Source code for d6tflow.targets

import luigi
import pandas as pd
import json
import pickle
import pathlib

#import d6tcollect

from d6tflow.cache import data as cache
import d6tflow.settings as settings
import d6tflow.utils
import d6tcollect

[docs]class CacheTarget(luigi.LocalTarget): """ Saves to in-memory cache, loads to python object """
[docs] def exists(self): return self.path in cache
[docs] def invalidate(self): if self.path in cache: cache.pop(self.path)
[docs] def load(self, cached=True): """ Load from in-memory cache Returns: python object """ if self.exists(): return cache.get(self.path) else: raise RuntimeError('Target does not exist, make sure task is complete')
@d6tcollect._collectClass def save(self, df): """ Save dataframe to in-memory cache Args: df (obj): pandas dataframe Returns: filename """ cache[self.path] = df return self.path
[docs]class PdCacheTarget(CacheTarget): pass
class _LocalPathTarget(luigi.LocalTarget): """ Local target with `self.path` as `pathlib.Path()` """ def __init__(self, path=None): super().__init__(path) self.path = pathlib.Path(path) (self.path).parent.mkdir(parents=True, exist_ok=True) def exists(self): return self.path.exists() def invalidate(self): if self.exists(): self.path.unlink() return not self.exists()
[docs]class DataTarget(_LocalPathTarget): """ Local target which saves in-memory data (eg dataframes) to persistent storage (eg files) and loads from storage to memory This is an abstract class that you should extend. """
[docs] def load(self, fun, cached=False, **kwargs): """ Runs a function to load data from storage into memory Args: fun (function): loading function cached (bool): keep data cached in memory **kwargs: arguments to pass to `fun` Returns: data object """ if self.exists(): if not cached or not settings.cached or self.path not in cache: opts = {**{},**kwargs} df = fun(self.path, **opts) if cached or settings.cached: cache[self.path] = df return df else: return cache.get(self.path) else: raise RuntimeError('Target does not exist, make sure task is complete')
[docs] def save(self, df, fun, **kwargs): """ Runs a function to save data from memory into storage Args: df (obj): data to save fun (function): saving function **kwargs: arguments to pass to `fun` Returns: filename """ fun = getattr(df, fun) (self.path).parent.mkdir(parents=True, exist_ok=True) fun(self.path, **kwargs) return self.path
[docs]class CSVPandasTarget(DataTarget): """ Saves to CSV, loads to pandas dataframe """
[docs] def load(self, cached=False, **kwargs): """ Load from csv to pandas dataframe Args: cached (bool): keep data cached in memory **kwargs: arguments to pass to pd.read_csv Returns: pandas dataframe """ return super().load(pd.read_csv, cached, **kwargs)
@d6tcollect._collectClass def save(self, df, **kwargs): """ Save dataframe to csv Args: df (obj): pandas dataframe kwargs : additional arguments to pass to df.to_csv Returns: filename """ opts = {**{'index':False},**kwargs} return super().save(df, 'to_csv', **opts)
[docs]class CSVGZPandasTarget(CSVPandasTarget): """ Saves to CSV gzip, loads to pandas dataframe """ @d6tcollect._collectClass def save(self, df, **kwargs): """ Save dataframe to csv gzip Args: df (obj): pandas dataframe kwargs : additional arguments to pass to df.to_csv Returns: filename """ opts = {**{'index':False, 'compression':'gzip'},**kwargs} return super().save(df, 'to_csv', **opts)
[docs]class ExcelPandasTarget(DataTarget): """ Saves to Excel, loads to pandas dataframe """
[docs] def load(self, cached=False, **kwargs): """ Load from Excel to pandas dataframe Args: cached (bool): keep data cached in memory **kwargs: arguments to pass to pd.read_csv Returns: pandas dataframe """ return super().load(pd.read_excel, cached, **kwargs)
@d6tcollect._collectClass def save(self, df, **kwargs): """ Save dataframe to Excel Args: df (obj): pandas dataframe kwargs : additional arguments to pass to df.to_csv Returns: filename """ opts = {**{'index':False},**kwargs} return super().save(df, 'to_excel', **opts)
[docs]class PqPandasTarget(DataTarget): """ Saves to parquet, loads to pandas dataframe """
[docs] def load(self, cached=False, **kwargs): """ Load from parquet to pandas dataframe Args: cached (bool): keep data cached in memory **kwargs: arguments to pass to pd.read_parquet Returns: pandas dataframe """ return super().load(pd.read_parquet, cached, **kwargs)
@d6tcollect._collectClass def save(self, df, **kwargs): """ Save dataframe to parquet Args: df (obj): pandas dataframe kwargs : additional arguments to pass to df.to_parquet Returns: filename """ opts = {**{'compression':'gzip','engine':'pyarrow'},**kwargs} return super().save(df, 'to_parquet', **opts)
[docs]class JsonTarget(DataTarget): """ Saves to json, loads to dict """
[docs] def load(self, cached=False, **kwargs): """ Load from json to dict Args: cached (bool): keep data cached in memory **kwargs: arguments to pass to json.load Returns: dict """ def read_json(path, **opts): with open(path, 'r') as fhandle: df = json.load(fhandle) return df['data'] return super().load(read_json, cached, **kwargs)
@d6tcollect._collectClass def save(self, dict_, **kwargs): """ Save dict to json Args: dict_ (dict): python dict kwargs : additional arguments to pass to json.dump Returns: filename """ def write_json(path, _dict_, **opts): with open(path, 'w') as fhandle: json.dump(_dict_, fhandle, **opts) (self.path).parent.mkdir(parents=True, exist_ok=True) opts = {**{'indent':4},**kwargs} write_json(self.path, {'data':dict_}, **opts) return self.path
[docs]class PickleTarget(DataTarget): """ Saves to pickle, loads to python obj """
[docs] def load(self, cached=False, **kwargs): """ Load from pickle to obj Args: cached (bool): keep data cached in memory **kwargs: arguments to pass to pickle.load Returns: dict """ def funload(x): with open(x,"rb" ) as fhandle: data = pickle.load(fhandle) return data return super().load(funload, cached, **kwargs)
@d6tcollect._collectClass def save(self, obj, **kwargs): """ Save obj to pickle Args: obj (obj): python object kwargs : additional arguments to pass to pickle.dump Returns: filename """ (self.path).parent.mkdir(parents=True, exist_ok=True) with open(self.path, "wb") as fhandle: pickle.dump(obj, fhandle, **kwargs) return self.path