Newer
Older
# -*- coding: utf-8 -*-
"""
Description :
-------------
PTRMS converter based on CAMS project
Usage :
-------
Usage: python3 PtrmsConverter.py [options] <infiles>
Author :
--------
CDS AERIS-ICARE Aurélien Chauvigné
License :
---------
This file must be used under the terms of the CeCILL.
This source file is licensed as described in the file COPYING, which
you should have received as part of this distribution. The terms
are also available at
http://www.cecill.info/licences/Licence_CeCILL_V2-en.txt
History :
--------
v0.0.0 : 2023/09/12
- creation
"""
import calendar
from optparse import OptionParser
import numpy
pkgdir = os.path.dirname(__file__)
d = os.path.abspath(os.path.join(pkgdir, ".."))
if d not in sys.path:
sys.path.append(os.path.join(d))
from converter import Converter
from product.AMESProduct import AMESProduct
from iointerface.AMESReader import AMESReader
from misc import Standardizer, TimeConverter
from uploader.DataSeeker import DataSeeker
import json
import re
from algo.ptrms_lib import ptrms_lib
__VERSION__ = "0.0.0"
__APP__ = os.path.splitext(os.path.basename(__file__))[0]
__DEBUG__ = True
__THROW__ = True
__SHOW_BAD_INPUTS__ = False
__SHOW_OUTPUTS__ = False
__ENABLE_BASELINE_CORRECTION_L0__ = True
class PtrmsConverter(Converter.Converter):
"""
Puy NOX data converter
"""
data_seeker = DataSeeker()
# acquisition conditions
acq_temp = Standardizer.T_STD # temperature in K
acq_pres = Standardizer.P_STD # pressure in hPa
# fill values
var_fills_conc = 999999.999
var_fills_env = 9999.999
var_fills_qa = 0.999
var_fills_conc_native = -99 # fill value in native files
def __init__(self, prod_id):
"""
@brief constructor
"""
# - process all enabled product IDs set in data seeker
if prod_id is None:
prod_ids = [k for k in self.data_seeker.cfg.keys()]
else:
prod_ids = [prod_id]
for prod_id in prod_ids:
Converter.Converter.__init__(self, prod_id, AMESProduct)
def process(self, infiles, conf_file, cache_dir, outdir, levels, hour=None):
"""
@brief convert a list of files
@param infiles the files to convert
@param outdir output directory
@param levels processed level
@param year year of the eventual yearly synthesis
"""
outdir = os.path.abspath(outdir)
# level 0 only
for lvl in levels:
start_time = time.time()
if __DEBUG__:
print("*" * 20 + " LEVEL %d " % lvl + "*" * 20)
try:
self.process_lvl(infiles, conf_file, cache_dir, outdir, lvl, hour)
except Exception as e:
if __DEBUG__:
msg = " > " + str(e) + " : " + os.linesep + "ABORT"
print(msg)
if __THROW__:
raise
if __DEBUG__:
print(
"*" * 20
+ " LEVEL %d" % lvl
+ " : "
+ time.strftime("%H:%M:%S ", time.gmtime(time.time() - start_time))
+ "*" * 20
)
def process_lvl(self, infiles, conf_file, cache_dir, outdir, lvl, hour):
"""
@brief convert a list of files
@param infiles the files to convert
@param cache_dir cache directory path
@param outdir output directory
@param year year of the eventual yearly synthesis
"""
self.reader = AMESReader(infiles[0])
year = int(self.reader.attributes["Startdate"][0:4])
ds_names = self.reader.get_ds_names()
indata = Converter.Converter.load_indata(
self, infiles, ds_names, reader_interface=AMESReader
)
self.var_ids_mz = []
self.var_ids_env_tag = []
self.var_ids_env = []
indata[Product.START_ACQ_ID] = indata[var_id]
indata[Product.END_ACQ_ID] = indata[var_id]
elif "status" in var_id:
indata["status"] = indata[var_id]
elif "temperature" in var_id and "Location=inlet" in var_id:
self.var_ids_env.append(var_id)
self.var_ids_env_tag.append("T_inlet")
elif "pressure" in var_id and "Location=inlet" in var_id:
self.var_ids_env.append(var_id)
self.var_ids_env_tag.append("P_inlet")
elif "count_rate" in var_id and "numflag" not in var_id:
m = re.search(r"_[0-9]+_", var_id)
mass_num = m.group(0).replace("_", "")
indata["mz_" + mass_num] = indata[var_id]
self.var_ids_mz.append("mz_" + mass_num)
if "methanal" in var_id:
mass_num = "31"
elif "methanol" in var_id:
mass_num = "33"
elif "acetonitrile" in var_id:
mass_num = "42"
elif "ethanal" in var_id:
mass_num = "45"
elif "dimethylsulfide" in var_id:
mass_num = "63"
elif "methyl_acetate" in var_id:
mass_num = "75"
elif "benzene" in var_id:
mass_num = "79"
elif "monoterpenes" in var_id:
mass_num = "137"
indata["mz_" + mass_num] = indata[var_id]
self.var_ids_mz.append("mz_" + mass_num)
prog_ptrms_lib = ptrms_lib(cache_dir, outdir, conf_file)
self.lev1_mz = [
s
for s in prog_ptrms_lib.conf["cols_species"]
if prog_ptrms_lib.conf["species"][s]["actris_submit"]
]
self.lev1_mz_name = [
prog_ptrms_lib.conf["species"][s]["actris_name"]
for s in prog_ptrms_lib.conf["cols_species"]
if prog_ptrms_lib.conf["species"][s]["actris_submit"]
]
ppb_data, lod, rsd, accuracy, uncertainty, raw_data_filtered = (
prog_ptrms_lib.process(indata, year, levels=[0])
# group outputs
outdata = ppb_data
outdata[Product.START_ACQ_ID] = raw_data_filtered[Product.START_ACQ_ID]
outdata[Product.END_ACQ_ID] = raw_data_filtered[Product.END_ACQ_ID]
for var_id in self.var_ids_env:
outdata[var_id] = raw_data_filtered[var_id]
indata_fill, indata_nan, invalid_acq_val = self.get_invalid(outdata)
v_start_acq_in = outdata[Product.START_ACQ_ID]
t_start = min(v_start_acq_in)
# get header
fname, sout, _ = self.get_header(t_start, lvl, v_start_acq_in, year)[:3]
# sort according start_acq
i_sort = numpy.argsort(outdata[Product.START_ACQ_ID])
outdata["start_acq"] = [outdata["start_acq"][i] for i in i_sort]
outdata["end_acq"] = [outdata["end_acq"][i] for i in i_sort]
for var_id in self.var_ids_env + self.lev1_mz:
outdata[var_id] = [outdata[var_id][i] for i in i_sort]
invalid_acq_val[var_id] = numpy.array(
[invalid_acq_val[var_id][i] for i in i_sort]
)
# outdata = outdata.sort_values(by=['start_acq'])
# outdata= sorted(outdata,key=lambda x:x["start_acq"].max(axis=0))
# --- write data to output
if __DEBUG__:
print("--- Write outputs ---")
# --- write data to output
sz = len(v_start_acq_in)
for i in range(sz):
# - time vars
line = "{0:<20.6f}{1:<20.6f}".format(
outdata["start_acq"][i], outdata["end_acq"][i]
)
# concentrations coef
for var_id in self.var_ids_env + self.lev1_mz:
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
conc = outdata[var_id][i]
qa = 0.0
if conc == self.var_fills_conc:
qa = self.var_fills_qa
qa = invalid_acq_val[var_id][i]
if var_id in self.var_ids_env:
line += "{0:<20.3f}".format(conc)
else:
line += "{0:<20.3f}".format(conc)
line += "{0:<20.3f}".format(qa)
sout += line.strip() + os.linesep
# write out data
outdir_year = "/".join([outdir, str(year)])
os.makedirs(outdir_year, exist_ok=True)
outfname = "/".join([outdir, str(year), fname])
f = open(outfname, "w")
try:
f.writelines(sout)
if __DEBUG__:
print(" > output file %s wrote" % outfname)
finally:
f.close()
if __SHOW_OUTPUTS__:
os.system("kate %s &" % outfname)
return sout
def get_fill(self, var_id):
"""
@brief return the fill value to use for the given variable
@param var_id a variable ID
@return the fill value
"""
if var_id == self.QA_FLAGS_ID:
return 9.999
return self.var_fills_conc
elif var_id in self.var_ids_env:
return self.var_fills_env
def get_invalid(self, indata):
"""
@brief identify the invalid data values and return it in 2 dictionary buffers : one with invalid, one with NaNs
@param indata input data buffer
@return 3 buffers : one with mask of invalids, one with mask of NaNs, one with the manually invalidated values
"""
invalid_databuf = {}
nan_databuf = {}
invalid_val = {}
# input data fill value
infill = self.var_fills_conc_native
# --- environmental property and concentration automatic QA
for var_id in self.var_ids_env + self.lev1_mz:
data = indata[var_id]
# data = numpy.array([float(d.replace('E','e').replace(',','.')) for d in data])
data = numpy.array(data)
is_nan_data = numpy.isnan(data)
nan_databuf[var_id] = is_nan_data
is_infill = data == infill
if var_id == "T inlet":
is_extreme = (data < 273) | (data > 350)
elif var_id == "p det":
is_extreme = (data < 200) | (data > 1200)
else:
is_extreme = data > 100000
is_invalid = is_nan_data | is_infill | is_extreme
invalid_databuf[var_id] = is_invalid
# - replace native fill value by ebas one
if any(is_infill) | any(is_nan_data):
indata[var_id] = [
self.get_fill(var_id) if ((is_infill[i]) | (is_nan_data[i])) else d
for i, d in enumerate(indata[var_id])
]
# - set QA value
invalid_val[var_id] = numpy.zeros(data.shape)
# invalid_val[var_id][is_invalid] = 0.980
invalid_val[var_id][is_infill | is_nan_data] = 0.999
invalid_val[var_id][is_extreme] = 0.459
return invalid_databuf, nan_databuf, invalid_val
def get_nvars(self, lvl):
"""
@brief return the number of output variables
@param lvl level of the product : 1 -> no time averaging ; 2 -> average all the year by day range
@return the the number of output variables
"""
nvars = 1 # end time
nvars += 1 # status
nvars += len(self.var_ids_env) * 2 # T, P and QA
nvars += len(self.lev1_mz) * 2 # m/e and QA flag
return nvars
def get_time_tags(self, lvl, v_start_acq, year):
"""
@brief return the time resolution, level, etc as requested by ACTRIS data format
@param lvl level of the ACTRIS product
@return the time tags
"""
dt_raw_tag = self.reader.attributes["Resolution code"]
dt_tag = self.reader.attributes["Sample duration"]
if lvl in [0, 1]:
if dt_tag.endswith("mn"):
dt = float(dt_tag.replace("mn", "")) / float(
24 * 60
) # in decimal day unit
if dt_tag.endswith("s"):
dt = float(dt_tag.replace("s", "")) / float(24 * 60 * 60)
period_tag = "1d" # NRT mode
elif lvl in [2]:
dt_tag = "1h"
dt = 1 / float(24) # 1h in decimal day unit
period_tag = "1y"
else:
raise ValueError("Invalid level %d" % lvl)
return (period_tag, dt_tag, dt, dt_raw_tag)
def get_vars_fill(self, lvl):
"""
@brief constructs the list of variable fill values separated by spaces
@param lvl level of the product : 1 -> no time averaging ; 2 -> average all the year by day range
@return the list of variable fill values as a string
"""
s = "9999.999999 " # end time
s += str(self.var_fills_conc) + " " # T
s += "9.999 "
s += str(self.var_fills_conc) + " " # P
s += "9.999 "
s += str(self.var_fills_conc)
s += " "
s += "9.999 "
return s.strip()
def get_vars_desc(self, lvl):
"""
@brief constructs the variables long names and column names
@param lvl level
@return (vars_long_name, vars_tags)
"""
vars_long_name = ""
vars_long_name += (
"{0:s}, {1:s}, Location=inlet, Matrix=instrument".format("temperature", "K")
+ os.linesep
)
vars_long_name += "numflag_temperature, no unit" + os.linesep
vars_long_name += (
"{0:s}, {1:s}, Location=inlet, Matrix=instrument".format("pressure", "hPa")
+ os.linesep
)
vars_long_name += "numflag_pressure, no unit" + os.linesep
for var_id in self.lev1_mz_name:
vars_long_name += var_id + ", ppb" + os.linesep
vars_long_name += "numflag_" + var_id + ", no unit" + os.linesep
vars_long_name = vars_long_name.strip()
# tags
vars_tag = ""
for var_id in self.var_ids_env_tag:
var_id = var_id.replace(" ", "_")
vars_tag += "{0:<20s}".format(var_id)
vars_tag += "{0:<20s}".format("numflag_" + var_id)
for var_id in self.lev1_mz:
vars_tag += "{0:<20s}".format(var_id)
vars_tag += "{0:<20s}".format(var_id + "_numflag")
vars_tag = vars_tag.strip()
return (vars_long_name, vars_tag)
def get_header(self, t_start, lvl, v_start_acq, year):
"""
@brief constructs the header file
@param t_startTrue
@param lvl level of the product : 1 -> no time averaging ; 2 -> average all the year by day range
@param v_start_acq start acquisition time vector
@param year year for L2 synthesis
@return the output file name and template header filled
"""
start_time, start_day = self.get_acq_time_tags(t_start, lvl, year)
d = datetime.strptime(start_day + " 00:00:00", "%Y %m %d 00:00:00")
ts_day_start = calendar.timegm(d.timetuple())
# - production time
prod_time, rev_day = self.get_prod_time_tags()
# set metdata based on level 0
self.station = self.reader.attributes["Station code"]
self.instrument = self.reader.attributes["Instrument type"]
self.matrix = self.reader.attributes["Matrix"]
self.instr_name = self.reader.attributes["Instrument name"]
self.model = self.reader.attributes["Instrument model"]
self.brand = self.reader.attributes["Instrument manufacturer"]
self.sn = self.reader.attributes["Instrument serial number"]
self.method_ref = self.reader.attributes["Method ref"]
self.lab = self.reader.attributes["Laboratory code"]
self.platform = self.reader.attributes["Platform code"]
self.gaw_id = self.reader.attributes["Station GAW-ID"]
self.wdca_id = self.reader.attributes["Station WDCA-ID"]
self.site = self.reader.attributes["Station name"]
self.latitude = self.reader.attributes["Measurement latitude"]
self.longitude = self.reader.attributes["Measurement longitude"]
self.altitude = self.reader.attributes["Measurement altitude"]
self.meas_height = self.reader.attributes["Measurement height"]
std_temp = self.reader.attributes["Volume std. temperature"]
std_pres = self.reader.attributes["Volume std. pressure"]
self.wmo_land_use = self.reader.attributes["Station land use"]
self.wmo_setting = self.reader.attributes["Station setting"]
self.gaw_type = self.reader.attributes["Station GAW type"]
self.wmo_region = self.reader.attributes["Station WMO region"]
self.component = "ion_concentrations"
self.normal_comment_line = 58
self.unit = "ppb"
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
# - output filename
fname = self.get_outfname(start_time, prod_time, lvl, v_start_acq, year)
fname_ext = "nas"
# - time resolution tags
period_tag, dt_tag, dt, dt_raw_tag = self.get_time_tags(lvl, v_start_acq, year)
# set type code + dt value : TU time regulary spaced, TI : irregulary
set_type_code = "TU" # should always be
if lvl in [0, 1]:
# default values
# dt_tag = dt_raw_tag
# 5min in decimal day unit
# dt = int(dt_tag.replace("mn", "")) / float(24 * 60)
# sometimes, records are missing -> TI + dt not constant (so set to 0)
# -> check constantness
diff = numpy.diff(v_start_acq)
if (diff.size > 0) and not numpy.all(diff == diff[0]):
set_type_code = "TI"
dt = 0
# elif lvl in [2]:
# # dt_tag = "1h"
# dt = 1 / float(24) # 1h in decimal day unit
# period_tag = "1y"
# - number of variables
nvars = self.get_nvars(lvl)
# - scale factors
scale_factors = ("1. " * nvars).strip()
# variables fill values
vars_fill = self.get_vars_fill(lvl)
# variable long names + tags
vars_long_name, vars_tag = self.get_vars_desc(lvl)
# standard conditions of acquisition
std_temp = self.acq_temp
std_pres = self.acq_pres
if lvl in ["1", "2"]:
std_temp = "%.2f" % Standardizer.T_STD
std_pres = "%.2f" % Standardizer.P_STD
# - main param unit depends of level
sz_header_base = (
self.normal_comment_line + 14
) # number of lines of the header without vars
header_size = sz_header_base + nvars # number of lines of the header
# --- fill-in the header
header = Converter.Converter.get_header(
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
self,
lvl,
header_size=header_size,
start_day=start_day,
rev_day=rev_day,
start_time=start_time,
prod_time=prod_time,
fname=fname,
fname_ext=fname_ext,
lab=self.lab,
station=self.station,
platform=self.platform,
gaw_id=self.gaw_id,
wdca_id=self.wdca_id,
site=self.site,
latitude=self.latitude,
longitude=self.longitude,
altitude=self.altitude,
meas_altitude=self.altitude + self.meas_height,
std_temp=std_temp,
std_pres=std_pres,
wmo_land_use=self.wmo_land_use,
wmo_setting=self.wmo_setting,
gaw_type=self.gaw_type,
wmo_region=self.wmo_region,
period_tag=period_tag,
dt_tag=dt_tag,
dt=dt,
dt_raw_tag=dt_raw_tag,
submitter=self.submitter,
component=self.component,
matrix=self.matrix,
unit=self.unit,
instrument=self.instrument,
brand=self.brand,
model=self.model,
sn=self.sn,
nvars=nvars,
scale_factors=scale_factors,
vars_fill=vars_fill,
vars_long_name=vars_long_name,
vars_tag=vars_tag,
set_type_code=set_type_code,
method_ref=self.method_ref,
instr_name=self.instr_name,
normal_comment_line=self.normal_comment_line,
)
return fname, header, start_day, ts_day_start
def get_parser(app, SRC_DIR):
"""
Sets the available program options, and their default values
"""
parser = OptionParser()
parser.usage = "python3 " + app + ".py [options] <conf-file> <infiles>" + os.linesep
parser.usage += os.linesep
parser.usage += "with :" + os.linesep
parser.usage += (
"\t<infiles> full path to the trace gazes file(s) to convert" + os.linesep +
"\t<conf-file> full path of the configuration file" + os.linesep
)
parser.add_option(
"-p",
"--prod-id",
action="store",
default=None,
type="str",
dest="prod_id",
help="Product_id as <STATION>_<INSTRUMENT>_<LEVEL>",
)
parser.add_option(
"-o",
"--out-dir",
action="store",
default=SRC_DIR + "/tests/results",
type="str",
dest="outdir",
help="Output directory",
)
parser.add_option(
"-d",
"--cache_dir",
action="store",
default=SRC_DIR + "/tests/cache/",
type="str",
dest="cache_dir",
help="Directory of the temporary files",
)
# check the number of command line arguments
raise ValueError("Missing command-line arguments")
return infiles, conf, parser.parse_args()
os.path.dirname(os.path.abspath(__file__)), "..", ".."
infiles, conf, (options, arg) = PtrmsConverter.get_parser(__APP__, SRC_DIR)
prog.process(infiles, conf, options.cache_dir, options.outdir, levels=[1])