Commit b87fe6a2 authored by Espen Sollum's avatar Espen Sollum
Browse files

Merge branch 'empa' into espen

parents 936be021 0d6208ef
from __future__ import absolute_import
import os
from types import MethodType
from pycif.utils import path
from pycif.utils.check import verbose
from .read_conc import read_conc
from .calc_init import calc_init
requirements = {'domain': {'name': 'FLEXPART', 'version': 'std', 'empty': False}}
import os
import pandas as pd
import xarray as xr
import numpy as np
from datetime import datetime, timedelta
import pycif.utils.check as check
from pycif.utils.netcdf import readnc
from pycif.utils.dates import j2d
def calc_init(self, model, dir_initconc, file_initconc, **kwargs):
""" Calculate background concentration contribution to observations
Args:
self: the background plugin
model: the model plugin
"""
subsimu_dates = model.subsimu_dates
path = os.path.join(dir_initconc,file_initconc)
file_baseline = subsimu_dates[0].strftime(path)
#'/store/empa/em05/projects/VERIFY/TM5-4DVAR/BASELINES_ECMWF/data/TM5-to-FLE_v1_CH4_%Y_stations.nc'
with Dataset(file_baseline) as bkg:
#find the station index in the baseline file
ids = [''.join([c.decode().lower() for c in name]) for name in bkg['identification'][:]]
station_index = np.argmax([station in name for name in ids])
# find the time index in the baseline file
ts = np.array([pd.Timestamp(t[0],t[1],t[2],t[3],t[4]) for t in bkg['time1'][:]])
try:
time_index = np.where(ts == row.Index)[0][0]
obs_bkg[obs_i] = bkg['CH4_c_trans'][time_index,station_index]
except IndexError:
print('HJM ERROR: time not found', row.Index)
#HJM TODO obs_bkgerr ?
obs_bkgerr[obs_i] = 0
import os
import pandas as pd
import xarray as xr
import numpy as np
from datetime import datetime, timedelta
import pycif.utils.check as check
from pycif.utils.netcdf import readnc
from pycif.utils.dates import j2d
def read_conc(self, model, dir_initconc, file_initconc, **kwargs):
pass
......@@ -72,7 +72,8 @@ def read_conc(self, model, dir_initconc, file_initconc, **kwargs):
xmod_hor = xmod[:, :, :].interp(coords={'lon' : model.domain.lonc_glob,
'lat' : model.domain.latc_glob},
method='linear',
kwargs={'fill_value' : None})
kwargs={'fill_value' : "extrapolate"})
# Interpolate to model grid (vertically)
# Use nearest neighbor below or above height of available data
......@@ -82,7 +83,7 @@ def read_conc(self, model, dir_initconc, file_initconc, **kwargs):
xmod_ver = xmod_hor.interp(coords={'height' : height_temp},
method='linear',
kwargs={'fill_value' : None})
kwargs={'fill_value' : "extrapolate"})
xmod_ver['height'] = np.array(model.domain.heights)
......
......@@ -9,3 +9,4 @@ from . import flexpart
from pycif.utils.classes.background import Background
Background.register_plugin('FLEXPART', 'nc', flexpart)
Background.register_plugin('TM5-4DVAR', 'nc', flexpart)
......@@ -44,7 +44,7 @@ subroutine read_header(filename, numpoint, ibdate, ibtime, releases, &
! character(len=256) :: flexversion
character(len=29) :: flexversion
character(len=13) :: flexversion
logical :: lexist
integer :: nageclass, imethod, nspec
integer :: loutstep, loutaver, loutsample
......
......@@ -8,11 +8,12 @@ import numpy as np
import glob
import re
from datetime import datetime, timedelta
from netCDF4 import Dataset
from . import flexpart_header
from . import mod_flexpart
from pycif.utils.dates import j2d
from pycif.utils.check import verbose
def read_flexpart_dir(subdir, nested=True, **kwargs):
""" Reads all FLEXPART grid_time files from a directory
......@@ -75,25 +76,29 @@ def read_flexpart_grid(subdir, file_name, fp_header, **kwargs):
file_dates = os.path.join(subdir, 'dates')
grid = np.ndarray((fp_header.numx, fp_header.numy, fp_header.maxngrid))
path_file = os.path.join(subdir, file_name)
if '.nc' in file_name:
grid_fp, ngrid, gtime_dt = read_grid_nc(path_file, file_dates, fp_header)
else:
grid_fp, ngrid, gtime = mod_flexpart.read_grid(
path_file, file_dates, fp_header.numx, fp_header.numy, \
fp_header.maxngrid, fp_header.xshift, fp_header.ndgrid, fp_header.trajdays)
# Convert grid times to datetime format
gtime_dt = []
# Convert FLEXPART julian days to datetime
for i in range(len(gtime)):
if gtime[i] == 0.:
break
gtime_dt.append(j2d(gtime[i]))
grid_fp, ngrid, gtime = mod_flexpart.read_grid(
path_file, file_dates, fp_header.numx, fp_header.numy, \
fp_header.maxngrid, fp_header.xshift, fp_header.ndgrid, fp_header.trajdays)
# Convert from ppt to ppmv or ppbv
# Convert s.m3/kg to s.m2/kg and apply numerical scaling
grid_fp = grid_fp/(fp_header.outheight[0]*numscale)
# Convert grid times to datetime format
gtime_dt = []
# Convert FLEXPART julian days to datetime
for i in range(len(gtime)):
if gtime[i] == 0.:
break
gtime_dt.append(j2d(gtime[i]))
# Generate the footprint dates as a series (for debugging), adding one hour
# gtime_dt.append(datetime.strptime(re.findall(r'\d+', file_name)[0], '%Y%m%d%H%M%S'))
......@@ -108,6 +113,30 @@ def read_flexpart_grid(subdir, file_name, fp_header, **kwargs):
return grid_fp, gtime_dt, ngrid
def read_grid_nc(path_file, file_dates, fp_header):
with Dataset(path_file) as nc:
#spec001 has dimension nageclass, numpoint, time, level, lat, lon
# Assume that there is 1 numpoint/station per file
shape = nc['spec001'].shape
if shape[0]>1 or shape[1]>1:
verbose('WARNING: There are more than 1 station in the flexpart file.')
if shape[3]>1:
verbose('INFO: Select the bottom layer of the grid')
grid = nc['spec001'][0,0,:,0,:,:] # time, lat, lon
# swap axes to get it to lon, lat, time
grid_fp = np.swapaxes(grid,0,-1)
times = np.sort(nc['time'][:])
enddate = datetime.strptime(nc.getncattr('iedate'),'%Y%m%d')
gtime = []
for t in times:
gtime.append(enddate + timedelta(seconds=int(t)))
grid_fp *= 1e12 #This is applied in mod_flexpart
return grid_fp, len(gtime), gtime
def read_init(subdir, file_name, nx, ny, nz, **kwargs):
""" Reads FLEXPART grid_initial file.
......
......@@ -7,7 +7,7 @@ from pycif.utils import path
from pycif.utils.check import verbose
from pycif.utils.datastores.dump import dump_datastore, read_datastore
from .check import check_inputs
from netCDF4 import Dataset
def obsoper(self, inputs, mode,
run_id=0, datei=datetime.datetime(1979, 1, 1),
......@@ -73,36 +73,45 @@ def obsoper(self, inputs, mode,
if mode == 'fwd':
controlvect = inputs
obsvect.datastore.loc[:, 'sim'] = np.nan
# Various initializations
# Get flexpart headers
subdir = subsimu_dates[0].strftime("%Y%m")
fp_header_glob = model.utils.flexpart_header.Flexpartheader()
try:
fp_header_glob.read_header(
os.path.join(model.run_dir_glob, obsvect.datastore.head(1)['station'][0].decode().
upper(),subdir, 'header'))
except AttributeError as e:
fp_header_glob.read_header(
os.path.join(model.run_dir_glob, obsvect.datastore.head(1)['station'][0].
upper(),subdir, 'header'))
subdir_fmt = getattr(model, "subdir", "%Y%m")
subdir = subsimu_dates[0].strftime(subdir_fmt)
if model.plugin.nested:
fp_header_glob = model.utils.flexpart_header.Flexpartheader()
try:
fp_header_nest = model.utils.flexpart_header.Flexpartheader()
fp_header_nest.read_header(
os.path.join(model.run_dir_nest, obsvect.datastore.head(1)['station'][0].
decode().upper(), subdir, 'header_nest'))
fp_header_glob.read_header(
os.path.join(model.run_dir_glob, obsvect.datastore.head(1)['station'][0].decode().
upper(),subdir, 'header'))
except AttributeError as e:
fp_header_nest.read_header(
os.path.join(model.run_dir_nest, obsvect.datastore.head(1)['station'][0].
upper(), subdir, 'header_nest'))
fp_header_glob.read_header(
os.path.join(model.run_dir_glob, obsvect.datastore.head(1)['station'][0].
upper(),subdir, 'header'))
header_name = "header_nest"
else:
header_name = "header"
try:
fp_header_nest = model.utils.flexpart_header.Flexpartheader()
fp_header_nest.read_header(
os.path.join(model.run_dir_nest, obsvect.datastore.head(1)['station'][0].
decode().upper(), subdir, header_name))
except AttributeError as e:
fp_header_nest.read_header(
os.path.join(model.run_dir_nest, obsvect.datastore.head(1)['station'][0].
upper(), subdir, header_name))
# Trajectory life time
trajdays = fp_header_glob.trajdays
trajdays = fp_header_nest.trajdays
# Get the domain definition
species = getattr(controlvect.components, 'fluxes').parameters.attributes[0]
tracer = getattr(getattr(controlvect.components, 'fluxes').parameters, species)
......@@ -151,8 +160,9 @@ def obsoper(self, inputs, mode,
for obs_i, row in enumerate(obsvect.datastore.itertuples()):
# Subdirectory for FLEXPART footprints
subdir = row.Index.strftime("%Y%m")
subdir = row.Index.strftime(subdir_fmt)
# For debugging
obs_check[obs_i] = obs_i
......@@ -161,19 +171,32 @@ def obsoper(self, inputs, mode,
try:
runsubdir_nest = os.path.join(
model.run_dir_nest, station.decode().upper(), subdir)
runsubdir_glob = os.path.join(
model.run_dir_glob, station.decode().upper(), subdir)
if model.plugin.nested:
runsubdir_glob = os.path.join(
model.run_dir_glob, station.decode().upper(), subdir)
except AttributeError as e:
runsubdir_nest = os.path.join(
model.run_dir_nest, station.upper(), subdir)
runsubdir_glob = os.path.join(
model.run_dir_glob, station.upper(), subdir)
file_date = row.Index.strftime('%Y%m%d%H%M%S')
if model.plugin.nested:
runsubdir_glob = os.path.join(
model.run_dir_glob, station.upper(), subdir)
file_fmt = '%Y%m%d%H%M%S'
model_fmt = getattr(model, "format", 'bin')
if model_fmt == 'bin':
file_fmt += '_001'
elif model_fmt == 'nc':
file_fmt += '.nc'
if model.plugin.nested:
file_fmt_glob = 'grid_time_' + file_fmt
file_fmt = 'grid_time_nest_' + file_fmt
else:
file_fmt = 'grid_time_' + file_fmt
# Read nested grids
file_name = 'grid_time_nest_' + file_date + '_001'
file_name = row.Index.strftime(file_fmt)
# If file is not found, continue
if not os.path.isfile(os.path.join(runsubdir_nest, file_name)):
verbose("WARNING: file not found: " + os.path.join(runsubdir_nest, file_name))
......@@ -181,43 +204,48 @@ def obsoper(self, inputs, mode,
grid_nest, gtime, ngrid = model.utils.read.read_flexpart_grid(
runsubdir_nest, file_name, fp_header_nest, numscale=model.numscale)
grid_nest *= model.coeff*model.mmair/model.molarmass
# Read global footprints
# TODO read correction factor dry air
file_name = 'grid_time_' + file_date + '_001'
if not os.path.isfile(os.path.join(runsubdir_glob, file_name)):
verbose("Warning: file not found, ", os.path.join(runsubdir_glob, file_name))
continue
grid_glob, gtime_glob, ngrid_glob = model.utils.read.read_flexpart_grid(
runsubdir_glob, file_name, fp_header_glob, numscale=model.numscale)
if model.plugin.nested:
# Read global footprints
# TODO read correction factor dry air
grid_glob *= model.coeff*model.mmair/model.molarmass
file_name = row.Index.strftime(file_fmt_glob)
if not os.path.isfile(os.path.join(runsubdir_glob, file_name)):
verbose("Warning: file not found, ", os.path.join(runsubdir_glob, file_name))
continue
# Background contribution from fluxes outside domain
hbkg = np.sum(grid_glob[:, :, 0:ngrid_glob], axis=2)
hbkg[ix1:ix2, iy1:iy2] = 0.0
# Index to state vector
ind = np.argmin(np.abs(tracer.dates[0:-1] - gtime_glob[0]))
grid_glob, gtime_glob, ngrid_glob = model.utils.read.read_flexpart_grid(
runsubdir_glob, file_name, fp_header_glob, numscale=model.numscale)
grid_glob *= model.coeff*model.mmair/model.molarmass
# TODO: calculate on first iteration ,then store?
obs_bkg[obs_i] = np.sum(hbkg[:,:].T*controlvect.flx_bkg[ind,:,:])
obs_bkgerr[obs_i] = np.sum(hbkg[:,:].T*np.abs(controlvect.flx_bkg[ind,:,:]))*tracer.err
# Background contribution from fluxes outside domain
hbkg = np.sum(grid_glob[:, :, 0:ngrid_glob], axis=2)
hbkg[ix1:ix2, iy1:iy2] = 0.0
# Index to state vector
ind = np.argmin(np.abs(tracer.dates[0:-1] - gtime_glob[0]))
# TODO: calculate on first iteration ,then store?
obs_bkg[obs_i] = np.sum(hbkg[:,:].T*controlvect.flx_bkg[ind,:,:])
obs_bkgerr[obs_i] = np.sum(hbkg[:,:].T*np.abs(controlvect.flx_bkg[ind,:,:]))*tracer.err
else:
obs_bkg[:] = 0
obs_bkgerr[:] = 0
# Transport for boxes in regions
hbox = np.zeros((nbox*ngrid), np.float64)
hbox = np.zeros((ngrid,nbox), np.float64)
reg = np.unique(tracer.regions)
for r in reg:
if r>0:
wh = np.where(tracer.regions == r)
hbox[:,r-1] = np.sum( grid_nest[wh[1],wh[0],:ngrid], axis=0)
for n in range(ngrid):
for jy in range(fp_header_nest.numy):
for ix in range(fp_header_nest.numx):
if tracer.regions[jy, ix] > 0:
hbox[n*nbox + tracer.regions[jy, ix] - 1] += \
grid_nest[ix, jy, n]
hbox = hbox.flatten()
# TODO: account for possible difference nested grid/inversion domain
hnest = grid_nest
......@@ -263,7 +291,7 @@ def obsoper(self, inputs, mode,
ind = np.argmin(np.abs(tracer.dates[0:-1] - itim))
obs_ghg[obs_i] += np.sum(hnest.T[i,:,:] * controlvect.flxall[ind,:,:])
# Get background contribution
if hasattr(obsvect, 'background'):
cini = np.zeros(obsvect.background.ncini)
for i, name in enumerate(obsvect.background.cini_names):
......@@ -286,6 +314,9 @@ def obsoper(self, inputs, mode,
obs_cinipos[obs_i] = np.dot(cini[:],
controlvect.x[npvar+ni*ncini:npvar+(ni+1)*ncini])
else:
obs_cinipos[:]=0
if getattr(tracer, 'offsets', False):
......@@ -329,12 +360,12 @@ def obsoper(self, inputs, mode,
obsvect.datastore['sim'] = obs_sim
obsvect.datastore['obs_bkgerr'] = obs_bkgerr
obsvect.datastore['obs_err'] = obs_err
obsvect.datastore['obs_model'] = obs_model
if dump_debug:
obsvect.datastore['obs_ghg'] = obs_ghg
obsvect.datastore['obs_bkg'] = obs_bkg
obsvect.datastore['obs_model'] = obs_model
obsvect.datastore['obs_sim'] = obs_sim
obsvect.datastore['obs_check'] = obs_check
......
......@@ -5,6 +5,8 @@ import os
import shutil
import numpy as np
from datetime import datetime, timedelta
from netCDF4 import Dataset
from pycif.utils.datastores.dump import dump_datastore, read_datastore
from pycif.plugins.obsvects.standard.utils.tstep import tstep
from pycif.plugins.obsvects.standard.utils.crop_monitor import crop_monitor
......@@ -14,7 +16,75 @@ from pycif.utils import path
from pycif.utils.check.errclass import PluginError
def init_background(obsvect, **kwargs):
if obsvect.background.plugin.name == "FLEXPART":
return init_background_flexinv(obsvect, **kwargs)
elif obsvect.background.plugin.name == "TM5-4DVAR":
return init_background_rodenbeck(obsvect, **kwargs)
def init_background_rodenbeck(obsvect, **kwargs):
model = obsvect.model
background = obsvect.background
varname = background.varname_conc
datei = obsvect.datei
path = os.path.join(background.dir_initconc,background.file_initconc)
file_baseline = datei.strftime(path)
# TODO: manage multiple background files at once
if obsvect.datef.strftime(path) != file_baseline:
verbose('ERROR: The time period spans multiple background files')
raise ValueError
obs_cini = np.empty(len(obsvect.datastore)); obs_cini[:]=np.nan
# Loop over all observations and calculate background contribution
with Dataset(file_baseline) as bkg:
ids = [''.join([c.decode().lower() for c in name]) for name in bkg['identification'][:]]
ids = np.array(ids)
stations = dict()
for obs_i, row in enumerate(obsvect.datastore.itertuples()):
station = row.station
try:
station_index = stations[station]
except KeyError:
station_indices = np.where([station in name for name in ids])
if len(station_indices[0])==0:
verbose('ERROR: Station %s not found in the background file %s' % (station,file_baseline))
elif len(station_indices[0])>1:
verbose('WARNING: More than one station %s found in the background file %s' % (station,file_baseline))
verbose('\t ' + ' '.join(ids[station_indices[0]]))
verbose('Picking the first one '+ids[station_indices[0][0]])
station_index = station_indices[0][0]
stations[station] = station_index
# find the time index in the baseline file
ts = np.array([pd.Timestamp(t[0],t[1],t[2],t[3],t[4]) for t in bkg['time1'][:]])
try:
time_index = np.where(ts == row.Index)[0][0]
obs_cini[obs_i] = bkg[varname][time_index,station_index]
except IndexError:
verbose('WARNING: time not found ' + str(row.Index))
obsvect.background.cini_names = ['obs_cini_0']
obsvect.background.ncini = 1
obsvect.datastore['obs_cini_0'] = obs_cini
if background.optimize_cini:
verbose('ERROR: The Rödenbeck background can not be optimized ')
raise ValueError
return obsvect
def init_background_flexinv(obsvect, **kwargs):
"""Initializes the background
Reads and regrids concentration fields
Reads sensitivity to initial conditions fields
......@@ -39,7 +109,6 @@ def init_background(obsvect, **kwargs):
conc_ini = background.read_conc(model, background.dir_initconc,
background.file_initconc, **kwargs)
n_x = model.domain.nlon_glob
n_y = model.domain.nlat_glob
n_z = len(model.domain.heights)
......@@ -49,23 +118,29 @@ def init_background(obsvect, **kwargs):
background.updated_controlvect = False
# ncini: number of initial mixing ratio variables to optimize
background.ncini = len(background.cini_lat)
if background.optimize_cini:
background.ncini = len(background.cini_lat)
# Set time dimension for optimizing initial mixing ratio scalars
background.ntcini = max(1, int((datef - datei).days + 1) //
int(background.cini_res))
# Time stamp for initial mixing ratio scalars
# background.cinitime = np.array([datei + timedelta(days=x) for x in range(background.ntcini)])
background.cinitime = np.array([datei + timedelta(days=x*int(background.cini_res)) for x in range(background.ntcini)])
else:
background.ncini = 1
background.ntcini = 1
background.cinitime = np.array([datei])
background.cini_names = ["obs_cini_{}".format(i) for i in range(background.ncini)]
# 'cini' is dimension number of observations x zones to optimize
# Stored in monitor as obs_cini_1, obs_cini_2 etc.
# Currently we are strictly using longitudinal bands, that are sorted
# South to North in the cini array
cini = np.zeros((len(obsvect.datastore), background.ncini), order='C')
# Set time dimension for optimizing initial mixing ratio scalars
background.ntcini = max(1, int((datef - datei).days + 1) //
int(background.cini_res))
# Time stamp for initial mixing ratio scalars
# background.cinitime = np.array([datei + timedelta(days=x) for x in range(background.ntcini)])
background.cinitime = np.array([datei + timedelta(days=x*int(background.cini_res)) for x in range(background.ntcini)])
# Read FLEXPART header (to access trajdays)
fp_header = None
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment