Source code for d6tcollect

'''

Much like websites, this library collects anonymous usage statistics.
It ONLY collects import and function call events. It does NOT collect any of your data.
Example: {'profile': 'prod', 'package': 'd6tmodule', 'module': 'd6tmodule.utils', 'classModule': 'd6tmodule.utils.MyClass', 'class': 'MyClass',
    'function': 'MyClass0.myfunction_1', 'functionModule': 'd6tmodule.utils.MyClass.myfunction_1', 'event': 'call', 'params': {'args': 1, 'kwargs': 'another'}}
For privacy notice see https://www.databolt.tech/index-terms.html#privacy

'''

import queue
from datetime import datetime
from datetime import timedelta
import sys
import os
import urllib.request

import json
import threading
import uuid
import sqlite3
from pathlib import Path
import hashlib

submit = True
ignore_errors = True
profile = 'prod'

host = 'http://localhost:8080'
host = os.environ.get('D6TCOLLECT_SVR', 'https://d6tcollect.databolt.tech')
endpoint = '/v1/api/collect'
source = 'd6tcollect'


def create_db():
    """ Creates a db if it doesn't already exists 
    and returns the path to it """
    collect_folder = ".d6tcollect"
    db_name = "collect.sqlite"
    db_path = Path.home() / collect_folder
    db_path.mkdir(parents=True, exist_ok=True)

    db_path = str(db_path / db_name)
    events_table = """CREATE TABLE IF NOT EXISTS events (
            id text PRIMARY KEY,
            date text NOT NULL,
            payload text NOT NULL,
            submit integer NOT NULL
        );"""
    events_submitted_table = """CREATE TABLE IF NOT EXISTS events_submitted (
            id integer PRIMARY KEY,
            date_submit text NOT NULL,
            payload text NOT NULL
        );"""
    date_submitted_table = """CREATE TABLE IF NOT EXISTS date_submitted (
            id integer PRIMARY KEY,
            date text NOT NULL,
            datetime text NOT NULL
        );"""
    with sqlite3.connect(db_path) as conn:
        conn.execute(events_table)
        conn.execute(events_submitted_table)
        conn.execute(date_submitted_table)

    return db_path


DB_PATH = create_db()


def daily_summary_sent():
    select_statement = """ 
        select date from date_submitted where date=?
    """
    record = None
    with get_connection() as conn:
        cur = conn.cursor()
        record = cur.execute(
            select_statement, (datetime.now().date().isoformat(),)
        ).fetchall()

    return len(record) != 0


def get_payloads():
    select_statement = """ 
        select date, payload from events
    """
    payloads = None
    with get_connection() as conn:
        cur = conn.cursor()
        cur.execute(select_statement)
        payloads = cur.fetchall()
    return payloads


def move_payloads(payloads, delete_original_payloads=True):
    insert_statement_submitted = """
        insert into events_submitted(date_submit, payload)
        values(?, ?)
    """

    delete_statement_events = """ 
        delete from events where date=? and payload=?
    """
    with get_connection() as conn:
        payloads_events_submitted = [
            (datetime.now(), payload[1]) for payload in payloads]
        cur = conn.cursor()
        cur.executemany(insert_statement_submitted, payloads_events_submitted)

        if delete_original_payloads:
            cur.executemany(delete_statement_events, payloads)

        conn.commit()


def today():
    return datetime.now().strftime("%Y-%m-%d")


def insert_date_submitted():
    date_time_submitted = datetime.now().isoformat()
    date_submitted = datetime.now().date().isoformat()

    insert_statement = """
        insert into date_submitted(date, datetime)
        values(?, ?)
    """
    with get_connection() as conn:
        cur = conn.cursor()
        cur.execute(insert_statement, (date_submitted, date_time_submitted))


daily_submits_queue = queue.Queue(maxsize=-1)


def DailySubmission(q):
    payload = q.get()
    while payload != "done":
        _request(payload[1], from_db=True)
        payload = q.get()


def send_daily_summary(forced=False):
    if daily_summary_sent() and not forced:
        return
    # print("send_daily_summary")
    payloads = get_payloads()

    for payload in payloads:
        # daily_submits_queue.put(payload)
        _submit(payload[1], put_in_queue=False, from_db=True)
    move_payloads(payloads, delete_original_payloads=True)

    if payloads:
        insert_date_submitted()


payload_queue = queue.Queue(maxsize=-1)


def get_connection():
    path = create_db()
    return sqlite3.connect(path)


def insert_event(id, date, payload, submitted):
    submitted = 1 if submitted else 0

    payload = json.dumps(payload, default=str).encode('utf-8')
    insert_statement = """ 
        INSERT or IGNORE INTO events(id, date, payload, submit)
        VALUES (?, ?, ?, ?)
        """
    with get_connection() as conn:
        conn.execute(insert_statement, (id, date, payload, submitted))
        conn.commit()


