Source code for astwro.exttools.Runner

# coding=utf-8
from __future__ import absolute_import, division, print_function
__metaclass__ = type

import os
import sys
import shutil
import hashlib
from copy import deepcopy
try:
    # noinspection PyCompatibility
    from StringIO import StringIO  # python2
except ImportError:
    from io import StringIO  # python3
import subprocess as sp
try:
    # noinspection PyUnresolvedReferences
    from subprocess import TimeoutExpired  # python 3 only
except ImportError:
    pass

#from . import logger as module_logger
from astwro.config.logger import logger as module_logger
from astwro.config import get_config
from astwro.utils import tmpdir, TmpDir
from .output_processors import StreamKeeper, OutputProvider


class Runner(object):
    """
    Base class for specific runners.

    Maintains underlying process lifetime,
    communication with process (streams, output processors chain), runner dir etc...
    """

    class RunnerException(Exception):
        """Exceptions raised by `Runner` and subclasses"""

        def __init__(self, message, runner):
            self.commandline = str(runner.executable) + ' ' + ' '.join(runner.arguments)
            self.stdin = runner.input
            self.stdout = runner.output
            self.stderr = runner.stderr
            super(Runner.RunnerException, self).__init__(message)

        def __str__(self):
            return (super(Runner.RunnerException,self).__str__()
                    + '\n>> Commandline:\n{}\n>> Stdin:\n{}\n>> Stdout:\n{}\n>> Stderr:\n{}\n'.format(
                        self.commandline, self.stdin, self.stdout, self.stderr
                    ))

    class ExitError(RunnerException):
        """Exceptions raised when underlying process returns error code on exit"""
        def __init__(self, message, runner, code):
            super(Runner.ExitError, self).__init__(message, runner)
            self.code = code
        def __str__(self):
            return (super(Runner.ExitError,self).__str__()
                    + '\n>> Process exit code: {}'.format(self.code))

    class NoFileError(RunnerException):
        def __init__(self, message, runner, filename):
            super(Runner.NoFileError, self).__init__(message, runner)
            self.filename = filename

    class RunnerValueError(ValueError, RunnerException):
        pass

    class RunnerTypeError(TypeError, RunnerException):
        pass

    raise_on_nonzero_exitcode = True
    preserve_process = False  # not implemented

    def __init__(self, dir=None, batch=False):
        """
        :param dir: path name or TmpDir object, in not provided new temp dir will be used
        :param bool batch:      whether Daophot have to work in batch mode.
        """
        self.logger = module_logger.getChild(type(self).__name__)
        self.executable = None
        self.arguments = []
        self.batch_mode = batch
        self.__stream_keeper = None

        self._prepare_dir(dir)
        self._reset()

    def _reset(self):
        """Resets runner without cleaning/changing runner dir
           allows execution of new sequence in same dir and files"""
        self.input = None
        self.output = None
        self.stderr = None
        self.returncode = None
        self.__process = None
        self.__commands = ''
        self.ext_output_files = set()

        if self.__stream_keeper is not None:
            self.__stream_keeper.stream = None # new chain containing only old StreamKeeper
        else:
            self.__stream_keeper = StreamKeeper(runner=self)   # new chain containing new StreamKeeper
        self.__processors_chain_last = self.__stream_keeper
        self.__processors_chain_first = None

    def __deepcopy__(self, memo):
        cls = self.__class__
        new = cls.__new__(cls)
        memo[id(self)] = new

        new.__stream_keeper = None
        new._reset()
        new.logger = self.logger
        new.executable = self.executable
        # new.output = self.output
        # new.stderr = self.stderr
        # new.returncode = self.returncode
        new.batch_mode = self.batch_mode
        new.arguments = self.arguments
        # new.__process = None
        # new.__commands = self.__commands
        # new.ext_output_files = set()

        # new.__processors_chain_last  = deepcopy(self.__processors_chain_last, memo) # copy chain
        # new.__processors_chain_first = memo[id(self.__processors_chain_first)]  # find StreamKeeper in copied chain
        # new.__stream_keeper          = memo[id(self.__stream_keeper)]  # find StreamKeeper in copied chain

        new.dir = deepcopy(self.dir, memo)
        return new

    def __del__(self):
        self.close()

    def __enter__(self):
        return self

    def __exit__(self, type, value, traceback):
        self.close()

    def clone(self):
        """Clones runner

        If *runner directory* was provided in constructor, clone will share the same dir, else, if
        *runner directory* is temp dir created implicitly by runner, clone will create it's own one, and
        content of *runner directory* will be copied from source to clone."""
        return deepcopy(self)

    def close(self):
        """Cleans things up."""
        self._on_exit()
        self.dir = None

    @property
    def mode(self):
        """Either "normal" or "batch". In batch mode, commands are not executed but collected
        on execution queue, then run together, in single process, one by one, 
        triggered by :py:meth:`run()` method"""
        return 'batch' if self.batch_mode else  'normal'

    @mode.setter
    def mode(self, m):
        if m.lowercase() == 'batch':
            self.batch_mode = True
        elif m.lowercase() == 'normal':
            self.batch_mode = False
        else:
            raise Runner.RunnerValueError('mode have to be either "normal" or "batch"', self)

    def _init_workdir_files(self, dir):
        pass

    def _update_executable(self, exe):
        """Find exe key in configuration and set as Runner executable"""
        if self.executable is None:
            self.executable = os.path.expanduser(get_config().get('executables', exe))


    def _prepare_dir(self, dir=None, init_files=True):
        if dir is None:
            dir = tmpdir(prefix='pydaophot_tmp')
        elif isinstance(dir, str):
            dir = tmpdir(use_existing=dir)
        elif not isinstance(dir, TmpDir):
            raise Runner.RunnerTypeError('dir must be either: TmpDir object, str, None', self)
        self.dir = dir
        if init_files:
            self._init_workdir_files(dir)

    def copy_to_runner_dir(self, source, filename=None):
        """Copies source file to  runner dir under name filename or the same
        as original if filename is None. Overwrites existing file."""
        dst = self.dir.path if filename is None else os.path.join(self.dir.path, filename)
        shutil.copy(source, dst)

    def link_to_runner_dir(self, source, link_filename=None):
        # type: (str, str) -> None
        """Creates symlink in  runner dir under name filename or the same
        as original if filename is None. Overwrites existing link.
        :param source: file patch  
        :param link_filename: worker dir link name, default: same as filename part of source"""
        source = self.expand_path(source)
        # if not os.path.isfile(source):
        #     raise IOError('Source file {} not found'.format(source))
        if link_filename is None:
            link_filename = os.path.basename(source)
        dest = os.path.join(self.dir.path, link_filename)
        try:
            os.remove(dest)
        except OSError:
            pass
        os.symlink(source, dest)

    def copy_from_runner_dir(self, filename, dest='./'):
        """Copies file: filename from runner dir. Overwrites existing file."""
        shutil.copy(os.path.join(self.dir.path, filename), dest)

    def link_from_runner_dir(self, filename, dest='./'):
        """Creates symlink in dest of file from runner dir.
        dest can be either file path for new symlink or directory.
        In second case name of symlink will be filename. Overwrites existing file."""
        if os.path.basename(dest) == '':
            dest = os.path.join(dest, filename)
        try:
            os.remove(dest)
        except OSError:
            pass
        os.symlink(os.path.join(self.dir.path, filename), dest)

    def file_from_runner_dir(self, filename):
        """Simply adds runner dir path into filename"""
        return os.path.join(self.dir.path, filename)

    def exists_in_runner_dir(self, filename):
        """Checks for filename existence in runner dir"""
        return os.path.exists(os.path.join(self.dir.path, filename))

    def rm_from_runner_dir(self, filename):
        """Removes (if exists) file filename from runner dir"""
        try:
            os.remove(os.path.join(self.dir.path, filename))
        except OSError:
            pass

    @staticmethod
    def expand_path(path):
        """Expand user ~ directory and finds absolute path."""
        if path is None:
            path = ''
        else:
            path = os.path.abspath(os.path.expanduser(path))
        return path

    def absolute_path(self, path):
        """Returns absolute path for filepath parameter, if :arg:path contain filename only, runner dir is added"""
        if os.path.basename(path) != path:  # not in runner directory
            absolute = self.expand_path(path)
        else:
            absolute = os.path.join(self.dir.path, path)
        return absolute


    @staticmethod
    def _runner_dir_file_name(filepath='', prefix='', suffix='', signature=None):
        # type: (str, str, str, str) -> str
        """Generates name used in Runner local dir for filepath

        Files processed by underlying process are always in it's working directory
          (runner directory). For files from other location in filesystem, copies
          or links in runner directory are maintained. Names of that files are prefixed
          with hash (shortened) of filepath to avoid collisions.
        """
        if signature is None:
            signature = filepath

        return prefix \
               + str(hashlib.md5(str(signature).encode()).hexdigest())[:6] \
               + '_' \
               + os.path.basename(filepath) + suffix

    def _prepare_output_file(self, data, preservefilename=False):
        # type: (str) -> (str, str)
        return self._prepare_io_file(data, output=True, preservefilename=preservefilename)

    def _prepare_input_file(self, path, preservefilename=False):
        # type: (str) -> (str, str)
        return self._prepare_io_file(path, output=False, preservefilename=preservefilename)

    def _prepare_io_file(self, path, output, preservefilename=False):
        # type: (str, bool) -> (str, str)
        """ make link for non-local input files in runner dir, gen runner dir filename """
        if not path:
            return '',''
        if os.path.dirname(os.path.abspath(path)) == self.dir.path: # path to runner dir provided, cut it
            path = os.path.basename(path)
        if os.path.basename(path) != path:  # not in runner directory
            absolute = self.expand_path(path)
            local = os.path.basename(path) if preservefilename else self._runner_dir_file_name(absolute)
            if output:
                # add to list of files to update after run
                self.ext_output_files.add(absolute)
            elif absolute not in self.ext_output_files:
                self.link_to_runner_dir(absolute, local)
                self.logger.debug("Linking input file into runner directory: {} <- {}".format(local, absolute))
        else:
            absolute = os.path.join(self.dir.path, path)
            local = path
        if output:
            # remove runner dir file if exist
            self.rm_from_runner_dir(local)

        return local, absolute

    def _pre_run(self, wait):
        pass

    def run(self, wait=True):
        """
        Execute commands queue.

        In the "normal" :meth:`mode <mode>` there is no need to call :meth:`run`, because all commands are
        executed immediately. In "batch" :meth:`mode <mode>`, commands  execution is queued and postponed
        until :meth:`.run`

        :param bool wait:
            If false,  :meth:`run` exits without waiting for finishing commands executions (asynchronous processing).
            Call :meth:`wait_for_results` before accessing results.
        :return: None
        """
        self._pre_run(wait)
        try:
            self.__process = sp.Popen([self.executable] + self.arguments,
                                      stdin=sp.PIPE,
                                      stdout=sp.PIPE,
                                      stderr=sp.PIPE,
                                      cwd=self.dir.path)
        except OSError as e:
            self.logger.error(
                'Executable: %s is expected in PATH, configure executable name/path in ~/pydaophot.cfg e.g.',
                self.executable)
            raise e
        self.logger.debug('STDIN:\n' + self.__commands)
        if wait:
            self.__communicate(self.__commands)
        else:
            if sys.version_info[0] > 2:  # python 3 has timeout in communicate
                try:
                    self.__communicate(self.__commands, timeout=0.01)
                except TimeoutExpired:
                    pass
            else:  # python 2 - write directly to stdin and close it to flush end generate EOF
                self.__process.stdin.write(self.__commands)
                self.__process.stdin.close()
                self.__process.stdin = None

    def is_ready_to_run(self):
        """
        Returns True if there are some commands waiting for run but process was not started yet

        :return: bool
        """
        return self.__commands and self.__process is None

    @property
    def running(self):
        """
        Whether if runner is running

        ``True`` If executable was started in async mode :meth:`run(wait=False) <run>`, and no output collected yet.

        .. Note::
            Even if executable has finished, output will not be collected and :meth:`running <running>` will
            return ``True`` until user asks for results or call :meth:`wait_for_results()`

        :return: bool
        """
        return self.__process is not None and self.output is None

    def has_finished_run(self):
        """
        Returns True if process has finished and output is available

        :return: bool
        """
        return self.output is not None

    def wait_for_results(self):
        """In the "batch" mode, waits for commands completion if :meth:`run(wait=False) <run>` was called """
        if self.running:
            self.__communicate()
        if self.is_ready_to_run():
            self.run(wait=True)

    def __communicate(self, inpt=None, timeout=None):
        i = inpt.encode(encoding='ascii') if inpt else None
        self.input = i
        o, e = self.__process.communicate(i, timeout=timeout) if timeout else self.__process.communicate(i)
        self.output = o.decode('ascii')
        self.stderr = e.decode('ascii')
        self.logger.debug('STDOUT:\n' + self.output)
        self.__stream_keeper.stream = StringIO(self.output)
        self.returncode = self.__process.returncode
        if self.returncode != 0:
            self.logger.warning('{} process finished with error code {}'.format(self.executable, self.returncode))
            if self.raise_on_nonzero_exitcode:
                raise Runner.ExitError('Execution failed, exit code {}'.format(self.returncode), self, self.returncode)
        # copy results - output files from runners directory to user specified path
        for f in self.ext_output_files:
            try:
                self.copy_from_runner_dir(self._runner_dir_file_name(f), f)
            except FileNotFoundError as e:
                msg = '{} process does not produce expected output file {}<--{}'.format(self.executable, f,
                                                                                        self._runner_dir_file_name(f))
                self.logger.error(msg)
                if self.raise_on_nonzero_exitcode:
                    raise Runner.NoFileError(msg, self, self.returncode)
        # fill chained processors buffers
        self.__processors_chain_last.get_output_stream()


    def _get_ready_for_commands(self):
        if self.running:
            self.wait_for_results()  # if running wait
        if self.has_finished_run():  # if was running reset and get ready for new process
            self._reset()

    def _insert_processing_step(self, std_in, output_processor=None, on_beginning=False):
        if on_beginning:
            self.__commands = std_in + self.__commands
        else:
            self.__commands += std_in
        if output_processor is not None:
            if not isinstance(output_processor, OutputProvider):
                raise Runner.RunnerTypeError('output_processor must OutputProvider subclass', self)
            output_processor.logger = self.logger
            #  chain organisation:
            # [stream_keeper]<-[processors_chain_first]<-[]<-[]<-[processors_chain_last]
            if on_beginning:
                output_processor._prev_in_chain = self.__stream_keeper
                if self.__processors_chain_first is not None:
                    self.__processors_chain_first._prev_in_chain = output_processor
                self.__processors_chain_first = output_processor
            else:
                output_processor._prev_in_chain = self.__processors_chain_last
                self.__processors_chain_last = output_processor
                if self.__processors_chain_first is None:
                    self.__processors_chain_first = output_processor
        return output_processor

    def _on_exit(self):
        pass