Newer
Older
# -*- coding: utf-8 -*-
"""
Description :
-------------
PTRMS converter based on CAMS project
Usage :
-------
Usage: python3 PtrmsConverter.py [options] <infiles>
Author :
--------
CDS AERIS-ICARE Aurélien Chauvigné
License :
---------
This file must be used under the terms of the CeCILL.
This source file is licensed as described in the file COPYING, which
you should have received as part of this distribution. The terms
are also available at
http://www.cecill.info/licences/Licence_CeCILL_V2-en.txt
History :
--------
v0.0.0 : 2023/09/12
- creation
"""
import calendar
from optparse import OptionParser
import numpy
pkgdir = os.path.dirname(__file__)
d = os.path.abspath(os.path.join(pkgdir, ".."))
if d not in sys.path:
sys.path.append(os.path.join(d))
from converter import Converter
from product.AMESProduct import AMESProduct
from iointerface.AMESReader import AMESReader
from misc import Standardizer, TimeConverter
from uploader.DataSeeker import DataSeeker
import json
import re
from algo.ptrms_lib import ptrms_lib
__VERSION__ = "0.0.0"
__APP__ = os.path.splitext(os.path.basename(__file__))[0]
__DEBUG__ = True
__THROW__ = True
__SHOW_BAD_INPUTS__ = False
__SHOW_OUTPUTS__ = False
__ENABLE_BASELINE_CORRECTION_L0__ = True
class PtrmsConverter(Converter.Converter):
"""
Puy NOX data converter
"""
data_seeker = DataSeeker()
# acquisition conditions
acq_temp = Standardizer.T_STD # temperature in K
acq_pres = Standardizer.P_STD # pressure in hPa
# fill values
var_fills_conc = 999999.999
var_fills_env = 9999.999
var_fills_qa = 0.999
var_fills_conc_native = -99 # fill value in native files
def __init__(self, prod_id):
"""
@brief constructor
"""
# - process all enabled product IDs set in data seeker
if prod_id is None:
prod_ids = [k for k in self.data_seeker.cfg.keys()]
else:
prod_ids = [prod_id]
for prod_id in prod_ids:
Converter.Converter.__init__(self, prod_id, AMESProduct)
def process(self, infiles, conf_file, cache_dir, outdir, levels, hour=None):
"""
@brief convert a list of files
@param infiles the files to convert
@param outdir output directory
@param levels processed level
@param year year of the eventual yearly synthesis
"""
outdir = os.path.abspath(outdir)
for lvl in levels:
start_time = time.time()
if __DEBUG__:
print("*" * 20 + " LEVEL %d " % lvl + "*" * 20)
try:
self.process_lvl(infiles, conf_file, cache_dir, outdir, lvl, hour)
except Exception as e:
if __DEBUG__:
msg = " > " + str(e) + " : " + os.linesep + "ABORT"
print(msg)
if __THROW__:
raise
if __DEBUG__:
print(
"*" * 20
+ " LEVEL %d" % lvl
+ " : "
+ time.strftime("%H:%M:%S ", time.gmtime(time.time() - start_time))
+ "*" * 20
)
def process_lvl(self, infiles, conf_file, cache_dir, outdir, lvl, hour):
"""
@brief convert a list of files
@param infiles the files to convert
@param cache_dir cache directory path
@param outdir output directory
@param year year of the eventual yearly synthesis
"""
self.reader = AMESReader(infiles[0])
year = int(self.reader.attributes["Startdate"][0:4])
ds_names = self.reader.get_ds_names()
indata = Converter.Converter.load_indata(
self, infiles, ds_names, reader_interface=AMESReader
)
self.var_ids_mz = []
self.var_ids_env_tag = []
self.var_ids_env = []
indata[Product.START_ACQ_ID] = indata[var_id]
indata[Product.END_ACQ_ID] = indata[var_id]
elif "status" in var_id:
indata["status"] = indata[var_id]
elif "temperature" in var_id and "Location=inlet" in var_id:
self.var_ids_env.append(var_id)
self.var_ids_env_tag.append("T_inlet")
elif "pressure" in var_id and "Location=inlet" in var_id:
self.var_ids_env.append(var_id)
self.var_ids_env_tag.append("P_inlet")
elif "count_rate" in var_id and "numflag" not in var_id:
m = re.search(r"_[0-9]+_", var_id)
mass_num = m.group(0).replace("_", "")
indata["mz_" + mass_num] = indata[var_id]
self.var_ids_mz.append("mz_" + mass_num)
if "methanal" in var_id:
mass_num = "31"
elif "methanol" in var_id:
mass_num = "33"
elif "acetonitrile" in var_id:
mass_num = "42"
elif "ethanal" in var_id:
mass_num = "45"
elif "dimethylsulfide" in var_id:
mass_num = "63"
elif "methyl_acetate" in var_id:
mass_num = "75"
elif "benzene" in var_id:
mass_num = "79"
elif "monoterpenes" in var_id:
mass_num = "137"
indata["mz_" + mass_num] = indata[var_id]
self.var_ids_mz.append("mz_" + mass_num)
prog_ptrms_lib = ptrms_lib(cache_dir, outdir, conf_file)
self.lev1_mz = [
s
for s in prog_ptrms_lib.conf["cols_species"]
if prog_ptrms_lib.conf["species"][s]["actris_submit"]
]
self.var_ids_flag = [
s + '_numflag'
for s in prog_ptrms_lib.conf["cols_species"]
if prog_ptrms_lib.conf["species"][s]["actris_submit"]
]
self.lev1_mz_name = [
prog_ptrms_lib.conf["species"][s]["actris_name"]
for s in prog_ptrms_lib.conf["cols_species"]
if prog_ptrms_lib.conf["species"][s]["actris_submit"]
]
ppb_data, lod, rsd, accuracy, uncertainty, raw_data_filtered = (
prog_ptrms_lib.process(indata, year, levels=[0])
indata = ppb_data
indata[Product.START_ACQ_ID] = raw_data_filtered[Product.START_ACQ_ID]
indata[Product.END_ACQ_ID] = raw_data_filtered[Product.END_ACQ_ID]
for var_id in self.var_ids_env:
indata[var_id] = raw_data_filtered[var_id]
if lvl == 2:
# - set output time axis of level 2 products
dt = 3600 # hour by hour
d_start = datetime(year,
TimeConverter.doy2dt(year, min(raw_data_filtered[Product.START_ACQ_ID])).month,
TimeConverter.doy2dt(year, min(raw_data_filtered[Product.START_ACQ_ID])).day,
TimeConverter.doy2dt(year, min(raw_data_filtered[Product.START_ACQ_ID])).hour,
0,
0)
d_end = datetime(year,
TimeConverter.doy2dt(year, max(raw_data_filtered[Product.START_ACQ_ID])).month,
TimeConverter.doy2dt(year, max(raw_data_filtered[Product.START_ACQ_ID])).day,
TimeConverter.doy2dt(year, max(raw_data_filtered[Product.START_ACQ_ID])).hour,
0,
0) + timedelta(hours=1)
ts_start = calendar.timegm(d_start.timetuple())
# - end time of averaging range
v_end_time = v_start_time + dt
for var_id in self.var_ids_env + self.lev1_mz + self.var_ids_flag:
shape, dtype=numpy.float64) + self.get_fill(var_id)
else:
raise ValueError(
"ERROR: only level 0, 1 or 2 are requested for ACTRIS")
v_start_acq_in = indata[Product.START_ACQ_ID]
v_start_acq_out = outdata[Product.START_ACQ_ID]
t_start = min(v_start_acq_in)
# get header
fname, sout, _ = self.get_header(t_start, lvl, v_start_acq_out, year)[:3]
i_sort = numpy.argsort(indata[Product.START_ACQ_ID])
indata["start_acq"] = [indata["start_acq"][i] for i in i_sort]
indata["end_acq"] = [indata["end_acq"][i] for i in i_sort]
for var_id in self.var_ids_env + self.lev1_mz:
indata[var_id] = [indata[var_id][i] for i in i_sort]
indata_fill, indata_nan, invalid_acq_val = self.get_invalid(indata)
# --- processing : averaging for L2, setting QA flags and filter invalid values for all levels
# some aliases for an easier reading
v_start_acq_out = outdata["start_acq"]
if lvl == 2:
# - averaging
for i in range(sz):
# select records in out time range
ts_min = TimeConverter.ts2dt(v_start_acq_out[i])
ts_max = TimeConverter.ts2dt(v_end_acq_out[i])
s = ((v_start_acq_in.index >= ts_min) & (v_start_acq_in.index <= ts_max))
if numpy.any(s):
# -> computes means and percentiles for concentration
self.set_stats_mean(
indata[var_id], indata_fill[var_id], s, i, outdata[var_id], self.get_fill(var_id))
# -> set flags
for var_id, var_id_flag in zip(self.lev1_mz,self.var_ids_flag):
data = outdata[var_id][i]
fill = self.get_fill(var_id)
is_valid = ~numpy.isnan(data) & (data != fill)
if is_valid:
outdata[var_id_flag][i] = 0.
else:
outdata[var_id_flag][i] = 0.999
# ts to doy
dt_start_date = TimeConverter.dt_to_ts(datetime(year, 1, 1))
outdata["start_acq"] = numpy.array(
[(d - dt_start_date) / 86400.0 for d in outdata["start_acq"]])
outdata["end_acq"] = numpy.array(
[(d - dt_start_date) / 86400.0 for d in outdata["end_acq"]])
# --- write data to output
if __DEBUG__:
print("--- Write outputs ---")
# --- write data to output
for i in range(sz):
# - time vars
line = "{0:<20.6f}{1:<20.6f}".format(
outdata["start_acq"][i], outdata["end_acq"][i]
)
# concentrations coef
for var_id in self.var_ids_env + self.lev1_mz:
conc = outdata[var_id][i]
qa = 0.0
if conc == self.var_fills_conc:
qa = self.var_fills_qa
if lvl < 2:
qa = invalid_acq_val[var_id][i]
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
if var_id in self.var_ids_env:
line += "{0:<20.3f}".format(conc)
else:
line += "{0:<20.3f}".format(conc)
line += "{0:<20.3f}".format(qa)
sout += line.strip() + os.linesep
# write out data
outdir_year = "/".join([outdir, str(year)])
os.makedirs(outdir_year, exist_ok=True)
outfname = "/".join([outdir, str(year), fname])
f = open(outfname, "w")
try:
f.writelines(sout)
if __DEBUG__:
print(" > output file %s wrote" % outfname)
finally:
f.close()
if __SHOW_OUTPUTS__:
os.system("kate %s &" % outfname)
return sout
def get_fill(self, var_id):
"""
@brief return the fill value to use for the given variable
@param var_id a variable ID
@return the fill value
"""
if var_id in [self.QA_FLAGS_ID] + self.var_ids_flag:
return self.var_fills_conc
elif var_id in self.var_ids_env:
return self.var_fills_env
def get_invalid(self, indata):
"""
@brief identify the invalid data values and return it in 2 dictionary buffers : one with invalid, one with NaNs
@param indata input data buffer
@return 3 buffers : one with mask of invalids, one with mask of NaNs, one with the manually invalidated values
"""
invalid_databuf = {}
nan_databuf = {}
invalid_val = {}
# input data fill value
infill = self.var_fills_conc_native
# --- environmental property and concentration automatic QA
for var_id in self.var_ids_env + self.lev1_mz:
data = indata[var_id]
# data = numpy.array([float(d.replace('E','e').replace(',','.')) for d in data])
data = numpy.array(data)
is_nan_data = numpy.isnan(data)
nan_databuf[var_id] = is_nan_data
is_infill = data == infill
if var_id == "T inlet":
is_extreme = (data < 273) | (data > 350)
elif var_id == "p det":
is_extreme = (data < 200) | (data > 1200)
else:
is_extreme = data > 100000
is_invalid = is_nan_data | is_infill | is_extreme
invalid_databuf[var_id] = is_invalid
# - replace native fill value by ebas one
if any(is_infill) | any(is_nan_data):
indata[var_id] = [
self.get_fill(var_id) if ((is_infill[i]) | (is_nan_data[i])) else d
for i, d in enumerate(indata[var_id])
]
# - set QA value
invalid_val[var_id] = numpy.zeros(data.shape)
# invalid_val[var_id][is_invalid] = 0.980
invalid_val[var_id][is_infill | is_nan_data] = 0.999
invalid_val[var_id][is_extreme] = 0.459
return invalid_databuf, nan_databuf, invalid_val
def get_nvars(self, lvl):
"""
@brief return the number of output variables
@param lvl level of the product : 1 -> no time averaging ; 2 -> average all the year by day range
@return the the number of output variables
"""
nvars = 1 # end time
nvars += 1 # status
nvars += len(self.var_ids_env) * 2 # T, P and QA
nvars += len(self.lev1_mz) * 2 # m/e and QA flag
return nvars
def get_time_tags(self, lvl, v_start_acq, year):
"""
@brief return the time resolution, level, etc as requested by ACTRIS data format
@param lvl level of the ACTRIS product
@return the time tags
"""
dt_raw_tag = self.reader.attributes["Resolution code"]
dt_tag = self.reader.attributes["Sample duration"]
if lvl in [0, 1]:
if dt_tag.endswith("mn"):
dt = float(dt_tag.replace("mn", "")) / float(
24 * 60
) # in decimal day unit
if dt_tag.endswith("s"):
dt = float(dt_tag.replace("s", "")) / float(24 * 60 * 60)
period_tag = "1d" # NRT mode
elif lvl in [2]:
dt_tag = "1h"
dt = 1 / float(24) # 1h in decimal day unit
period_tag = "1y"
else:
raise ValueError("Invalid level %d" % lvl)
return (period_tag, dt_tag, dt, dt_raw_tag)
def get_vars_fill(self, lvl):
"""
@brief constructs the list of variable fill values separated by spaces
@param lvl level of the product : 1 -> no time averaging ; 2 -> average all the year by day range
@return the list of variable fill values as a string
"""
s = "9999.999999 " # end time
s += str(self.var_fills_conc) + " " # T
s += "9.999 "
s += str(self.var_fills_conc) + " " # P
s += "9.999 "
s += str(self.var_fills_conc)
s += " "
s += "9.999 "
return s.strip()
def get_vars_desc(self, lvl):
"""
@brief constructs the variables long names and column names
@param lvl level
@return (vars_long_name, vars_tags)
"""
vars_long_name = ""
vars_long_name += (
"{0:s}, {1:s}, Location=inlet, Matrix=instrument".format("temperature", "K")
+ os.linesep
)
vars_long_name += "numflag_temperature, no unit" + os.linesep
vars_long_name += (
"{0:s}, {1:s}, Location=inlet, Matrix=instrument".format("pressure", "hPa")
+ os.linesep
)
vars_long_name += "numflag_pressure, no unit" + os.linesep
for var_id in self.lev1_mz_name:
vars_long_name += var_id + ", ppb" + os.linesep
vars_long_name += "numflag_" + var_id + ", no unit" + os.linesep
vars_long_name = vars_long_name.strip()
# tags
vars_tag = ""
for var_id in self.var_ids_env_tag:
var_id = var_id.replace(" ", "_")
vars_tag += "{0:<20s}".format(var_id)
vars_tag += "{0:<20s}".format(var_id + "_numflag")
for var_id in self.lev1_mz:
vars_tag += "{0:<20s}".format(var_id)
vars_tag += "{0:<20s}".format(var_id + "_numflag")
vars_tag = vars_tag.strip()
return (vars_long_name, vars_tag)
def get_header(self, t_start, lvl, v_start_acq, year):
"""
@brief constructs the header file
@param t_startTrue
@param lvl level of the product : 1 -> no time averaging ; 2 -> average all the year by day range
@param v_start_acq start acquisition time vector
@param year year for L2 synthesis
@return the output file name and template header filled
"""
start_time, start_day = self.get_acq_time_tags(t_start, lvl, year)
d = datetime.strptime(start_day + " 00:00:00", "%Y %m %d 00:00:00")
ts_day_start = calendar.timegm(d.timetuple())
# - production time
prod_time, rev_day = self.get_prod_time_tags()
# set metdata based on level 0
self.station = self.reader.attributes["Station code"]
self.instrument = self.reader.attributes["Instrument type"]
self.matrix = self.reader.attributes["Matrix"]
self.instr_name = self.reader.attributes["Instrument name"]
self.model = self.reader.attributes["Instrument model"]
self.brand = self.reader.attributes["Instrument manufacturer"]
self.sn = self.reader.attributes["Instrument serial number"]
self.method_ref = self.reader.attributes["Method ref"]
self.lab = self.reader.attributes["Laboratory code"]
self.platform = self.reader.attributes["Platform code"]
self.gaw_id = self.reader.attributes["Station GAW-ID"]
self.wdca_id = self.reader.attributes["Station WDCA-ID"]
self.site = self.reader.attributes["Station name"]
self.latitude = self.reader.attributes["Measurement latitude"]
self.longitude = self.reader.attributes["Measurement longitude"]
self.altitude = self.reader.attributes["Measurement altitude"]
self.meas_height = self.reader.attributes["Measurement height"]
std_temp = self.reader.attributes["Volume std. temperature"]
std_pres = self.reader.attributes["Volume std. pressure"]
self.wmo_land_use = self.reader.attributes["Station land use"]
self.wmo_setting = self.reader.attributes["Station setting"]
self.gaw_type = self.reader.attributes["Station GAW type"]
self.wmo_region = self.reader.attributes["Station WMO region"]
self.component = "ion_concentrations"
self.normal_comment_line = 58
self.unit = "ppb"
# - output filename
fname = self.get_outfname(start_time, prod_time, lvl, v_start_acq, year)
fname_ext = "nas"
# - time resolution tags
period_tag, dt_tag, dt, dt_raw_tag = self.get_time_tags(lvl, v_start_acq, year)
# set type code + dt value : TU time regulary spaced, TI : irregulary
set_type_code = "TU" # should always be
if lvl in [0, 1]:
diff = numpy.diff(v_start_acq)
if (diff.size > 0) and not numpy.all(diff == diff[0]):
set_type_code = "TI"
dt = 0
elif lvl in [2]:
dt = 1 / float(24) # 1h in decimal day unit
period_tag = "1y"
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
# - number of variables
nvars = self.get_nvars(lvl)
# - scale factors
scale_factors = ("1. " * nvars).strip()
# variables fill values
vars_fill = self.get_vars_fill(lvl)
# variable long names + tags
vars_long_name, vars_tag = self.get_vars_desc(lvl)
# standard conditions of acquisition
std_temp = self.acq_temp
std_pres = self.acq_pres
if lvl in ["1", "2"]:
std_temp = "%.2f" % Standardizer.T_STD
std_pres = "%.2f" % Standardizer.P_STD
# - main param unit depends of level
sz_header_base = (
self.normal_comment_line + 14
) # number of lines of the header without vars
header_size = sz_header_base + nvars # number of lines of the header
# --- fill-in the header
header = Converter.Converter.get_header(
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
self,
lvl,
header_size=header_size,
start_day=start_day,
rev_day=rev_day,
start_time=start_time,
prod_time=prod_time,
fname=fname,
fname_ext=fname_ext,
lab=self.lab,
station=self.station,
platform=self.platform,
gaw_id=self.gaw_id,
wdca_id=self.wdca_id,
site=self.site,
latitude=self.latitude,
longitude=self.longitude,
altitude=self.altitude,
meas_altitude=self.altitude + self.meas_height,
std_temp=std_temp,
std_pres=std_pres,
wmo_land_use=self.wmo_land_use,
wmo_setting=self.wmo_setting,
gaw_type=self.gaw_type,
wmo_region=self.wmo_region,
period_tag=period_tag,
dt_tag=dt_tag,
dt=dt,
dt_raw_tag=dt_raw_tag,
submitter=self.submitter,
component=self.component,
matrix=self.matrix,
unit=self.unit,
instrument=self.instrument,
brand=self.brand,
model=self.model,
sn=self.sn,
nvars=nvars,
scale_factors=scale_factors,
vars_fill=vars_fill,
vars_long_name=vars_long_name,
vars_tag=vars_tag,
set_type_code=set_type_code,
method_ref=self.method_ref,
instr_name=self.instr_name,
normal_comment_line=self.normal_comment_line,
)
return fname, header, start_day, ts_day_start
def get_parser(app, SRC_DIR):
"""
Sets the available program options, and their default values
"""
parser = OptionParser()
parser.usage = "python3 " + app + ".py [options] <conf-file> <infiles>" + os.linesep
parser.usage += os.linesep
parser.usage += "with :" + os.linesep
parser.usage += (
"\t<infiles> full path to the trace gazes file(s) to convert" + os.linesep +
"\t<conf-file> full path of the configuration file" + os.linesep
)
parser.add_option(
"-p",
"--prod-id",
action="store",
default=None,
type="str",
dest="prod_id",
help="Product_id as <STATION>_<INSTRUMENT>_<LEVEL>",
)
parser.add_option(
"-o",
"--out-dir",
action="store",
default=SRC_DIR + "/tests/results",
type="str",
dest="outdir",
help="Output directory",
)
parser.add_option(
"-d",
"--cache_dir",
action="store",
default=SRC_DIR + "/tests/cache/",
type="str",
dest="cache_dir",
help="Directory of the temporary files",
)
# check the number of command line arguments
raise ValueError("Missing command-line arguments")
return infiles, conf, parser.parse_args()
os.path.dirname(os.path.abspath(__file__)), "..", ".."
infiles, conf, (options, arg) = PtrmsConverter.get_parser(__APP__, SRC_DIR)
prog.process(infiles, conf, options.cache_dir, options.outdir, levels=[2])