forward.py 13.7 KB
Newer Older
1
2
3
import numpy as np
import xarray as xr
import os
4
import itertools
5

6
from logging import info, debug
7
8
9
10
11
12
13
14
15
from pycif.utils.path import init_dir
from pycif.utils.datastores.dump import dump_datastore, read_datastore
from .apply_AK import apply_ak
from .apply_AK import apply_ak_tl
from .vinterp import vertical_interp


def forward(
        transf,
16
        inout_datastore,
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
        controlvect,
        obsvect,
        mapper,
        di,
        df,
        mode,
        runsubdir,
        workdir,
        onlyinit=False,
        **kwargs
):
    """Aggregate simulations at the grid scale to total columns.
    Re-interpolate the model pressure levels to the satellite averaging kernel
    levels. Average using the averaging kernel formula

    """
    
34
35
    ddi = min(di, df)
    
36
37
38
39
40
    if onlyinit:
        return
    
    ref_parameter = transf.parameter[0]
    ref_component = transf.component[0]
41
    
42
    ref_datastore = inout_datastore["outputs"][(ref_component, ref_parameter)]
43
    ref_ds = ref_datastore[ddi]
44
    target_datastore = \
45
46
47
48
49
50
51
        inout_datastore["inputs"][("concs", ref_parameter)][ddi]
    
    # Put auxiliary values into target datastore
    target_datastore.loc[:, "pressure"] = \
        inout_datastore["inputs"][("pressure", ref_parameter)][ddi]["spec"]
    target_datastore.loc[:, "dp"] = \
        inout_datastore["inputs"][("dpressure", ref_parameter)][ddi]["spec"]
52
53
54
55
56
    if transf.product == "column":
        target_datastore.loc[:, "airm"] = \
            inout_datastore["inputs"][("airm", ref_parameter)][ddi]["spec"]
        target_datastore.loc[:, "hlay"] = \
            inout_datastore["inputs"][("hlay", ref_parameter)][ddi]["spec"]
57
58
59
60
    
    ddi = min(di, df)
    ref_indexes = ~target_datastore.duplicated(subset=["indorig"])
    y0 = target_datastore.loc[ref_indexes]
61
62
63
64
    
    # Exit if empty observation datastore
    if target_datastore.size == 0:
        return
65
66
67
68
69
70

    # Init firectory for saving forward datastore for later use by adjoint
    dir_fwd = ddi.strftime("{}/chain/satellites".format(
        transf.model.adj_refdir))
    if not os.path.isdir(dir_fwd):
        init_dir(dir_fwd)
71
    
72
73
74
75
    # Building the extended dataframe
    iq1 = y0["station"]
    list_satIDs = iq1.unique()
    ds_p = target_datastore.set_index("indorig")[
76
        ["pressure", "dp"] + (transf.product == "column") * ["airm", "hlay"]]
77
78
79
80
    for satID in list_satIDs:
        info("Processing satellite: {}".format(satID))
        satmask = iq1 == satID
        nobs = np.sum(satmask)
81
        
82
83
84
85
86
87
88
        # Stacking output datastore into levels * nobs
        native_ind_stack = (
            np.flatnonzero(ref_indexes)[satmask]
            + np.arange(transf.model.domain.nlev)[:, np.newaxis]
        )

        # If all nans in datasim, meaning that the species was not simulated
89
        sim = target_datastore.loc[:, "spec"].values[native_ind_stack]
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
        if not np.any(~np.isnan(sim)):
            continue

        # Grouping all data from this satellite
        datasim = xr.Dataset(
            {
                "pressure": (
                    ["level", "index"],
                    np.log(ds_p["pressure"].values[native_ind_stack]),
                ),
                "dp": (
                    ["level", "index"],
                    ds_p["dp"].values[native_ind_stack],
                ),
                "sim": (["level", "index"], sim),
            },
            coords={
107
                "index": np.arange(nobs),
108
109
110
                "level": np.arange(transf.model.domain.nlev),
            },
        )
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
        
        if transf.product == "column":
            datasim = datasim.assign(
                {
                    "airm": (
                        ["level", "index"],
                        ds_p["airm"].values[native_ind_stack],
                    ),
                    "hlay": (
                        ["level", "index"],
                        ds_p["hlay"].values[native_ind_stack],
                    ),
                }
            )
        
126
127
128
        if mode == "tl":
            datasim["sim_tl"] = (
                ["level", "index"],
129
                target_datastore.loc[:, "incr"].values[native_ind_stack],
130
131
132
133
            )

        # convert CHIMERE fields to the correct unit
        # from ppb to molec.cm-2 if the satellite product is a column
134
        if transf.product == "column":
135
136
137
138
139
            keys = ["sim"] + (["sim_tl"] if mode == "tl" else [])
            for k in keys:
                datasim[k] *= datasim["hlay"] / (1e9 / datasim["airm"])

        # Check whether there is some ak
140
        ref_mapper = mapper["outputs"][(ref_component, ref_parameter)]
141
142
143
        files_aks = \
            list(set(list(itertools.chain(
                *ref_mapper["tracer"].input_files.values()))))
