Source code for plots.pearps2m.postprocess

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Created on 6 déc. 2018

@author: lafaysse

usage: python postprocess.py [-b YYYYMMDD] [-e YYYYMMDD] [-o diroutput]

    #) extracts operational simulation results
    #) Plots maps for the Alps, the Pyrenees the Corse, Vosges, Massif Central, Jura
    #) Creates spaghetti plots for all massifs and stations
"""

import argparse
import locale
import os
import datetime
from collections import Counter, defaultdict

import numpy as np
import matplotlib
# import matplotlib.pyplot as plt
matplotlib.use('Agg')
import matplotlib.style
#import rpy2.robjects as robjects
import pandas as pd
from scipy.stats import gamma

from snowtools.utils.prosimu import prosimu
from snowtools.utils.dates import check_and_convert_date, pretty_date
from snowtools.plots.temporal.chrono import spaghettis_with_det, spaghettis
from snowtools.utils.infomassifs import infomassifs
from snowtools.utils.FileException import DirNameException
from snowtools.DATA import LUSTRE_NOSAVE_USER_DIR, SNOWTOOLS_DATA

from bronx.stdtypes.date import today
from bronx.syntax.externalcode import ExternalCodeImportChecker

echecker = ExternalCodeImportChecker('cartopy')
with echecker:
    import cartopy
echecker_pyproj = ExternalCodeImportChecker('pyproj')
with echecker_pyproj as echecker_register:
    import pyproj
    echecker_register.update(version=pyproj.__version__)

if 'fast' in matplotlib.style.available:
    matplotlib.style.use('fast')
matplotlib.rcParams['agg.path.chunksize'] = 100
matplotlib.rcParams['axes.xmargin'] = 0
matplotlib.rcParams['axes.ymargin'] = 0
# matplotlib.rcParams["figure.dpi"] = 75


[docs]def build_filename(massif, alti): """ Construct a filename from massif and altitude information. :param massif: a massif number :param alti: an altitude level :return: filename :rtype: str """ filename = str(massif) if alti: filename += "_{:d}".format(int(alti)) return filename
[docs]class Config: """ Configuration passed to S2MExtractor and to be used in vortex toolboxes. """ previ = True #: False for analysis, True for forecast xpid = "oper" #: Operational chain alternate_xpid = ["OPER@lafaysse"] #: Alternative experiment id # alternate_xpid = ["oper"] #: List of geometries list_geometry = ['alp', 'pyr', 'cor', 'jur', 'mac', 'vog', 'postes'] # list_geometry = ['alp_allslopes', 'pyr_allslopes', 'cor_allslopes', 'postes'] #: List of geometries # list_geometry = ['alp', 'pyr', 'cor', 'postes'] #: Alternative list of geometries (corresponding to alternative experiment ids) # alternate_list_geometry = [['alp', 'pyr', 'cor', 'postes']] alternate_list_geometry = [['alp_allslopes', 'pyr_allslopes', 'cor_allslopes', 'postes']] # Development chain # xpid = "OPER@lafaysse" # To be changed with IGA account when operational # list_geometry = ['alp_allslopes', 'pyr_allslopes', 'cor_allslopes', 'postes'] #: 35 for determinstic member, 36 for sytron, 0-34 for PEARP members list_members = list(range(0, 36))
[docs] def __init__(self, OPTIONS): """ #) checks and converts dates #) creates output directories if they don't exist. """ OPTIONS.datebegin, OPTIONS.dateend = [check_and_convert_date(dat) for dat in [OPTIONS.datebegin, OPTIONS.dateend]] if OPTIONS.datebegin.hour == 0: #: Date of model run class:`bronx.stdtypes.date.Date` self.rundate = OPTIONS.datebegin.replace(hour=6) else: #: Date of model run :class:`bronx.stdtypes.date.Date` self.rundate = OPTIONS.datebegin #: output directory self.diroutput = os.path.join(OPTIONS.diroutput, self.rundate.strftime("%Y%m%d%H")) self.diroutput_maps = os.path.join(self.diroutput, 'maps') #: output directory for maps self.diroutput_plots = os.path.join(self.diroutput, "plots") #: output directory for other plots for required_directory in [self.diroutput, self.diroutput_maps, self.diroutput_plots]: if not os.path.isdir(required_directory): os.mkdir(required_directory) self.dev = OPTIONS.dev if OPTIONS.dev: self.xpid = "OPER@lafaysse" delattr(Config, 'alternate_xpid') self.list_geometry = ['jur', 'mac', 'vog', 'cor', 'alp', 'pyr', 'postes'] self.dble = OPTIONS.dble if OPTIONS.dble: self.xpid = "dble" delattr(Config, 'alternate_xpid') self.list_geometry = ['alp', 'pyr', 'cor', 'jur', 'mac', 'vog', 'postes'] self.reforecast = OPTIONS.reforecast if OPTIONS.reforecast: self.xpid = "reforecast_double2021@vernaym" delattr(Config, 'alternate_xpid') self.list_geometry = ['jur4_allslopes_reforecast', 'mac11_allslopes_reforecast', 'vog3_allslopes_reforecast', 'alp27_allslopes', 'pyr24_allslopes', 'cor2_allslopes']
[docs]class Ensemble: """ Describes an ensemble of simulations """
[docs] def __init__(self): """ """ self.ensemble = {} #: data dict with variable names as keys and np.arrays as values self.simufiles = [] #: list of prosimu objects with simulation file self.nech = None #: number of time steps (forecast steps). type int self.nmembers = None #: number of members type int self.time = None #: :ivar ~.time: time variable from the first simulation file :vartype ~.time: numpy array self.indpoints = [] #: :ivar indpoints: list with spatial indices :vartype indpoints: list self.npoints = None #: :ivar npoints: total number of spatial points :vartype npoints: int
@property def spatialdim(self): """Name of spatial dimension""" return "Number_of_points"
[docs] def open(self, listmembers): """ Opens simulation files :param listmembers: list of ensemble members (filenames) :type listmembers: list :ivar inddeterministic: index of the deterministic member :vartype inddeterministic: int """ print(listmembers) for m, member in enumerate(listmembers): p = prosimu(member) if m == 0: self.nech = p.getlendim("time") ntime = p.getlendim("time") if ntime == self.nech: self.simufiles.append(p) if 'mb035' in member: self.inddeterministic = len(self.simufiles) - 1 self.nmembers = len(self.simufiles) print(self.nmembers) self.time = self.simufiles[0].readtime() self.indpoints = self.select_points() self.npoints = self.get_npoints()
[docs] def select_points(self): """ Get a list of spatial indices from the spatial dimension length in the first simulation file. :return: list of spatial points (indices) :rtype: list """ indpoints = range(0, self.simufiles[0].getlendim(self.spatialdim)) return indpoints
[docs] def get_npoints(self): """ Get the number of spatial points. :return: total number of spatial points :rtype: int """ if isinstance(self.indpoints, tuple): npoints = 0 for indpoints in self.indpoints: npoints += len(indpoints) else: npoints = len(self.indpoints) return npoints
[docs] def read(self, varname): """ Read a variable and store it in :py:attr:`~.ensemble` :param varname: name of the variable to be read (corresponds to the variable name of the simulation NetCDF files) :type varname: str """ self.ensemble[varname] = np.empty([self.nech, self.npoints, self.nmembers]) kwargs = dict() for m, member in enumerate(self.simufiles): # print("read " + varname + " for member" + str(m)) import datetime before = datetime.datetime.today() if isinstance(self.indpoints, tuple): sections = [] for indpoints in self.indpoints: kwargs[self.spatialdim] = indpoints sections.append(member.read_var(varname, **kwargs)) self.ensemble[varname][:, :, m] = np.concatenate(tuple(sections), axis=1) else: kwargs[self.spatialdim] = self.indpoints self.ensemble[varname][:, :, m] = member.read_var(varname, **kwargs) after = datetime.datetime.today() # print(after - before) # Verrues (à éviter) if varname == 'NAT_LEV': self.ensemble[varname][:, :, :] = np.where(self.ensemble[varname] == 6., 0., self.ensemble[varname])
[docs] def read_geovar(self, varname): """ Read a variable from the first simulation file. :param varname: NetCDF variable name of the variable to read :type varname: str :return: data read :rtype: numpy array """ kwargs = dict() if isinstance(self.indpoints, tuple): sections = [] for indpoints in self.indpoints: kwargs[self.spatialdim] = indpoints sections.append(self.simufiles[0].read_var(varname, **kwargs)) return np.concatenate(tuple(sections)) else: kwargs[self.spatialdim] = self.indpoints return self.simufiles[0].read_var(varname, **kwargs)
[docs] def probability(self, varname, seuilinf=-999999999., seuilsup=999999999.): """ Calculates probability as the proportion of ensemble members with values larger than :py:attr:`seuilinf` and smaller than :py:attr:`seuilsup`. :param varname: name of the variable for which to calculate the probability :type varname: str :param seuilinf: lower threshold :param seuilsup: upper threshold :return: probability field (or np.nan if the ensemble is not defined) :rtype: numpy array """ if varname not in self.ensemble.keys(): self.read(varname) condition = (self.ensemble[varname] > seuilinf) & (self.ensemble[varname] < seuilsup) probability = np.sum(condition, axis=2) / (1. * self.nmembers) return np.where(np.isnan(self.ensemble[varname][:, :, 0]), np.nan, probability)
# On renvoit des nan quand ensemble n'est pas défini
[docs] def emos_csg_nonorm_allstations_newsnow(self, varname, level): """ Applies Censored Shifted Gamma (CSG) EMOS trained for 24h new snow variable and all stations in the Alps without normalisation (internship Benoit Gacon 2024) to variable "varname" and returns the quantiles given in "levels" of the CSG distribution. :param varname: variable name :type varname: str :param level: quantiles (range 0 to 100) :type level: array :return: quantiles of predictive distribution :rtype: np.array """ if varname not in self.ensemble.keys(): self.read(varname) csgmean, csgstd, csgdelta = self.get_csg_predictive_dist_nonorm_allstations(varname) level=level/100. quantile = self.quantiles_CSGD(csgmean, csgstd, csgdelta, level) return quantile
[docs] def get_csg_predictive_dist_nonorm_allstations(self, varname): """ get the censored shifted gamma regression coefficients and climatological paramters for each lead time and apply them to the ensemble forecast and return the parameters mu, sigma and delta of the predictive censored shifted gamma distribution. :param varname: variable name :return: mu, sigma, delta of the predictive CSG distribution """ # filename = os.path.join(SNOWTOOLS_DATA, "emos_2000_2022_212_par_nonorm.Rdata") # robj = robjects.r.load(filename) # pylint: disable=possibly-unused-variable # reg_coef = np.array(robjects.r[robj[0]]) # clim_par = np.array(robjects.r[robj[1]]) reg_coef = pd.read_csv(os.path.join(SNOWTOOLS_DATA, "matEmosPars.csv"), index_col=0).values clim_par = pd.read_csv(os.path.join(SNOWTOOLS_DATA, "matEmosParsClim.csv"), index_col=0).values ndays_leadtime = 4 ntime = len(self.time) list_indtime = [] delta = self.time - self.time[0].replace(hour=6) for day_leadtime in range(1, ndays_leadtime + 1): list_indtime.append(np.where( (delta <= datetime.timedelta(days=day_leadtime)) & (delta > datetime.timedelta(days=day_leadtime - 1)))[0]) # Extract regression parameters a1, a2, a3, a4, b1 = np.empty((5, ntime, self.npoints)) a1.fill(np.nan) a2.fill(np.nan) a3.fill(np.nan) a4.fill(np.nan) b1.fill(np.nan) muclim, sigmaclim, deltaclim = np.empty((3, ntime, self.npoints)) muclim.fill(np.nan) sigmaclim.fill(np.nan) deltaclim.fill(np.nan) for leadtime in range(0, ndays_leadtime): #cst_a1, cst_a2, cst_a3, cst_a4, cst_b1 = reg_coef[leadtime, 0, 1, 0:5] #cst_muclim, cst_sigmaclim, cst_deltaclim = clim_par[leadtime, 0, 1, :] cst_a1, cst_a2, cst_a3, cst_a4, cst_b1 = reg_coef[leadtime, 0:5] cst_muclim, cst_sigmaclim, cst_deltaclim = clim_par[leadtime, :] begin = list_indtime[leadtime][0] end = list_indtime[leadtime][-1] a1[slice(begin, end + 1), :] = cst_a1 a2[slice(begin, end + 1), :] = cst_a2 a3[slice(begin, end + 1), :] = cst_a3 a4[slice(begin, end + 1), :] = cst_a4 b1[slice(begin, end + 1), :] = cst_b1 muclim[slice(begin, end + 1), :] = cst_muclim sigmaclim[slice(begin, end + 1), :] = cst_sigmaclim deltaclim[slice(begin, end + 1), :] = cst_deltaclim # Extract raw ensemble predictors ensmean = self.mean(varname) ensPOP = self.probability(varname, seuilinf=1.E-6) # Compute CSGD parameters with regression laws csgmean = self.csg_mean_nonorm(a1, a2, a3, a4, ensmean, ensPOP) csgstd = self.csg_std_nonorm(b1, muclim, sigmaclim, csgmean) csgdelta = deltaclim return csgmean, csgstd, csgdelta
[docs] def csg_mean_nonorm(self, a1, a2, a3, a4, ensmean, ensPOP): """ Calculate mu of predictive censored shifted gamma distribution from regression coefficients alpha 1 to alpha 4, ensemble mean and probability of non-zero values of the ensemble. :param a1: regression coefficient alpha 1 :type a1: float :param a2: regression coefficient alpha 2 :type a2: float :param a3: regression coefficient alpha 3 :type a3: float :param a4: regression coefficient alpha 4 :type a4: float :param ensmean: ensemble mean :type ensmean: float :param ensPOP: probability of non-zero values of the ensemble [0., 1.] :type ensPOP: float :return: mu of predictive censored shifted gamma distribution :rtype: float """ return np.log1p(np.expm1(a1) * (a2 + a3 * ensPOP + a4 * ensmean)) / a1
[docs] def csg_std_nonorm(self, b1, muclim, sigmaclim, csgmean): """ Calculate sigma of predictive censored shifted gamma distribution from regression coefficients beta 1, climatological mu and sigma and csg mu parameter. :param b1: regression coefficient beta 1 :type b1: float :param muclim: climatological mu :type muclim: float :param sigmaclim: climatological sigma :type sigmaclim: float :param csgmean: csg mu parameter :type csgmean: float :return: sigma of predictive censored shifted gamma distribution :rtype: float """ return b1 * sigmaclim * np.sqrt(csgmean / muclim)
[docs] def quantiles_CSGD(self, csgmean, csgstd, csgdelta, quantiles): """ get quantiles of the censored shifted gamma (CSG) distribution with given mu, sigma and delta. :param csgmean: mu parameter of CSG :type csgmean: array :param csgstd: sigma parameter of CSG :type csgstd: array :param csgdelta: delta parameter of CSG :type csgdelta: array :param quantiles: quantiles wanted :type quantiles: array of floats :return: array of quantiles :rtype: array of same length as quantiles. """ vectorppf = np.vectorize(gamma.ppf, signature='(),(),()->(n)', excluded='q') tmp = vectorppf(q=quantiles, a=(csgmean / csgstd) ** 2, scale=(csgstd ** 2) / csgmean, loc=csgdelta) # print(tmp) # return np.where(tmp >= 0, tmp, 0) return np.where(tmp < 0., 0., tmp)
[docs] def quantile(self, varname, level): """ Calculates ensemble percentiles for a given variable and given percentile levels. :param varname: Variable name :type varname: str :param level: list of percentiles to calculate :type level: list of numbers between 0 and 100 :return: array of percentiles :rtype: numpy array """ if varname not in self.ensemble.keys(): self.read(varname) quantile = np.where(np.isnan(self.ensemble[varname][:, :, 0]), np.nan, np.percentile(self.ensemble[varname], level, axis=2)) return quantile
[docs] def mean(self, varname): """ Calculate ensemble mean for a given variable. :param varname: variable name :type varname: str :return: ensemble mean :rtype: numpy array """ if varname not in self.ensemble.keys(): self.read(varname) return np.nanmean(self.ensemble[varname], axis=2)
[docs] def spread(self, varname): """ Calculate ensemble spread (standard deviation) for a given variable. :param varname: variable name :type varname: str :return: ensemble spread :rtype: numpy array :rtype: numpy array """ if varname not in self.ensemble.keys(): self.read(varname) return np.nanstd(self.ensemble[varname], axis=2)
[docs] def close(self): """ Close simulation files and remove data from :py:attr:`ensemble`. """ for member in self.simufiles: member.close() self.ensemble.clear()
[docs] def get_metadata(self): """ Get a tuple with spatial indices. :return: indpoints, indpoints """ indpoints = self.select_points() return indpoints, indpoints
[docs] def get_alti(self): """ Get altitude variable from the first simulation file. :return: altitude variable :rtype: numpy array """ if not hasattr(self, "alti"): self.alti = self.read_geovar("ZS") return self.alti
[docs] def get_aspect(self): """ Get aspect variable from the first simulation file. :return: aspect variable :rtype: numpy array """ if not hasattr(self, "aspect"): self.aspect = self.read_geovar("aspect") return self.aspect
[docs]class _EnsembleMassif(Ensemble): """ Metaclass for ensemble simulations on a massif geometry (SAFRAN like). :ivar InfoMassifs: Information of Massifs """ InfoMassifs = infomassifs() @property def geo(self): """ Geometry :return: "massifs" """ return "massifs"
[docs] def read(self, varname): """ Read data for a given variable name into the :py:attr:`ensemble` instance variable. :param varname: variable name :type varname: str """ if varname == 'naturalIndex': nmassifs = len(self.get_massifvar()) self.ensemble[varname] = np.empty([self.nech, nmassifs, self.nmembers]) for m, member in enumerate(self.simufiles): self.ensemble[varname][:, :, m] = member.read_var(varname) else: super(_EnsembleMassif, self).read(varname)
[docs] def get_massifdim(self): """ Read massif_num variable from the first simulation file. :return: massif numbers :rtype: numpy array """ if not hasattr(self, "massifdim"): self.massifdim = self.read_geovar("massif_num") return self.massifdim
[docs] def get_massifvar(self): """ Read "massif" variable from the first simulation file. :return: massif numbers :rtype: numpy array """ if not hasattr(self, "massifvar"): self.massifvar = self.simufiles[0].read_var("massif") return self.massifvar
[docs] def get_metadata(self, nolevel=False): """ Construct filenames and plot titles from massif and altitude variables in the first simulation file. :param nolevel: if True the altitude is not included in the filenames and titles. :return: a list of filenames and a list of titles :rtype: two lists. """ if nolevel: massif = self.get_massifvar() alti = [None] * len(massif) else: alti = self.get_alti() massif = self.get_massifdim() return [self.build_filename(mas, alt) for mas, alt in zip(massif, alti)], \ [self.build_title(mas, alt) for mas, alt in zip(massif, alti)]
[docs] def build_filename(self, massif, alti): """ Construct a filename from massif and altitude information. :param massif: a massif number :param alti: an altitude level :return: filename :rtype: str """ filename = str(massif) if alti: filename += "_" + str(int(alti)) return filename
[docs] def build_title(self, massif, alti): """ Construct a figure title from massif and altitude information. :param massif: a massif number :param alti: an altitude level :return: a title :rtype: unicode str """ title = self.InfoMassifs.getMassifName(massif) # type unicode if alti: title += " {:d}m".format(int(alti)) return title # matplotlib needs unicode
[docs]class EnsembleFlatMassif(_EnsembleMassif): """ Class for ensemble simulations on a massif geometry (SAFRAN like) where all data points are considered on flat terrain (zero slope and no orientation information). """
[docs] def select_points(self): """ Select spatial indices from the first simulation file where aspect=-1 (zero slope, no orientation information). :return: spatial indices to be read from simulation files :rtype: numpy boolean array """ return self.simufiles[0].get_points(aspect=-1)
[docs]class EnsembleNorthSouthMassif(_EnsembleMassif): """ Class for ensemble simulations on a massif geometry (SAFRAN like) where data points are considered at a slope of 40 degrees and two orientations: North and South. """
[docs] def select_points(self): """ Select spatial indices from the first simulation file where slope=40 and aspect either 0 or 180. :return: spatial indices to be read from simulation files :rtype: a tuple of numpy boolean array, where the first component corresponds to the Northern orientation and the second to the Southern one. """ # return np.sort(np.concatenate((self.simufiles[0].get_points(aspect = 0, slope = 40), # self.simufiles[0].get_points(aspect = 180, slope = 40)))) # TAKE CARE : It is extremely more efficient to read regular sections of the netcdf files return (self.simufiles[0].get_points(aspect=0, slope=40), self.simufiles[0].get_points(aspect=180, slope=40))
[docs]class EnsembleMassifPoint(_EnsembleMassif): """ Class for extracting one specific point in massif geometry """
[docs] def __init__(self, massif_num, alti, aspect, slope): self.massif_num = massif_num self.alti = alti self.aspect = aspect self.slope = slope super(EnsembleMassifPoint, self).__init__()
[docs] def select_points(self): """Select index of the corresponding point""" return self.simufiles[0].get_points(massif_num=self.massif_num, aspect=self.aspect, slope=self.slope, ZS=self.alti)
[docs]class EnsembleStation(Ensemble): """ Class for ensemble simulations at station locations. :ivar InfoMassifs: Information on the Massif the station is situated. """ InfoMassifs = infomassifs() @property def geo(self): """ Geometry information. :return: "stations" """ return "stations"
[docs] def get_station(self): """ Read station numbers from the first simulation file. :return: station numbers :rtype: numpy array """ return self.simufiles[0].read_var("station", Number_of_points=self.indpoints)
[docs] def get_metadata(self, **kwargs): """ Construct filenames and plot titles from altitude and station information :param kwargs: :return: a list of filenames and a list of plot titles """ alti = self.simufiles[0].read_var("ZS", Number_of_points=self.indpoints) station = self.get_station() # print('alti type: ', type(alti), 'station type: ', type(station)) return [self.build_filename(stat, alt) for stat, alt in zip(station, alti)], \ [self.build_title(stat, alt) for stat, alt in zip(station, alti)]
[docs] def build_filename(self, station, alti): """ Construct a filename from a station number :param station: station number :param alti: altitude level (not used) :return: filename :rtype: station number as 8digit integer. """ return '%08d' % station
[docs] def build_title(self, station, alti): """ Construct a title from a station number and altitude level. :param station: station number :param alti: station altitude :return: title string composed of the station name and the altitude. :rtype: unicode str """ # nameposte gives unicode # matplotlib expects unicode return self.InfoMassifs.nameposte(station) + " %d m" % int(alti)
[docs]class EnsembleDiags(Ensemble): """ Probabilities and quantiles from ensembles. """
[docs] def __init__(self): """ """ self.proba = {} #: probabilities self.quantiles = {} #: percentiles super(EnsembleDiags, self).__init__()
[docs] def diags(self, list_var, list_quantiles, list_seuils): """ Calculate quantiles and exceedance probabilities for a list of variables, quantiles and thresholds. The quantiles are stored in :py:attr:`~.quantiles` and the probabilities in :py:attr:`~.proba` :param list_var: list of variables :type list_var: iterable :param list_quantiles: list of percentiles to be calculated :type list_quantiles: iterable :param list_seuils: list of thresholds for each variable :type list_seuils: dict of iterables with variable names as keys """ for var in list_var: if var in list_seuils.keys(): for seuil in list_seuils[var]: self.proba[(var, seuil)] = self.probability(var, seuilinf=seuil) for var in list_var: self.quantiles[var] = [] for quantile in list_quantiles: print("Compute quantile " + str(quantile) + " for variable " + var) self.quantiles[var].append(self.quantile(var, quantile))
[docs] def close(self): """ Close simulation files and remove data from :py:attr:`~.plots.pearps2m.postprocess.Ensemble.ensemble`, :py:attr:`.proba` and :py:attr:`.quantiles` """ super(EnsembleDiags, self).close() self.proba.clear() self.quantiles.clear()
[docs]@echecker_pyproj.disabled_if_unavailable(version='2.0.0') class EnsembleOperDiags(EnsembleDiags): """ Class for operationally used plots. """ #: plot format formatplot = 'png' #: dict of plot attributes for each variable attributes = dict( PP_SD_1DY_ISBA=dict(convert_unit=1., forcemin=0., forcemax=60., palette='YlGnBu', seuiltext=50., label=u'Epaisseur de neige fraîche en 24h (cm)'), SD_1DY_ISBA=dict(convert_unit=100., forcemin=0., forcemax=60., palette='YlGnBu', seuiltext=50., label=u'Epaisseur de neige fraîche en 24h (cm)'), SD_3DY_ISBA=dict(convert_unit=100., forcemin=0., forcemax=60., palette='YlGnBu', seuiltext=50., label=u'Epaisseur de neige fraîche en 72h (cm)'), RAMSOND_ISBA=dict(convert_unit=100., forcemin=0., forcemax=60., palette='YlGnBu', seuiltext=50., label=u'Epaisseur mobilisable (cm)'), NAT_LEV=dict(forcemin=-0.5, forcemax=5.5, palette='YlOrRd', ncolors=6, label=u'Risque naturel', ticks=[u'Très faible', u'Faible', u'Mod. A', u'Mod. D', u'Fort', u'Très fort']), naturalIndex=dict(forcemin=0., forcemax=8., palette='YlOrRd', label=u'Indice de risque naturel', format='%.1f', nolevel=True), DSN_T_ISBA=dict(convert_unit=100., label=u'Hauteur de neige (cm)'), WSN_T_ISBA=dict(label=u'Equivalent en eau (kg/m2)'), SNOMLT_ISBA=dict(convert_unit=3. * 3600., forcemin=0., forcemax=60., palette='YlGnBu', seuiltext=50., label=u'Ecoulement en 3h (kg/m2/3h)'), WET_TH_ISBA=dict(convert_unit=100., forcemin=0., forcemax=60., palette='YlGnBu', seuiltext=50., label=u'Epaisseur humide (cm)'), REFRZTH_ISBA=dict(convert_unit=100., forcemin=0., forcemax=60., palette='YlGnBu', seuiltext=50., label=u'Epaisseur regelée (cm)'), RAINF_ISBA=dict(convert_unit=3. * 3600., forcemin=0., forcemax=60., palette='YlGnBu', seuiltext=50., label=u'Pluie en 3h (kg/m2/3h)'), ) #: list of quantiles list_q = [20, 50, 80]
[docs] def __init__(self): super(EnsembleOperDiags, self).__init__()
[docs] def alldiags(self): """ Calculate percentiles for all variables in :py:attr:`list_var_spag` and :py:attr:`list_var_map`. """ super(EnsembleOperDiags, self).diags(set(self.list_var_spag + self.list_var_map), self.list_q, {})
[docs] def pack_spaghettis(self, suptitle, diroutput = "."): """ Produce spaghetti plots for all variables in :py:attr:`list_var_spag`. :param suptitle: Suptitle for all plots. :type suptitle: unicode string :param diroutput: directory to save the plots """ for var in self.list_var_spag: if 'nolevel' not in self.attributes[var].keys(): self.attributes[var]['nolevel'] = False list_filenames, list_titles = self.get_metadata(nolevel=self.attributes[var]['nolevel']) if hasattr(self, 'inddeterministic'): # print('has inddet') s = spaghettis_with_det(self.time) else: # print('no inddet') s = spaghettis(self.time) settings = self.attributes[var].copy() if 'label' in self.attributes[var].keys(): settings['ylabel'] = self.attributes[var]['label'] npoints = self.quantiles[var][0][0, :].shape[0] for point in range(0, npoints): if 'convert_unit' in self.attributes[var].keys(): allmembers = self.ensemble[var][:, point, :] \ * self.attributes[var]['convert_unit'] qmin = self.quantiles[var][0][:, point] * self.attributes[var]['convert_unit'] qmed = self.quantiles[var][1][:, point] * self.attributes[var]['convert_unit'] qmax = self.quantiles[var][2][:, point] * self.attributes[var]['convert_unit'] else: allmembers = self.ensemble[var][:, point, :] qmin = self.quantiles[var][0][:, point] qmed = self.quantiles[var][1][:, point] qmax = self.quantiles[var][2][:, point] if hasattr(self, 'inddeterministic'): s.draw(self.time, allmembers, qmin, qmed, qmax, deterministic=allmembers[:, self.inddeterministic], **settings) else: s.draw(self.time, allmembers, qmin, qmed, qmax, **settings) s.set_title(list_titles[point]) s.set_suptitle(suptitle) s.addlogo() plotname = diroutput + "/" + var + "_" + list_filenames[point] + "." + self.formatplot s.save(plotname, formatout=self.formatplot) print(plotname + " is available.") s.close()
[docs] def pack_spaghettis_multipoints(self, list_pairs, suptitle, diroutput=".", **kwargs): """ Produce spaghetti plots with up to four points per plot for variables in :py:attr:`list_var_spag_2points`. Each point gets a different color. :param list_pairs: list of indices to plot together on each plot :type list_pairs: list of list :param suptitle: common suptitle for all plots :type suptitle: unicode str :param diroutput: output directory :param kwargs: arguments to be passed to plotting routines, notably "labels" """ list_colors = ['blue', 'red', 'green', 'orange'] for var in self.list_var_spag_2points: if 'nolevel' not in self.attributes[var].keys(): self.attributes[var]['nolevel'] = False list_filenames, list_titles = self.get_metadata(nolevel = self.attributes[var]['nolevel']) if hasattr(self, 'inddeterministic'): s = spaghettis_with_det(self.time) else: s = spaghettis(self.time) settings = self.attributes[var].copy() if 'label' in self.attributes[var].keys(): settings['ylabel'] = self.attributes[var]['label'] for pair in list_pairs: for p, point in enumerate(pair): if 'convert_unit' in self.attributes[var].keys(): allmembers = self.ensemble[var][:, point, :] \ * self.attributes[var]['convert_unit'] qmin = self.quantiles[var][0][:, point] \ * self.attributes[var]['convert_unit'] qmed = self.quantiles[var][1][:, point] \ * self.attributes[var]['convert_unit'] qmax = self.quantiles[var][2][:, point] \ * self.attributes[var]['convert_unit'] else: allmembers = self.ensemble[var][:, point, :] qmin = self.quantiles[var][0][:, point] qmed = self.quantiles[var][1][:, point] qmax = self.quantiles[var][2][:, point] settings['colorquantiles'] = list_colors[p] settings['colormembers'] = list_colors[p] if 'labels' in kwargs.keys(): settings['commonlabel'] = kwargs['labels'][p] if hasattr(self, 'inddeterministic'): s.draw(self.time, allmembers, qmin, qmed, qmax, deterministic=allmembers[:, self.inddeterministic], **settings) else: s.draw(self.time, allmembers, qmin, qmed, qmax, **settings) s.set_title(list_titles[point]) s.set_suptitle(suptitle) s.addlogo() plotname = diroutput + "/" + var + "_" + list_filenames[point] + "." + self.formatplot s.save(plotname, formatout=self.formatplot) print(plotname + " is available.") s.close()
[docs]@echecker.disabled_if_unavailable @echecker_pyproj.disabled_if_unavailable(version='2.0.0') class EnsembleOperDiagsFlatMassif(EnsembleOperDiags, EnsembleFlatMassif): """ Class for operationally plotting maps and spaghetti plots for ensembles with massif geometry and for the zero slope case. """ #: maximum height level to be treated levelmax = 3900 #: minimum heigth level to be treated levelmin = 0 #: list of variables to plot maps for list_var_map = ['naturalIndex', 'SD_1DY_ISBA', 'SD_3DY_ISBA', 'SNOMLT_ISBA'] #: list of variables to do spaghetti plots for list_var_spag = ['naturalIndex', 'DSN_T_ISBA', 'WSN_T_ISBA', 'SNOMLT_ISBA'] # list_var_spag = ['DSN_T_ISBA', 'WSN_T_ISBA', 'SNOMLT_ISBA']
[docs] def pack_maps(self, domain, suptitle, diroutput="."): """ Produce maps for the variables given in :py:attr:`list_var_map` over the regions given in :py:attr:`domain` for each available altitude level and time step. Each massif is colored corresponding to the second percentile value defined in :py:attr:`.list_q` and the color map defined in :py:attr:`.attributes`. At the center of each massif the values corresponding to the first three percentiles in :py:attr:`.list_q` are marked. :param domain: list of region identifiers (e.g., "alp", "pyr", "cor") :param suptitle: common suptitle for all plots :type suptitle: str :param diroutput: output directory to save the maps """ from snowtools.plots.maps.cartopy_massifs import Map_alpes, Map_pyrenees, Map_corse, Map_vosges, Map_jura, \ Map_central map_generic = dict(alp=Map_alpes, pyr=Map_pyrenees, cor=Map_corse, jur=Map_jura, mac=Map_central, vog=Map_vosges) alti = self.get_alti() list_alti = list(set(alti)) m = map_generic[domain[0:3]]() for var in self.list_var_map: m.init_massifs(**self.attributes[var]) m.addlogo() if 'nolevel' not in self.attributes[var].keys(): self.attributes[var]['nolevel'] = False if self.attributes[var]['nolevel']: list_loop_alti = [0] massif = self.get_massifvar() indalti = np.ones_like(self.quantiles[var][0][0, :], dtype=bool) else: list_loop_alti = list_alti[:] for level in list_loop_alti: if level < self.levelmin or level > self.levelmax: list_loop_alti.remove(level) massif = self.get_massifdim() for level in list_loop_alti: if not self.attributes[var]['nolevel']: indalti = alti == level for t in range(0, self.nech): qmin = self.quantiles[var][0][t, indalti] qmed = self.quantiles[var][1][t, indalti] qmax = self.quantiles[var][2][t, indalti] m.draw_massifs(massif[indalti], qmed, **self.attributes[var]) m.plot_center_massif(massif[indalti], qmin, qmed, qmax, **self.attributes[var]) title = "pour le " + pretty_date(self.time[t]) if not self.attributes[var]['nolevel']: title += " - Altitude : " + str(int(level)) + "m" m.set_title(title) m.set_suptitle(suptitle) ech = self.time[t] - self.time[0] + self.time[1] - self.time[0] ech_str = '+%02d' % (ech.days * 24 + ech.seconds / 3600) plotname = diroutput + "/" + domain[0:3] + "_" + var + "_" + str(int(level)) + ech_str + "." + self.formatplot m.save(plotname, formatout=self.formatplot) print(plotname + " is available.") m.reset_massifs(rmcbar=False) m.reset_massifs()
[docs]@echecker.disabled_if_unavailable @echecker_pyproj.disabled_if_unavailable(version='2.0.0') class EnsembleOperDiagsNorthSouthMassif(EnsembleOperDiags, EnsembleNorthSouthMassif): """ Class for operationally plot maps and spaghetti plots distinguishing between northern and southern orientation at each massif. """ #: maximum height level to plot levelmax = 3900 #: minimum height level to plot levelmin = 0 #: labels for the two orientations versants = [u'Nord 40°', u'Sud 40°'] #: variable list for standard spaghetti plots list_var_spag = [] #: variable list for spaghetti plots combining northern and southern orientation list_var_spag_2points = ['RAMSOND_ISBA', 'NAT_LEV', 'WET_TH_ISBA', 'REFRZTH_ISBA'] #: variable list for producing maps list_var_map = ['RAMSOND_ISBA', 'NAT_LEV', 'WET_TH_ISBA', 'REFRZTH_ISBA'] #: ensemble data ensemble = {}
[docs] def alldiags(self): """ Calculate percentiles for all variables in :py:attr:`.list_var_spag_2points` and :py:attr:`.list_var_map`. """ super(EnsembleOperDiagsNorthSouthMassif, self).diags(set(self.list_var_spag_2points + self.list_var_map), self.list_q, {})
[docs] def get_pairs_ns(self): """ Get for each massif and altitude the pair of indices corresponding to values for the northern and southern slopes. :return: indices corresponding to northern and southern slopes :rtype: list of lists """ alti = self.get_alti() aspect = np.array([int(asp) for asp in self.get_aspect()]) massif = self.get_massifdim() if not hasattr(self, 'list_pairs'): self.list_pairs = [] for point in range(0, np.shape(alti)[0]): if aspect[point] == 0: indsouth = np.where((alti == alti[point]) & (aspect == 180) & (massif == massif[point])) if len(indsouth) == 1: self.list_pairs.append([point, indsouth[0][0]]) return self.list_pairs
[docs] def pack_spaghettis_ns(self, suptitle, diroutput="."): """ Do spaghetti plots with values for the northern and southern slope on the same plot. :param suptitle: common suptitle for all plots :param diroutput: directory to save the plots """ list_pairs = self.get_pairs_ns() super(EnsembleOperDiagsNorthSouthMassif, self).pack_spaghettis_multipoints(list_pairs, suptitle, diroutput, labels=self.versants)
[docs] def pack_maps(self, domain, suptitle, diroutput): """ Produce maps for the variables given in :py:attr:`.list_var_map` over the regions given in :py:attr:`.domain` for each available altitude level and time step. For each massif a table with background colors and values corresponding to the percentiles in :py:attr:`.list_q` and the color map defined in :py:attr:`.attributes` is plotted near the center of each massif. :param domain: list of region identifiers (e.g., "alp", "pyr", "cor") :param suptitle: common suptitle for all plots :type suptitle: str :param diroutput: output directory to save the maps """ from snowtools.plots.maps.cartopy_massifs import Map_alpes, Map_pyrenees, Map_corse, Map_vosges, Map_jura, \ Map_central map_generic = dict(alp=Map_alpes, pyr=Map_pyrenees, cor=Map_corse, jur=Map_jura, vog=Map_vosges, mac=Map_central) list_pairs = self.get_pairs_ns() # pylint: disable=possibly-unused-variable alti = self.get_alti() aspect = self.get_aspect() list_alti = list(set(alti)) m = map_generic[domain[0:3]]() for var in self.list_var_map: m.init_massifs(**self.attributes[var]) m.empty_massifs() m.add_north_south_info() m.addlogo() list_loop_alti = list_alti[:] for level in list_loop_alti: if level < self.levelmin or level > self.levelmax: list_loop_alti.remove(level) massif = self.get_massifdim() for level in list_loop_alti: list_indalti = [] for plotaspect in [180, 0]: # du bas vers le haut list_indalti.append((alti == level) & (aspect == plotaspect)) for t in range(0, self.nech): list_values = [] for indalti in list_indalti: for q, quantile in enumerate(self.list_q): # pylint: disable=possibly-unused-variable list_values.append(self.quantiles[var][q][t, indalti]) # print(len(massif[indalti])) m.rectangle_massif(massif[indalti], list_values, ncol=2, **self.attributes[var]) title = "pour le " + pretty_date(self.time[t]) title += " - Altitude : " + str(int(level)) + "m" m.set_title(title) m.set_suptitle(suptitle) ech = self.time[t] - self.time[0] + self.time[1] - self.time[0] ech_str = '+%02d' % (ech.days * 24 + ech.seconds / 3600) plotname = diroutput + "/" + domain[0:3] + "_" + var + "_" + str(int(level)) + ech_str + "." + self.formatplot m.save(plotname, formatout=self.formatplot) print(plotname + " is available.") m.reset_massifs(rmcbar=False, rminfobox=False) m.reset_massifs()
[docs]@echecker.disabled_if_unavailable @echecker_pyproj.disabled_if_unavailable(version='2.0.0') class EnsembleOperDiagsStations(EnsembleOperDiags, EnsembleStation): """ Class for operationally plotting spaghetti plots for station based simulations. """ #: list of variables to plot maps for list_var_map = [] #: list of variables to plot spaghetti plots for. list_var_spag = ['DSN_T_ISBA', 'WSN_T_ISBA', 'RAMSOND_ISBA', 'WET_TH_ISBA', 'REFRZTH_ISBA', 'SNOMLT_ISBA']
[docs]class EnsemblePostproc(_EnsembleMassif): """ Class for ensemble post-processing. """
[docs] def __init__(self, variables, inputfiles, decile=np.arange(10, 100, 10), outdir='.', outfilename='PRO_post.nc', emosmethod=None): """ :param variables: list of variables to process :param inputfiles: list of input files :param decile: list of percentiles :param outdir: Output directory :param outfilename: output file name :param emosmethod: emos method to use. The method is supposed to take a variable name and the wanted quantiles as arguments. :type outdir: str """ print(inputfiles) super(EnsemblePostproc, self).__init__() # self.ensemble = self #: ensemble data self.variables = variables #: list of variables #self.ensemble.open(inputfiles) self.open(inputfiles) #: output filename self.outfile = os.path.join(outdir, outfilename) # self.outfile = os.path.join(outdir, 'PRO_post_{0}_{1}.nc'.format(datebegin.ymdh, dateend.ymdh)) #: list of percentiles self.decile = decile if emosmethod: if emosmethod == 'emos': self.emosmethod = Ensemble.emos_csg_nonorm_allstations_newsnow elif emosmethod == 'quantiles': self.emosmethod = Ensemble.quantile else: raise Exception('EMOS method ' + emosmethod + ' not available') self.flipaxis = False else: self.emosmethod = Ensemble.quantile self.flipaxis = True
# TODO: to test postprocessing with CSG EMOS inside the vortex Four_Seasons_Task (s2m_postproc algo) # without changes in vortex, comment the two lines above and uncomment the two lines below. # self.emosmethod = Ensemble.emos_csg_nonorm_allstations_newsnow # self.flipaxis = False @property def standardvars(self): """variables always written to the output file""" return ['time', 'ZS', 'aspect', 'slope', 'massif_num', 'longitude', 'latitude']
[docs] def create_outfile(self): """ Create output data set. :ivar outdataset: output data set :vartype outdataset: :py:class:`utils.prosimu.prosimu` """ # if not os.path.isdir(self.outfile): # print(self.outfile) # raise DirNameException(self.outfile) self.outdataset = prosimu(self.outfile, ncformat='NETCDF4_CLASSIC', openmode='w')
[docs] def init_outfile(self): """ Copy global attributes, dimensions and standard variables from the first simulation file to the output file. """ # copy global attributes all at once via dictionary self.outdataset.dataset.setncatts(self.simufiles[0].dataset.__dict__) # copy dimensions for name, dimension in self.simufiles[0].dataset.dimensions.items(): self.outdataset.dataset.createDimension( name, (len(dimension) if not dimension.isunlimited() else None)) # print(self.outdataset.listdim()) # copy standard variables for name, variable in self.simufiles[0].dataset.variables.items(): if name in self.standardvars: fillval = self.simufiles[0].getfillvalue(name) x = self.outdataset.dataset.createVariable(name, variable.datatype, variable.dimensions, fill_value=fillval) # copy variable attributes without _FillValue since this causes an error for att in self.simufiles[0].listattr(name): if att != '_FillValue': # print(self.simufiles[0].getattr(name, att)) self.outdataset.dataset[name].setncatts({att: self.simufiles[0].getattr(name, att)}) self.outdataset.dataset[name][:] = self.simufiles[0].dataset[name][:]
# print('data copied') # print(self.outdataset.listvar())
[docs] def postprocess(self): """ Do postprocessing #) create output file #) copy global attributes and standard variables to the output file #) calculate deciles for the data variables and put them to the output file #) close all input and output data sets. Calls :py:meth:`.create_outfile`, :py:meth:`.init_outfile`, :py:meth:`.deciles` and close methods for all data sets. """ self.create_outfile() self.init_outfile() print('init done') # self.median() # print('median done') self.deciles() self.outdataset.close() self.close()
[docs] def deciles(self): """ Calculates percentiles given in :py:attr:`.decile` for variables in :py:attr:`.variables` and adds them to the output data set including the corresponding dimension, coordinate variable and attributes applying the method in :py:attr:`.emosmethod` to obtain the percentiles. """ # create decile dimension self.outdataset.dataset.createDimension('decile', len(self.decile)) # create decile variable x = self.outdataset.dataset.createVariable('decile', 'i4', 'decile') self.outdataset.dataset['decile'][:] = self.decile[:] atts = {'long_name': "Percentiles of the ensemble forecast"} self.outdataset.dataset['decile'].setncatts(atts) for name, variable in self.simufiles[0].dataset.variables.items(): if name in self.variables: # calculate deciles vardecile = self.emosmethod(self, varname=name, level=self.decile) if self.flipaxis: # get decile axis in the right place vardecile = np.moveaxis(vardecile, 0, -1) fillval = self.simufiles[0].getfillvalue(name) x = self.outdataset.dataset.createVariable(name, variable.datatype, variable.dimensions + ('decile',), fill_value=fillval) # copy variable attributes all at once via dictionary, but without _FillValue attdict = self.simufiles[0].dataset[name].__dict__ attdict.pop('_FillValue', None) attdict['emos_method'] = str(self.emosmethod).split()[1] self.outdataset.dataset[name].setncatts(attdict) # print(vardecile.shape) self.outdataset.dataset[name][:] = vardecile[:]
[docs] def median(self): """ Calculates the median for variables in :py:attr:`.variables` and adds the median to the output data set including the corresponding attributes. """ print('entered median') for name, variable in self.simufiles[0].dataset.variables.items(): if name in self.variables: # print(name) median = self.quantile(name, 50) fillval = self.simufiles[0].getfillvalue(name) x = self.outdataset.dataset.createVariable(name, variable.datatype, variable.dimensions, fill_value=fillval) # copy variable attributes all at once via dictionary, but without _FillValue attdict = self.simufiles[0].dataset[name].__dict__ attdict.pop('_FillValue', None) self.outdataset.dataset[name].setncatts(attdict) print(self.outdataset.dataset[name][:].size) print(median.size) self.outdataset.dataset[name][:] = median[:]
[docs]class EnsemblePostprocStation(EnsemblePostproc): """ Class for ensemble postprocessing at station locations. """ @property def standardvars(self): """variables always written to the output file""" return ['time', 'ZS', 'aspect', 'slope', 'station', 'longitude', 'latitude']
[docs]class EnsembleHydro(EnsemblePostproc): """ Class to provide a synthesis of ensemble hydrological diagnostics of S2M """ @property def spatialdim(self): return 'basin' @property def standardvars(self): return ['time', 'basin']
[docs]class PPQuantiles: """ Class for handling files containing postprocessed (quantile) forecasts. """ #: maximum height level to be treated levelmax = 3900 #: minimum heigth level to be treated levelmin = 0 #: list of variables to plot maps for list_var_map = ['SD_1DY_ISBA', 'SD_12H_ISBA'] #: list of variables to do spaghetti plots for list_var_spag = ['SD_1DY_ISBA', 'SD_12H_ISBA'] attributes = dict( SD_12H_ISBA=dict(convert_unit=100., forcemin=0., forcemax=60., palette='YlGnBu', seuiltext=50., label=u'Epaisseur de neige fraîche en 12h (cm)'), SD_1DY_ISBA=dict(convert_unit=100., forcemin=0., forcemax=60., palette='YlGnBu', seuiltext=50., label=u'Epaisseur de neige fraîche en 24h (cm)'))
[docs] def __init__(self, filename): self.ps = prosimu(filename) self.nech = self.ps.getlendim("time") self.points = self.ps.get_points(aspect=-1) self.massifs = self.ps.read('massif_num', selectpoint=self.points) self.alti = self.ps.read('ZS', selectpoint=self.points) self.formatplot = 'png' self.deciles = self.ps.read('decile') self.time = self.ps.readtime()
[docs] def quantilemaps(self, domain, suptitle, diroutput, mapindex=np.array([False, True, False, False, True, False, False, True, False])): """ Plots maps of quantile forecasts. For example EMOS postprocessed quantiles. :param domain: domain (region) string to determine which map class to use :param suptitle: Plot title highest level :param diroutput: Output directory for the plots :param mapindex: array of bool indicating for with indices of the quantile dimension (last dimension of the data array) maps should be drawn. """ from snowtools.plots.maps.cartopy_massifs import MultiMap_Alps, MultiMap_Pyr, MultiMap_Cor map_generic = dict(alp=MultiMap_Alps, pyr=MultiMap_Pyr, cor=MultiMap_Cor) if domain[0:3] == "pyr": mm = map_generic[domain[0:3]](nrow=3, ncol=1, geofeatures=False, width=13, height=10) mm.legendpos = [0.9, 0.13, 0.02, 0.6] elif domain[0:3] == "cor": mm = map_generic[domain[0:3]](nrow=1, ncol=3, geofeatures=False, width=15, height=7.5) mm.legendpos = [0.9, 0.15, 0.03, 0.6] else: mm = map_generic[domain[0:3]](nrow=1, ncol=3, geofeatures=False, height=7.5) titles = ["Percentile {0}".format(i) for i in self.deciles[mapindex]] for var in self.list_var_map: var_data = self.ps.read(var, selectpoint=self.points) mm.init_massifs(**self.attributes[var]) mm.addlogo() list_alti = list(set(self.alti)) list_loop_alti = list_alti[:] for level in list_loop_alti: if level < self.levelmin or level > self.levelmax: list_loop_alti.remove(level) for level in list_loop_alti: indalti = self.alti == level for t in range(0, self.nech): mm.draw_massifs(self.massifs[indalti], var_data[t, indalti, :][:, mapindex], axis=1, **self.attributes[var]) mm.set_maptitle(titles) mm.plot_center_massif(self.massifs[indalti], var_data[t, indalti, :][:, mapindex], axis=1, **self.attributes[var]) mysuptitle = suptitle + "\n pour le " + pretty_date(self.time[t]) + " - Altitude : " + str(int(level)) + "m" mm.set_suptitle(mysuptitle) ech = self.time[t] - self.time[0] + self.time[1] - self.time[0] ech_str = '+%02d' % (ech.days * 24 + ech.seconds / 3600) plotname = diroutput + "/pp_quantiles_" + domain[0:3] + "_" + var + "_" + str( int(level)) + ech_str + "." + self.formatplot mm.save(plotname, formatout=self.formatplot) print(plotname + " is available.") mm.reset_massifs(rmcbar=False) mm.empty_massifs() mm.reset_massifs()
[docs] def spaghetti_plots(self, suptitle, diroutput = "."): """ Produce spaghetti plots for all variables in :py:attr:`list_var_spag`. :param suptitle: Suptitle for all plots. :type suptitle: unicode string :param diroutput: directory to save the plots """ list_filenames = [_EnsembleMassif.build_filename(_EnsembleMassif(), mas, alt) for mas, alt in zip(self.massifs, self.alti)] list_titles = [_EnsembleMassif.build_title(_EnsembleMassif(), mas, alt) for mas, alt in zip(self.massifs, self.alti)] for var in self.list_var_spag: s = spaghettis(self.time) settings = self.attributes[var].copy() if 'label' in self.attributes[var].keys(): settings['ylabel'] = self.attributes[var]['label'] var_data = self.ps.read(var, selectpoint=self.points) if 'convert_unit' in self.attributes[var].keys(): var_data = var_data * self.attributes[var]['convert_unit'] npoints = var_data.shape[1] for point in range(0, npoints): s.draw(self.time, var_data[:, point, :], var_data[:, point, 1], var_data[:, point, 4], var_data[:, point, 7], **settings) s.set_title(list_titles[point]) s.set_suptitle(suptitle) s.addlogo() plotname = os.path.join(diroutput, "pp_quantiles_" + var + "_" + list_filenames[point] + "." + self.formatplot) # plt.show() s.save(plotname, formatout=self.formatplot) print(plotname + " is available.") s.close()
[docs]@echecker.disabled_if_unavailable @echecker_pyproj.disabled_if_unavailable(version='2.0.0') def pp_plots(c): """ downloads files with postprocessed quantiles and plots maps and spaghetti plots of them :param c: config """ from snowtools.tasks.oper.get_oper_files import FutureS2MExtractor os.chdir(c.diroutput) S2ME = FutureS2MExtractor(c) pp_files, pp_xpid = S2ME.get_pp_quantiles() dict_chaine = defaultdict(str) dict_chaine['OPER'] = ' (oper)' dict_chaine['DBLE'] = ' (double)' dict_chaine['MIRR'] = ' (miroir)' dict_chaine['OPER@lafaysse'] = ' (dev)' dict_chaine['nouveaux_guess@lafaysse'] = ' (dev)' locale.setlocale(locale.LC_TIME, 'fr_FR.UTF-8') list_domains = pp_files.keys() print(list_domains) for domain in ['alp', 'pyr', 'cor']: # list_domains: # ['alp_allslopes']: # suptitle = u'Prévisions PEARP-S2M du ' + pretty_date(S2ME.conf.rundate) # Identify the prevailing xpid in the obtained resources and adapt the title count = Counter(pp_xpid[domain]) prevailing_xpid = count.most_common(1)[0][0] suffixe_suptitle = dict_chaine[prevailing_xpid] suptitle += suffixe_suptitle data = PPQuantiles(pp_files[domain]) data.quantilemaps(domain, suptitle, diroutput=c.diroutput_maps) data.spaghetti_plots(suptitle, diroutput=c.diroutput_plots)
[docs]@echecker.disabled_if_unavailable @echecker_pyproj.disabled_if_unavailable(version='2.0.0') def main(c): """ :param c: config """ # The following class has a vortex-dependence # Should not import than above to avoid problems when importing the module from vortex from snowtools.tasks.oper.get_oper_files import FutureS2MExtractor os.chdir(c.diroutput) # if c.dev: # S2ME = FutureS2MExtractor(c) # elif c.dble: # S2ME = FutureS2MExtractor(c) # elif c.reforecast: # S2ME = FutureS2MExtractor(c) # else: S2ME = FutureS2MExtractor(c) snow_members, snow_xpid = S2ME.get_snow() dict_chaine = defaultdict(str) dict_chaine['OPER'] = ' (oper)' dict_chaine['DBLE'] = ' (double)' dict_chaine['MIRR'] = ' (miroir)' dict_chaine['OPER@lafaysse'] = ' (dev)' dict_chaine['nouveaux_guess@lafaysse'] = ' (dev)' # undefined xpid is possible because it is allowed by defaultdict locale.setlocale(locale.LC_TIME, 'fr_FR.UTF-8') list_domains = snow_members.keys() print(list_domains) for domain in list_domains: # ['alp_allslopes']: # # S2ME.conf.rundate is a Date object --> strftime already calls decode method suptitle = u'Prévisions PEARP-S2M du ' + pretty_date(S2ME.conf.rundate) # Identify the prevailing xpid in the obtained resources and adapt the title count = Counter(snow_xpid[domain]) print(count) prevailing_xpid = count.most_common(1)[0][0] suffixe_suptitle = dict_chaine[prevailing_xpid] suptitle += suffixe_suptitle if domain == 'postes': E = EnsembleOperDiagsStations() else: E = EnsembleOperDiagsFlatMassif() ENS = EnsembleOperDiagsNorthSouthMassif() ENS.open(snow_members[domain]) print('number of member files', len(snow_members[domain])) E.open(snow_members[domain]) print("domain " + domain + " npoints = " + str(E.npoints)) E.alldiags() print('Diagnostics have been computed for the following variables :') print(E.ensemble.keys()) E.pack_spaghettis(suptitle, diroutput=c.diroutput_plots) if domain != 'postes': E.pack_maps(domain, suptitle, diroutput=c.diroutput_maps) ENS.alldiags() print('Diagnostics have been computed for the following variables :') print(ENS.ensemble.keys()) ENS.pack_maps(domain, suptitle, diroutput=c.diroutput_maps) ENS.pack_spaghettis_ns(suptitle, diroutput=c.diroutput_plots) ENS.close() del ENS print(E.list_var_spag) print(E.list_var_map) E.close() del E
if __name__ == "__main__": USAGE = "usage: python postprocess.py [-b YYYYMMDD] [-e YYYYMMDD] [-o diroutput]" PARSER = argparse.ArgumentParser(description="Postprocess new snow heights: " "1) extracts operational simulation results," "2) Plots maps for the Alps, the Pyrenees the Corse," "3) Creates spaghetti plots " "for all massifs and stations") PARSER.add_argument("-b", action="store", type=str, dest="datebegin", default=today().ymd, help="First year of extraction") PARSER.add_argument("-e", action="store", type=str, dest="dateend", default=today().ymd, help="Last year of extraction") PARSER.add_argument("-o", action="store", type=str, dest="diroutput", default=os.path.join(LUSTRE_NOSAVE_USER_DIR, "PEARPS2M"), help="Output directory") PARSER.add_argument("--dev", action="store_true", dest="dev", default=False) PARSER.add_argument("--reforecast", action="store_true", dest="reforecast", default=False) PARSER.add_argument("--dble", action="store_true", dest="dble", default=False) OPTIONS = PARSER.parse_args() # @UnusedVariable c = Config(OPTIONS) main(c) if OPTIONS.dev: pp_plots(c)