Skip to content
Snippets Groups Projects
importing.py 6.49 KiB
import io
from pandas import DataFrame
import time
from core.data.processing.flagging import Flagging
from core.printcol import printcol
from core.utils import U
from core.data.processing.scaling import Scaling
from core.data.processing.calculation import Calculating
from core.data.processing.converting import Converting
from core.data.processing.filling import Filling
from core.data.processing.common import Common
import pandas as pd


class Importing:

    @staticmethod
    def Import(cursor: any, df_values: DataFrame):

        # IMPORT FLOW
        # - Validates datatypes
        # - Set import value
        # - Add additional timeserie info to value
        # - Verify no duplicate times, is calculated etc
        # - Scale values if possible.
        # - Calculate new values if possible.
        # - Convert values if convertion factor is present.
        # - FillInMissing fills holes based on timestep.
        # - DoFlagging
        # - Upsert inserts

        Common.validate_dataframe(df_values)
        Importing.set_import_value(df_values)
        df_values = Common.add_timeserie_info(cursor, df_values)
        Importing.verify_values(cursor, df_values)
        Scaling.Scale(cursor, df_values)
        df_values = Importing.process_scaled_values(cursor, df_values)
        df_values = Importing.upsert(cursor, df_values)
        pass

    @staticmethod
    def process_scaled_values(cursor, df_values: DataFrame, doFlagging: bool = True):
        df_values = Calculating.calculate(cursor, df_values)  # Rethink function for better performance
        Converting.convert(cursor, df_values)
        df_values = Filling.fillinmissing(cursor, df_values)  # Use pandas more active for better performance?

        if doFlagging:
            Flagging.flag(cursor, df_values)
        return df_values

    @staticmethod
    def set_import_value(df_values: DataFrame):
        bench = time.perf_counter()
        df_values["import_value"] = df_values["value"]
        df_values["scaled_value"] = None
        printcol(f"- Setting import value took {time.perf_counter() - bench} seconds")

    @staticmethod
    def verify_values(cursor: any, df_values: DataFrame):
        bench = time.perf_counter()
        # Check for duplicate datetimes
        if len(df_values[df_values[['sampling_point_id', 'begin_position', 'end_position']].duplicated()]) > 0:
            raise Exception("A timeserie cannot contain duplicate datetimes")

        # Check if value has a samplingpoint
        if len(df_values[df_values["has_timeserie_info"] == False]) > 0:
            raise Exception("Not enough timeserie info to continue")

        # Check if any are calculated timeseries
        if len(df_values[df_values["ts_is_calculated"] == True]) > 0:
            raise Exception("Calculated values cannot be imported")

        # Check to see if timestep matches with the imported values. Ignore if timestep is 1
        if not (((df_values.end_position - df_values.begin_position) / pd.Timedelta(seconds=1) == (df_values.apply(lambda x: U.actual_timestep(x.begin_position, x.ts_timestep), axis=1)))).all() and not (df_values.ts_timestep == -1).all():
            raise Exception("The difference between end_position and begin_position must be the same as the samplingpoint timestep")

        # Check if any are verified values
        # if len(df_values[df_values["verification_flag"] == 1]) > 0:
        #     raise Exception("Imported values cannot be verified")

        # Check if all values have the same timezone
        if len(df_values["begin_position"].apply(lambda t: t.utcoffset().total_seconds()).unique()) > 1 or len(df_values["end_position"].apply(lambda t: t.utcoffset().total_seconds()).unique()) > 1:
            raise Exception("All values must have the same timezone ")

        printcol(f"- Verifying values took {time.perf_counter() - bench} seconds")

    @staticmethod
    def upsert(cursor: any, df_values: DataFrame):
        # Use COPY FROM to copy data into a temp table
        # Then update those data in the observations data.
        # Data that was not updated are inserted.
        # Do it this way for performance reason. Db.executemany is too slow with a lot of data

        bench = time.perf_counter()
        sql = """
            WITH updates AS (
                UPDATE observations as t 
                SET
                    value = s.value,
                    verification_flag = s.verification_flag,
                    validation_flag = s.validation_flag,
                    touched = now(),
                    import_value = s.import_value,
                    scaled_value = s.scaled_value
                FROM source s
                WHERE t.sampling_point_id = s.sampling_point_id
                AND t.begin_position = s.begin_position
                AND t.end_position = s.end_position
                RETURNING t.sampling_point_id, t.begin_position, t.end_position
            )
            INSERT INTO observations (sampling_point_id, begin_position, end_position, value, verification_flag, validation_flag, import_value,scaled_value, touched)
            SELECT v.*, now() as touched
            FROM source v
            WHERE NOT EXISTS (
                SELECT 1
                FROM updates u
                WHERE u.sampling_point_id = v.sampling_point_id
                AND u.begin_position = v.begin_position
                and u.end_position = v.end_position 
            )
        """

        # tic = time.perf_counter()
        d = Importing.__data2io__(df_values)
        cols = ('sampling_point_id', 'begin_position', 'end_position', 'value', 'verification_flag', 'validation_flag', 'import_value', 'scaled_value')
        cursor.execute('CREATE TEMP TABLE source(sampling_point_id varchar(100), begin_position varchar(25), end_position varchar(25),value numeric(255,5), verification_flag integer,validation_flag integer, import_value numeric(255,5), scaled_value numeric(255,5)) ON COMMIT DROP;')
        cursor.copy_from(d, 'source', columns=cols, null='None')
        cursor.execute(sql)
        cursor.execute('DROP TABLE source')

        printcol(f"- Importing to db values took {time.perf_counter() - bench} seconds")

    @staticmethod
    def __data2io__(df_values: DataFrame):
        data = df_values.to_dict("records")
        si = io.StringIO()
        for row in data:
            si.write(f"{row['sampling_point_id']}\t{row['begin_position'].isoformat()}\t{row['end_position'].isoformat()}\t{row['value']}\t{row['verification_flag']}\t{row['validation_flag']}\t{row['import_value']}\t{row['scaled_value']}\n")
        si.seek(0)
        return si