Source code for tools.perturb_forcing

# -*- coding: utf-8 -*-

"""
Created on 3 Aug. 2022

@author: M Lafaysse, from B. Cluzet methods
"""

import numpy as np
import os
import csv

from snowtools.tools.change_forcing import forcinput_tomodify
from snowtools.DATA import SNOWTOOLS_DATA


[docs] def gener_random_serie_autocorr(nt, sigma, tau, dt, semiDistrib=False, fsys=0): """ Generates a serie perturbation_vector of random temporally correlated values, centered on fsys of std sigma and temporal correlation tau. :param nt: time dimension length :type nt: int :param sigma: standard deviation of the random perturbation :type sigma: float :param tau: temporal correlation length of the random perturbation (hours) :type tau: int :param dt: time step (hours) :type dt: int :param fsys: value of the systematic perturbation :type fsys: float :param semiDistrib: flag if var in SAFRAN geometry or semi-distributed geometry :type semiDistrib: bool :return perturbation_vector: vector of the same size as var of perturbation """ xx = np.zeros(nt, float) # random part of the perturbation v2 = sigma * sigma # variance phi = np.exp(- dt / tau) sig2 = v2 * (1 - phi * phi) sig = np.sqrt(sig2) np.random.seed(None) epsilon = np.random.normal(0., sig, nt) for ii in range(nt): xx[ii] = phi * xx[ii - 1] + epsilon[ii] # phi ensures the temporal correlation, epsilon the random part # add the random (perturbation_vector) and systematic (fsys) parts of the perturbation perturbation_vector = fsys + xx if semiDistrib: perturbation_vector = np.reshape(perturbation_vector, (nt, 1)) else: perturbation_vector = np.reshape(perturbation_vector, (nt, 1, 1)) return perturbation_vector
[docs] def convertPrecipPhase(Tair, precip, Tmin=272.65, Tmax=274.05): """ Convert precipitation to solid or liquid according to the disturbed temperature. :param Tair: air temperature data (K) :param precip: precip data :param Tmin: minimum temperature for liquid precip (K) :param Tmax: maximum temperature for solid precip (K) :return: list [Rain, Snow] """ frac_liquid = (Tair - Tmin) / (Tmax - Tmin) # mix of liquid and solid precip if Tair within [Tmin,Tmax] frac_liquid[Tair < Tmin] = 0. # solid precip if Tair < Tmin frac_liquid[Tair > Tmax] = 1. # liquid precip if Tair > Tmax # Write Rainf, Snowf Rainfout = frac_liquid * precip Snowfout = (1. - frac_liquid) * precip return Rainfout, Snowfout
[docs] def addNoiseMultiplicative(var, sigma, tau, dt, fsys=1, semiDistrib=False): """ Apply multiplicative perturbation (sigma=std of random perturbation, tau=temporal correlation, fsys=systematic :param var: data to be pertubed :type var: numpy.array :param sigma: standard deviation of the random perturbation :type sigma: float :param tau: temporal correlation length of the random perturbation (hours) :type tau: float :param dt: time step (hour) :type dt: int :param fsys: value of the systematic perturbation :type fsys: float :param semiDistrib: flag if var in SAFRAN geometry or semi-distributed geometry :type semiDistrib: bool :return perturbed variable, same type and shape as var """ nt = np.shape(var)[0] # time length # generates random perturbation factor vector perturbation_vector = gener_random_serie_autocorr(nt, sigma, tau, dt, semiDistrib=semiDistrib, fsys=fsys) # perturbation cannot be negative !! this will introduce a positive bias in the forcings, # especially if fsys<1 or sigma>>mean(var) perturbation_vector[perturbation_vector < 0] = 0 varout = var[:] * perturbation_vector # multiplicative perturbation of the variable return varout
[docs] def addNoiseAdditive(var, sigma, tau, dt, fsys = 0, semiDistrib=False): """ Apply additive perturbation (sigma=std of random perturbation, tau=temporal correlation, fsys=systematic perturbation) to a time serie. :param var: data to be pertubed :type var: numpy.array :param sigma: standard deviation of the random perturbation :type sigma: float :param tau: temporal correlation length of the random perturbation (hours) :type tau: float :param dt: time step (hour) :type dt: int :param fsys: value of the systematic perturbation :type fsys: float :param semiDistrib: flag if var in SAFRAN geometry or semi-distributed geometry :type semiDistrib: bool :return perturbed variable, same type and shape as var """ nt = np.shape(var)[0] # time length # generates random perturbation factor vector perturbation_vector = gener_random_serie_autocorr(nt, sigma, tau, dt, semiDistrib=semiDistrib, fsys=fsys) # print('ppp', perturbation_vector.shape) varout = var[:] + perturbation_vector # additive perturbation of the variable return varout
[docs] def addNoise2Impur(var, varName, sigma, tau, dt, semiDistrib=False, brutal=False): """ Add noise with temporal correlation. :param var: data to be pertubed :type var: numpy.array :param varName: name of the variable :type varName: str :param sigma: standard deviation of the random perturbation :type sigma: float :param tau: temporal correlation length of the random perturbation (hours) :type tau: float :param dt: time step (hour) :type dt: int :param semiDistrib: flag if var in SAFRAN geometry or semi-distributed geometry :type semiDistrib: bool :param brutal: multiply all timesteps by a random factor following a lognormal law :type brutal: bool """ if not brutal: varout = addNoiseMultiplicative(var, sigma, tau, dt, fsys=1, semiDistrib = semiDistrib) else: # brutal : multiply all timesteps by a random factor following a lognormal law nt = np.shape(var)[0] # time length if '1' in varName: # case for BC # /100 x1 (too much BC in MOCAGE) logstdFact = 1 meanFact = -2 else: # Case for Dust # /10*10 (no clue about Dust) -> (1,0), logstdFact = 1 meanFact = 0 fact = np.random.normal(meanFact, logstdFact, 1) # BC 24/10/19 toggle on the following line for baseline experiment. # fact = meanFact YY = 10**fact * np.ones((nt, ), float) if semiDistrib: YY = np.reshape(YY, (np.shape(var)[0], 1)) else: YY = np.reshape(YY, (np.shape(var)[0], 1, 1)) varout = var[:] * YY return varout
[docs] class forcinput_perturb(forcinput_tomodify): """ Stochastically perturbed meterological forcing file for assimilation purpose """ seuilmin = dict(Tair = 200., LWdown = 100., DIR_SWdown = 0., SCA_LWdown = 0., ) seuilmax = dict(Tair = 330., LWdown = 450., DIR_SWdown = 1500., SCA_LWdown = 800., )
[docs] def modify(self, init_forcing_file, new_forcing_file, *args): """ Add stochastic perturbations to a meterological forcing file :param init_forcing_file: Input file object :type init_forcing_file: :class:`utils.prosimu.prosimu` :param new_forcing_file: Output file object :type new_forcing_file: :class:`utils.prosimu.prosimu` """ np.random.seed() po = os.path.join(SNOWTOOLS_DATA, 'perturbations_param.txt') brutal = True # read parameters: sigma and tau for each disturbed variable from a csv file param = {} with open(po, mode='r') as csv_file: csv_reader = csv.DictReader(csv_file) print('Param value') for row in csv_reader: # fsys is an optional argument. if not prescribed, set to 0 for additive perts, and 1 for multiplicative if 'fsys' in row.keys(): fsysparam = float(row['fsys']) else: if row['varName'] in ['Tair', 'Lwdown']: fsysparam = 0 else: fsysparam = 1 param[row['varName']] = [float(row['std']), float(row['tau']), fsysparam] print(str(row['varName']) + ' | std : ' + str(param[row['varName']][0]) + ' | tau : ' + str( param[row['varName']][1]) + ' | bias factor : ' + str(param[row['varName']][2])) time = init_forcing_file.readtime() delta = time[1] - time[0] dt = delta.seconds / 3600 new_forcing_file.createDimension("time", None) dictdim = init_forcing_file.listdim() del dictdim["time"] semiDistrib = init_forcing_file.pointsdim is not None for dimname, dim in dictdim.items(): print("Create dimension " + dimname + " " + str(len(dim))) new_forcing_file.createDimension(dimname, len(dim)) listvar = init_forcing_file.listvar() for varname in listvar: vartype, rank, array_dim, varFillvalue, var_attrs = init_forcing_file.infovar(varname) newvar = new_forcing_file.createVariable(varname, vartype, array_dim, fill_value=varFillvalue) # Determine whether the variable should be disturbed or not if varname in ['Rainf', 'Snowf'] and 'Precip' in param.keys(): # Specific case for precip modifyvar = param['Precip'][0] != 0 elif varname in param.keys(): modifyvar = param[varname][0] != 0 else: modifyvar = False # Read initial variable var = init_forcing_file.read(varname) if modifyvar: print("Disturb " + varname) # Variable-dependent disturbance before writing new variable if varname in ['Tair', 'LWdown']: tempvar = addNoiseAdditive(var, param[varname][0], param[varname][1], dt, fsys=param[varname][2], semiDistrib=semiDistrib) elif varname in ['DIR_SWdown', 'Wind']: tempvar = addNoiseMultiplicative(var, param[varname][0], param[varname][1], dt, fsys=param[varname][2], semiDistrib=semiDistrib) elif varname in ['Rainf']: Precip = init_forcing_file.read('Snowf') + init_forcing_file.read('Rainf') Precip = addNoiseMultiplicative(Precip, param['Precip'][0], param['Precip'][1], dt, fsys=param['Precip'][2], semiDistrib=semiDistrib) Tair = init_forcing_file.read('Tair') tempvar[:], snowf = \ convertPrecipPhase(Tair, Precip, Tmin=272.65, Tmax=274.05) elif varname in ['Snowf']: tempvar = snowf elif varname in ['IMPWET1', 'IMPWET2', 'IMPDRY1', 'IMPDRY2']: tempvar = addNoise2Impur(var, varname, param[varname][0], param[varname][1], dt, semiDistrib=semiDistrib, brutal=brutal) else: # Perturbation prescribed in the parameter file but unknown method for this variable raise Exception('Undefined perturbation method for variable ' + varname) # Apply thresholds to avoid unrealistic values if varname in self.seuilmax.keys(): print ('Apply max threshold for ' + varname) tempvar = np.where(tempvar > self.seuilmax[varname], self.seuilmax[varname], tempvar) if varname in self.seuilmin.keys(): print ('Apply min threshold for ' + varname) tempvar = np.where(tempvar < self.seuilmin[varname], self.seuilmin[varname], tempvar) newvar[:] = tempvar # Specify that this is a disturbed variable setattr(newvar, 'disturbed', 1) else: print("Copy " + varname) newvar[:] = var[:] # Copy attributes from input to output file for attname in var_attrs: if not attname == '_FillValue': setattr(newvar, attname, init_forcing_file.getattr(varname, attname))
if __name__ == "__main__": import sys f = forcinput_perturb(sys.argv[1], sys.argv[2])