144
        files_aks.sort()
145
        info("Fetching satellite infos from files: {}".format(files_aks))
146
        
147
        try:
148
            colsat = ["qa0", "ak", "pavg0", "date", "index", "station"]
149
150
            if transf.formula == 5 :
              colsat = colsat + ["dryair"]
151
            coord2dump = ["index"]
152
153
154
155
156
            all_sat_aks = []
            for file_aks in files_aks:
                sat_aks = \
                    read_datastore(file_aks,
                                   col2dump=colsat,
157
                                   coord2dump=coord2dump,
158
159
                                   keep_default=False,
                                   to_pandas=False)
Isabelle Pison's avatar
Isabelle Pison committed
160
161
162
                ###### A ENLEVER QUAND INDEX VRAIMENT INDEX ET NON DATE
                # sat_aks["index"] = np.arange(sat_aks.dims["index"]) # PROBLEME FICHIERS OBS AUDREY 
                ##########
163
164
165
166
167
                if len(all_sat_aks) == 0:
                    all_sat_aks = sat_aks
                else:
                    all_sat_aks = xr.concat([all_sat_aks, sat_aks], "index")

168
            # Selecting only lines used in simulation
Antoine Berchet's avatar
Antoine Berchet committed
169
            mask = all_sat_aks["date"].isin(ref_ds["date"]) \
170
171
                   & all_sat_aks.index.isin(ref_ds.index) \
                   & (all_sat_aks["station"] == satID)
172
173
            sat_aks = all_sat_aks.loc[{"index": mask}]

174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
        except IOError:
            # Assumes total columns?
            raise IOError("Could not fetch "
                          "satellite info from {}".format(file_aks))
            # datastore['qdp'] = datastore['sim'] * datastore['dp']
            # groups = datastore.groupby(['indorig'])
            # y0.loc[:, 'sim'] += \
            #     groups['qdp'].sum().values / groups['dp'].sum().values
            #
            # if 'sim_tl' in datastore:
            #     datastore['qdp'] = datastore['sim_tl'] * datastore['dp']
            #     groups = datastore.groupby(['indorig'])
            #     y0.loc[:, 'sim_tl'] += \
            #         groups['qdp'].sum().values / groups['dp'].sum().values
            #    continue
189
190
        
        aks = sat_aks["ak"][:, ::-1].T
191
192

        if transf.pressure == "Pa":
193
            pavgs = sat_aks["pavg0"][:, ::-1].T
194
        else:
195
196
197
198
199
200
201
            pavgs = 100 * sat_aks["pavg0"][:, ::-1].T

        coords0 = {"index": np.arange(nobs),
                   "level": np.arange(aks.level.size + 1)}
        coords1 = {"index": np.arange(nobs),
                   "level": np.arange(aks.level.size)}
        dims = ("level", "index")
202
        pavgs = xr.DataArray(pavgs, coords0, dims).bfill("level")
203
204
        pavgs_mid = xr.DataArray(
            np.log(0.5 * (pavgs[:-1].values + pavgs[1:].values)),
205
206
207
            coords1, dims)
        dpavgs = xr.DataArray(np.diff(-pavgs, axis=0), coords1, dims)
        qa0 = sat_aks["qa0"][:, ::-1].T
208
209
210
211
        if transf.formula == 5: 
          drycols = sat_aks["dryair"][:, ::-1].T
        else:
           drycols = qa0 * 0.0
212
        # Interpolating simulated values to averaging kernel pressures
213
        # Doing it by chunk to fasten the process
214
215
216
217
218
219
220
        # A single chunk overloads the memory,
        # while too many chunks do not take advantage
        # of scipy automatic parallelisation
        # 50 chunks seems to be fairly efficient
        sim_ak = 0.0 * pavgs_mid
        sim_ak_tl = 0.0 * pavgs_mid

221
        nchunks = transf.nchunks
222
        chunks = np.linspace(0, nobs, num=nchunks, dtype=int)
223
        cropstrato = transf.cropstrato
224
        for k1, k2 in zip(chunks[:-1], chunks[1:]):
225
226
227
228
            # Skip chunks that are too short
            if k1 == k2:
                continue
            
229
            debug("Compute chunk for satellite {}: {}-{}".format(satID, k1, k2))
230
            
231
232
233
234
235
236
            sim_pressure = datasim["pressure"][:, k1:k2].values
            sim = datasim["sim"][:, k1:k2].values
            if mode == "tl":
                sim_tl = datasim["sim_tl"][:, k1:k2].values
            
            # Fetch missing values from stratosphere
237
            if transf.fill_strato:
238
                strato = inout_datastore["inputs"][("stratosphere",
239
                                                    ref_parameter)][ddi]
240
241
242
243
                strato_mapper = mapper["inputs"][("stratosphere",
                                                  ref_parameter)]
                sigma_a = strato_mapper["domain"].sigma_a
                sigma_b = strato_mapper["domain"].sigma_b
244
                psurf_strato = np.exp(datasim["pressure"][:, k1:k2].values[0])
