# -*- coding: utf-8 -*- """ Description : ------------- ACTRIS converter abstract class Limitations : ------------- - Author : -------- CGTD-ICARE/UDEV Nicolas PASCAL License : --------- This file must be used under the terms of the CeCILL. This source file is licensed as described in the file COPYING, which you should have received as part of this distribution. The terms are also available at http://www.cecill.info/licences/Licence_CeCILL_V2-en.txt History : -------- v0.0.0 : 2013/09/30 - creation """ import os import sys import time from datetime import datetime import calendar from optparse import OptionParser import numpy pkgdir = os.path.dirname(__file__) d = os.path.abspath(os.path.join(pkgdir, "..")) if d not in sys.path: sys.path.append(os.path.join(d)) from misc.FileTools import check_file, check_dir from misc.Percentile import percentile from misc.TimeConverter import NSEC_DAY from product.AcquisitionErrorLogger import AcquisitionErrorLogger from product.Product import Product __VERSION__ = "0.0.0" __APP__ = os.path.splitext(os.path.basename(__file__))[0] __DEBUG__ = True __THROW__ = True __SHOW_BAD_INPUTS__ = True __SHOW_OUTPUTS__ = True class Converter(object): """ Generic interface for data conversion """ QA_FLAGS_ID = "QA_Flags" QA_AGG_FLAGS_ID = "QA_Agg_Flags" # aggreageted flags, L2 QA_FLAGS_VALID = 0.000 QA_FLAGS_FILL = 9.999 QA_FLAGS_MISSING = 0.999 DEFAULT_FILL = 9999.999 # specific atributes. Must be implemented by concrete classes component = None matrix = None instrument = None brand = None model = None sn = None unit = None component_extra_tag = None submitter = "Pascal, Nicolas, nicolas.pascal@univ-lille.fr, AERIS/ICARE Data and Services Center, AERIS/ICARE,, Universite Lille 1, Bat. M3 Extension - Avenue Carl Gauss, 59650, Villeneuve d'Ascq, France" def __init__(self, prod_id, reader_interface, year=None, period=None, levels=[1, 2, 3], acq_err_log=None): """ @brief constructor @param prod_id input product ID @param reader_class data reader class @param year year of the eventual yearly synthesis @param period period id in case of configuration change over the year ( @param levels list of supported levels @param acq_err_log file that contains the history of acquisition errors with its description, can also be a list of string """ object.__init__(self) self.prod_id = prod_id self.reader_interface = reader_interface self.year = year self.period = period self.levels = levels self.acq_err_logger = None # one log file only self.acq_err_loggers = None # multiple log for one dataset. Should be merged if acq_err_log is not None: if isinstance(acq_err_log, str): check_file(acq_err_log) self.acq_err_logger = AcquisitionErrorLogger(acq_err_log) elif isinstance(acq_err_log, list): # be careful, only the last file set in acq_err_logger. Need a # specific treatment self.acq_err_loggers = [] for flog in acq_err_log: check_file(flog) self.acq_err_loggers.append(AcquisitionErrorLogger(flog)) self.infiles = None # default items # self.instr_name = "{0:s}_{1:s}".format( # self.brand, self.model) # self.method_ref = "{0:s}_{1:s}_analysis".format( # self.lab, self.component) # remove dummy spaces # brand = self.brand.replace(" ", "_") # model = self.model.replace(" ", "_") # self.instr_name = "{instrument:s}_{brand:s}_{model:s}".format( # instrument=self.instrument, brand=brand, model=model) # self.method_ref = "{lab:s}_{brand:s}_{model:s}_{gaw_id:s}".format( # lab=self.lab, brand=brand, model=model, gaw_id=self.gaw_id) def __str__(self): """ print out object content """ s = str(self.__class__) + os.linesep s += "levels=" + str(self.fname) + os.linesep return s def get_readers(self, infiles): """ @brief return all the reader interfaces, instanciated with infiles @param a list of input files @return the instanciated readers """ return [self.reader_interface(infile) for infile in infiles] def build_outfname(self, site, start_time, prod_time, component, matrix, component_extra_tag, period, dt, lvl): """ @brief construct the standard actris filename for this granule and return it @param start_time start acquisition time in the format %Y%m%d%H%M%S" @param prod_time production time in the format %Y%m%d%H%M%S" @param lvl level of the product : 1 -> no time averaging ; 2 -> average all the year by day range """ # lvl_tag = "" # if lvl in [0, 1]: lvl_tag = "lev%s" % lvl if component_extra_tag is not None and len(component_extra_tag.strip()) > 0: component_extra_tag = "_" + component_extra_tag else: component_extra_tag = "" # fix eventual spaces instrument = self.instrument.replace(" ", "_") instr_name = self.instr_name.replace(" ", "_") method_ref = self.method_ref.replace(" ", "_") fname = "{site:s}.{start_time:s}.{prod_time:s}.{instrument:s}.{component:s}.{matrix:s}.{period:s}.{dt:s}.{lab:s}_{instr_name:s}.{method_ref:s}.{lvl_tag:s}.nas".format( site=site, start_time=start_time, prod_time=prod_time, instrument=instrument, component=component, component_extra_tag=component_extra_tag, matrix=matrix, period=period, dt=dt, method_ref=method_ref, instr_name=instr_name, lvl_tag=lvl_tag, lab=self.lab) return fname def get_outfname(self, start_time, prod_time, lvl, v_start_acq, year=None): """ @brief construct the standard actris filename for this granule and return it @param start_time start acquisition time in the format %Y%m%d%H%M%S" @param prod_time production time in the format %Y%m%d%H%M%S" @param lvl level of the product : 1 -> no time averaging ; 2 -> average all the year by day range """ period_tag, dt_tag = self.get_time_tags(lvl, v_start_acq, year)[:2] # period_tag, dt_tag = self.get_time_tags(lvl, year)[:2] return self.build_outfname(self.station, start_time, prod_time, self.component, self.matrix, self.component_extra_tag, period_tag, dt_tag, lvl) def get_acq_time_tags(self, t_start, lvl, year=None, use_real_start_time=False): """ @brief set the acquisition time tags in use in output files @param t_start acquisition start time in UNIX epoch time @param lvl level on output file @param year year of synthesis if level = 2 @return (start_time, start_day) where start_time the first measurement time ("%Y%m%d%H%M%S") and start_day if the reference time axis day (%Y %m %d) """ start_time = None start_day = None if (lvl == 2 or lvl == "2" or lvl == "2-15min") and not use_real_start_time: if year is None: raise ValueError( "For level synthesis, a year must be specified") # yearly data : start is the very first second of the year start_time = "%d0101000000" % year start_day = "%d 01 01" % year else: # advanced data : real first measurement time # dt = datetime.fromtimestamp(t_start) # start_time = time.strftime("%Y%m%d%H%M%S", dt.timetuple()) # print("\nstart_time : ",start_time) # start_day = time.strftime("%Y 01 01", dt.timetuple()) if year is not None: start_time = time.strftime( "{0:s}%m%d%H%M%S".format(str(year)), time.gmtime(numpy.round(t_start))) start_day = "{0:s} 01 01".format(str(year)) else: start_time = time.strftime( "%Y%m%d%H%M%S", time.gmtime(numpy.round(t_start))) start_day = time.strftime("%Y 01 01", time.gmtime(t_start)) return start_time, start_day def get_prod_time_tags(self): """ @brief set the production time tags in use in output files @return (prod_time, prod_day) where prod_time the production time ("%Y%m%d%H%M%S") and prod_day if the production (ie revision) day (%Y %m %d) """ prod_time = time.strftime("%Y%m%d%H%M%S", time.gmtime(time.time())) prod_day = "%s %s %s" % (prod_time[:4], prod_time[4:6], prod_time[6:8]) return prod_time, prod_day def get_header_tpl(self, lvl=None): """ @brief return the header template to use @param lvl output data level. If none, don't search a specific one @return the header as a string template """ # - set the header template to use # if a destination product ID is not explicitly set, # it is considered that only one input is set for one output product if hasattr(self, "dst_prod_id"): tpl_prod_id = self.dst_prod_id else: tpl_prod_id = self.prod_id suffix = ".actris_header" # try it without level extension fheader = os.path.join(os.path.dirname(__file__), "..", "cfg", "actris_header", tpl_prod_id + suffix) if not os.path.exists(fheader): # try a level specific one if lvl is not None: fheader += "_l%d" % lvl # - read it header = "" f = open(fheader) try: header = f.read() finally: f.close() return header def get_header(self, lvl=None, **kwargs): """ @brief fill in the header template and return it as a string It replace all tags in template like "${tag}" or "$tag" by the corresponding value set in kwargs @param lvl requested data level @param kwargs dictionnary of substitution strings """ sout = self.get_header_tpl(lvl) # add level to substitutions if lvl is not None: kwargs["lvl"] = lvl if "instr_name" not in kwargs or kwargs["instr_name"] is None: # set a default instrument name instr_name = "{0:s}_{1:s}_{2:s}_ambient".format( str(kwargs["brand"]).replace(" ", "-"), str(kwargs["model"]).replace(" ", "-"), str(kwargs["gaw_id"]).replace(" ", "-")) kwargs["instr_name"] = instr_name if "method_ref" not in kwargs or kwargs["method_ref"] is None: # set a default method ref method_ref = "{0:s}_{1:s}_{2:s}_{3:s}_lev{4:s}".format( str(kwargs["lab"]).replace(" ", "-"), str(kwargs["instrument"]).replace(" ", "-"), str(kwargs["brand"]).replace(" ", "-"), str(kwargs["model"]).replace(" ", "-"), str(lvl)) kwargs["method_ref"] = method_ref # process to substitutions # for k, v in kwargs.iteritems(): for k, v in iter(list(kwargs.items())): sout = sout.replace("${" + k + "}", str(v)) sout = sout.replace("$" + k + "", str(v)) # print("\n sout : ", sout) return sout def check_infiles(self, infiles, check_fname_year=True, prod_id=None): """ @brief validate the given input files It checks if they all match the good file pattern and if they are all of the same year than the first valid one. The files that do not match these critera are ignored and a message in printed @param infiles a list of input files (or a string) @param check_year_fname if set, check if the filename has the same timestamp than the synthesis year. Should be disabled if one daily file does not contain exactly one day of data for instance @param prod_id if None, use the default one, if set,n consider this custom one @return the list of valid input files """ # make sure it is a list if isinstance(infiles, str): infiles = [infiles] # set the prod id to consider if prod_id is None: # use converter main product prod_id = self.prod_id # check pattern product_identifier = self.reader_interface.product_identifier v_out = [] year = None for infile in iter(infiles): check_file(infile) f = os.path.basename(infile) m = product_identifier.get_match(f, prod_id) if m is None: print( ("WARNING: file %s does not match product %s -> ignored" % (infile, prod_id))) elif check_fname_year is True: _year = int(m.group("year")) if year is None: # first valid file -> set the synthesis year year = _year if _year != year: print( ("WARNING: file %s is not of the same year %d than previous files -> ignored" % (infile, year))) else: # file is valid v_out.append(infile) else: v_out.append(infile) return v_out def load_indata(self, infiles, var_ids, reader_interface=None, databuf=None, *args, **kwargs): """ @brief load the requested datasets from the given input files, concatenates them file by file and return thme in a buffer dictionnary that uses variable IDs as keys and the read data as values @param infiles the list of files to read @param var_ids the list of variables to read @param reader_interface the reader to use. If none set, use the default one @param databuf the output databuffer. If none, a new one is created, if not, new data are append to it @param args other arguments to pass to the reader interface constructor @return a buffer dictionnary """ # make sure infiles is a list if isinstance(infiles, str): infiles = [infiles] # if no databuf set, init a new one if databuf is None: databuf = {} # multifile accumulator # if no custom reader_interface passed as argument, use the defaut one if reader_interface is None: reader_interface = self.reader_interface file_databuf = {} # one file accumulator for infile in iter(infiles): # print('file : ',infile) invalid_file = False file_databuf = {} try: # load variables of one file reader = reader_interface(infile) for var_id in iter(var_ids): data = reader.get_data(var_id) file_databuf[var_id] = data except Exception as e: msg = "WARNING: Error while reading file " + \ infile + " : " + str(e) + " -> ignored" print(msg) invalid_file = True # raise # accumulate multiple files data only if whole file is clean if not invalid_file: for var_id in iter(var_ids): if var_id not in databuf: databuf[var_id] = file_databuf[var_id] else: # print databuf[var_id].shape # print file_databuf[var_id].shape databuf[var_id] = numpy.concatenate( (databuf[var_id], file_databuf[var_id])) return databuf def trim_end_acq(self, v_start_acq, v_end_acq): """ @brief trims the end of acquisition time to the next acquisitio start if needed ometimes an end of acquisition time can be bigger than time of the next acq (mainly duer to rounding effects or time resolution setting change) @param v_start_acq acquisition start times, increasing order @param v_end_acq acquisition end times, increasing order @return the eventually corretced end_acq_time """ # - delta that can be observed sometimes between 2 acquisitions in end_time v_dt = numpy.empty_like(v_start_acq) v_dt[:-1] = numpy.diff(v_start_acq) v_dt[-1] = v_dt[-2] # hypothesis : same dt for 2 last records # - change end acq time when start of next record is earlier is_end_acq_high = ((v_end_acq - v_start_acq) > v_dt) if numpy.any(is_end_acq_high): v_end_acq[is_end_acq_high] = v_start_acq[ is_end_acq_high] + v_dt[is_end_acq_high] return v_end_acq def get_stats_mean(self, indata, s_infill, s_time): """ @brief compute the mean of valid input values in a time subset @param indata native resolution input data @param s_infill mask of invalid input values @param s_time mask of input values that are in the time subset @return the mean of valid input values in the requested time range. None if none found """ t = indata[s_time & ~s_infill & numpy.isfinite(indata)] # print("\n t : ", t) if t.size > 0: # print("\n outdata : ", t.mean()) return t.mean() return None def set_stats_mean(self, indata, s_infill, s_time, iout, outdata, outfill=None): """ @brief compute the mean of valid input values in a time subset and fill the corresponding output record(s) @param indata native resolution input data @param s_infill mask of invalid input values @param s_time mask of input values that are in the time subset @param iout indice of the output record @param outdata output array of means @return the mean of valid input values in the requested time range. None if none found """ mean = self.get_stats_mean(indata, s_infill, s_time) if mean is not None: outdata[iout] = mean elif outfill is not None: # not data in time range and outfill value set outdata[iout] = outfill @staticmethod def get_valid_flags(): """ @brief returns the list of validity flags """ return numpy.array([0., 0.147, 0.559]) @staticmethod def get_valid_l0_only_flags(): """ @brief returns the list of validity flags, only in level 0 """ return numpy.array([0.147]) @staticmethod def get_missing_flags(): """ @brief returns the list of missing data flags """ return numpy.array([0.999, 0.980]) @staticmethod def get_invalid_flags(): """ @brief returns the list of invalid data flags (but not missing) """ return numpy.array([0.456, 0.459, 0.452, 0.460, 0.659]) def set_l2_flags(self, indata, s_time, iout, outdata, outfill=QA_FLAGS_MISSING, infill=0.999): """ @brief aggregates hourly level 2 flags, based on full resolution ones @param indata native resolution input data flags @param s_time mask of input values that are in the time subset @param iout indice of the output record @param outdata output array of hourly flags @param outfill missing data flag @param infill input missing data flag """ _trace_ = False if _trace_: print ("--- set_l2_flags ---") # identify number of triplets nfields = (str(outfill)[::-1].find(".")) / 3 #print ("nfields:", nfields) # unique flags one time range if _trace_: print ("indata[s_time]:", indata[s_time], end=" ") hourly_flags = numpy.unique(indata[s_time]) if _trace_: print ("hourly_flags:", hourly_flags, end=" ") # check if current flag is wide enough to hendle all triplets is_non_zero_non_missing = (hourly_flags != 0) & (hourly_flags != infill) is_warning_level0_only = numpy.isin(hourly_flags, self.get_valid_l0_only_flags()) if (hourly_flags[is_non_zero_non_missing].size > nfields): raise ValueError("Flag field ({:f}) is to small to handle all triplets {:s}".format(outfill, str(hourly_flags[is_non_zero_non_missing]))) if _trace_: print ("is_non_zero_non_missing:", hourly_flags[is_non_zero_non_missing]) #print (outfill) #aaa hourly_flags = numpy.unique(indata[s_time]) #is_missing = (hourly_flags == outfill) valid_flags = set(self.get_valid_flags()) is_valid = numpy.isin(hourly_flags, valid_flags) if numpy.all(hourly_flags == infill): # no data -> empty outdata[iout] = outfill elif numpy.all(hourly_flags == 0.): # all valid -> 0 outdata[iout] = 0. else: # general case, aggregates flags hourly_flags = hourly_flags[is_non_zero_non_missing & ~is_warning_level0_only] hourly_flags.sort() if _trace_: print ("> general case ", hourly_flags) flag_val = "0." i_triplet = 0 for val in hourly_flags: flag_val += "{:d}".format(int(round(val*1000))) outdata[iout] = float(flag_val) if _trace_: print ("iout:", iout, "out:", outdata[iout]) def get_data_completeness_flag(self, v_start_acq_in, v_end_acq_in, v_start_acq_out, v_end_acq_out, is_valid): """ @brief build the hourly data completeness flags, based on the valid full resolution time converted in each hour @param v_start_acq_in full resolution start time of acquisitions @param v_end_acq_in full resolution end time of acquisitions @param v_start_acq_out full hourly start time of acquisitions @param v_end_acq_out full hourly end time of acquisitions @param is_valid full res mask of valid acquisitions @return a vector of int flag values, hourly resolution """ # print("is_valid", is_valid) outdata = numpy.zeros(v_start_acq_out.shape, dtype=int) sz = v_start_acq_out.size for i in range(sz): # select records in out time range ts_min, ts_max = v_start_acq_out[i], v_end_acq_out[i] # s = ((v_end_acq_in > ts_min) & (v_start_acq_in < ts_max)) s = ((v_end_acq_in > ts_min) & (v_start_acq_in < ts_max)) if numpy.any(s): # - reference period, mostly 1 hour ts_min = v_start_acq_out[i] ts_max = v_end_acq_out[i] coarse_res_coverage = ts_max - ts_min # - compute period covered by valid measurements v_start_acq = v_start_acq_in[s & is_valid] v_end_acq = v_end_acq_in[s & is_valid] # trim start/end of hour time v_start_acq[v_start_acq < ts_min] = ts_min v_end_acq[v_end_acq > ts_max] = ts_max full_res_coverage = (v_end_acq - v_start_acq).sum() # - fraction of coarse res period covered by valid measurements coverage = (full_res_coverage / coarse_res_coverage) # print("v_start_acq", v_start_acq) # print("v_end_acq", v_end_acq) # print("coarse_res_coverage", coarse_res_coverage) # print("full_res_coverage", full_res_coverage) # print("coverage", coverage) # - set flags, see https://projects.nilu.no/ccc/flags/ # 390 less than 50% # 388 less than 66% # 392 less than 75% # 394 less than 90% flag_val = 0 if coverage <= 0.50: flag_val = 390 elif coverage <= 0.66: flag_val = 388 elif coverage <= 0.75: flag_val = 392 elif coverage <= 0.90: flag_val = 394 outdata[i] = flag_val return outdata def get_stats_min(self, indata, s_infill, s_time): """ @brief compute the min of valid input values in a time subset @param indata native resolution input data @param s_infill mask of invalid input values @param s_time mask of input values that are in the time subset @return the min of valid input values in the requested time range. None if none found """ t = indata[s_time & ~s_infill & numpy.isfinite(indata)] if t.size > 0: return t.min() return None def set_stats_min(self, indata, s_infill, s_time, iout, outdata): """ @brief compute the min of valid input values in a time subset and fill the corresponding output record(s) @param indata native resolution input data @param s_infill mask of invalid input values @param s_time mask of input values that are in the time subset @param iout indice of the output record @param outdata output array of mins @return the min of valid input values in the requested time range. None if none found """ vmin = self.get_stats_min(indata, s_infill, s_time) if vmin is not None: outdata[iout] = vmin def get_stats_max(self, indata, s_infill, s_time): """ @brief compute the max of valid input values in a time subset @param indata native resolution input data @param s_infill mask of invalid input values @param s_time mask of input values that are in the time subset @return the max of valid input values in the requested time range. None if none found """ t = indata[s_time & ~s_infill & numpy.isfinite(indata)] if t.size > 0: return t.max() return None def set_stats_max(self, indata, s_infill, s_time, iout, outdata): """ @brief compute the max of valid input values in a time subset and fill the corresponding output record(s) @param indata native resolution input data @param s_infill mask of invalid input values @param s_time mask of input values that are in the time subset @param iout indice of the output record @param outdata output array of maxs @return the max of valid input values in the requested time range. None if none found """ vmax = self.get_stats_max(indata, s_infill, s_time) if vmax is not None: outdata[iout] = vmax def get_stats_std(self, indata, s_infill, s_time): """ @brief compute the standard deviation of valid input values in a time subset @param indata native resolution input data @param s_infill mask of invalid input values @param s_time mask of input values that are in the time subset @return the std of valid input values in the requested time range. None if none found """ t = indata[s_time & ~s_infill & numpy.isfinite(indata)] if t.size > 0: return t.std() return None def set_stats_std(self, indata, s_infill, s_time, iout, outdata): """ @brief compute the standard deviation of valid input values in a time subset and fill the corresponding output record(s) @param indata native resolution input data @param s_infill mask of invalid input values @param s_time mask of input values that are in the time subset @param iout indice of the output record @param outdata output array of stds @return the std of valid input values in the requested time range. None if none found """ vstd = self.get_stats_std(indata, s_infill, s_time) if vstd is not None: outdata[iout] = vstd def get_stats_pc(self, indata, s_infill, s_time, invalidate_unique_val=False): """ @brief compute the 15.87 and 84.13 percentiles of valid input values in a time subset @param indata native resolution input data @param s_infill mask of invalid input values @param s_time mask of input values that are in the time subset @param invalidate_unique_val if True, return a fill value if only value is in the time range @return the mean 15.87 and 84.13 percentiles of valid input values in the requested time range. (None, None) if none found """ t = indata[s_time & ~s_infill & numpy.isfinite(indata)] if invalidate_unique_val and t.size == 1: return (None, None) if t.size > 0: return (percentile(t, 15.87), percentile(t, 84.13)) return (None, None) def set_stats_pc(self, indata, s_infill, s_time, iout, outdata_pc16, outdata_pc84, invalidate_unique_val=False): """ @brief compute the 15.87 and 84.13 percentiles of valid input values in a time subset and fill the corresponding output record(s) @param indata native resolution input data @param s_infill mask of invalid input values @param s_time mask of input values that are in the time subset @param iout indice of the output record @param outdata output array of means @param invalidate_unique_val if True, return a fill value if only value is in the time range @return the mean 15.87 and 84.13 percentiles of valid input values in the requested time range. (None, None) if none found """ pc16, pc84 = self.get_stats_pc( indata, s_infill, s_time, invalidate_unique_val) if pc16 is not None: outdata_pc16[iout] = pc16 # else: # outdata_pc16[iout] = numpy.nan if pc84 is not None: outdata_pc84[iout] = pc84 # else: # outdata_pc16[iout] = numpy.nan def aggregate_flag_val(self, indata): """ @brief aggregates L1 flag values into L2 @param indata native resolution input flags list @param s_infill mask of invalid input values @param s_time mask of input values that are in the time subset """ # if indata.size == 0: # # - no invalid in range # return self.QA_FLAGS_MISSING # select only meaningful flags in subset flag_vals = indata[numpy.isfinite(indata) & (indata != 0.) & (indata != self.QA_FLAGS_FILL)] if flag_vals.size == 0: # - all valid return self.QA_FLAGS_VALID # --- some relevant invalid found flag_vals = numpy.unique(flag_vals) flag_vals *= 1e3 # -> 0.XXX to XXX form # aggregates values agg_flag_val = 0 for ival, val in enumerate(flag_vals): agg_flag_val += int(val) * 10 ** (3 * ival) # -> XXX to 0.XXX form ndigit = len(str(agg_flag_val)) agg_flag_val = agg_flag_val * 10 ** (-(ndigit)) return agg_flag_val def get_invalid(self, indata, var_ids=None, end_time_included=False, start_time_included=False): """ @brief identify the invalid data values and return it in 2 dictionnary buffers : one with invalid, one with NaNs By default data are set as invalid if NaN or negative @param indata input data buffer @param var_ids list of variables to check. If None, all variables that have been loaded in indata will be checked @return 2 buffers : one with mask of invalids, one with mask of NaNs """ # - manually invalidated records invalid_acq = False if self.acq_err_logger is not None: invalid_acq = self.get_invalid_acq( indata[Product.START_ACQ_ID], indata[Product.END_ACQ_ID], end_time_included, start_time_included) # - automatic flags invalid_databuf = {} nan_databuf = {} if var_ids is None: var_ids = list(indata.keys()) for var_id in iter(var_ids): data = indata[var_id] is_nan_data = numpy.isnan(data) nan_databuf[var_id] = is_nan_data invalid_databuf[var_id] = is_nan_data | (data < 0) | invalid_acq return invalid_databuf, nan_databuf def get_invalid_acq(self, v_start_acq, v_end_acq=None, end_time_included=False, start_time_included=False, ignore_flags=None, shift_errlog_times=None): """ @brief build the mask and the values of acquisition errors as set in the error logger file @param v_start_acq vector of acquisition starts, using UNIX UTC EPOCH time @param v_end_acq vector of acquisition ends, using UNIX UTC EPOCH time @param end_time_included if True, end time is considered as invalid too @param ignore_flags list of flags that must be ignored from err log @param shift_errlog_times shift the times in err log by this number of seconds @return mask of invalidated records """ if self.acq_err_logger is not None: return self.acq_err_logger.get_invalid_acq(v_start_acq, v_end_acq, end_time_included, start_time_included, ignore_flags, shift_errlog_times) else: # all considered as valid return numpy.zeros(v_start_acq.shape, dtype=bool), numpy.empty(v_start_acq.shape, dtype=float) + numpy.nan @staticmethod def get_time_vars(start_day, v_start_acq_out, v_end_acq_out): """ @brief computes the output time axis variables in decimal days @param start_day time axis origin in format "YYYY MM DD". Most of the time, the 1st second of a year @param v_start_acq_out start of acquisition range, unix epoch time @param v_end_acq_out end of acquisition range, unix epoch time @return (v_ts_start, v_ts_end, v_start_year, v_end_year) where : - v_ts_start, v_ts_end are the start/end of acquisition in decimal days from ts_day_start; - v_start_year, v_end_year the matching years """ # compute start time in decimal days for file reference time # start of day in seconds d = datetime.strptime(start_day + " 00:00:00Z", "%Y %m %d %H:%M:%SZ") ts_day_start = (calendar.timegm(d.timetuple())) # - compute start time in decimal days for file reference time # measure start v_ts_start = v_start_acq_out - ts_day_start # sec from day start v_ts_start = v_ts_start / float(NSEC_DAY) # to days # - compute end time in decimal days for file reference time # measure end v_ts_end = v_end_acq_out - ts_day_start # sec from day end v_ts_end = v_ts_end / float(NSEC_DAY) # to days # for start, end in zip(v_start_acq_out, v_end_acq_out): # print "start/end", time.gmtime(start), time.gmtime(end) # - compute start time in decimal days for file reference time # measure start v_ts_start = v_start_acq_out - ts_day_start # sec from day start v_ts_start = v_ts_start / float(NSEC_DAY) # to days # measure start year v_start_year = numpy.array( [time.gmtime(ts).tm_year for ts in v_start_acq_out], numpy.uint16) # - compute end time in decimal days for file reference time # measure end v_ts_end = v_end_acq_out - ts_day_start # sec from day end v_ts_end = v_ts_end / float(NSEC_DAY) # to days # measure end year v_end_year = numpy.array( [time.gmtime(ts).tm_year for ts in v_end_acq_out], numpy.uint16) # make sure to control floating point rouding artefacts : # after the 7th decimal, we are under the second precision # v_ts_start -= (v_ts_start % 1e-7) # v_ts_end -= (v_ts_end % 1e-7) return (v_ts_start, v_ts_end, v_start_year, v_end_year) def get_time_axis_bounds(self, year, dt=3600): """ @brief return the start_times/end_times limits for the given year with a time difference of @a dt Leap years are treated @param year the year @param dt time lap in seconds. 3600s (1 hour) by default @return (v_start_time, v_end_time) in UNIX EPOCH time """ # - set output time axis of level 2 products # number of days in the year ndays = 365 if calendar.isleap(year): ndays += 1 # - start time of averaging range d_start = datetime(year, 1, 1, 0, 0, 0) ts_start = calendar.timegm(d_start.timetuple()) v_start_time = numpy.arange( ts_start, ts_start + (ndays * NSEC_DAY), dt) # - end time of averaging range v_end_time = v_start_time + dt return (v_start_time, v_end_time) def check_set_type_code(self, lvl, v_start_acq, dt=None): """ @brief identify the set type code and the diffence of time between 2 records set type code : TU -> time regulary spaced, TI -> irregulary dt : time between 2 recors in decimal day unit if regulary spaced, 0 if irregulary @param lvl level of product @param v_start_acq record time vector, unix epoch time @param dt presupposed dt. If None, try to guess it @return (set_type_code, dt) """ # set type code + dt value : TU time regulary spaced, TI : irregulary set_type_code = "TU" # should always be if lvl < 2: # sometimes, records are missing -> TI + dt not constant (so set to 0) # -> check constantness diff = numpy.diff(v_start_acq) if (diff.size == 0): raise ValueError( "ERROR : Only one record in file. Can't determine dt between 2 records") if not numpy.all(diff == diff[0]): set_type_code = "TI" dt = 0 elif dt is None: # guess dt dt = diff[0] / float(86400) else: # lvl 2 dt = 0.041667 # 1h in decimal day unit return (set_type_code, dt) def get_fill(self, var_id): """ @brief return the fill value to use for the given variable @param var_id a variable ID @return the fill value """ if var_id == self.QA_FLAGS_ID: return 9.999 else: return self.DEFAULT_FILL def normalize_time_range(self, start_acq, end_acq, is_end_time_ref=False): """ @brief make sure there is no overlap between records time range, mostly because of rounding effects @param start_acq start of acquisition time vector @param end_acq start of acquisition time vector @return the (eventually) corrected acquisition range vectors """ # if end of acquisition time higher than next record start time detected, set end time # to next record start time if is_end_time_ref == False: diff_acq_range = end_acq - start_acq diff_start_acq = numpy.diff(start_acq) s_end_acq_too_high = diff_acq_range[:-1] > diff_start_acq if numpy.any(s_end_acq_too_high): end_acq[:-1][s_end_acq_too_high] = start_acq[:- 1][s_end_acq_too_high] + diff_start_acq[s_end_acq_too_high] else: diff_acq_range = end_acq - start_acq diff_end_acq = numpy.diff(end_acq) s_start_acq_too_low = diff_acq_range[1:] > diff_end_acq if numpy.any(s_start_acq_too_low): start_acq[1:][s_start_acq_too_low] = end_acq[1:][s_start_acq_too_low] - diff_end_acq[s_start_acq_too_low] return start_acq, end_acq def process(self, infiles, outdir, year=None): """ @brief convert a list of files @param infiles the files to convert @param outdir output directory @param year year of the eventual yearly synthesis """ outdir = os.path.abspath(outdir) # validate input files infiles = self.check_infiles(infiles) # level 2 for lvl in [0, 1, 2]: # for lvl in [2]: start_time = time.time() if __DEBUG__: print(("*" * 20 + " LEVEL %d " % lvl + "*" * 20)) try: self.process_lvl(infiles, outdir, lvl, year) except Exception as e: if __DEBUG__: msg = " > " + str(e) + " : " + os.linesep + "ABORT" print(msg) if __THROW__: raise if __DEBUG__: print(("*" * 20 + " LEVEL %d" % lvl + " : " + time.strftime("%H:%M:%S ", time.gmtime(time.time() - start_time)) + "*" * 20)) def get_parser(app): """ Sets the available program options, and their default values """ parser = OptionParser() parser.usage = "python3 " + app + \ ".py [options] <fname> <outdir>" + os.linesep parser.usage += os.linesep parser.usage += "with :" + os.linesep parser.usage += "\t<fname> full path to the file to convert" + os.linesep parser.usage += "\t<outdir> output directory where will be wrote the ACTRIS files" + \ os.linesep parser.add_option("-y", "--year", action="store", default=None, type="int", dest="year", help="Year of the eventual Level 2 synthesis (Default : Disabled)") parser.add_option("-p", "--period", action="store", default=None, type="int", dest="period", help="Period in case of configuration change over the year (Default : Disabled)") # check the number of command line arguments if len(sys.argv) < 2: parser.print_help() raise ValueError("Missing command-line options") return parser def get_args(app, check_infiles=True): """ Check if the command line options are valid. If not, throws an exception describing the problem @param check_infiles is True, check file validity """ parser = get_parser(app) (options, args) = parser.parse_args() infiles = args[:-1] infiles.sort() if check_infiles: for infile in iter(infiles): check_file(infile) outdir = args[-1] check_dir(outdir) return infiles, outdir, options