Commit f175cbe6 authored by cst's avatar cst
Browse files

added aggregation

parent f843916a
......@@ -3,13 +3,14 @@ import csv
from flask import Blueprint, request, abort, make_response
from flask_login import login_required
from web.helpers.responses import Responses
from .timeseries_handler import TimeseriesHandler
from .observation_handler import ObservationHandler
from web.api.viewer.historical.timeseries_handler import TimeseriesHandler
from web.api.viewer.historical.observation_handler import ObservationHandler
from web.helpers.model_binder import ModelBinder as Binder
from werkzeug.exceptions import InternalServerError
historical = Blueprint("historical", __name__)
@historical.route("/api/viewer/historical/timeseries", methods=['GET'])
@login_required
def timeseries():
......@@ -19,37 +20,40 @@ def timeseries():
except Exception as e:
raise InternalServerError(description=str(e))
@historical.route("/api/viewer/historical/observations", methods=['POST'])
@login_required
def observations():
p = Binder.bind_and_validate(ObservationHandler.get_validation_rules())
try:
observation = ObservationHandler.handle(p)
return Responses.json(observation)
observations = ObservationHandler.handle(p)
return Responses.json(observations)
except Exception as e:
raise InternalServerError(description=str(e))
@historical.route("/api/viewer/historical/csv/timeseries", methods=['POST'])
@login_required
def timeseriesCsv():
try:
p = Binder.bind_and_validate(ObservationHandler.get_validation_rules())
observations = ObservationHandler.handleCsv(p)
si = StringIO()
si = StringIO()
cw = csv.writer(si)
cw.writerows(observations)
output = make_response(si.getvalue())
output.headers["Content-Disposition"] = "attachment; filename=export.csv"
output.headers["Content-type"] = "text/csv"
return output
return output
except Exception as e:
raise InternalServerError(description=str(e))
@historical.route("/api/viewer/historical/csv/pivot/timeseries", methods=['POST'])
@login_required
def timeseriesCsvPivot():
......@@ -58,14 +62,14 @@ def timeseriesCsvPivot():
observations = ObservationHandler.handleCsvPivot(p)
si = StringIO()
si = StringIO()
cw = csv.writer(si)
cw.writerows(observations)
output = make_response(si.getvalue())
output.headers["Content-Disposition"] = "attachment; filename=export.csv"
output.headers["Content-type"] = "text/csv"
return output
except Exception as e:
raise InternalServerError(description=str(e))
\ No newline at end of file
raise InternalServerError(description=str(e))
......@@ -4,6 +4,11 @@ from datetime import *
from dateutil.relativedelta import *
from dateutil.rrule import *
from collections import OrderedDict
from web.helpers.mean import Mean
from web.helpers.mean import MeanType
from web.helpers.mean import MeanValue
from web.helpers.mapper import Mapper
class ObservationHandler:
@staticmethod
......@@ -13,134 +18,98 @@ class ObservationHandler:
{"name": "from", "required": True, "type": str},
{"name": "to", "required": True, "type": str},
{"name": "onlyValidValues", "required": True, "type": bool},
{"name": "viewAsBar", "required": True, "type": bool}
{"name": "meantype", "required": True, "type": int},
{"name": "coverage", "required": True, "type": int}
]
return rules
@staticmethod
def handle(p):
sql = """
SELECT
CASE WHEN %(viewAsBar)s THEN 'bar' ELSE 'line' END as type,
s.name || ' - ' || po.notation as name,
array_agg(array[(extract(epoch from (aa.to_time))*1000)::double PRECISION, aa.value::double PRECISION]
order by aa.to_time asc) as data
FROM stations s, sampling_points sp, observing_capabilities oc, eea_pollutants po,
(
SELECT o.from_time, o.to_time,
CASE WHEN (o.validation_flag < 1 OR o.value = -9900) AND %(onlyValidValues)s THEN
NULL ELSE o.value END as value, oc.id
FROM observations o, observing_capabilities oc
WHERE 1=1
AND oc.sampling_point_id = o.sampling_point_id
AND oc.id in %(oc_id_tup)s
AND o.from_time >= %(from)s
AND o.to_time < %(to)s
) aa
WHERE 1=1
AND aa.id = oc.id
AND oc.sampling_point_id = sp.id
AND sp.station_id = s.id
AND oc.pollutant = po.uri
GROUP by s.name,sp.id, oc.pollutant, oc.id, po.notation
"""
# Make sure its an array, not just a string
if not isinstance(p["oc_id"], list):
p["oc_id"] = p["oc_id"].split(',')
p["oc_id_tup"] = tuple(p["oc_id"])
observation = Db.fetchall(sql, p)
return observation
observations = ObservationHandler.get_observations(p)
values = ObservationHandler.group_observations(p["oc_id"], observations)
return sorted(values, key=lambda i: i['station'])
@staticmethod
def handleCsv(p):
def group_observations(ids, observations):
values = []
for oc in ids:
v = list(filter(lambda x: x.OCId == oc, observations))
t = v[0]
data = [{"datetime": d.DateTime, "val": d.Val} for d in v]
o = {"station": t.Station, "component": t.Component, "unit": t.Unit, "timestep": t.Timestep, "values": data}
values.append(o)
return values
@staticmethod
def get_observations(p):
observations = []
if p["meantype"] == -1:
observations = ObservationHandler.get_originals(p)
else:
observations = Mean.Aggregate(MeanType(p["meantype"]), p["oc_id_tup"], p["from"], p["to"], p["coverage"], 3, 3, True)
return sorted(observations, key=lambda i: i.DateTime)
@staticmethod
def get_originals(p):
sql = """
SELECT
s.name as station,
po.notation as component,
aa.fromtime,
aa.totime,
aa.value
FROM stations s, sampling_points sp, observing_capabilities oc, eea_pollutants po,
(
SELECT o.from_time as fromtime, o.to_time as totime,
CASE WHEN (o.validation_flag < 1 OR o.value = -9900) AND %(onlyValidValues)s THEN
NULL ELSE o.value END as value, oc.id
FROM observations o, observing_capabilities oc
WHERE 1=1
AND oc.sampling_point_id = o.sampling_point_id
AND oc.id in %(oc_id_tup)s
AND o.from_time >= %(from)s
AND o.to_time < %(to)s
) aa
WHERE 1=1
AND aa.id = oc.id
AND oc.sampling_point_id = sp.id
AND sp.station_id = s.id
AND oc.pollutant = po.uri
SELECT
sta.name "Station",
po.notation "Component",
ti.timestep "Timestep",
con.notation "Unit",
oc.sampling_point_id "SamplingPointId",
oc.id "OCId",
100 "Coverage",
1 "Cnt",
o.to_time "DateTime",
CASE
WHEN (o.validation_flag < 1 OR o.value = -9900) AND %(onlyValidValues)s THEN NULL
ELSE ROUND(o.value,3)::double PRECISION
END "Val"
FROM observations o, stations sta, sampling_points spo, observing_capabilities oc, eea_pollutants po, eea_times ti, eea_concentrations con
WHERE 1=1
and sta.id = spo.station_id
and spo.id = oc.sampling_point_id
and oc.pollutant = po.uri
and oc.timestep = ti.id
and oc.concentration = con.id
AND oc.sampling_point_id = o.sampling_point_id
AND oc.id in %(oc_id_tup)s
AND o.from_time >= %(from)s
AND o.from_time < %(to)s
"""
rows = Db.fetchall(sql, p)
return Mapper.map_list_of_dict(rows, MeanValue)
@staticmethod
def handleCsv(p):
# Make sure its an array, not just a string
if not isinstance(p["oc_id"], list):
p["oc_id"] = p["oc_id"].split(',')
p["oc_id_tup"] = tuple(p["oc_id"])
observations = Db.fetchall(sql, p)
observations = ObservationHandler.get_observations(p)
result = []
keys = []
keys = ["SamplingPointId", "Station", "Component", "Unit", "Timestep", "DateTime", "Value", "Coverage", "Count"]
for key in observations[0].keys():
keys.append(key)
result.append(keys)
for items in observations:
values = []
for val in items.values():
values.append(val)
for o in observations:
values = [o.SamplingPointId, o.Station, o.Component, o.Unit, o.Timestep, o.DateTime, o.Val, o.Coverage, o.Cnt]
result.append(values)
return result
@staticmethod
def daterange(start_date, end_date, timesteps):
timestep_rank = 100
result = None
fromdate = datetime.strptime(start_date, '%Y-%m-%dT%H:%M') #2020-01-01T00:00
todate = datetime.strptime(end_date, '%Y-%m-%dT%H:%M') #2020-01-01T00:00
for timestep_input in timesteps:
if timestep_input == "http://dd.eionet.europa.eu/vocabulary/uom/time/hour" or timestep_input == "http://dd.eionet.europa.eu/vocabulary/aq/primaryObservation/hour":
if timestep_rank > 1:
result = list(rrule(freq=HOURLY, dtstart=fromdate, until=todate))
if timestep_input == "http://dd.eionet.europa.eu/vocabulary/uom/time/day" or timestep_input == "http://dd.eionet.europa.eu/vocabulary/aq/primaryObservation/day":
if timestep_rank > 2:
result = list(rrule(freq=DAILY, dtstart=fromdate, until=todate))
if timestep_input == "http://dd.eionet.europa.eu/vocabulary/uom/time/week":
if timestep_rank > 3:
result = list(rrule(freq=WEEKLY, dtstart=fromdate, until=todate))
if timestep_input == "http://dd.eionet.europa.eu/vocabulary/uom/time/month":
if timestep_rank > 4:
result = list(rrule(freq=MONTHLY, dtstart=fromdate, until=todate))
if timestep_input == "http://dd.eionet.europa.eu/vocabulary/uom/time/year":
if timestep_rank > 5:
result = list(rrule(freq=YEARLY, dtstart=fromdate, until=todate))
return result
@staticmethod
def handleCsvPivot(p):
# Make sure its an array, not just a string
......@@ -148,71 +117,19 @@ class ObservationHandler:
p["oc_id"] = p["oc_id"].split(',')
p["oc_id_tup"] = tuple(p["oc_id"])
series = []
for id in p["oc_id"]:
p["id"] = id
sql = """
SELECT
aa.fromtime,
aa.totime,
s.name as station,
po.notation as component,
aa.value,
aa.timestep
FROM stations s, sampling_points sp, observing_capabilities oc, eea_pollutants po,
(
SELECT o.from_time as fromtime, o.to_time as totime, oc.timestep,
CASE WHEN (o.validation_flag < 1 OR o.value = -9900) AND %(onlyValidValues)s THEN
NULL ELSE o.value END as value, oc.id
FROM observations o, observing_capabilities oc
WHERE 1=1
AND oc.sampling_point_id = o.sampling_point_id
AND oc.id = %(id)s
AND o.from_time >= %(from)s
AND o.to_time < %(to)s
) aa
WHERE 1=1
AND aa.id = oc.id
AND oc.sampling_point_id = sp.id
AND sp.station_id = s.id
AND oc.pollutant = po.uri
"""
series.append(Db.fetchall(sql, p))
timesteps = []
# find the smallest timestep and fill dates
for serie in series:
if serie:
for key, val in serie[0].items():
if key == "timestep":
timesteps.append(val)
result = []
observations = ObservationHandler.get_observations(p)
series = ObservationHandler.group_observations(p["oc_id"], observations)
for date in ObservationHandler.daterange(p["from"], p["to"], timesteps):
row = OrderedDict()
row["from"] = date
result.append(row)
station = ""
component = ""
dates = list(set([o.DateTime for o in observations]))
dates.sort()
result = [{"datetime": o} for o in dates]
for serie in series:
if serie:
for key, val in serie[0].items():
if key == "component":
component = val
elif key == "station":
station = val
for observation in serie:
o_fromtime = observation.get("fromtime")
for observation in serie["values"]:
for row in result:
fromdate = row.get("from")
if fromdate == o_fromtime:
row[station + " " + component] = str(observation.get("value"))
if row["datetime"] == observation["datetime"]:
row[serie["station"] + " " + serie["component"]] = None if observation["val"] == None else str(observation["val"])
break
csvresult = []
......@@ -221,16 +138,13 @@ class ObservationHandler:
firstrow = result[0]
for key in firstrow.keys():
csvkeys.append(key)
csvresult.append(csvkeys)
for items in result:
values = []
for val in items.values():
values.append(val)
csvresult.append(values)
csvresult.append(values)
return csvresult
......@@ -11,7 +11,7 @@
"@fortawesome/free-solid-svg-icons": "^5.9.0",
"@fortawesome/vue-fontawesome": "^0.1.6",
"@handsontable/vue": "^4.1.1",
"apexcharts": "^3.8.5",
"apexcharts": "^3.23.1",
"axios": "^0.19.0",
"core-js": "^2.6.5",
"d3": "^5.9.7",
......@@ -22,7 +22,7 @@
"v-click-outside": "^2.1.3",
"v-tooltip": "^2.0.2",
"vue": "^2.6.10",
"vue-apexcharts": "^1.4.0",
"vue-apexcharts": "^1.6.0",
"vue-ctk-date-time-picker": "^2.1.1",
"vue-property-decorator": "^8.2.1",
"vue-router": "^3.0.6",
......@@ -35,7 +35,7 @@
"@ky-is/vue-cli-plugin-tailwind": "^2.0.0",
"@vue/cli-plugin-babel": "^3.8.0",
"@vue/cli-service": "^3.8.0",
"node-sass": "^4.9.0",
"node-sass": "^5.0.0",
"postcss-preset-env": "^6.6.0",
"sass-loader": "^7.1.0",
"tailwindcss": "^1.0.1",
......
......@@ -2,62 +2,62 @@ import axios from "axios";
import ErrorParser from '../error.parser';
const HistoricalService = {
timeseries: async function () {
const requestData = {
method: "get",
url: "/api/viewer/historical/timeseries",
};
try {
const response = await axios(requestData);
return response.data;
} catch (error) {
const message = ErrorParser.asMessage(error);
throw new Error(message);
}
},
observations: async function (oc_id, from, to, onlyValidValues, viewAsBar) {
const requestData = {
method: "post",
url: "/api/viewer/historical/observations",
data: { oc_id, from, to, onlyValidValues, viewAsBar}
};
console.log("REQUEST", requestData);
try {
const response = await axios(requestData);
return response.data;
} catch (error) {
const message = ErrorParser.asMessage(error);
throw new Error(message);
}
},
timeSeriesCsv: async function (selection, pivot) {
try {
var url = "/api/viewer/historical/csv/timeseries"
if(pivot) url = "/api/viewer/historical/csv/pivot/timeseries"
timeseries: async function () {
const requestData = {
method: "get",
url: "/api/viewer/historical/timeseries",
};
try {
const response = await axios(requestData);
return response.data;
} catch (error) {
const message = ErrorParser.asMessage(error);
throw new Error(message);
}
},
observations: async function (oc_id, from, to, onlyValidValues, viewAsBar, meantype, coverage) {
const requestData = {
method: "post",
url: "/api/viewer/historical/observations",
data: { oc_id, from, to, onlyValidValues, viewAsBar, meantype, coverage }
};
console.log("REQUEST", requestData);
try {
const response = await axios(requestData);
return response.data;
} catch (error) {
const message = ErrorParser.asMessage(error);
throw new Error(message);
}
},
timeSeriesCsv: async function (selection, pivot) {
try {
var url = "/api/viewer/historical/csv/timeseries"
if (pivot) url = "/api/viewer/historical/csv/pivot/timeseries"
await axios({
method: "post",
url: url,
responsetype: 'blob',
data: selection
}).then((response) => {
await axios({
method: "post",
url: url,
responsetype: 'blob',
data: selection
}).then((response) => {
var fileURL = window.URL.createObjectURL(new Blob([response.data]));
var fileLink = document.createElement('a');
var fileURL = window.URL.createObjectURL(new Blob([response.data]));
var fileLink = document.createElement('a');
fileLink.href = fileURL;
fileLink.setAttribute('download', 'export.csv');
document.body.appendChild(fileLink);
fileLink.href = fileURL;
fileLink.setAttribute('download', 'export.csv');
document.body.appendChild(fileLink);
fileLink.click();
});
} catch (error) {
console.log(error);
const message = error.response.data.message ? error.response.data.message : error.response.data.errors[0].message;
throw new TimeseriesError(error.response.status, message);
fileLink.click();
});
} catch (error) {
console.log(error);
const message = error.response.data.message ? error.response.data.message : error.response.data.errors[0].message;
throw new TimeseriesError(error.response.status, message);
}
}
}
};
export default HistoricalService;
const Apexchart = {
settings(id = "apexchart", showToolbar = true) {
return {
chart: {
id: id,
type:"line",
toolbar: {
show: showToolbar,
tools: {
download: true,
selection: true,
zoom: true,
zoomin: true,
zoomout: true,
pan: true,
},
autoSelected: 'zoom'
},
animations: {
enabled: false,
animateGradually: {
enabled: false
},
dynamicAnimation: {
enabled: false
}
settings(id = "apexchart", showToolbar = true) {
return {
chart: {
id: id,
type: "line",
toolbar: {
show: showToolbar,
tools: {
download: true,
selection: true,
zoom: true,
zoomin: true,
zoomout: true,
pan: true,
},
autoSelected: 'zoom'
},
animations: {
enabled: false,
animateGradually: {
enabled: false
},
dynamicAnimation: {
enabled: false
}
}
},
theme: {
palette: "palette3"
},
xaxis: {
type: "category",
labels: {
show: true,
rotate: -70,