Source code for d6tflow.pipes

import d6tflow
import d6tcollect

try:
    import d6tpipe
except:
    ModuleNotFoundError("Optional requirement d6tpipe not found, install with `pip install d6tpipe`")


#@d6tcollect.collect
[docs]def init(default_pipe_name, profile=None, local_pipe=False, local_api=False, reset=False, api=None, set_dir=True, api_args=None, pipe_args=None): """ Initialize d6tpipe Args: default_pipe_name (str): name of pipe to store results. Override by setting Task.pipe attribute profile (str): name of d6tpipe profile to get api if api not provided local_pipe (bool): use `PipeLocal()` local_api (bool): use `APILocal()` reset (bool): reset api and pipe connection api (obj): d6tpipe api object. if not provided will be loaded set_dir (bool): if True, set d6tflow directory to default pipe directory api_args (dir): arguments to pass to api pipe_args (dir): arguments to pass to pipe """ if not d6tflow.settings.isinitpipe or reset: d6tflow.cache.pipe_default_name = default_pipe_name api_args = {} if api_args is None else api_args pipe_args = {} if pipe_args is None else pipe_args if local_pipe: pipe_ = d6tpipe.PipeLocal(default_pipe_name, profile=profile) else: if api is None: if local_api: d6tflow.cache.api = d6tpipe.APILocal(profile=profile, **api_args) else: d6tflow.cache.api = d6tpipe.APIClient(profile=profile, **api_args) else: d6tflow.cache.api = api pipe_ = d6tpipe.Pipe(d6tflow.cache.api, default_pipe_name, **pipe_args) d6tflow.cache.pipes[default_pipe_name] = pipe_ if set_dir: d6tflow.settings.isinitpipe = False d6tflow.set_dir(pipe_.dir) d6tflow.settings.isinitpipe = True
def _assertinit(): if not d6tflow.settings.isinitpipe: raise RuntimeError('d6tpipe not initialized, run d6tflow.pipes.init()') #@d6tcollect.collect
[docs]def init_pipe(name=None, **kwargs): _assertinit() name = d6tflow.cache.pipe_default_name if name is None else name pipe_ = d6tpipe.Pipe(d6tflow.cache.api,name, **kwargs) d6tflow.cache.pipes[name] = pipe_
[docs]def get_pipe(name=None): """ Get a pipe Args: name (str): name of pipe Returns: obj: pipe object """ _assertinit() name = d6tflow.cache.pipe_default_name if name is None else name if name not in d6tflow.cache.pipes: init_pipe(name) return d6tflow.cache.pipes[name]
[docs]def get_dirpath(name=None): """ Get a pipe directory as Pathlib.Path Args: name (str): name of pipe """ return get_pipe(name).dirpath
[docs]def set_default(name): """ Set default pipe. Will also change d6tflow directory Args: name (str): name of pipe """ pipe_ = get_pipe(name) d6tflow.cache.pipe_default_name = name d6tflow.settings.isinitpipe = False d6tflow.set_dir(pipe_.dir) d6tflow.settings.isinitpipe = True
def _all_pipes(task, push=False): """ Get all pipes for all downstream tasks in a flow Args: task (obj): task object """ tasks = d6tflow.taskflow_upstream(task, only_complete=False) if push: pipenames = set([t.get_pipename() for t in tasks if not getattr(t, 'external', False)]) else: pipenames = set([t.get_pipename() for t in tasks]) return {name:get_pipe(name) for name in pipenames} @d6tcollect.collect def _all_runfun(task, fun, push=False, **kwargs): pipes = _all_pipes(task,push) def runfun(p): f = getattr(p, fun) return f(**kwargs) return {pipe.name: runfun(pipe) for pipe in pipes.values()}
[docs]def all_pull(task, **kwargs): """ Pull for all upstream tasks in a flow """ return _all_runfun(task, 'pull', **kwargs)
[docs]def all_pull_preview(task, **kwargs): """ Pull preview for all upstream tasks in a flow """ return _all_runfun(task, 'pull_preview', **kwargs)
[docs]def all_push(task, **kwargs): """ Push for all upstream tasks in a flow """ return _all_runfun(task, 'push', push=True, **kwargs)
[docs]def all_push_preview(task, **kwargs): """ Push preview for all upstream tasks in a flow """ return _all_runfun(task, 'push_preview', push=True, **kwargs)