Skip to content
Snippets Groups Projects
Commit 86d11e80 authored by Christoffer Stoll's avatar Christoffer Stoll
Browse files

bug: fill in missing failed on large imports

parent 90eb8498
No related branches found
No related tags found
No related merge requests found
......@@ -61,8 +61,8 @@ def ReScale(use_scalingpoint, oc_id, zero, span, gas, timestamp, old_timestamp=N
def __minmax__(oc_id, timestamp, old_timestamp):
sql = """
select
min(extract(epoch from f.timestamp)*1000) as min,
select
min(extract(epoch from f.timestamp)*1000) as min,
max(extract(epoch from t.timestamp)*1000) as max
from
(
......@@ -74,7 +74,7 @@ def __minmax__(oc_id, timestamp, old_timestamp):
order by s.timestamp desc
limit 1
) f
FULL OUTER JOIN
FULL OUTER JOIN
(
select s.id, s.timestamp
from scaling_points s
......@@ -84,7 +84,7 @@ def __minmax__(oc_id, timestamp, old_timestamp):
order by s.timestamp asc
limit 1
) t ON t.id = f.next
--order by t.timestamp, f.timestamp
--order by t.timestamp, f.timestamp
"""
minmax = Db.fetchone(sql, {'oc_id': oc_id, 'timestamp': timestamp, 'old_timestamp': old_timestamp})
if minmax is None:
......@@ -94,7 +94,7 @@ def __minmax__(oc_id, timestamp, old_timestamp):
def __get_imported_observations(oc_id, min=None, max=None):
sql = """
select o.id, o.sampling_point_id, o.begin_position, o.end_position, o.verification_flag, o.validation_flag, o.import_value::DOUBLE PRECISION, o.import_value::DOUBLE PRECISION as value, o.from_time, o.to_time, extract(epoch from o.to_time)*1000 as to_epoch , extract(epoch from o.from_time)*1000 as from_epoch
select o.id, o.sampling_point_id, o.begin_position, o.end_position, o.verification_flag, o.validation_flag, o.import_value::DOUBLE PRECISION, o.import_value::DOUBLE PRECISION as value, o.from_time, o.to_time, extract(epoch from o.to_time)*1000 as to_epoch , extract(epoch from o.from_time)*1000 as from_epoch
from observations o, observing_capabilities oc
where o.sampling_point_id = oc.sampling_point_id
and oc.id = %(oc_id)s
......@@ -139,33 +139,33 @@ def __scalingpoints__(sampling_point_id, scalingpoint=None):
(select -1 as id, %(oc_id)s as oc_id, %(zero_point)s as zero_point, %(span_value)s as span_value, %(gas_concentration)s as gas_concentration, %(timestamp)s as timestamp, 'generated' as createdby)
union
select * from scaling_points where timestamp <> %(old_timestamp)s
)
)
"""
elif scalingpoint["use_scalingpoint"] == False:
with_sql = """
with scaling_points as (
with scaling_points as (
select * from scaling_points where timestamp <> %(timestamp)s
)
)
"""
model = scalingpoint
model["sampling_point_id"] = sampling_point_id
sql = """
select
f.oc_id as f_oc_id,
f.zero_point as f_zero_point,
f.span_value as f_span_value,
select
f.oc_id as f_oc_id,
f.zero_point as f_zero_point,
f.span_value as f_span_value,
f.gas_concentration as f_gas_concentration,
f.timestamp as f_timestamp,
extract(epoch from f.timestamp)*1000 as f_timestamp_number,
t.oc_id as t_oc_id,
t.zero_point as t_zero_point,
t.span_value as t_span_value,
t.oc_id as t_oc_id,
t.zero_point as t_zero_point,
t.span_value as t_span_value,
t.gas_concentration as t_gas_concentration,
t.timestamp as t_timestamp,
extract(epoch from t.timestamp)*1000 as t_timestamp_number
from
(
select a.*, lead(a.id) over (order by a.timestamp asc) as next
......@@ -173,7 +173,7 @@ def __scalingpoints__(sampling_point_id, scalingpoint=None):
where b.sampling_point_id = %(sampling_point_id)s
and a.oc_id = b.id
) f
FULL OUTER JOIN
FULL OUTER JOIN
(
select a.*
from scaling_points a, observing_capabilities b
......@@ -233,7 +233,7 @@ def Calculate(values):
def __calculated_timeseries__():
sql = """
select oc_pri.sampling_point_id spo_pri, oc_sec.sampling_point_id spo_sec, oc_res.sampling_point_id spo_res, cs.*
from calculated_series cs, observing_capabilities oc_pri, observing_capabilities oc_sec, observing_capabilities oc_res
from calculated_series cs, observing_capabilities oc_pri, observing_capabilities oc_sec, observing_capabilities oc_res
where oc_pri.id = cs.primary
and oc_sec.id = cs.secondary
and oc_res.id = cs.result
......@@ -293,12 +293,12 @@ def __converted_timeseries__():
def FillInMissing(values):
tz = __validate_and_get_tz__(values)
timeseries = __timeseries__(values)
observations = Enumerable(values)
missing_values = Enumerable([])
missing_values = []
for t in timeseries:
dates = observations.where(lambda x: x["sampling_point_id"] == t["sampling_point_id"]).select(lambda x: U.to_datetime_ignore_tz(x["begin_position"]))
from_time = dates.min() if t["to_time"] == None else dates.min() if t["to_time"] > dates.min() else t["to_time"]
to_time = dates.max() if t["to_time"] < dates.max() else t["to_time"]
dates = list(map(lambda x: U.to_datetime_ignore_tz(x["begin_position"]), filter(lambda x: x["sampling_point_id"] == t["sampling_point_id"], values)))
from_time = min(dates) if t["to_time"] == None else min(dates) if t["to_time"] > min(dates) else t["to_time"]
to_time = max(dates) if t["to_time"] == None else max(dates) if t["to_time"] < max(dates) else t["to_time"]
existing_dates = __existing_dates__(t["sampling_point_id"], from_time, to_time)
current_date = from_time
......@@ -309,7 +309,7 @@ def FillInMissing(values):
stop = True
break
if not existing_dates.contains(current_date, lambda x: x) and not Enumerable(values).select(lambda x: U.to_datetime_ignore_tz(x["begin_position"])).contains(current_date, lambda x: x):
if not any(current_date == U.to_datetime_ignore_tz(d["begin_position"]) for d in existing_dates) and not any(current_date == d for d in dates):
v = {
"sampling_point_id": t["sampling_point_id"],
"begin_position": U.add_seconds_to_date(current_date, 0) + tz,
......@@ -321,11 +321,10 @@ def FillInMissing(values):
}
missing_values.append(v)
missing_values = missing_values.to_list()
if len(missing_values) > 0:
for v in missing_values:
observations.add(v)
return observations.to_list()
values.add(v)
return values
def __validate_and_get_tz__(values):
......@@ -345,8 +344,7 @@ def __timeseries__(values):
where c.timestep = t.id
and c.sampling_point_id in %(sp)s
"""
autovalidated_series = Db.fetchall(sql, {"sp": sp})
return Enumerable(autovalidated_series)
return Db.fetchall(sql, {"sp": sp})
# Gets timeseries
......@@ -358,7 +356,7 @@ def __existing_dates__(sampling_point_id, dt_from, dt_to):
and o.from_time >= %(dt_from)s
and o.from_time < %(dt_to)s
"""
return Enumerable(Db.fetchall(sql, {"sp": sampling_point_id, "dt_from": dt_from, "dt_to": dt_to}))
return Db.fetchall(sql, {"sp": sampling_point_id, "dt_from": dt_from, "dt_to": dt_to})
############
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment