#!/usr/bin/env python3
# -*- coding: utf8 -*-
from io import StringIO
import os
import subprocess
import sys
import types
from .exceptions import (DeliveryTransportError, DeliveryPackingError)
from .pickle import pickle, unpickle, ModulePickle
[docs]class DeliveryBox(object):
"""Container for data exchange"""
# OUTPUT VALUES
stdout = None
stderr = None
return_value = None
exception = None
# INPUT VALUES
instance = None
func = None
args = None
kwargs = None
modules = set()
pickled_modules = set()
def __str__(self):
return "\n".join(["{:15s}: {}".format(key, value)
for (key, value) in self.__dict__.items()])
def __eq__(self, other):
return self.__dict__ == other.__dict__
[docs]class DeliveryBoy(object):
"""Operator for call the new process and handle input/output
When called the decorated function and non-standard modules stored in its
`__globals__` attribute are pickled and passed via the transport command
to the newly started python process.
If an exception is raised during execution of the decorated function, this
exception is pickled and reraised.
If `async` is `False`, STDOUT, STDERR and the return value of the decorated
function are returned upon calling the decorated function. Otherwise only
the process ID is returned; if a transport is defined, it is the process ID
of the transport, otherwise the process ID of the interpreter.
After execution STDOUT and STDERR writing during execution of the callable
are written to STDOUT and STDERR of the main process. This applies only to
synchronous execution!
:param func: Function object that is called in the new process
:type func: callable
:param transport: Transport command
:type transport: str
:param transport_params: Additional arguments for the transport command.
:type transport_params: list
:param executable: The python executable to be called.
Default: `sys.executable`.
:type executable: Absolute path of python interpreter
:param async: If set to `True`, this process will not wait for the process
called via the transport command to finish. Default: `False`
:type async: bool
:param discard_excess: If set to `False`, all output written to STDOUT by
the new process that is not redirected gets pre- or
appended accordingly to the delivery box.
Default: `True`
:type discard_excess: bool
:return: Return value of the decorated callable
:raises deliveryboy.exceptions.DeliveryPackingError: if decorated callable
is not supported, if a module cannot be added to the delivery box
:raises deliveryboy.exceptions.DeliveryTransportError: if calling the
transport or executable fail (e.g. command not found, exit code not
equal zero.
"""
def __init__(self, func, transport=None, transport_params=[],
executable=sys.executable, async=False, discard_excess=True,
**params):
self.func = func
self.params = params
self.async = async
self.discard_excess= discard_excess
self.executable = executable
self.transport = transport
self.transport_params = transport_params
self.inbox = DeliveryBox()
self.outbox = None
def __call__(self, *args, **kwargs):
self._pack_box(args, kwargs)
response = self._run_delivery()
if self.transport:
self.outbox, prefix, suffix = unpickle(response[0],
self.discard_excess)
if prefix or suffix:
self.outbox.stdout = prefix + self.outbox.stdout + suffix
self._pipe_stdout_err()
self._reraise()
return self.outbox.return_value
def __get__(self, obj, classobj=None):
if obj is not None:
self.inbox.instance = obj
return self
def _pack_box(self, args, kwargs):
"""Pack callable, arguments and modules
:param args: Arguments to be passed to the callable
:type args: list
:param kwargs: Arguments to be passed to the callable
:type kwargs: dict
"""
self.inbox.args = args
self.inbox.kwargs = kwargs
if isinstance(self.func, types.FunctionType):
self.inbox.func = self.func.__code__
self._pack_box_modules()
# myglobals = self.func.__globals__
else:
raise DeliveryPackingError(
"This type of callable is not supported"
)
def _pack_box_modules(self):
"""Add modules to box for pickling"""
allmodules = [(k, v) for (k, v) in self.func.__globals__.items()
if isinstance(v, types.ModuleType)
and not k.startswith("__")]
venv = os.environ.get("VIRTUAL_ENV", None)
path = sys.path[1:]
if venv:
path = [p for p in path if p and not p.startswith(venv)]
path.append(venv)
try:
# Handle builtins and modules from virtual env
# Start with those that have no __file__ attribute
self.inbox.modules |= set([k for (k, v) in allmodules
if getattr(v, '__file__', None) is None])
# Then add those from the system paths
for sitepath in path:
self.inbox.modules |= {
k for (k, v) in allmodules
if getattr(v, '__file__', '').startswith(sitepath)
}
except Exception as error:
raise DeliveryPackingError(
"Cannot pack built-in/venv modules",
real_exception=error
)
# TODO: This breaks availability of imported submodules
mod_pickle = ModulePickle(modules=[v for (k, v) in allmodules
if k not in self.inbox.modules])
self.inbox.pickled_modules = mod_pickle.pickle()
self.inbox.modules |= set([k for (k, v) in allmodules
if k not in self.inbox.modules])
def _run_delivery(self):
"""Executes the actual transport/executable
If `transport` is `None`, it and `transport_params` will be omitted
from the command line. In this case the callable is run directly.
Also, in this case the `async` option is ignored.
"""
if self.transport:
cmd = [self.transport, ] + self.transport_params + [
self.executable, "-m", "deliveryboy", pickle(self.inbox)
]
try:
child_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
except Exception as error:
raise DeliveryTransportError(real_exception=error)
if not self.async:
response = child_process.communicate()
self._handle_call_error(response, child_process.returncode)
return response
else:
return child_process.pid
else:
self.outbox = execute(self.inbox)
def _handle_call_error(self, response, returncode):
if returncode:
raise DeliveryTransportError(
"Child process exited with {}: {}".format(
returncode, response[1].decode("utf8")
))
def _pipe_stdout_err(self):
"""Redirect STDOUT and STDERR from delivered callable"""
for stream in ["stdout", "stderr"]:
if isinstance(self.outbox, DeliveryBox) \
and getattr(self.outbox, stream, None):
print(
getattr(self.outbox, stream),
file=getattr(sys, stream)
)
def _reraise(self):
"""Re-raises an exception originating from the callable"""
if self.outbox and isinstance(self.outbox.exception, Exception):
raise self.outbox.exception
[docs]class DeliveryBoyDecorator(object):
"""Decorator for functions
Decorated functions are pickled and passed to a newly started python process
that is called via a transport command (e.g. sudo)
:param transport: Transport command
:type transport: str
:param executable: The python executable to be called.
Default: `sys.executable`.
:type executable: Absolute path of python interpreter
:param async: If set to `True`, this process will not wait for the process
called via the transport command to finish. Default: `False`
:type async: bool
"""
def __init__(self, **params):
self.params = params
def __call__(self, func, *args, **kwargs):
return DeliveryBoy(func, **self.params)
[docs]def execute(inbox):
"""Setup the environment and execute the decorated callable
:param inbox: Pickled :py:obj:`DeliveryBox` instance
:return: :py:obj:`DeliveryBox`
:raises deliveryboy.exception.DeliveryPackingError: If callable is missing
"""
# Load pickled modules
mod_pickle = ModulePickle(pickled=inbox.pickled_modules)
mod_pickle.unpickle()
# Import modules
globals().update({x: __import__(x) for x in inbox.modules})
orig_stdout = sys.stdout
orig_stderr = sys.stderr
sys.stdout = StringIO()
sys.stderr = StringIO()
if inbox.func is not None and isinstance(inbox.func, types.CodeType):
func = types.FunctionType(inbox.func, globals())
else:
del mod_pickle
raise DeliveryPackingError("No callable to run in delivery box")
box = DeliveryBox()
try:
if inbox.instance is not None:
box.return_value = func(inbox.instance, *inbox.args, **inbox.kwargs)
else:
box.return_value = func(*inbox.args, **inbox.kwargs)
except Exception as error:
box.exception = error
box.stdout = sys.stdout.getvalue()
box.stderr = sys.stderr.getvalue()
sys.stdout = orig_stdout
sys.stderr = orig_stderr
del mod_pickle
return box
[docs]def main():
"""Entry function for new process
This method unpickles data from the command line, redirects STDOUT + STDERR
and pickles the return value and exception
Input and output of this function are base64 encoded strings representing
pickled :py:obj:`deliveryboy.core.DeliveryBox` objects.
"""
try:
inbox = unpickle(bytes(sys.argv[1], "utf8"))[0]
except Exception as error:
box = DeliveryBox()
box.exception = error
else:
box = execute(inbox)
print(pickle(box))