def insert_payload(payload):
    id = hashlib.md5(str(payload.values()).encode('utf-8')).hexdigest()
    date = datetime.now().isoformat()
    submitted = False

    insert_event(id, date, payload, submitted)


def Writer(payload_queue):
    main_thread_alive = True

    while(main_thread_alive):
        for i in threading.enumerate():
            if i.name == "MainThread":
                main_thread_alive = i.is_alive()
        try:
            payload = payload_queue.get()
            # print("payload: ", payload)
            if payload == "EXIT":
                break

            # insert payload in db
            insert_payload(payload)
        except queue.Empty:
            pass

    while not payload_queue.empty():
        payload = payload_queue.get()
        insert_payload(payload)


_t = threading.Thread(target=Writer, args=(payload_queue,))
_t.daemon = True
_t.start()


def _request(payload, from_db):
    try:
        if from_db:
            req = urllib.request.Request(
                host + endpoint, data=payload,
                headers={'content-type': 'application/json', "Source": source})

        else:
            payload['uuid'] = str(uuid.UUID(int=uuid.getnode())).split('-')[-1]
            req = urllib.request.Request(host + endpoint, data=json.dumps(payload, default=str).encode(
                'utf-8'), headers={'content-type': 'application/json', "Source": source})
        urllib.request.urlopen(req,  timeout=2)
    except Exception as e:
        #print(e)
        if ignore_errors:
            pass
        else:
            raise e


def _submit(payload, put_in_queue=True, from_db=False):
    if put_in_queue:
        payload_queue.put(payload)
    else:
        _t = threading.Thread(target=_request, args=(
            payload, from_db))
        _t.daemon = True
        _t.start()
        _t.join()


def init(_module):

    module = _module.split('.')
    payload = {
        'profile': profile,
        'package': module[0] if len(module) > 0 else module,
        'module': _module,
        'event': 'import',
        'date_collect': today()
    }
    _submit(payload)


def collect(func):
    def wrapper(*args, **kwargs):
        # print("kwargs",  ",".join(kwargs))
        if submit == False:
            return func(*args, **kwargs)

        module = func.__module__.split('.')
        payload = {
            'profile': profile,
            'package': module[0] if len(module) > 0 else module,
            'module': func.__module__,
            'classModule': None,
            'class': None,
            'function': func.__qualname__,
            'functionModule': ".".join([func.__module__, func.__qualname__]),
            'event': 'call',
            'params': {'args': len(args), 'kwargs': ",".join(kwargs)},
            'date_collect': today()
        }
        payload['uuid'] = str(uuid.UUID(int=uuid.getnode())).split('-')[-1]
        # print(payload)
        _submit(payload)
        try:
            return func(*args, **kwargs)
        except Exception as e:
            payload['event'] = 'exception'
            payload['exceptionType'] = e.__class__.__name__
            payload['exceptionMsg'] = str(e)
            _submit(payload)
            raise e

    return wrapper


def get_og_class(cls, func):
    ''' og_class is the original implementor of the function func'''
    func_implementor = func.__qualname__.split(".")[0]
    for _cls in cls.__mro__:
        if _cls.__name__ == func_implementor:
            return _cls


def _collectClass(func):
    def wrapper(self, *args, **kwargs):
        if submit == False:
            return func(self, *args, **kwargs)
        og_class = get_og_class(self.__class__, func)

        if og_class is None:
            # This should never happen but just to be sure
            return func(self, *args, **kwargs)

        module = func.__module__.split('.')
        payload = {
            'profile': profile,
            'package': module[0] if len(module) > 0 else module,
            'module': og_class.__module__,
            'classModule': ".".join([og_class.__module__, og_class.__qualname__]),
            'class': og_class.__qualname__,
            'function': func.__qualname__,
            'functionModule': ".".join([og_class.__module__, og_class.__name__, func.__name__]),
            'event': 'call',
            'params': {'args': len(args), 'kwargs': ",".join(kwargs)},
            'date_collect': today()
        }
        payload['uuid'] = str(uuid.UUID(int=uuid.getnode())).split('-')[-1]
        # print(payload)
        _submit(payload)
        try:
            return func(self, *args, **kwargs)
        except Exception as e:
            payload['event'] = 'exception'
            payload['exceptionType'] = e.__class__.__name__
            payload['exceptionMsg'] = str(e)
            _submit(payload)
            raise e

    return wrapper


class Collect(type):
    def __new__(cls, name, bases, namespace, **kwds):
        namespace = {k: v if k.startswith('_') else _collectClass(
            v) for k, v in namespace.items()}
        return type.__new__(cls, name, bases, namespace)


send_daily_summary()