# -*- coding: utf-8 -*-
# Copyright (C) 2010-2023 by Mike Gabriel <mike.gabriel@das-netzwerkteam.de>
#
# Python X2Go is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# Python X2Go is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program; if not, write to the
# Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.
# gevent_subprocess was found here: http://groups.google.com/group/gevent/browse_thread/thread/dba1a5d29e0a60ff
# Mark Visser <mjmvisser@gmail.com>
###
### Mark Visser, Sat, 20 Nov 2010 13:30:16 -0500
###
### <quote>
### Hi Mike,
###
### I hereby place that code snippet in the public domain, feel free to apply any license that is appropriate!
###
### cheers,
### -Mark
### </quote>
###
### Thus, I place myself as the copyright holder for code in this file
### for cases in that it is used in context of the X2Go project.
###
__package__ = 'x2go'
__name__ = 'x2go.gevent_subprocess'
"""Implementation of the standard :mod:`subprocess` module that spawns greenlets"""
import errno
import sys
import fcntl, os
_subprocess = __import__('subprocess')
from gevent import socket, select, hub
# identical to original
CalledProcessError = _subprocess.CalledProcessError
PIPE = _subprocess.PIPE
STDOUT = _subprocess.STDOUT
call = _subprocess.call
check_call = _subprocess.check_call
list2cmdline = _subprocess.list2cmdline
[docs]
class Popen(object):
def __init__(self, *args, **kwargs):
# delegate to an actual Popen object
self.__p = _subprocess.Popen(*args, **kwargs)
# make the file handles nonblocking
if self.stdin is not None:
fcntl.fcntl(self.stdin, fcntl.F_SETFL, os.O_NONBLOCK)
if self.stdout is not None:
fcntl.fcntl(self.stdout, fcntl.F_SETFL, os.O_NONBLOCK)
if self.stderr is not None:
fcntl.fcntl(self.stderr, fcntl.F_SETFL, os.O_NONBLOCK)
def __getattr__(self, name):
# delegate attribute lookup to the real Popen object
return getattr(self.__p, name)
def _write_pipe(self, f, input):
# writes the given input to f without blocking
if input:
bytes_total = len(input)
bytes_written = 0
while bytes_written < bytes_total:
try:
# f.write() doesn't return anything, so use os.write.
bytes_written += os.write(f.fileno(), input[bytes_written:])
except IOError as ex:
if ex[0] != errno.EAGAIN:
raise
sys.exc_clear()
socket.wait_write(f.fileno())
f.close()
def _read_pipe(self, f):
# reads output from f without blocking
# returns output
chunks = []
while True:
try:
chunk = f.read(4096)
if not chunk:
break
chunks.append(chunk)
except IOError as ex:
if ex[0] != errno.EAGAIN:
raise
sys.exc_clear()
socket.wait_read(f.fileno())
f.close()
return ''.join(chunks)
[docs]
def communicate(self, input=None):
# Optimization: If we are only using one pipe, or no pipe at
# all, using select() is unnecessary.
if [self.stdin, self.stdout, self.stderr].count(None) >= 2:
stdout = None
stderr = None
if self.stdin:
self._write_pipe(self.stdin, input)
elif self.stdout:
stdout = self._read_pipe(self.stdout)
elif self.stderr:
stderr = self._read_pipe(self.stderr)
self.wait()
return (stdout, stderr)
else:
return self._communicate(input)
def _communicate(self, input):
# identical to original... all the heavy lifting is done
# in gevent.select.select
read_set = []
write_set = []
stdout = None # Return
stderr = None # Return
if self.stdin:
# Flush stdin buffer.
self.stdin.flush()
if input:
write_set.append(self.stdin)
else:
self.stdin.close()
if self.stdout:
read_set.append(self.stdout)
stdout = []
if self.stderr:
read_set.append(self.stderr)
stderr = []
input_offset = 0
while read_set or write_set:
try:
rlist, wlist, xlist = select.select(read_set, write_set, [])
except select.error as e:
if e.args[0] == errno.EINTR:
continue
raise
if self.stdin in wlist:
# When select has indicated that the file is writable,
# we can write up to PIPE_BUF bytes without risk
# blocking. POSIX defines PIPE_BUF >= 512
bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512))
input_offset += bytes_written
if input_offset >= len(input):
self.stdin.close()
write_set.remove(self.stdin)
if self.stdout in rlist:
data = os.read(self.stdout.fileno(), 1024)
if data == "":
self.stdout.close()
read_set.remove(self.stdout)
stdout.append(data)
if self.stderr in rlist:
data = os.read(self.stderr.fileno(), 1024)
if data == "":
self.stderr.close()
read_set.remove(self.stderr)
stderr.append(data)
# All data exchanged. Translate lists into strings.
if stdout is not None:
stdout = ''.join(stdout)
if stderr is not None:
stderr = ''.join(stderr)
# Translate newlines, if requested. We cannot let the file
# object do the translation: It is based on stdio, which is
# impossible to combine with select (unless forcing no
# buffering).
if self.universal_newlines and hasattr(file, 'newlines'):
if stdout:
stdout = self._translate_newlines(stdout)
if stderr:
stderr = self._translate_newlines(stderr)
self.wait()
return (stdout, stderr)
[docs]
def wait(self, check_interval=0.01):
# non-blocking, use hub.sleep
try:
while True:
status = self.poll()
if status >= 0:
return status
hub.sleep(check_interval)
except OSError as e:
if e.errno == errno.ECHILD:
# no child process, this happens if the child process
# already died and has been cleaned up
return -1
else:
raise