Skip to content
Snippets Groups Projects
PtrmsConverter.py 22.5 KiB
Newer Older
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
# -*- coding: utf-8 -*-

"""
Description :
-------------
    PTRMS converter based on CAMS project

Usage :
-------
    Usage: python3 PtrmsConverter.py [options] <infiles>

Author :
--------
    CDS AERIS-ICARE Aurélien Chauvigné

License :
---------
    This file must be used under the terms of the CeCILL.
    This source file is licensed as described in the file COPYING, which
    you should have received as part of this distribution.  The terms
    are also available at
    http://www.cecill.info/licences/Licence_CeCILL_V2-en.txt

History :
--------
    v0.0.0 : 2023/09/12
    - creation
"""
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
import sys
import os.path
import time
from datetime import datetime
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
import calendar
from optparse import OptionParser
import numpy
import pandas as pd
pkgdir = os.path.dirname(__file__)
d = os.path.abspath(os.path.join(pkgdir, ".."))
if d not in sys.path:
    sys.path.append(os.path.join(d))

from converter import Converter
from product.AMESProduct import AMESProduct
from iointerface.AMESReader import AMESReader
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
from product.Product import Product
from misc import Standardizer, TimeConverter
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
from uploader.DataSeeker import DataSeeker
import json
import re

from algo.ptrms_lib import ptrms_lib
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
__VERSION__ = "0.0.0"
__APP__ = os.path.splitext(os.path.basename(__file__))[0]
__DEBUG__ = True
__THROW__ = True
__SHOW_BAD_INPUTS__ = False
__SHOW_OUTPUTS__ = False
__ENABLE_BASELINE_CORRECTION_L0__ = True