Antoine Berchet's avatar
Antoine Berchet committed
245
246
                pstrato = np.log(
                    sigma_b * psurf_strato[:, np.newaxis] + sigma_a).T
247
                
248
                # Here merge sim_pressure
249
                # and pstrato properly, then apply vertical_interp
250
251
252
253
254
                missing_levels = np.argmax(pstrato < sim_pressure[-1],
                                           axis=0)[0]
                sim_pressure = \
                    np.concatenate([sim_pressure, pstrato[missing_levels:]],
                                   axis=0)
255
256
257
258
259

                # Stacking strato dataframe to column shape
                sim_strato = \
                    strato["spec"].values.reshape(
                        (len(sigma_a), -1), order="F")[:, k1:k2]
260
                
261
262
                incr_strato = 0. * sim_strato
                if "incr" in strato:
263
264
265
                    incr_strato =\
                        strato["incr"].values.reshape(
                            (len(sigma_a), -1), order="F")[:, k1:k2]
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
                
                # ppb to molec/cm2 if column product
                if transf.product == "column":
                    dpstrato = \
                        np.diff(np.concatenate(
                            [psurf_strato[np.newaxis, :], np.exp(pstrato)],
                            axis=0), axis=0) / 100  # hPa
                    G = 9.88
                    dmass = np.abs(dpstrato / G)  # kg/m2
                    column = dmass * 1e3  # g/m2
                    column /= transf.molmass * 1e4  # mol/cm2
                    column *= 6.02214076 * 10**23  # molec / cm2
                    
                    sim_strato *= column
                    incr_strato *= column

282
283
284
285
286
287
                sim = np.concatenate([sim, sim_strato[missing_levels:]],
                                     axis=0)
                if mode == "tl":
                    sim_tl = np.concatenate([sim_tl,
                                             incr_strato[missing_levels:]],
                                             axis=0)
288
                
289
290
291
292
293
294
            # Vertical interpolation
            xlow, xhigh, alphalow, alphahigh = vertical_interp(
                sim_pressure,
                pavgs_mid[:, k1:k2].values,
                cropstrato,
            )
295
                
296
297
298
299
300
            # Applying coefficients
            meshout = np.array(pavgs_mid.shape[0] * [list(range(k2 - k1))])
            sim_ak[:, k1:k2] = (
                alphalow * sim[xlow, meshout] + alphahigh * sim[xhigh, meshout]
            )
301
            
302
303
304
305
306
307
            if mode == "tl":
                sim_ak_tl[:, k1:k2] = (
                    alphalow * sim_tl[xlow, meshout]
                    + alphahigh * sim_tl[xhigh, meshout]
                )

308
309
310
311
312
313
314
315
316
317
318
319
320
321
        # Correction with the pressure thickness
        target_datastore["pthick"] = np.nan
        if transf.correct_pthick:
            scale_pthick = (datasim['dp'] * datasim['sim']).sum(axis=0).values \
                      / (dpavgs * sim_ak).sum(axis=0).values \
                      * dpavgs.sum(axis=0).values / datasim['dp'].sum(axis=0).values
            sim_ak *= scale_pthick
            if 'sim_tl' in datasim:
                sim_ak_tl *= scale_pthick
            
            target_datastore.iloc[
                np.flatnonzero(ref_indexes)[satmask],
                target_datastore.columns.get_loc("pthick")] = scale_pthick
            
322
323
        # Applying aks
        nbformula = transf.formula
324
        chosenlevel = transf.chosenlev
325
326
327
328

        debug("product: {}".format(transf.product))
        debug("nbformula: {}".format(nbformula))
        debug("chosenlev: {}".format(chosenlevel))
329
        y0.loc[satmask, "spec"] = apply_ak(
330
            sim_ak.values, dpavgs.values, aks.values,
331
332
            nbformula, qa0.values, chosenlevel,
            drycols.values
333
334
335
        )

        if mode == "tl":
336
            y0.loc[satmask, "incr"] = apply_ak_tl(
337
338
                sim_ak_tl.values,
                dpavgs.values,
339
340
                aks.values,
                nbformula,
341
                qa0.values,
342
                chosenlevel,
343
                sim_ak.values,
344
            )
345
346
347
348
349
350
        
        # Saves sim_ak for later use by adjoint
        if nbformula == 3:
            file_dump = ddi.strftime(
                "{}/sim_ak_{}_%Y%m%d%H%M.nc".format(dir_fwd, satID))
            sim_ak.to_netcdf(file_dump)
351
    
Antoine Berchet's avatar
test    
Antoine Berchet committed
352
    inout_datastore["outputs"][(ref_component, ref_parameter)][ddi] = y0
353

354
355
356
357
358
359
360
361
362
    # Save forward datastore for later use by adjoint
    file_monit = ddi.strftime(
        "{}/monit_%Y%m%d%H%M.nc"
            .format(dir_fwd)
    )
    dump_datastore(
        target_datastore,
        file_monit=file_monit,
        dump_default=False,
363
        col2dump=["pressure", "dp", "indorig", "hlay", "airm", "sim", "pthick"],
364
365
        mode="w",
    )