Newer
Older
# -*- coding: utf-8 -*-
"""
Description :
-------------
PTRMS converter based on CAMS project
Usage :
-------
Usage: python3 PtrmsConverter.py [options] <infiles>
Author :
--------
CDS AERIS-ICARE Aurélien Chauvigné
License :
---------
This file must be used under the terms of the CeCILL.
This source file is licensed as described in the file COPYING, which
you should have received as part of this distribution. The terms
are also available at
http://www.cecill.info/licences/Licence_CeCILL_V2-en.txt
History :
--------
v0.0.0 : 2023/09/12
- creation
"""
import calendar
from optparse import OptionParser
import numpy
pkgdir = os.path.dirname(__file__)
d = os.path.abspath(os.path.join(pkgdir, ".."))
if d not in sys.path:
sys.path.append(os.path.join(d))
from converter import Converter
from product.AMESProduct import AMESProduct
from iointerface.AMESReader import AMESReader
from misc import Standardizer, TimeConverter
from uploader.DataSeeker import DataSeeker
import json
import re
from algo.ptrms_lib import ptrms_lib
__VERSION__ = "0.0.0"
__APP__ = os.path.splitext(os.path.basename(__file__))[0]
__DEBUG__ = True
__THROW__ = True
__SHOW_BAD_INPUTS__ = False
__SHOW_OUTPUTS__ = False
__ENABLE_BASELINE_CORRECTION_L0__ = True
class PtrmsConverter(Converter.Converter):
"""
Puy NOX data converter
"""
data_seeker = DataSeeker()
# acquisition conditions
acq_temp = Standardizer.T_STD # temperature in K
acq_pres = Standardizer.P_STD # pressure in hPa
# fill values
var_fills_conc = 999999.999
var_fills_env = 9999.999
var_fills_qa = 0.999
var_fills_conc_native = -99 # fill value in native files
def __init__(self, prod_id):
"""
@brief constructor
"""
# - process all enabled product IDs set in data seeker
if prod_id is None:
prod_ids = [k for k in self.data_seeker.cfg.keys()]
else:
prod_ids = [prod_id]
for prod_id in prod_ids:
Converter.Converter.__init__(self, prod_id, AMESProduct)
def process(self, infiles, conf_file, cache_dir, outdir, levels, hour=None):
"""
@brief convert a list of files
@param infiles the files to convert
@param outdir output directory
@param levels processed level
@param year year of the eventual yearly synthesis
"""
outdir = os.path.abspath(outdir)
print("*" * 20 + " LEVEL %s " % lvl + "*" * 20)
self.process_lvl(infiles, conf_file, cache_dir, outdir, lvl, hour)
except Exception as e:
if __DEBUG__:
msg = " > " + str(e) + " : " + os.linesep + "ABORT"
print(msg)
if __THROW__:
raise
if __DEBUG__:
print(
"*" * 20
+ " : "
+ time.strftime("%H:%M:%S ", time.gmtime(time.time() - start_time))
+ "*" * 20
)
def process_lvl(self, infiles, conf_file, cache_dir, outdir, lvl, hour):
"""
@brief convert a list of files
@param infiles the files to convert
@param cache_dir cache directory path
@param outdir output directory
@param year year of the eventual yearly synthesis
"""
self.reader = AMESReader(infiles[0])
year = int(self.reader.attributes["Startdate"][0:4])
ds_names = self.reader.get_ds_names()
indata = Converter.Converter.load_indata(
self, infiles, ds_names, reader_interface=AMESReader
)
self.var_ids_mz = []
self.var_ids_env_tag = []
self.var_ids_env = []
indata[Product.START_ACQ_ID] = indata[var_id]
indata[Product.END_ACQ_ID] = indata[var_id]
elif "status" in var_id:
indata["status"] = indata[var_id]
elif "temperature" in var_id and "Location=inlet" in var_id:
self.var_ids_env.append(var_id)
self.var_ids_env_tag.append("T_inlet")
elif "pressure" in var_id and "Location=inlet" in var_id:
self.var_ids_env.append(var_id)
self.var_ids_env_tag.append("P_inlet")
elif "count_rate" in var_id and "numflag" not in var_id:
m = re.search(r"_[0-9]+_", var_id)
mass_num = m.group(0).replace("_", "")
indata["mz_" + mass_num] = indata[var_id]
self.var_ids_mz.append("mz_" + mass_num)
if "methanal" in var_id:
mass_num = "31"
elif "methanol" in var_id:
mass_num = "33"
elif "acetonitrile" in var_id:
mass_num = "42"
elif "ethanal" in var_id:
mass_num = "45"
elif "dimethylsulfide" in var_id:
mass_num = "63"
elif "methyl_acetate" in var_id:
mass_num = "75"
elif "benzene" in var_id:
mass_num = "79"
elif "monoterpenes" in var_id:
mass_num = "137"
indata["mz_" + mass_num] = indata[var_id]
self.var_ids_mz.append("mz_" + mass_num)
prog_ptrms_lib = ptrms_lib(cache_dir, outdir, conf_file)
self.lev1_mz = [
s
for s in prog_ptrms_lib.conf["cols_species"]
if prog_ptrms_lib.conf["species"][s]["actris_submit"]
]
self.var_ids_flag = [
s + '_numflag'
for s in prog_ptrms_lib.conf["cols_species"]
if prog_ptrms_lib.conf["species"][s]["actris_submit"]
]
self.lev1_mz_name = [
prog_ptrms_lib.conf["species"][s]["actris_name"]
for s in prog_ptrms_lib.conf["cols_species"]
if prog_ptrms_lib.conf["species"][s]["actris_submit"]
]
ppb_data, lod, rsd, accuracy, uncertainty, raw_data_filtered = (
prog_ptrms_lib.process(indata, year, levels=[0])
self.var_ids_std = [var_id + "_std" for var_id in self.lev1_mz]
indata = ppb_data
indata[Product.START_ACQ_ID] = raw_data_filtered[Product.START_ACQ_ID]
indata[Product.END_ACQ_ID] = raw_data_filtered[Product.END_ACQ_ID]
t_start = datetime(year,
TimeConverter.doy2dt(year, min(raw_data_filtered[Product.START_ACQ_ID])).month,
TimeConverter.doy2dt(year, min(raw_data_filtered[Product.START_ACQ_ID])).day,
TimeConverter.doy2dt(year, min(raw_data_filtered[Product.START_ACQ_ID])).hour,
TimeConverter.doy2dt(year, min(raw_data_filtered[Product.START_ACQ_ID])).minute,
TimeConverter.doy2dt(year, min(raw_data_filtered[Product.START_ACQ_ID])).second)
t_end = datetime(year,
TimeConverter.doy2dt(year, max(raw_data_filtered[Product.END_ACQ_ID])).month,
TimeConverter.doy2dt(year, max(raw_data_filtered[Product.END_ACQ_ID])).day,
TimeConverter.doy2dt(year, max(raw_data_filtered[Product.END_ACQ_ID])).hour,
TimeConverter.doy2dt(year, max(raw_data_filtered[Product.END_ACQ_ID])).minute,
TimeConverter.doy2dt(year, max(raw_data_filtered[Product.END_ACQ_ID])).second)
for var_id in self.var_ids_env:
indata[var_id] = raw_data_filtered[var_id]
# - set output time axis of level 2 products
dt = 3600 # hour by hour
d_start = t_start(minutes=0, seconds=0)
d_end = t_end(minutes=0, seconds=0) + timedelta(hours=1)
# d_end = datetime(year,
# TimeConverter.doy2dt(year, max(raw_data_filtered[Product.START_ACQ_ID])).month,
# TimeConverter.doy2dt(year, max(raw_data_filtered[Product.START_ACQ_ID])).day,
# TimeConverter.doy2dt(year, max(raw_data_filtered[Product.START_ACQ_ID])).hour,
# 0,
# 0) + timedelta(hours=1)
ts_start = calendar.timegm(d_start.timetuple())
# - end time of averaging range
v_end_time = v_start_time + dt
for var_id in self.lev1_mz + self.var_ids_flag + self.var_ids_std:
shape, dtype=numpy.float64) + self.get_fill(var_id)
v_start_acq_in = indata[Product.START_ACQ_ID]
v_start_acq_out = outdata[Product.START_ACQ_ID]
t_start = calendar.timegm(t_start.timetuple())
fname, sout, _ = self.get_header(t_start, lvl, v_start_acq_out, year)[:3]
i_sort = numpy.argsort(indata[Product.START_ACQ_ID])
indata["start_acq"] = [indata["start_acq"][i] for i in i_sort]
indata["end_acq"] = [indata["end_acq"][i] for i in i_sort]
for var_id in self.var_ids_env + self.lev1_mz:
indata[var_id] = [indata[var_id][i] for i in i_sort]
indata_fill, indata_nan, invalid_acq_val = self.get_invalid(indata)
# --- processing : averaging for L2, setting QA flags and filter invalid values for all levels
# some aliases for an easier reading
v_start_acq_out = outdata["start_acq"]
# - averaging
for i in range(sz):
# select records in out time range
ts_min = TimeConverter.ts2dt(v_start_acq_out[i])
ts_max = TimeConverter.ts2dt(v_end_acq_out[i])
s = ((v_start_acq_in.index >= ts_min) & (v_start_acq_in.index <= ts_max))
if numpy.any(s):
# -> computes means and percentiles for concentration
self.set_stats_mean(
indata[var_id], indata_fill[var_id], s, i, outdata[var_id], self.get_fill(var_id))
# -> stddev
_var_id = "{0:s}_{1:s}".format(var_id, "std")
self.set_stats_std(
indata[var_id], indata_fill[var_id], s, i, outdata[_var_id])
if outdata[_var_id][i] != self.get_fill(_var_id):
outdata[_var_id][i] = outdata[_var_id][i] * 2
for var_id, var_id_flag in zip(self.lev1_mz,self.var_ids_flag):
data = outdata[var_id][i]
fill = self.get_fill(var_id)
is_valid = ~numpy.isnan(data) & (data != fill)
if is_valid:
outdata[var_id_flag][i] = 0.
else:
outdata[var_id_flag][i] = 0.999
if lvl in ["1.5"]:
dt_start_date = TimeConverter.dt_to_ts(datetime(year, 1, 1))
outdata["start_acq"] = numpy.array(
[(d - dt_start_date) / 86400.0 for d in outdata["start_acq"]])
outdata["end_acq"] = numpy.array(
[(d - dt_start_date) / 86400.0 for d in outdata["end_acq"]])
# convert ppb to ppt
for var_id, var_id_flag in zip(self.lev1_mz, self.var_ids_flag):
if any(outdata[var_id][outdata[var_id_flag] == 0]):
outdata[var_id] *= 1000
# --- write data to output
if __DEBUG__:
print("--- Write outputs ---")
# --- write data to output
for i in range(sz):
# - time vars
line = "{0:<20.6f}{1:<20.6f}".format(
outdata["start_acq"][i], outdata["end_acq"][i]
)
# environmental variables
if lvl in ["0", "1"]:
for var_id in self.var_ids_env:
data = outdata[var_id][i]
line += "{0:<20.3f}".format(data)
qa = 0.0
if data == self.get_fill(var_id):
qa = self.var_fills_qa
else:
qa = invalid_acq_val[var_id][i]
line += "{0:<20.3f}".format(qa)
line += "{0:<20.3f}".format(conc)
if lvl == "1.5":
line += "{0:<20.3f}".format(outdata[var_id + '_std'][i])
qa = 0.0
if conc == self.var_fills_conc:
qa = self.var_fills_qa
else:
if lvl in ["0", "1"]:
qa = invalid_acq_val[var_id][i]
else :
qa = outdata[var_id + '_numflag'][i]
sout += line.strip() + os.linesep
# write out data
outdir_year = "/".join([outdir, str(year)])
os.makedirs(outdir_year, exist_ok=True)
outfname = "/".join([outdir, str(year), fname])
f = open(outfname, "w")
try:
f.writelines(sout)
if __DEBUG__:
print(" > output file %s wrote" % outfname)
finally:
f.close()
if __SHOW_OUTPUTS__:
os.system("kate %s &" % outfname)
return sout
def get_fill(self, var_id):
"""
@brief return the fill value to use for the given variable
@param var_id a variable ID
@return the fill value
"""
if var_id in [self.QA_FLAGS_ID] + self.var_ids_flag:
elif var_id in self.lev1_mz + self.var_ids_std:
return self.var_fills_conc
elif var_id in self.var_ids_env:
return self.var_fills_env
def get_invalid(self, indata):
"""
@brief identify the invalid data values and return it in 2 dictionary buffers : one with invalid, one with NaNs
@param indata input data buffer
@return 3 buffers : one with mask of invalids, one with mask of NaNs, one with the manually invalidated values
"""
invalid_databuf = {}
nan_databuf = {}
invalid_val = {}
# input data fill value
infill = self.var_fills_conc_native
# --- environmental property and concentration automatic QA
for var_id in self.var_ids_env + self.lev1_mz:
data = indata[var_id]
# data = numpy.array([float(d.replace('E','e').replace(',','.')) for d in data])
data = numpy.array(data)
is_nan_data = numpy.isnan(data)
nan_databuf[var_id] = is_nan_data
is_infill = data == infill
if var_id == "T inlet":
is_extreme = (data < 273) | (data > 350)
elif var_id == "p det":
is_extreme = (data < 200) | (data > 1200)
else:
is_extreme = data > 100000
is_invalid = is_nan_data | is_infill | is_extreme
invalid_databuf[var_id] = is_invalid
# - replace native fill value by ebas one
if any(is_infill) | any(is_nan_data):
indata[var_id] = [
self.get_fill(var_id) if ((is_infill[i]) | (is_nan_data[i])) else d
for i, d in enumerate(indata[var_id])
]
# - set QA value
invalid_val[var_id] = numpy.zeros(data.shape)
# invalid_val[var_id][is_invalid] = 0.980
invalid_val[var_id][is_infill | is_nan_data] = 0.999
invalid_val[var_id][is_extreme] = 0.459
return invalid_databuf, nan_databuf, invalid_val
def get_nvars(self, lvl):
"""
@brief return the number of output variables
@param lvl level of the product : 1 -> no time averaging ; 2 -> average all the year by day range
@return the the number of output variables
"""
nvars = 1 # end time
if lvl == "0":
nvars += 1 # status
if lvl in ["0", "1"]:
nvars += len(self.var_ids_env) * 2 # T, P and QA
nvars += len(self.lev1_mz) * 2 # m/e and QA flag
if lvl == "1.5":
nvars += len(self.lev1_mz) # std
return nvars
def get_time_tags(self, lvl, v_start_acq, year):
"""
@brief return the time resolution, level, etc as requested by ACTRIS data format
@param lvl level of the ACTRIS product
@return the time tags
"""
dt_raw_tag = self.reader.attributes["Resolution code"]
dt_tag = self.reader.attributes["Sample duration"]
if dt_tag.endswith("mn"):
dt = float(dt_tag.replace("mn", "")) / float(
24 * 60
) # in decimal day unit
if dt_tag.endswith("s"):
dt = float(dt_tag.replace("s", "")) / float(24 * 60 * 60)
period_tag = "1d" # NRT mode
dt_tag = "1h"
dt = 1 / float(24) # 1h in decimal day unit
period_tag = "1y"
else:
return (period_tag, dt_tag, dt, dt_raw_tag)
def get_vars_fill(self, lvl):
"""
@brief constructs the list of variable fill values separated by spaces
@param lvl level of the product : 1 -> no time averaging ; 2 -> average all the year by day range
@return the list of variable fill values as a string
"""
s = "9999.999999 " # end time
s += str(self.var_fills_conc) + " " # T
s += "9.999 "
s += str(self.var_fills_conc) + " " # P
s += "9.999 "
s += str(self.var_fills_conc)
s += " "
s += "9.999 "
return s.strip()
def get_vars_desc(self, lvl):
"""
@brief constructs the variables long names and column names
@param lvl level
@return (vars_long_name, vars_tags)
"""
vars_long_name = ""
if lvl in ["0", "1"]:
vars_long_name += (
"{0:s}, {1:s}, Location=inlet, Matrix=instrument".format("temperature", "K")
+ os.linesep
)
vars_long_name += "numflag temperature, no unit" + os.linesep
vars_long_name += (
"{0:s}, {1:s}, Location=inlet, Matrix=instrument".format("pressure", "hPa")
+ os.linesep
)
vars_long_name += "numflag pressure, no unit" + os.linesep
vars_long_name += var_id + ", pmol/mol" + os.linesep
if lvl == "1.5":
vars_long_name += var_id + ", pmol/mol, Statistics=expanded uncertainty 2sigma" + os.linesep
vars_long_name += "numflag " + var_id + ", no unit" + os.linesep
vars_long_name = vars_long_name.strip()
# tags
vars_tag = ""
if lvl in ["0", "1"]:
for var_id in self.var_ids_env_tag:
var_id = var_id.replace(" ", "_")
vars_tag += "{0:<20s}".format(var_id)
vars_tag += "{0:<20s}".format(var_id + "_numflag")
for var_id in self.lev1_mz:
if lvl == "1.5":
vars_tag += "{0:<20s}".format(var_id + "_std")
vars_tag += "{0:<20s}".format(var_id + "_numflag")
vars_tag = vars_tag.strip()
return (vars_long_name, vars_tag)
def get_header(self, t_start, lvl, v_start_acq, year):
"""
@brief constructs the header file
@param t_startTrue
@param lvl level of the product : 1 -> no time averaging ; 2 -> average all the year by day range
@param v_start_acq start acquisition time vector
@param year year for L2 synthesis
@return the output file name and template header filled
"""
start_time, start_day = self.get_acq_time_tags(t_start, lvl, year)
d = datetime.strptime(start_day + " 00:00:00", "%Y %m %d 00:00:00")
ts_day_start = calendar.timegm(d.timetuple())
# - production time
prod_time, rev_day = self.get_prod_time_tags()
# set metdata based on level 0
self.station = self.reader.attributes["Station code"]
self.instrument = self.reader.attributes["Instrument type"]
self.matrix = self.reader.attributes["Matrix"]
self.instr_name = self.reader.attributes["Instrument name"]
self.model = self.reader.attributes["Instrument model"]
self.brand = self.reader.attributes["Instrument manufacturer"]
self.sn = self.reader.attributes["Instrument serial number"]
self.method_ref = self.reader.attributes["Method ref"]
self.lab = self.reader.attributes["Laboratory code"]
self.platform = self.reader.attributes["Platform code"]
self.gaw_id = self.reader.attributes["Station GAW-ID"]
self.wdca_id = self.reader.attributes["Station WDCA-ID"]
self.site = self.reader.attributes["Station name"]
self.latitude = self.reader.attributes["Measurement latitude"]
self.longitude = self.reader.attributes["Measurement longitude"]
self.altitude = self.reader.attributes["Measurement altitude"]
self.meas_height = self.reader.attributes["Measurement height"]
std_temp = self.reader.attributes["Volume std. temperature"]
std_pres = self.reader.attributes["Volume std. pressure"]
self.wmo_land_use = self.reader.attributes["Station land use"]
self.wmo_setting = self.reader.attributes["Station setting"]
self.gaw_type = self.reader.attributes["Station GAW type"]
self.wmo_region = self.reader.attributes["Station WMO region"]
self.component = "ion_concentrations"
self.normal_comment_line = 59
self.unit = "pmol/mol"
# - output filename
fname = self.get_outfname(start_time, prod_time, lvl, v_start_acq, year)
fname_ext = "nas"
# - time resolution tags
period_tag, dt_tag, dt, dt_raw_tag = self.get_time_tags(lvl, v_start_acq, year)
# set type code + dt value : TU time regulary spaced, TI : irregulary
set_type_code = "TU" # should always be
diff = numpy.diff(v_start_acq)
if (diff.size > 0) and not numpy.all(diff == diff[0]):
set_type_code = "TI"
dt = 0
dt = 1 / float(24) # 1h in decimal day unit
period_tag = "1y"
# - number of variables
nvars = self.get_nvars(lvl)
# - scale factors
scale_factors = ("1. " * nvars).strip()
# variables fill values
vars_fill = self.get_vars_fill(lvl)
# variable long names + tags
vars_long_name, vars_tag = self.get_vars_desc(lvl)
# standard conditions of acquisition
std_temp = self.acq_temp
std_pres = self.acq_pres
std_temp = "%.2f" % Standardizer.T_STD
std_pres = "%.2f" % Standardizer.P_STD
# - main param unit depends of level
sz_header_base = (
self.normal_comment_line + 14
) # number of lines of the header without vars
header_size = sz_header_base + nvars # number of lines of the header
# --- fill-in the header
header = Converter.Converter.get_header(
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
self,
lvl,
header_size=header_size,
start_day=start_day,
rev_day=rev_day,
start_time=start_time,
prod_time=prod_time,
fname=fname,
fname_ext=fname_ext,
lab=self.lab,
station=self.station,
platform=self.platform,
gaw_id=self.gaw_id,
wdca_id=self.wdca_id,
site=self.site,
latitude=self.latitude,
longitude=self.longitude,
altitude=self.altitude,
meas_altitude=self.altitude + self.meas_height,
std_temp=std_temp,
std_pres=std_pres,
wmo_land_use=self.wmo_land_use,
wmo_setting=self.wmo_setting,
gaw_type=self.gaw_type,
wmo_region=self.wmo_region,
period_tag=period_tag,
dt_tag=dt_tag,
dt=dt,
dt_raw_tag=dt_raw_tag,
submitter=self.submitter,
component=self.component,
matrix=self.matrix,
unit=self.unit,
instrument=self.instrument,
brand=self.brand,
model=self.model,
sn=self.sn,
nvars=nvars,
scale_factors=scale_factors,
vars_fill=vars_fill,
vars_long_name=vars_long_name,
vars_tag=vars_tag,
set_type_code=set_type_code,
method_ref=self.method_ref,
instr_name=self.instr_name,
normal_comment_line=self.normal_comment_line,
)
return fname, header, start_day, ts_day_start
def get_parser(app, SRC_DIR):
"""
Sets the available program options, and their default values
"""
parser = OptionParser()
parser.usage = "python3 " + app + ".py [options] <conf-file> <infiles>" + os.linesep
parser.usage += os.linesep
parser.usage += "with :" + os.linesep
parser.usage += (
"\t<infiles> full path to the trace gazes file(s) to convert" + os.linesep +
"\t<conf-file> full path of the configuration file" + os.linesep
parser.add_option(
"-l",
"--levels",
action="store",
default=1,
type="str",
dest="levels",
help="Processed levels",
)
parser.add_option(
"-p",
"--prod-id",
action="store",
default=None,
type="str",
dest="prod_id",
help="Product_id as <STATION>_<INSTRUMENT>_<LEVEL>",
)
parser.add_option(
"-o",
"--out-dir",
action="store",
default=SRC_DIR + "/tests/results",
type="str",
dest="outdir",
help="Output directory",
)
parser.add_option(
"-d",
"--cache_dir",
action="store",
default=SRC_DIR + "/tests/cache/",
type="str",
dest="cache_dir",
help="Directory of the temporary files",
)
# check the number of command line arguments
raise ValueError("Missing command-line arguments")
return infiles, conf, parser.parse_args()
os.path.dirname(os.path.abspath(__file__)), "..", ".."
infiles, conf, (options, arg) = PtrmsConverter.get_parser(__APP__, SRC_DIR)
prog.process(infiles, conf, options.cache_dir, options.outdir, levels=options.levels)