class PtrmsConverter(Converter.Converter):
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
    """
    Puy NOX data converter
    """

    data_seeker = DataSeeker()

    # acquisition conditions
    acq_temp = Standardizer.T_STD  #  temperature in K
    acq_pres = Standardizer.P_STD  #  pressure in hPa

    # fill values
    var_fills_conc = 999999.999
    var_fills_env = 9999.999
    var_fills_qa = 0.999
    var_fills_conc_native = -99  # fill value in native files

    def __init__(self, prod_id):
        """
        @brief constructor
        """
        # - process all enabled product IDs set in data seeker
        if prod_id is None:
            prod_ids = [k for k in self.data_seeker.cfg.keys()]
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
        else:
            prod_ids = [prod_id]

        for prod_id in prod_ids:
            Converter.Converter.__init__(self, prod_id, AMESProduct)
    def process(self, infiles, conf_file, cache_dir, outdir, levels, hour=None):
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
        """
        @brief convert a list of files
        @param infiles the files to convert
        @param outdir output directory
        @param levels processed level
        @param year year of the eventual yearly synthesis
        """
        outdir = os.path.abspath(outdir)
        # level 0 only
        for lvl in levels:
            start_time = time.time()
            if __DEBUG__:
                print("*" * 20 + " LEVEL %d " % lvl + "*" * 20)
            try:
                self.process_lvl(infiles, conf_file, cache_dir, outdir, lvl, hour)
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
            except Exception as e:
                if __DEBUG__:
                    msg = " > " + str(e) + " : " + os.linesep + "ABORT"
                    print(msg)
                if __THROW__:
                    raise
            if __DEBUG__:
                print(
                    "*" * 20
                    + " LEVEL %d" % lvl
                    + " : "
                    + time.strftime("%H:%M:%S ", time.gmtime(time.time() - start_time))
                    + "*" * 20
                )

    def process_lvl(self, infiles, conf_file, cache_dir, outdir, lvl, hour):
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
        """
        @brief convert a list of files
        @param infiles the files to convert
        @param cache_dir cache directory path
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
        @param outdir output directory
        @param year year of the eventual yearly synthesis
        """
        # load data
        self.reader = AMESReader(infiles[0])
        year = int(self.reader.attributes["Startdate"][0:4])
        ds_names = self.reader.get_ds_names()
        indata = Converter.Converter.load_indata(
            self, infiles, ds_names, reader_interface=AMESReader
        )

        self.var_ids_mz = []
        self.var_ids_env_tag = []
        self.var_ids_env = []
        for var_id in ds_names:
            if "start_time" in var_id:
                indata[Product.START_ACQ_ID] = indata[var_id]
            elif "end_time" in var_id:
                indata[Product.END_ACQ_ID] = indata[var_id]
            elif "status" in var_id:
                indata["status"] = indata[var_id]
            elif "temperature" in var_id and "Location=inlet" in var_id:
                self.var_ids_env.append(var_id)
                self.var_ids_env_tag.append("T_inlet")
            elif "pressure" in var_id and "Location=inlet" in var_id:
                self.var_ids_env.append(var_id)
                self.var_ids_env_tag.append("P_inlet")
            elif "count_rate" in var_id and "numflag" not in var_id:
                m = re.search(r"_[0-9]+_", var_id)
                if m is not None:
                    mass_num = m.group(0).replace("_", "")
                    indata["mz_" + mass_num] = indata[var_id]
                    self.var_ids_mz.append("mz_" + mass_num)
                    if "methanal" in var_id:
                        mass_num = "31"
                    elif "methanol" in var_id:
                        mass_num = "33"
                    elif "acetonitrile" in var_id:
                        mass_num = "42"
                    elif "ethanal" in var_id:
                        mass_num = "45"
                    elif "dimethylsulfide" in var_id:
                        mass_num = "63"
                    elif "methyl_acetate" in var_id:
                        mass_num = "75"
                    elif "benzene" in var_id:
                        mass_num = "79"
                    elif "monoterpenes" in var_id:
                        mass_num = "137"
                    indata["mz_" + mass_num] = indata[var_id]
                    self.var_ids_mz.append("mz_" + mass_num)

        # convert to ppm concentration
        prog_ptrms_lib = ptrms_lib(cache_dir, outdir, conf_file)
        self.lev1_mz = [
            s
            for s in prog_ptrms_lib.conf["cols_species"]
            if prog_ptrms_lib.conf["species"][s]["actris_submit"]
        ]
        self.lev1_mz_name = [
            prog_ptrms_lib.conf["species"][s]["actris_name"]
            for s in prog_ptrms_lib.conf["cols_species"]
            if prog_ptrms_lib.conf["species"][s]["actris_submit"]
        ]
        ppb_data, lod, rsd, accuracy, uncertainty, raw_data_filtered = (
            prog_ptrms_lib.process(indata, year, levels=[0])
        # group outputs
        outdata = ppb_data
        outdata[Product.START_ACQ_ID] = raw_data_filtered[Product.START_ACQ_ID]
        outdata[Product.END_ACQ_ID] = raw_data_filtered[Product.END_ACQ_ID]
        for var_id in self.var_ids_env:
            outdata[var_id] = raw_data_filtered[var_id]
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
        # identify invalid values
        indata_fill, indata_nan, invalid_acq_val = self.get_invalid(outdata)
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed

        # - acquisition time
        v_start_acq_in = outdata[Product.START_ACQ_ID]
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
        t_start = min(v_start_acq_in)

        # get header
        fname, sout, _ = self.get_header(t_start, lvl, v_start_acq_in, year)[:3]

        # sort according start_acq
        i_sort = numpy.argsort(outdata[Product.START_ACQ_ID])
        outdata["start_acq"] = [outdata["start_acq"][i] for i in i_sort]
        outdata["end_acq"] = [outdata["end_acq"][i] for i in i_sort]
        for var_id in self.var_ids_env + self.lev1_mz:
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
            outdata[var_id] = [outdata[var_id][i] for i in i_sort]
            invalid_acq_val[var_id] = numpy.array(
                [invalid_acq_val[var_id][i] for i in i_sort]
            )
        # outdata = outdata.sort_values(by=['start_acq'])
        # outdata= sorted(outdata,key=lambda x:x["start_acq"].max(axis=0))

        # --- write data to output
        if __DEBUG__:
            print("--- Write outputs ---")

        # --- write data to output
        sz = len(v_start_acq_in)
        for i in range(sz):
            # - time vars
            line = "{0:<20.6f}{1:<20.6f}".format(
                outdata["start_acq"][i], outdata["end_acq"][i]
            )

            # concentrations coef
            for var_id in self.var_ids_env + self.lev1_mz:
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
                conc = outdata[var_id][i]

                qa = 0.0
                if conc == self.var_fills_conc:
                    qa = self.var_fills_qa

                qa = invalid_acq_val[var_id][i]
                if var_id in self.var_ids_env:
                    line += "{0:<20.3f}".format(conc)
                else:
                    line += "{0:<20.3f}".format(conc)

                line += "{0:<20.3f}".format(qa)
            sout += line.strip() + os.linesep

        # write out data
        outdir_year = "/".join([outdir, str(year)])
        os.makedirs(outdir_year, exist_ok=True)
        outfname = "/".join([outdir, str(year), fname])
        f = open(outfname, "w")
        try:
            f.writelines(sout)
            if __DEBUG__:
                print(" > output file %s wrote" % outfname)
        finally:
            f.close()
        if __SHOW_OUTPUTS__:
            os.system("kate %s &" % outfname)

        return sout

    def get_fill(self, var_id):
        """
        @brief return the fill value to use for the given variable
        @param var_id a variable ID
        @return the fill value
        """
        if var_id == self.QA_FLAGS_ID:
            return 9.999
        elif var_id in self.lev1_mz:
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
            return self.var_fills_conc
        elif var_id in self.var_ids_env:
            return self.var_fills_env

    def get_invalid(self, indata):
        """
        @brief identify the invalid data values and return it in 2 dictionary buffers : one with invalid, one with NaNs
        @param indata input data buffer
        @return 3 buffers : one with mask of invalids, one with mask of NaNs, one with the manually invalidated values
        """
        invalid_databuf = {}
        nan_databuf = {}
        invalid_val = {}

        # input data fill value
        infill = self.var_fills_conc_native

        # --- environmental property and concentration automatic QA
        for var_id in self.var_ids_env + self.lev1_mz:
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
            data = indata[var_id]
            # data = numpy.array([float(d.replace('E','e').replace(',','.')) for d in data])
            data = numpy.array(data)
            is_nan_data = numpy.isnan(data)
            nan_databuf[var_id] = is_nan_data
            is_infill = data == infill
            if var_id == "T inlet":
                is_extreme = (data < 273) | (data > 350)
            elif var_id == "p det":
                is_extreme = (data < 200) | (data > 1200)
            else:
                is_extreme = data > 100000

            is_invalid = is_nan_data | is_infill | is_extreme
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed

            invalid_databuf[var_id] = is_invalid
            # - replace native fill value by ebas one
            if any(is_infill) | any(is_nan_data):
                indata[var_id] = [
                    self.get_fill(var_id) if ((is_infill[i]) | (is_nan_data[i])) else d
                    for i, d in enumerate(indata[var_id])
                ]
            # - set QA value
            invalid_val[var_id] = numpy.zeros(data.shape)
            # invalid_val[var_id][is_invalid] = 0.980
            invalid_val[var_id][is_infill | is_nan_data] = 0.999
            invalid_val[var_id][is_extreme] = 0.459

        return invalid_databuf, nan_databuf, invalid_val

    def get_nvars(self, lvl):
        """
        @brief return the number of output variables
        @param lvl level of the product : 1 -> no time averaging ; 2 -> average all the year by day range
        @return the the number of output variables
        """
        nvars = 1  # end time
        nvars += 1  # status
        nvars += len(self.var_ids_env) * 2  # T, P and QA
        nvars += len(self.lev1_mz) * 2  # m/e and QA flag
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed

        return nvars

    def get_time_tags(self, lvl, v_start_acq, year):
        """
        @brief return the time resolution, level, etc as requested by ACTRIS data format
        @param lvl level of the ACTRIS product
        @return the time tags
        """
        dt_raw_tag = self.reader.attributes["Resolution code"]
        dt_tag = self.reader.attributes["Sample duration"]
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
        if lvl in [0, 1]:
            if dt_tag.endswith("mn"):
                dt = float(dt_tag.replace("mn", "")) / float(
                    24 * 60
                )  # in decimal day unit
            if dt_tag.endswith("s"):
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
                dt = float(dt_tag.replace("s", "")) / float(24 * 60 * 60)
            period_tag = "1d"  # NRT mode
        elif lvl in [2]:
            dt_tag = "1h"
            dt = 1 / float(24)  # 1h in decimal day unit
            period_tag = "1y"
        else:
            raise ValueError("Invalid level %d" % lvl)
        return (period_tag, dt_tag, dt, dt_raw_tag)

    def get_vars_fill(self, lvl):
        """
        @brief constructs the list of variable fill values separated by spaces
        @param lvl level of the product : 1 -> no time averaging ; 2 -> average all the year by day range
        @return the list of variable fill values as a string
        """
        s = "9999.999999 "  # end time
        s += str(self.var_fills_conc) + " "  # T
        s += "9.999 "
        s += str(self.var_fills_conc) + " "  # P
        s += "9.999 "
        for me in self.lev1_mz:
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
            s += str(self.var_fills_conc)
            s += " "
            s += "9.999 "
        return s.strip()

    def get_vars_desc(self, lvl):
        """
        @brief constructs the variables long names and column names
        @param lvl level
        @return (vars_long_name, vars_tags)
        """
        vars_long_name = ""
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
        vars_long_name += (
            "{0:s}, {1:s}, Location=inlet, Matrix=instrument".format("temperature", "K")
            + os.linesep
        )
        vars_long_name += "numflag_temperature, no unit" + os.linesep
        vars_long_name += (
            "{0:s}, {1:s}, Location=inlet, Matrix=instrument".format("pressure", "hPa")
            + os.linesep
        )
        vars_long_name += "numflag_pressure, no unit" + os.linesep
        for var_id in self.lev1_mz_name:
            vars_long_name += var_id + ", ppb" + os.linesep
            vars_long_name += "numflag_" + var_id + ", no unit" + os.linesep
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
        vars_long_name = vars_long_name.strip()

        # tags
        vars_tag = ""

        for var_id in self.var_ids_env_tag:
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
            var_id = var_id.replace(" ", "_")
            vars_tag += "{0:<20s}".format(var_id)
            vars_tag += "{0:<20s}".format("numflag_" + var_id)

        for var_id in self.lev1_mz:
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
            vars_tag += "{0:<20s}".format(var_id)
            vars_tag += "{0:<20s}".format(var_id + "_numflag")
        vars_tag = vars_tag.strip()

        return (vars_long_name, vars_tag)

    def get_header(self, t_start, lvl, v_start_acq, year):
        """
        @brief constructs the header file
        @param t_startTrue
        @param lvl level of the product : 1 -> no time averaging ; 2 -> average all the year by day range
        @param v_start_acq start acquisition time vector
        @param year year for L2 synthesis
        @return the output file name and template header filled
        """
        start_time, start_day = self.get_acq_time_tags(t_start, lvl, year)
        d = datetime.strptime(start_day + " 00:00:00", "%Y %m %d 00:00:00")
        ts_day_start = calendar.timegm(d.timetuple())

        # - production time
        prod_time, rev_day = self.get_prod_time_tags()

        # set metdata based on level 0
        self.station = self.reader.attributes["Station code"]
        self.instrument = self.reader.attributes["Instrument type"]
        self.matrix = self.reader.attributes["Matrix"]
        self.instr_name = self.reader.attributes["Instrument name"]
        self.model = self.reader.attributes["Instrument model"]
        self.brand = self.reader.attributes["Instrument manufacturer"]
        self.sn = self.reader.attributes["Instrument serial number"]
        self.method_ref = self.reader.attributes["Method ref"]
        self.lab = self.reader.attributes["Laboratory code"]
        self.platform = self.reader.attributes["Platform code"]
        self.gaw_id = self.reader.attributes["Station GAW-ID"]
        self.wdca_id = self.reader.attributes["Station WDCA-ID"]
        self.site = self.reader.attributes["Station name"]
        self.latitude = self.reader.attributes["Measurement latitude"]
        self.longitude = self.reader.attributes["Measurement longitude"]
        self.altitude = self.reader.attributes["Measurement altitude"]
        self.meas_height = self.reader.attributes["Measurement height"]
        std_temp = self.reader.attributes["Volume std. temperature"]
        std_pres = self.reader.attributes["Volume std. pressure"]
        self.wmo_land_use = self.reader.attributes["Station land use"]
        self.wmo_setting = self.reader.attributes["Station setting"]
        self.gaw_type = self.reader.attributes["Station GAW type"]
        self.wmo_region = self.reader.attributes["Station WMO region"]

        # set metdata for level 1
        self.component = "ion_concentrations"
        self.normal_comment_line = 58
        self.unit = "ppb"
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
        # - output filename
        fname = self.get_outfname(start_time, prod_time, lvl, v_start_acq, year)
        fname_ext = "nas"

        # - time resolution tags
        period_tag, dt_tag, dt, dt_raw_tag = self.get_time_tags(lvl, v_start_acq, year)

        # set type code + dt value : TU time regulary spaced, TI : irregulary
        set_type_code = "TU"  # should always be
        if lvl in [0, 1]:
            # default values
            #             dt_tag = dt_raw_tag
            # 5min in decimal day unit
            #             dt = int(dt_tag.replace("mn", "")) / float(24 * 60)
            # sometimes, records are missing -> TI + dt not constant (so set to 0)
            # -> check constantness
            diff = numpy.diff(v_start_acq)
            if (diff.size > 0) and not numpy.all(diff == diff[0]):
                set_type_code = "TI"
                dt = 0
        #         elif lvl in [2]:
        #             #             dt_tag = "1h"
        #             dt = 1 / float(24)  # 1h in decimal day unit
        #             period_tag = "1y"

        # - number of variables
        nvars = self.get_nvars(lvl)

        # - scale factors
        scale_factors = ("1. " * nvars).strip()

        # variables fill values
        vars_fill = self.get_vars_fill(lvl)

        # variable long names + tags
        vars_long_name, vars_tag = self.get_vars_desc(lvl)

        # standard conditions of acquisition
        std_temp = self.acq_temp
        std_pres = self.acq_pres
        if lvl in ["1", "2"]:
            std_temp = "%.2f" % Standardizer.T_STD
            std_pres = "%.2f" % Standardizer.P_STD

        # - main param unit depends of level
        sz_header_base = (
            self.normal_comment_line + 14
        )  # number of lines of the header without vars
        header_size = sz_header_base + nvars  # number of lines of the header

        # --- fill-in the header
        header = Converter.Converter.get_header(
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
            self,
            lvl,
            header_size=header_size,
            start_day=start_day,
            rev_day=rev_day,
            start_time=start_time,
            prod_time=prod_time,
            fname=fname,
            fname_ext=fname_ext,
            lab=self.lab,
            station=self.station,
            platform=self.platform,
            gaw_id=self.gaw_id,
            wdca_id=self.wdca_id,
            site=self.site,
            latitude=self.latitude,
            longitude=self.longitude,
            altitude=self.altitude,
            meas_altitude=self.altitude + self.meas_height,
            std_temp=std_temp,
            std_pres=std_pres,
            wmo_land_use=self.wmo_land_use,
            wmo_setting=self.wmo_setting,
            gaw_type=self.gaw_type,
            wmo_region=self.wmo_region,
            period_tag=period_tag,
            dt_tag=dt_tag,
            dt=dt,
            dt_raw_tag=dt_raw_tag,
            submitter=self.submitter,
            component=self.component,
            matrix=self.matrix,
            unit=self.unit,
            instrument=self.instrument,
            brand=self.brand,
            model=self.model,
            sn=self.sn,
            nvars=nvars,
            scale_factors=scale_factors,
            vars_fill=vars_fill,
            vars_long_name=vars_long_name,
            vars_tag=vars_tag,
            set_type_code=set_type_code,
            method_ref=self.method_ref,
            instr_name=self.instr_name,
            normal_comment_line=self.normal_comment_line,
        )

        return fname, header, start_day, ts_day_start

    def get_parser(app, SRC_DIR):
        """
        Sets the available program options, and their default values
        """
        parser = OptionParser()

        parser.usage = "python3 " + app + ".py [options] <conf-file> <infiles>" + os.linesep
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
        parser.usage += os.linesep
        parser.usage += "with :" + os.linesep
        parser.usage += (
            "\t<infiles>  full path to the trace gazes file(s) to convert" + os.linesep +
            "\t<conf-file>  full path of the configuration file" + os.linesep
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
        )

        parser.add_option(
            "-p",
            "--prod-id",
            action="store",
            default=None,
            type="str",
            dest="prod_id",
            help="Product_id as <STATION>_<INSTRUMENT>_<LEVEL>",
        )

        parser.add_option(
            "-o",
            "--out-dir",
            action="store",
            default=SRC_DIR + "/tests/results",
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
            type="str",
            dest="outdir",
            help="Output directory",
        )

        parser.add_option(
            "-d",
            "--cache_dir",
            action="store",
            default=SRC_DIR + "/tests/cache/",
            type="str",
            dest="cache_dir",
            help="Directory of the temporary files",
        )

        infiles = [sys.argv[-1]]
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed

        # check the number of command line arguments
        if len(sys.argv) < 2:
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
            parser.print_help()
            raise ValueError("Missing command-line arguments")
        return infiles, conf, parser.parse_args()
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
def main():
    SRC_DIR = os.path.join(
        os.path.dirname(os.path.abspath(__file__)), "..", ".."
    infiles, conf, (options, arg) = PtrmsConverter.get_parser(__APP__, SRC_DIR)
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed
    prog = PtrmsConverter(options.prod_id)
    prog.process(infiles, conf, options.cache_dir, options.outdir, levels=[1])
Aurelien Chauvigne's avatar
Aurelien Chauvigne committed


if __name__ == "__main__":
    main()