#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on 6 déc. 2018
@author: lafaysse
usage: python postprocess.py [-b YYYYMMDD] [-e YYYYMMDD] [-o diroutput]
#) extracts operational simulation results
#) Plots maps for the Alps, the Pyrenees the Corse, Vosges, Massif Central, Jura
#) Creates spaghetti plots for all massifs and stations
"""
import argparse
import locale
import os
import datetime
from collections import Counter, defaultdict
import numpy as np
import matplotlib
# import matplotlib.pyplot as plt
matplotlib.use('Agg')
import matplotlib.style
#import rpy2.robjects as robjects
import pandas as pd
from scipy.stats import gamma
from snowtools.utils.prosimu import prosimu
from snowtools.utils.dates import check_and_convert_date, pretty_date
from snowtools.plots.temporal.chrono import spaghettis_with_det, spaghettis
from snowtools.utils.infomassifs import infomassifs
from snowtools.utils.FileException import DirNameException
from snowtools.DATA import LUSTRE_NOSAVE_USER_DIR, SNOWTOOLS_DATA
from bronx.stdtypes.date import today
from bronx.syntax.externalcode import ExternalCodeImportChecker
echecker = ExternalCodeImportChecker('cartopy')
with echecker:
import cartopy
echecker_pyproj = ExternalCodeImportChecker('pyproj')
with echecker_pyproj as echecker_register:
import pyproj
echecker_register.update(version=pyproj.__version__)
if 'fast' in matplotlib.style.available:
matplotlib.style.use('fast')
matplotlib.rcParams['agg.path.chunksize'] = 100
matplotlib.rcParams['axes.xmargin'] = 0
matplotlib.rcParams['axes.ymargin'] = 0
# matplotlib.rcParams["figure.dpi"] = 75
[docs]def build_filename(massif, alti):
"""
Construct a filename from massif and altitude information.
:param massif: a massif number
:param alti: an altitude level
:return: filename
:rtype: str
"""
filename = str(massif)
if alti:
filename += "_{:d}".format(int(alti))
return filename
[docs]class Config:
"""
Configuration passed to S2MExtractor and to be used in vortex toolboxes.
"""
previ = True #: False for analysis, True for forecast
xpid = "oper" #: Operational chain
alternate_xpid = ["OPER@lafaysse"] #: Alternative experiment id
# alternate_xpid = ["oper"]
#: List of geometries
list_geometry = ['alp', 'pyr', 'cor', 'jur', 'mac', 'vog', 'postes']
# list_geometry = ['alp_allslopes', 'pyr_allslopes', 'cor_allslopes', 'postes'] #: List of geometries
# list_geometry = ['alp', 'pyr', 'cor', 'postes']
#: Alternative list of geometries (corresponding to alternative experiment ids)
# alternate_list_geometry = [['alp', 'pyr', 'cor', 'postes']]
alternate_list_geometry = [['alp_allslopes', 'pyr_allslopes', 'cor_allslopes', 'postes']]
# Development chain
# xpid = "OPER@lafaysse" # To be changed with IGA account when operational
# list_geometry = ['alp_allslopes', 'pyr_allslopes', 'cor_allslopes', 'postes']
#: 35 for determinstic member, 36 for sytron, 0-34 for PEARP members
list_members = list(range(0, 36))
[docs] def __init__(self, OPTIONS):
"""
#) checks and converts dates
#) creates output directories if they don't exist.
"""
OPTIONS.datebegin, OPTIONS.dateend = [check_and_convert_date(dat)
for dat in [OPTIONS.datebegin, OPTIONS.dateend]]
if OPTIONS.datebegin.hour == 0:
#: Date of model run class:`bronx.stdtypes.date.Date`
self.rundate = OPTIONS.datebegin.replace(hour=6)
else:
#: Date of model run :class:`bronx.stdtypes.date.Date`
self.rundate = OPTIONS.datebegin
#: output directory
self.diroutput = os.path.join(OPTIONS.diroutput, self.rundate.strftime("%Y%m%d%H"))
self.diroutput_maps = os.path.join(self.diroutput, 'maps') #: output directory for maps
self.diroutput_plots = os.path.join(self.diroutput, "plots") #: output directory for other plots
for required_directory in [self.diroutput, self.diroutput_maps, self.diroutput_plots]:
if not os.path.isdir(required_directory):
os.mkdir(required_directory)
self.dev = OPTIONS.dev
if OPTIONS.dev:
self.xpid = "OPER@lafaysse"
delattr(Config, 'alternate_xpid')
self.list_geometry = ['jur', 'mac', 'vog', 'cor', 'alp', 'pyr', 'postes']
self.dble = OPTIONS.dble
if OPTIONS.dble:
self.xpid = "dble"
delattr(Config, 'alternate_xpid')
self.list_geometry = ['alp', 'pyr', 'cor', 'jur', 'mac', 'vog', 'postes']
self.reforecast = OPTIONS.reforecast
if OPTIONS.reforecast:
self.xpid = "reforecast_double2021@vernaym"
delattr(Config, 'alternate_xpid')
self.list_geometry = ['jur4_allslopes_reforecast', 'mac11_allslopes_reforecast',
'vog3_allslopes_reforecast', 'alp27_allslopes',
'pyr24_allslopes', 'cor2_allslopes']
[docs]class Ensemble:
"""
Describes an ensemble of simulations
"""
[docs] def __init__(self):
"""
"""
self.ensemble = {} #: data dict with variable names as keys and np.arrays as values
self.simufiles = [] #: list of prosimu objects with simulation file
self.nech = None #: number of time steps (forecast steps). type int
self.nmembers = None #: number of members type int
self.time = None #: :ivar ~.time: time variable from the first simulation file :vartype ~.time: numpy array
self.indpoints = [] #: :ivar indpoints: list with spatial indices :vartype indpoints: list
self.npoints = None #: :ivar npoints: total number of spatial points :vartype npoints: int
@property
def spatialdim(self):
"""Name of spatial dimension"""
return "Number_of_points"
[docs] def open(self, listmembers):
"""
Opens simulation files
:param listmembers: list of ensemble members (filenames)
:type listmembers: list
:ivar inddeterministic: index of the deterministic member
:vartype inddeterministic: int
"""
print(listmembers)
for m, member in enumerate(listmembers):
p = prosimu(member)
if m == 0:
self.nech = p.getlendim("time")
ntime = p.getlendim("time")
if ntime == self.nech:
self.simufiles.append(p)
if 'mb035' in member:
self.inddeterministic = len(self.simufiles) - 1
self.nmembers = len(self.simufiles)
print(self.nmembers)
self.time = self.simufiles[0].readtime()
self.indpoints = self.select_points()
self.npoints = self.get_npoints()
[docs] def select_points(self):
"""
Get a list of spatial indices from the spatial dimension
length in the first simulation file.
:return: list of spatial points (indices)
:rtype: list
"""
indpoints = range(0, self.simufiles[0].getlendim(self.spatialdim))
return indpoints
[docs] def get_npoints(self):
"""
Get the number of spatial points.
:return: total number of spatial points
:rtype: int
"""
if isinstance(self.indpoints, tuple):
npoints = 0
for indpoints in self.indpoints:
npoints += len(indpoints)
else:
npoints = len(self.indpoints)
return npoints
[docs] def read(self, varname):
"""
Read a variable and store it in :py:attr:`~.ensemble`
:param varname: name of the variable to be read
(corresponds to the variable name of the simulation NetCDF files)
:type varname: str
"""
self.ensemble[varname] = np.empty([self.nech, self.npoints, self.nmembers])
kwargs = dict()
for m, member in enumerate(self.simufiles):
# print("read " + varname + " for member" + str(m))
import datetime
before = datetime.datetime.today()
if isinstance(self.indpoints, tuple):
sections = []
for indpoints in self.indpoints:
kwargs[self.spatialdim] = indpoints
sections.append(member.read_var(varname, **kwargs))
self.ensemble[varname][:, :, m] = np.concatenate(tuple(sections), axis=1)
else:
kwargs[self.spatialdim] = self.indpoints
self.ensemble[varname][:, :, m] = member.read_var(varname, **kwargs)
after = datetime.datetime.today()
# print(after - before)
# Verrues (à éviter)
if varname == 'NAT_LEV':
self.ensemble[varname][:, :, :] = np.where(self.ensemble[varname] == 6.,
0., self.ensemble[varname])
[docs] def read_geovar(self, varname):
"""
Read a variable from the first simulation file.
:param varname: NetCDF variable name of the variable to read
:type varname: str
:return: data read
:rtype: numpy array
"""
kwargs = dict()
if isinstance(self.indpoints, tuple):
sections = []
for indpoints in self.indpoints:
kwargs[self.spatialdim] = indpoints
sections.append(self.simufiles[0].read_var(varname, **kwargs))
return np.concatenate(tuple(sections))
else:
kwargs[self.spatialdim] = self.indpoints
return self.simufiles[0].read_var(varname, **kwargs)
[docs] def probability(self, varname, seuilinf=-999999999., seuilsup=999999999.):
"""
Calculates probability as the proportion of ensemble members with
values larger than :py:attr:`seuilinf`
and smaller than :py:attr:`seuilsup`.
:param varname: name of the variable for which to calculate the probability
:type varname: str
:param seuilinf: lower threshold
:param seuilsup: upper threshold
:return: probability field (or np.nan if the ensemble is not defined)
:rtype: numpy array
"""
if varname not in self.ensemble.keys():
self.read(varname)
condition = (self.ensemble[varname] > seuilinf) & (self.ensemble[varname] < seuilsup)
probability = np.sum(condition, axis=2) / (1. * self.nmembers)
return np.where(np.isnan(self.ensemble[varname][:, :, 0]), np.nan, probability)
# On renvoit des nan quand ensemble n'est pas défini
[docs] def emos_csg_nonorm_allstations_newsnow(self, varname, level):
"""
Applies Censored Shifted Gamma (CSG) EMOS trained for 24h new snow variable and all stations in the Alps without
normalisation (internship Benoit Gacon 2024)
to variable "varname" and returns the quantiles given in "levels" of the CSG distribution.
:param varname: variable name
:type varname: str
:param level: quantiles (range 0 to 100)
:type level: array
:return: quantiles of predictive distribution
:rtype: np.array
"""
if varname not in self.ensemble.keys():
self.read(varname)
csgmean, csgstd, csgdelta = self.get_csg_predictive_dist_nonorm_allstations(varname)
level=level/100.
quantile = self.quantiles_CSGD(csgmean, csgstd, csgdelta, level)
return quantile
[docs] def get_csg_predictive_dist_nonorm_allstations(self, varname):
"""
get the censored shifted gamma regression coefficients and climatological
paramters for each lead time and apply them to the ensemble forecast
and return the parameters mu, sigma and delta of the predictive censored shifted gamma distribution.
:param varname: variable name
:return: mu, sigma, delta of the predictive CSG distribution
"""
# filename = os.path.join(SNOWTOOLS_DATA, "emos_2000_2022_212_par_nonorm.Rdata")
# robj = robjects.r.load(filename) # pylint: disable=possibly-unused-variable
# reg_coef = np.array(robjects.r[robj[0]])
# clim_par = np.array(robjects.r[robj[1]])
reg_coef = pd.read_csv(os.path.join(SNOWTOOLS_DATA, "matEmosPars.csv"), index_col=0).values
clim_par = pd.read_csv(os.path.join(SNOWTOOLS_DATA, "matEmosParsClim.csv"), index_col=0).values
ndays_leadtime = 4
ntime = len(self.time)
list_indtime = []
delta = self.time - self.time[0].replace(hour=6)
for day_leadtime in range(1, ndays_leadtime + 1):
list_indtime.append(np.where(
(delta <= datetime.timedelta(days=day_leadtime)) &
(delta > datetime.timedelta(days=day_leadtime - 1)))[0])
# Extract regression parameters
a1, a2, a3, a4, b1 = np.empty((5, ntime, self.npoints))
a1.fill(np.nan)
a2.fill(np.nan)
a3.fill(np.nan)
a4.fill(np.nan)
b1.fill(np.nan)
muclim, sigmaclim, deltaclim = np.empty((3, ntime, self.npoints))
muclim.fill(np.nan)
sigmaclim.fill(np.nan)
deltaclim.fill(np.nan)
for leadtime in range(0, ndays_leadtime):
#cst_a1, cst_a2, cst_a3, cst_a4, cst_b1 = reg_coef[leadtime, 0, 1, 0:5]
#cst_muclim, cst_sigmaclim, cst_deltaclim = clim_par[leadtime, 0, 1, :]
cst_a1, cst_a2, cst_a3, cst_a4, cst_b1 = reg_coef[leadtime, 0:5]
cst_muclim, cst_sigmaclim, cst_deltaclim = clim_par[leadtime, :]
begin = list_indtime[leadtime][0]
end = list_indtime[leadtime][-1]
a1[slice(begin, end + 1), :] = cst_a1
a2[slice(begin, end + 1), :] = cst_a2
a3[slice(begin, end + 1), :] = cst_a3
a4[slice(begin, end + 1), :] = cst_a4
b1[slice(begin, end + 1), :] = cst_b1
muclim[slice(begin, end + 1), :] = cst_muclim
sigmaclim[slice(begin, end + 1), :] = cst_sigmaclim
deltaclim[slice(begin, end + 1), :] = cst_deltaclim
# Extract raw ensemble predictors
ensmean = self.mean(varname)
ensPOP = self.probability(varname, seuilinf=1.E-6)
# Compute CSGD parameters with regression laws
csgmean = self.csg_mean_nonorm(a1, a2, a3, a4, ensmean, ensPOP)
csgstd = self.csg_std_nonorm(b1, muclim, sigmaclim, csgmean)
csgdelta = deltaclim
return csgmean, csgstd, csgdelta
[docs] def csg_mean_nonorm(self, a1, a2, a3, a4, ensmean, ensPOP):
"""
Calculate mu of predictive censored shifted gamma distribution from regression coefficients alpha 1 to
alpha 4, ensemble mean and probability of non-zero values of the ensemble.
:param a1: regression coefficient alpha 1
:type a1: float
:param a2: regression coefficient alpha 2
:type a2: float
:param a3: regression coefficient alpha 3
:type a3: float
:param a4: regression coefficient alpha 4
:type a4: float
:param ensmean: ensemble mean
:type ensmean: float
:param ensPOP: probability of non-zero values of the ensemble [0., 1.]
:type ensPOP: float
:return: mu of predictive censored shifted gamma distribution
:rtype: float
"""
return np.log1p(np.expm1(a1) * (a2 + a3 * ensPOP + a4 * ensmean)) / a1
[docs] def csg_std_nonorm(self, b1, muclim, sigmaclim, csgmean):
"""
Calculate sigma of predictive censored shifted gamma distribution from regression coefficients
beta 1, climatological mu and sigma and csg mu parameter.
:param b1: regression coefficient beta 1
:type b1: float
:param muclim: climatological mu
:type muclim: float
:param sigmaclim: climatological sigma
:type sigmaclim: float
:param csgmean: csg mu parameter
:type csgmean: float
:return: sigma of predictive censored shifted gamma distribution
:rtype: float
"""
return b1 * sigmaclim * np.sqrt(csgmean / muclim)
[docs] def quantiles_CSGD(self, csgmean, csgstd, csgdelta, quantiles):
"""
get quantiles of the censored shifted gamma (CSG) distribution with given mu, sigma and delta.
:param csgmean: mu parameter of CSG
:type csgmean: array
:param csgstd: sigma parameter of CSG
:type csgstd: array
:param csgdelta: delta parameter of CSG
:type csgdelta: array
:param quantiles: quantiles wanted
:type quantiles: array of floats
:return: array of quantiles
:rtype: array of same length as quantiles.
"""
vectorppf = np.vectorize(gamma.ppf, signature='(),(),()->(n)', excluded='q')
tmp = vectorppf(q=quantiles, a=(csgmean / csgstd) ** 2, scale=(csgstd ** 2) / csgmean, loc=csgdelta)
# print(tmp)
# return np.where(tmp >= 0, tmp, 0)
return np.where(tmp < 0., 0., tmp)
[docs] def quantile(self, varname, level):
"""
Calculates ensemble percentiles for a given variable and given percentile levels.
:param varname: Variable name
:type varname: str
:param level: list of percentiles to calculate
:type level: list of numbers between 0 and 100
:return: array of percentiles
:rtype: numpy array
"""
if varname not in self.ensemble.keys():
self.read(varname)
quantile = np.where(np.isnan(self.ensemble[varname][:, :, 0]), np.nan,
np.percentile(self.ensemble[varname], level, axis=2))
return quantile
[docs] def mean(self, varname):
"""
Calculate ensemble mean for a given variable.
:param varname: variable name
:type varname: str
:return: ensemble mean
:rtype: numpy array
"""
if varname not in self.ensemble.keys():
self.read(varname)
return np.nanmean(self.ensemble[varname], axis=2)
[docs] def spread(self, varname):
"""
Calculate ensemble spread (standard deviation) for a given variable.
:param varname: variable name
:type varname: str
:return: ensemble spread
:rtype: numpy array
:rtype: numpy array
"""
if varname not in self.ensemble.keys():
self.read(varname)
return np.nanstd(self.ensemble[varname], axis=2)
[docs] def close(self):
"""
Close simulation files and remove data from :py:attr:`ensemble`.
"""
for member in self.simufiles:
member.close()
self.ensemble.clear()
[docs] def get_alti(self):
"""
Get altitude variable from the first simulation file.
:return: altitude variable
:rtype: numpy array
"""
if not hasattr(self, "alti"):
self.alti = self.read_geovar("ZS")
return self.alti
[docs] def get_aspect(self):
"""
Get aspect variable from the first simulation file.
:return: aspect variable
:rtype: numpy array
"""
if not hasattr(self, "aspect"):
self.aspect = self.read_geovar("aspect")
return self.aspect
[docs]class _EnsembleMassif(Ensemble):
"""
Metaclass for ensemble simulations on a massif geometry (SAFRAN like).
:ivar InfoMassifs: Information of Massifs
"""
InfoMassifs = infomassifs()
@property
def geo(self):
"""
Geometry
:return: "massifs"
"""
return "massifs"
[docs] def read(self, varname):
"""
Read data for a given variable name into the :py:attr:`ensemble` instance variable.
:param varname: variable name
:type varname: str
"""
if varname == 'naturalIndex':
nmassifs = len(self.get_massifvar())
self.ensemble[varname] = np.empty([self.nech, nmassifs, self.nmembers])
for m, member in enumerate(self.simufiles):
self.ensemble[varname][:, :, m] = member.read_var(varname)
else:
super(_EnsembleMassif, self).read(varname)
[docs] def get_massifdim(self):
"""
Read massif_num variable from the first simulation file.
:return: massif numbers
:rtype: numpy array
"""
if not hasattr(self, "massifdim"):
self.massifdim = self.read_geovar("massif_num")
return self.massifdim
[docs] def get_massifvar(self):
"""
Read "massif" variable from the first simulation file.
:return: massif numbers
:rtype: numpy array
"""
if not hasattr(self, "massifvar"):
self.massifvar = self.simufiles[0].read_var("massif")
return self.massifvar
[docs] def build_filename(self, massif, alti):
"""
Construct a filename from massif and altitude information.
:param massif: a massif number
:param alti: an altitude level
:return: filename
:rtype: str
"""
filename = str(massif)
if alti:
filename += "_" + str(int(alti))
return filename
[docs] def build_title(self, massif, alti):
"""
Construct a figure title from massif and altitude information.
:param massif: a massif number
:param alti: an altitude level
:return: a title
:rtype: unicode str
"""
title = self.InfoMassifs.getMassifName(massif) # type unicode
if alti:
title += " {:d}m".format(int(alti))
return title # matplotlib needs unicode
[docs]class EnsembleFlatMassif(_EnsembleMassif):
"""
Class for ensemble simulations on a massif geometry (SAFRAN like)
where all data points are considered
on flat terrain (zero slope and no orientation information).
"""
[docs] def select_points(self):
"""
Select spatial indices from the first simulation file
where aspect=-1 (zero slope, no orientation information).
:return: spatial indices to be read from simulation files
:rtype: numpy boolean array
"""
return self.simufiles[0].get_points(aspect=-1)
[docs]class EnsembleNorthSouthMassif(_EnsembleMassif):
"""
Class for ensemble simulations on a massif geometry (SAFRAN like)
where data points are considered at a
slope of 40 degrees and two orientations: North and South.
"""
[docs] def select_points(self):
"""
Select spatial indices from the first simulation file
where slope=40 and aspect either 0 or 180.
:return: spatial indices to be read from simulation files
:rtype: a tuple of numpy boolean array, where the first
component corresponds to the Northern orientation
and the second to the Southern one.
"""
# return np.sort(np.concatenate((self.simufiles[0].get_points(aspect = 0, slope = 40),
# self.simufiles[0].get_points(aspect = 180, slope = 40))))
# TAKE CARE : It is extremely more efficient to read regular sections of the netcdf files
return (self.simufiles[0].get_points(aspect=0, slope=40),
self.simufiles[0].get_points(aspect=180, slope=40))
[docs]class EnsembleMassifPoint(_EnsembleMassif):
"""
Class for extracting one specific point in massif geometry
"""
[docs] def __init__(self, massif_num, alti, aspect, slope):
self.massif_num = massif_num
self.alti = alti
self.aspect = aspect
self.slope = slope
super(EnsembleMassifPoint, self).__init__()
[docs] def select_points(self):
"""Select index of the corresponding point"""
return self.simufiles[0].get_points(massif_num=self.massif_num,
aspect=self.aspect, slope=self.slope,
ZS=self.alti)
[docs]class EnsembleStation(Ensemble):
"""
Class for ensemble simulations at station locations.
:ivar InfoMassifs: Information on the Massif the station is situated.
"""
InfoMassifs = infomassifs()
@property
def geo(self):
"""
Geometry information.
:return: "stations"
"""
return "stations"
[docs] def get_station(self):
"""
Read station numbers from the first simulation file.
:return: station numbers
:rtype: numpy array
"""
return self.simufiles[0].read_var("station", Number_of_points=self.indpoints)
[docs] def build_filename(self, station, alti):
"""
Construct a filename from a station number
:param station: station number
:param alti: altitude level (not used)
:return: filename
:rtype: station number as 8digit integer.
"""
return '%08d' % station
[docs] def build_title(self, station, alti):
"""
Construct a title from a station number and altitude level.
:param station: station number
:param alti: station altitude
:return: title string composed of the station name and the altitude.
:rtype: unicode str
"""
# nameposte gives unicode
# matplotlib expects unicode
return self.InfoMassifs.nameposte(station) + " %d m" % int(alti)
[docs]class EnsembleDiags(Ensemble):
"""
Probabilities and quantiles from ensembles.
"""
[docs] def __init__(self):
"""
"""
self.proba = {} #: probabilities
self.quantiles = {} #: percentiles
super(EnsembleDiags, self).__init__()
[docs] def diags(self, list_var, list_quantiles, list_seuils):
"""
Calculate quantiles and exceedance probabilities
for a list of variables, quantiles and thresholds.
The quantiles are stored in :py:attr:`~.quantiles`
and the probabilities in :py:attr:`~.proba`
:param list_var: list of variables
:type list_var: iterable
:param list_quantiles: list of percentiles to be calculated
:type list_quantiles: iterable
:param list_seuils: list of thresholds for each variable
:type list_seuils: dict of iterables with variable names as keys
"""
for var in list_var:
if var in list_seuils.keys():
for seuil in list_seuils[var]:
self.proba[(var, seuil)] = self.probability(var, seuilinf=seuil)
for var in list_var:
self.quantiles[var] = []
for quantile in list_quantiles:
print("Compute quantile " + str(quantile) + " for variable " + var)
self.quantiles[var].append(self.quantile(var, quantile))
[docs] def close(self):
"""
Close simulation files and remove data from
:py:attr:`~.plots.pearps2m.postprocess.Ensemble.ensemble`,
:py:attr:`.proba` and :py:attr:`.quantiles`
"""
super(EnsembleDiags, self).close()
self.proba.clear()
self.quantiles.clear()
[docs]@echecker_pyproj.disabled_if_unavailable(version='2.0.0')
class EnsembleOperDiags(EnsembleDiags):
"""
Class for operationally used plots.
"""
#: plot format
formatplot = 'png'
#: dict of plot attributes for each variable
attributes = dict(
PP_SD_1DY_ISBA=dict(convert_unit=1., forcemin=0., forcemax=60.,
palette='YlGnBu', seuiltext=50.,
label=u'Epaisseur de neige fraîche en 24h (cm)'),
SD_1DY_ISBA=dict(convert_unit=100., forcemin=0., forcemax=60.,
palette='YlGnBu', seuiltext=50.,
label=u'Epaisseur de neige fraîche en 24h (cm)'),
SD_3DY_ISBA=dict(convert_unit=100., forcemin=0., forcemax=60.,
palette='YlGnBu', seuiltext=50.,
label=u'Epaisseur de neige fraîche en 72h (cm)'),
RAMSOND_ISBA=dict(convert_unit=100., forcemin=0., forcemax=60.,
palette='YlGnBu', seuiltext=50.,
label=u'Epaisseur mobilisable (cm)'),
NAT_LEV=dict(forcemin=-0.5, forcemax=5.5, palette='YlOrRd',
ncolors=6, label=u'Risque naturel',
ticks=[u'Très faible', u'Faible', u'Mod. A',
u'Mod. D', u'Fort', u'Très fort']),
naturalIndex=dict(forcemin=0., forcemax=8., palette='YlOrRd',
label=u'Indice de risque naturel', format='%.1f',
nolevel=True),
DSN_T_ISBA=dict(convert_unit=100., label=u'Hauteur de neige (cm)'),
WSN_T_ISBA=dict(label=u'Equivalent en eau (kg/m2)'),
SNOMLT_ISBA=dict(convert_unit=3. * 3600., forcemin=0.,
forcemax=60., palette='YlGnBu', seuiltext=50.,
label=u'Ecoulement en 3h (kg/m2/3h)'),
WET_TH_ISBA=dict(convert_unit=100., forcemin=0.,
forcemax=60., palette='YlGnBu', seuiltext=50.,
label=u'Epaisseur humide (cm)'),
REFRZTH_ISBA=dict(convert_unit=100., forcemin=0.,
forcemax=60., palette='YlGnBu', seuiltext=50.,
label=u'Epaisseur regelée (cm)'),
RAINF_ISBA=dict(convert_unit=3. * 3600., forcemin=0.,
forcemax=60., palette='YlGnBu', seuiltext=50.,
label=u'Pluie en 3h (kg/m2/3h)'),
)
#: list of quantiles
list_q = [20, 50, 80]
[docs] def __init__(self):
super(EnsembleOperDiags, self).__init__()
[docs] def alldiags(self):
"""
Calculate percentiles for all variables in :py:attr:`list_var_spag` and :py:attr:`list_var_map`.
"""
super(EnsembleOperDiags, self).diags(set(self.list_var_spag + self.list_var_map), self.list_q, {})
[docs] def pack_spaghettis(self, suptitle, diroutput = "."):
"""
Produce spaghetti plots for all variables in :py:attr:`list_var_spag`.
:param suptitle: Suptitle for all plots.
:type suptitle: unicode string
:param diroutput: directory to save the plots
"""
for var in self.list_var_spag:
if 'nolevel' not in self.attributes[var].keys():
self.attributes[var]['nolevel'] = False
list_filenames, list_titles = self.get_metadata(nolevel=self.attributes[var]['nolevel'])
if hasattr(self, 'inddeterministic'):
# print('has inddet')
s = spaghettis_with_det(self.time)
else:
# print('no inddet')
s = spaghettis(self.time)
settings = self.attributes[var].copy()
if 'label' in self.attributes[var].keys():
settings['ylabel'] = self.attributes[var]['label']
npoints = self.quantiles[var][0][0, :].shape[0]
for point in range(0, npoints):
if 'convert_unit' in self.attributes[var].keys():
allmembers = self.ensemble[var][:, point, :] \
* self.attributes[var]['convert_unit']
qmin = self.quantiles[var][0][:, point] * self.attributes[var]['convert_unit']
qmed = self.quantiles[var][1][:, point] * self.attributes[var]['convert_unit']
qmax = self.quantiles[var][2][:, point] * self.attributes[var]['convert_unit']
else:
allmembers = self.ensemble[var][:, point, :]
qmin = self.quantiles[var][0][:, point]
qmed = self.quantiles[var][1][:, point]
qmax = self.quantiles[var][2][:, point]
if hasattr(self, 'inddeterministic'):
s.draw(self.time, allmembers, qmin, qmed, qmax, deterministic=allmembers[:, self.inddeterministic],
**settings)
else:
s.draw(self.time, allmembers, qmin, qmed, qmax, **settings)
s.set_title(list_titles[point])
s.set_suptitle(suptitle)
s.addlogo()
plotname = diroutput + "/" + var + "_" + list_filenames[point] + "." + self.formatplot
s.save(plotname, formatout=self.formatplot)
print(plotname + " is available.")
s.close()
[docs] def pack_spaghettis_multipoints(self, list_pairs, suptitle, diroutput=".", **kwargs):
"""
Produce spaghetti plots with up to four points per plot for variables in :py:attr:`list_var_spag_2points`.
Each point gets a different color.
:param list_pairs: list of indices to plot together on each plot
:type list_pairs: list of list
:param suptitle: common suptitle for all plots
:type suptitle: unicode str
:param diroutput: output directory
:param kwargs: arguments to be passed to plotting routines, notably "labels"
"""
list_colors = ['blue', 'red', 'green', 'orange']
for var in self.list_var_spag_2points:
if 'nolevel' not in self.attributes[var].keys():
self.attributes[var]['nolevel'] = False
list_filenames, list_titles = self.get_metadata(nolevel = self.attributes[var]['nolevel'])
if hasattr(self, 'inddeterministic'):
s = spaghettis_with_det(self.time)
else:
s = spaghettis(self.time)
settings = self.attributes[var].copy()
if 'label' in self.attributes[var].keys():
settings['ylabel'] = self.attributes[var]['label']
for pair in list_pairs:
for p, point in enumerate(pair):
if 'convert_unit' in self.attributes[var].keys():
allmembers = self.ensemble[var][:, point, :] \
* self.attributes[var]['convert_unit']
qmin = self.quantiles[var][0][:, point] \
* self.attributes[var]['convert_unit']
qmed = self.quantiles[var][1][:, point] \
* self.attributes[var]['convert_unit']
qmax = self.quantiles[var][2][:, point] \
* self.attributes[var]['convert_unit']
else:
allmembers = self.ensemble[var][:, point, :]
qmin = self.quantiles[var][0][:, point]
qmed = self.quantiles[var][1][:, point]
qmax = self.quantiles[var][2][:, point]
settings['colorquantiles'] = list_colors[p]
settings['colormembers'] = list_colors[p]
if 'labels' in kwargs.keys():
settings['commonlabel'] = kwargs['labels'][p]
if hasattr(self, 'inddeterministic'):
s.draw(self.time, allmembers, qmin, qmed, qmax,
deterministic=allmembers[:, self.inddeterministic], **settings)
else:
s.draw(self.time, allmembers, qmin, qmed, qmax, **settings)
s.set_title(list_titles[point])
s.set_suptitle(suptitle)
s.addlogo()
plotname = diroutput + "/" + var + "_" + list_filenames[point] + "." + self.formatplot
s.save(plotname, formatout=self.formatplot)
print(plotname + " is available.")
s.close()
[docs]@echecker.disabled_if_unavailable
@echecker_pyproj.disabled_if_unavailable(version='2.0.0')
class EnsembleOperDiagsFlatMassif(EnsembleOperDiags, EnsembleFlatMassif):
"""
Class for operationally plotting maps and spaghetti plots for
ensembles with massif geometry and for the zero slope case.
"""
#: maximum height level to be treated
levelmax = 3900
#: minimum heigth level to be treated
levelmin = 0
#: list of variables to plot maps for
list_var_map = ['naturalIndex', 'SD_1DY_ISBA', 'SD_3DY_ISBA', 'SNOMLT_ISBA']
#: list of variables to do spaghetti plots for
list_var_spag = ['naturalIndex', 'DSN_T_ISBA', 'WSN_T_ISBA', 'SNOMLT_ISBA']
# list_var_spag = ['DSN_T_ISBA', 'WSN_T_ISBA', 'SNOMLT_ISBA']
[docs] def pack_maps(self, domain, suptitle, diroutput="."):
"""
Produce maps for the variables given in :py:attr:`list_var_map`
over the regions given in :py:attr:`domain`
for each available altitude level and time step.
Each massif is colored corresponding to the second percentile value defined in
:py:attr:`.list_q` and
the color map defined in :py:attr:`.attributes`.
At the center of each massif the values corresponding to
the first three
percentiles in :py:attr:`.list_q` are marked.
:param domain: list of region identifiers (e.g., "alp", "pyr", "cor")
:param suptitle: common suptitle for all plots
:type suptitle: str
:param diroutput: output directory to save the maps
"""
from snowtools.plots.maps.cartopy_massifs import Map_alpes, Map_pyrenees, Map_corse, Map_vosges, Map_jura, \
Map_central
map_generic = dict(alp=Map_alpes, pyr=Map_pyrenees, cor=Map_corse, jur=Map_jura, mac=Map_central,
vog=Map_vosges)
alti = self.get_alti()
list_alti = list(set(alti))
m = map_generic[domain[0:3]]()
for var in self.list_var_map:
m.init_massifs(**self.attributes[var])
m.addlogo()
if 'nolevel' not in self.attributes[var].keys():
self.attributes[var]['nolevel'] = False
if self.attributes[var]['nolevel']:
list_loop_alti = [0]
massif = self.get_massifvar()
indalti = np.ones_like(self.quantiles[var][0][0, :], dtype=bool)
else:
list_loop_alti = list_alti[:]
for level in list_loop_alti:
if level < self.levelmin or level > self.levelmax:
list_loop_alti.remove(level)
massif = self.get_massifdim()
for level in list_loop_alti:
if not self.attributes[var]['nolevel']:
indalti = alti == level
for t in range(0, self.nech):
qmin = self.quantiles[var][0][t, indalti]
qmed = self.quantiles[var][1][t, indalti]
qmax = self.quantiles[var][2][t, indalti]
m.draw_massifs(massif[indalti], qmed, **self.attributes[var])
m.plot_center_massif(massif[indalti], qmin, qmed, qmax, **self.attributes[var])
title = "pour le " + pretty_date(self.time[t])
if not self.attributes[var]['nolevel']:
title += " - Altitude : " + str(int(level)) + "m"
m.set_title(title)
m.set_suptitle(suptitle)
ech = self.time[t] - self.time[0] + self.time[1] - self.time[0]
ech_str = '+%02d' % (ech.days * 24 + ech.seconds / 3600)
plotname = diroutput + "/" + domain[0:3] + "_" + var + "_" + str(int(level)) + ech_str + "." + self.formatplot
m.save(plotname, formatout=self.formatplot)
print(plotname + " is available.")
m.reset_massifs(rmcbar=False)
m.reset_massifs()
[docs]@echecker.disabled_if_unavailable
@echecker_pyproj.disabled_if_unavailable(version='2.0.0')
class EnsembleOperDiagsNorthSouthMassif(EnsembleOperDiags, EnsembleNorthSouthMassif):
"""
Class for operationally plot maps and spaghetti plots
distinguishing between northern and southern orientation
at each massif.
"""
#: maximum height level to plot
levelmax = 3900
#: minimum height level to plot
levelmin = 0
#: labels for the two orientations
versants = [u'Nord 40°', u'Sud 40°']
#: variable list for standard spaghetti plots
list_var_spag = []
#: variable list for spaghetti plots combining northern and southern orientation
list_var_spag_2points = ['RAMSOND_ISBA', 'NAT_LEV', 'WET_TH_ISBA', 'REFRZTH_ISBA']
#: variable list for producing maps
list_var_map = ['RAMSOND_ISBA', 'NAT_LEV', 'WET_TH_ISBA', 'REFRZTH_ISBA']
#: ensemble data
ensemble = {}
[docs] def alldiags(self):
"""
Calculate percentiles for all variables in :py:attr:`.list_var_spag_2points` and :py:attr:`.list_var_map`.
"""
super(EnsembleOperDiagsNorthSouthMassif, self).diags(set(self.list_var_spag_2points + self.list_var_map),
self.list_q, {})
[docs] def get_pairs_ns(self):
"""
Get for each massif and altitude the pair of indices corresponding to
values for the northern and southern slopes.
:return: indices corresponding to northern and southern slopes
:rtype: list of lists
"""
alti = self.get_alti()
aspect = np.array([int(asp) for asp in self.get_aspect()])
massif = self.get_massifdim()
if not hasattr(self, 'list_pairs'):
self.list_pairs = []
for point in range(0, np.shape(alti)[0]):
if aspect[point] == 0:
indsouth = np.where((alti == alti[point]) & (aspect == 180) & (massif == massif[point]))
if len(indsouth) == 1:
self.list_pairs.append([point, indsouth[0][0]])
return self.list_pairs
[docs] def pack_spaghettis_ns(self, suptitle, diroutput="."):
"""
Do spaghetti plots with values for the northern and southern slope on the same plot.
:param suptitle: common suptitle for all plots
:param diroutput: directory to save the plots
"""
list_pairs = self.get_pairs_ns()
super(EnsembleOperDiagsNorthSouthMassif, self).pack_spaghettis_multipoints(list_pairs, suptitle, diroutput,
labels=self.versants)
[docs] def pack_maps(self, domain, suptitle, diroutput):
"""
Produce maps for the variables given in :py:attr:`.list_var_map`
over the regions given in :py:attr:`.domain`
for each available altitude level and time step.
For each massif a table with background colors and values corresponding to the
percentiles in :py:attr:`.list_q` and the color map defined in
:py:attr:`.attributes` is plotted near the center of each massif.
:param domain: list of region identifiers (e.g., "alp", "pyr", "cor")
:param suptitle: common suptitle for all plots
:type suptitle: str
:param diroutput: output directory to save the maps
"""
from snowtools.plots.maps.cartopy_massifs import Map_alpes, Map_pyrenees, Map_corse, Map_vosges, Map_jura, \
Map_central
map_generic = dict(alp=Map_alpes, pyr=Map_pyrenees, cor=Map_corse, jur=Map_jura, vog=Map_vosges,
mac=Map_central)
list_pairs = self.get_pairs_ns() # pylint: disable=possibly-unused-variable
alti = self.get_alti()
aspect = self.get_aspect()
list_alti = list(set(alti))
m = map_generic[domain[0:3]]()
for var in self.list_var_map:
m.init_massifs(**self.attributes[var])
m.empty_massifs()
m.add_north_south_info()
m.addlogo()
list_loop_alti = list_alti[:]
for level in list_loop_alti:
if level < self.levelmin or level > self.levelmax:
list_loop_alti.remove(level)
massif = self.get_massifdim()
for level in list_loop_alti:
list_indalti = []
for plotaspect in [180, 0]: # du bas vers le haut
list_indalti.append((alti == level) & (aspect == plotaspect))
for t in range(0, self.nech):
list_values = []
for indalti in list_indalti:
for q, quantile in enumerate(self.list_q): # pylint: disable=possibly-unused-variable
list_values.append(self.quantiles[var][q][t, indalti])
# print(len(massif[indalti]))
m.rectangle_massif(massif[indalti], list_values, ncol=2, **self.attributes[var])
title = "pour le " + pretty_date(self.time[t])
title += " - Altitude : " + str(int(level)) + "m"
m.set_title(title)
m.set_suptitle(suptitle)
ech = self.time[t] - self.time[0] + self.time[1] - self.time[0]
ech_str = '+%02d' % (ech.days * 24 + ech.seconds / 3600)
plotname = diroutput + "/" + domain[0:3] + "_" + var + "_" + str(int(level)) + ech_str + "." + self.formatplot
m.save(plotname, formatout=self.formatplot)
print(plotname + " is available.")
m.reset_massifs(rmcbar=False, rminfobox=False)
m.reset_massifs()
[docs]@echecker.disabled_if_unavailable
@echecker_pyproj.disabled_if_unavailable(version='2.0.0')
class EnsembleOperDiagsStations(EnsembleOperDiags, EnsembleStation):
"""
Class for operationally plotting spaghetti plots for station based simulations.
"""
#: list of variables to plot maps for
list_var_map = []
#: list of variables to plot spaghetti plots for.
list_var_spag = ['DSN_T_ISBA', 'WSN_T_ISBA', 'RAMSOND_ISBA',
'WET_TH_ISBA', 'REFRZTH_ISBA', 'SNOMLT_ISBA']
[docs]class EnsemblePostproc(_EnsembleMassif):
"""
Class for ensemble post-processing.
"""
[docs] def __init__(self, variables, inputfiles, decile=np.arange(10, 100, 10), outdir='.',
outfilename='PRO_post.nc', emosmethod=None):
"""
:param variables: list of variables to process
:param inputfiles: list of input files
:param decile: list of percentiles
:param outdir: Output directory
:param outfilename: output file name
:param emosmethod: emos method to use. The method is supposed to take a variable name and the wanted quantiles as arguments.
:type outdir: str
"""
print(inputfiles)
super(EnsemblePostproc, self).__init__()
# self.ensemble = self #: ensemble data
self.variables = variables #: list of variables
#self.ensemble.open(inputfiles)
self.open(inputfiles)
#: output filename
self.outfile = os.path.join(outdir, outfilename)
# self.outfile = os.path.join(outdir, 'PRO_post_{0}_{1}.nc'.format(datebegin.ymdh, dateend.ymdh))
#: list of percentiles
self.decile = decile
if emosmethod:
if emosmethod == 'emos':
self.emosmethod = Ensemble.emos_csg_nonorm_allstations_newsnow
elif emosmethod == 'quantiles':
self.emosmethod = Ensemble.quantile
else:
raise Exception('EMOS method ' + emosmethod + ' not available')
self.flipaxis = False
else:
self.emosmethod = Ensemble.quantile
self.flipaxis = True
# TODO: to test postprocessing with CSG EMOS inside the vortex Four_Seasons_Task (s2m_postproc algo)
# without changes in vortex, comment the two lines above and uncomment the two lines below.
# self.emosmethod = Ensemble.emos_csg_nonorm_allstations_newsnow
# self.flipaxis = False
@property
def standardvars(self):
"""variables always written to the output file"""
return ['time', 'ZS', 'aspect', 'slope', 'massif_num', 'longitude', 'latitude']
[docs] def create_outfile(self):
"""
Create output data set.
:ivar outdataset: output data set
:vartype outdataset: :py:class:`utils.prosimu.prosimu`
"""
# if not os.path.isdir(self.outfile):
# print(self.outfile)
# raise DirNameException(self.outfile)
self.outdataset = prosimu(self.outfile, ncformat='NETCDF4_CLASSIC', openmode='w')
[docs] def init_outfile(self):
"""
Copy global attributes, dimensions and standard variables from the
first simulation file to the output file.
"""
# copy global attributes all at once via dictionary
self.outdataset.dataset.setncatts(self.simufiles[0].dataset.__dict__)
# copy dimensions
for name, dimension in self.simufiles[0].dataset.dimensions.items():
self.outdataset.dataset.createDimension(
name, (len(dimension) if not dimension.isunlimited() else None))
# print(self.outdataset.listdim())
# copy standard variables
for name, variable in self.simufiles[0].dataset.variables.items():
if name in self.standardvars:
fillval = self.simufiles[0].getfillvalue(name)
x = self.outdataset.dataset.createVariable(name, variable.datatype, variable.dimensions,
fill_value=fillval)
# copy variable attributes without _FillValue since this causes an error
for att in self.simufiles[0].listattr(name):
if att != '_FillValue':
# print(self.simufiles[0].getattr(name, att))
self.outdataset.dataset[name].setncatts({att: self.simufiles[0].getattr(name, att)})
self.outdataset.dataset[name][:] = self.simufiles[0].dataset[name][:]
# print('data copied')
# print(self.outdataset.listvar())
[docs] def postprocess(self):
"""
Do postprocessing
#) create output file
#) copy global attributes and standard variables to the output file
#) calculate deciles for the data variables and put them to the output file
#) close all input and output data sets.
Calls :py:meth:`.create_outfile`, :py:meth:`.init_outfile`,
:py:meth:`.deciles` and close methods for all data sets.
"""
self.create_outfile()
self.init_outfile()
print('init done')
# self.median()
# print('median done')
self.deciles()
self.outdataset.close()
self.close()
[docs] def deciles(self):
"""
Calculates percentiles given in :py:attr:`.decile` for variables in :py:attr:`.variables` and adds them
to the output data set including the corresponding dimension, coordinate variable and attributes applying the
method in :py:attr:`.emosmethod` to obtain the percentiles.
"""
# create decile dimension
self.outdataset.dataset.createDimension('decile', len(self.decile))
# create decile variable
x = self.outdataset.dataset.createVariable('decile', 'i4', 'decile')
self.outdataset.dataset['decile'][:] = self.decile[:]
atts = {'long_name': "Percentiles of the ensemble forecast"}
self.outdataset.dataset['decile'].setncatts(atts)
for name, variable in self.simufiles[0].dataset.variables.items():
if name in self.variables:
# calculate deciles
vardecile = self.emosmethod(self, varname=name, level=self.decile)
if self.flipaxis:
# get decile axis in the right place
vardecile = np.moveaxis(vardecile, 0, -1)
fillval = self.simufiles[0].getfillvalue(name)
x = self.outdataset.dataset.createVariable(name, variable.datatype, variable.dimensions + ('decile',),
fill_value=fillval)
# copy variable attributes all at once via dictionary, but without _FillValue
attdict = self.simufiles[0].dataset[name].__dict__
attdict.pop('_FillValue', None)
attdict['emos_method'] = str(self.emosmethod).split()[1]
self.outdataset.dataset[name].setncatts(attdict)
# print(vardecile.shape)
self.outdataset.dataset[name][:] = vardecile[:]
[docs] def median(self):
"""
Calculates the median for variables in :py:attr:`.variables` and adds the median
to the output data set including the corresponding attributes.
"""
print('entered median')
for name, variable in self.simufiles[0].dataset.variables.items():
if name in self.variables:
# print(name)
median = self.quantile(name, 50)
fillval = self.simufiles[0].getfillvalue(name)
x = self.outdataset.dataset.createVariable(name, variable.datatype, variable.dimensions,
fill_value=fillval)
# copy variable attributes all at once via dictionary, but without _FillValue
attdict = self.simufiles[0].dataset[name].__dict__
attdict.pop('_FillValue', None)
self.outdataset.dataset[name].setncatts(attdict)
print(self.outdataset.dataset[name][:].size)
print(median.size)
self.outdataset.dataset[name][:] = median[:]
[docs]class EnsemblePostprocStation(EnsemblePostproc):
"""
Class for ensemble postprocessing at station locations.
"""
@property
def standardvars(self):
"""variables always written to the output file"""
return ['time', 'ZS', 'aspect', 'slope', 'station', 'longitude', 'latitude']
[docs]class EnsembleHydro(EnsemblePostproc):
"""
Class to provide a synthesis of ensemble hydrological diagnostics of S2M
"""
@property
def spatialdim(self):
return 'basin'
@property
def standardvars(self):
return ['time', 'basin']
[docs]class PPQuantiles:
"""
Class for handling files containing postprocessed (quantile) forecasts.
"""
#: maximum height level to be treated
levelmax = 3900
#: minimum heigth level to be treated
levelmin = 0
#: list of variables to plot maps for
list_var_map = ['SD_1DY_ISBA', 'SD_12H_ISBA']
#: list of variables to do spaghetti plots for
list_var_spag = ['SD_1DY_ISBA', 'SD_12H_ISBA']
attributes = dict(
SD_12H_ISBA=dict(convert_unit=100., forcemin=0., forcemax=60.,
palette='YlGnBu', seuiltext=50.,
label=u'Epaisseur de neige fraîche en 12h (cm)'),
SD_1DY_ISBA=dict(convert_unit=100., forcemin=0., forcemax=60.,
palette='YlGnBu', seuiltext=50.,
label=u'Epaisseur de neige fraîche en 24h (cm)'))
[docs] def __init__(self, filename):
self.ps = prosimu(filename)
self.nech = self.ps.getlendim("time")
self.points = self.ps.get_points(aspect=-1)
self.massifs = self.ps.read('massif_num', selectpoint=self.points)
self.alti = self.ps.read('ZS', selectpoint=self.points)
self.formatplot = 'png'
self.deciles = self.ps.read('decile')
self.time = self.ps.readtime()
[docs] def quantilemaps(self, domain, suptitle, diroutput,
mapindex=np.array([False, True, False, False, True, False, False, True, False])):
"""
Plots maps of quantile forecasts. For example EMOS postprocessed quantiles.
:param domain: domain (region) string to determine which map class to use
:param suptitle: Plot title highest level
:param diroutput: Output directory for the plots
:param mapindex: array of bool indicating for with indices of the quantile dimension (last dimension of the
data array) maps should be drawn.
"""
from snowtools.plots.maps.cartopy_massifs import MultiMap_Alps, MultiMap_Pyr, MultiMap_Cor
map_generic = dict(alp=MultiMap_Alps, pyr=MultiMap_Pyr, cor=MultiMap_Cor)
if domain[0:3] == "pyr":
mm = map_generic[domain[0:3]](nrow=3, ncol=1, geofeatures=False, width=13, height=10)
mm.legendpos = [0.9, 0.13, 0.02, 0.6]
elif domain[0:3] == "cor":
mm = map_generic[domain[0:3]](nrow=1, ncol=3, geofeatures=False, width=15, height=7.5)
mm.legendpos = [0.9, 0.15, 0.03, 0.6]
else:
mm = map_generic[domain[0:3]](nrow=1, ncol=3, geofeatures=False, height=7.5)
titles = ["Percentile {0}".format(i) for i in self.deciles[mapindex]]
for var in self.list_var_map:
var_data = self.ps.read(var, selectpoint=self.points)
mm.init_massifs(**self.attributes[var])
mm.addlogo()
list_alti = list(set(self.alti))
list_loop_alti = list_alti[:]
for level in list_loop_alti:
if level < self.levelmin or level > self.levelmax:
list_loop_alti.remove(level)
for level in list_loop_alti:
indalti = self.alti == level
for t in range(0, self.nech):
mm.draw_massifs(self.massifs[indalti], var_data[t, indalti, :][:, mapindex], axis=1, **self.attributes[var])
mm.set_maptitle(titles)
mm.plot_center_massif(self.massifs[indalti], var_data[t, indalti, :][:, mapindex], axis=1,
**self.attributes[var])
mysuptitle = suptitle + "\n pour le " + pretty_date(self.time[t]) + " - Altitude : " + str(int(level)) + "m"
mm.set_suptitle(mysuptitle)
ech = self.time[t] - self.time[0] + self.time[1] - self.time[0]
ech_str = '+%02d' % (ech.days * 24 + ech.seconds / 3600)
plotname = diroutput + "/pp_quantiles_" + domain[0:3] + "_" + var + "_" + str(
int(level)) + ech_str + "." + self.formatplot
mm.save(plotname, formatout=self.formatplot)
print(plotname + " is available.")
mm.reset_massifs(rmcbar=False)
mm.empty_massifs()
mm.reset_massifs()
[docs] def spaghetti_plots(self, suptitle, diroutput = "."):
"""
Produce spaghetti plots for all variables in :py:attr:`list_var_spag`.
:param suptitle: Suptitle for all plots.
:type suptitle: unicode string
:param diroutput: directory to save the plots
"""
list_filenames = [_EnsembleMassif.build_filename(_EnsembleMassif(), mas, alt) for mas, alt in
zip(self.massifs, self.alti)]
list_titles = [_EnsembleMassif.build_title(_EnsembleMassif(), mas, alt) for mas, alt in
zip(self.massifs, self.alti)]
for var in self.list_var_spag:
s = spaghettis(self.time)
settings = self.attributes[var].copy()
if 'label' in self.attributes[var].keys():
settings['ylabel'] = self.attributes[var]['label']
var_data = self.ps.read(var, selectpoint=self.points)
if 'convert_unit' in self.attributes[var].keys():
var_data = var_data * self.attributes[var]['convert_unit']
npoints = var_data.shape[1]
for point in range(0, npoints):
s.draw(self.time, var_data[:, point, :], var_data[:, point, 1], var_data[:, point, 4],
var_data[:, point, 7], **settings)
s.set_title(list_titles[point])
s.set_suptitle(suptitle)
s.addlogo()
plotname = os.path.join(diroutput, "pp_quantiles_" + var + "_" + list_filenames[point] + "." + self.formatplot)
# plt.show()
s.save(plotname, formatout=self.formatplot)
print(plotname + " is available.")
s.close()
[docs]@echecker.disabled_if_unavailable
@echecker_pyproj.disabled_if_unavailable(version='2.0.0')
def pp_plots(c):
"""
downloads files with postprocessed quantiles and plots maps and spaghetti plots of them
:param c: config
"""
from snowtools.tasks.oper.get_oper_files import FutureS2MExtractor
os.chdir(c.diroutput)
S2ME = FutureS2MExtractor(c)
pp_files, pp_xpid = S2ME.get_pp_quantiles()
dict_chaine = defaultdict(str)
dict_chaine['OPER'] = ' (oper)'
dict_chaine['DBLE'] = ' (double)'
dict_chaine['MIRR'] = ' (miroir)'
dict_chaine['OPER@lafaysse'] = ' (dev)'
dict_chaine['nouveaux_guess@lafaysse'] = ' (dev)'
locale.setlocale(locale.LC_TIME, 'fr_FR.UTF-8')
list_domains = pp_files.keys()
print(list_domains)
for domain in ['alp', 'pyr', 'cor']: # list_domains: # ['alp_allslopes']: #
suptitle = u'Prévisions PEARP-S2M du ' + pretty_date(S2ME.conf.rundate)
# Identify the prevailing xpid in the obtained resources and adapt the title
count = Counter(pp_xpid[domain])
prevailing_xpid = count.most_common(1)[0][0]
suffixe_suptitle = dict_chaine[prevailing_xpid]
suptitle += suffixe_suptitle
data = PPQuantiles(pp_files[domain])
data.quantilemaps(domain, suptitle, diroutput=c.diroutput_maps)
data.spaghetti_plots(suptitle, diroutput=c.diroutput_plots)
[docs]@echecker.disabled_if_unavailable
@echecker_pyproj.disabled_if_unavailable(version='2.0.0')
def main(c):
"""
:param c: config
"""
# The following class has a vortex-dependence
# Should not import than above to avoid problems when importing the module from vortex
from snowtools.tasks.oper.get_oper_files import FutureS2MExtractor
os.chdir(c.diroutput)
# if c.dev:
# S2ME = FutureS2MExtractor(c)
# elif c.dble:
# S2ME = FutureS2MExtractor(c)
# elif c.reforecast:
# S2ME = FutureS2MExtractor(c)
# else:
S2ME = FutureS2MExtractor(c)
snow_members, snow_xpid = S2ME.get_snow()
dict_chaine = defaultdict(str)
dict_chaine['OPER'] = ' (oper)'
dict_chaine['DBLE'] = ' (double)'
dict_chaine['MIRR'] = ' (miroir)'
dict_chaine['OPER@lafaysse'] = ' (dev)'
dict_chaine['nouveaux_guess@lafaysse'] = ' (dev)'
# undefined xpid is possible because it is allowed by defaultdict
locale.setlocale(locale.LC_TIME, 'fr_FR.UTF-8')
list_domains = snow_members.keys()
print(list_domains)
for domain in list_domains: # ['alp_allslopes']: #
# S2ME.conf.rundate is a Date object --> strftime already calls decode method
suptitle = u'Prévisions PEARP-S2M du ' + pretty_date(S2ME.conf.rundate)
# Identify the prevailing xpid in the obtained resources and adapt the title
count = Counter(snow_xpid[domain])
print(count)
prevailing_xpid = count.most_common(1)[0][0]
suffixe_suptitle = dict_chaine[prevailing_xpid]
suptitle += suffixe_suptitle
if domain == 'postes':
E = EnsembleOperDiagsStations()
else:
E = EnsembleOperDiagsFlatMassif()
ENS = EnsembleOperDiagsNorthSouthMassif()
ENS.open(snow_members[domain])
print('number of member files', len(snow_members[domain]))
E.open(snow_members[domain])
print("domain " + domain + " npoints = " + str(E.npoints))
E.alldiags()
print('Diagnostics have been computed for the following variables :')
print(E.ensemble.keys())
E.pack_spaghettis(suptitle, diroutput=c.diroutput_plots)
if domain != 'postes':
E.pack_maps(domain, suptitle, diroutput=c.diroutput_maps)
ENS.alldiags()
print('Diagnostics have been computed for the following variables :')
print(ENS.ensemble.keys())
ENS.pack_maps(domain, suptitle, diroutput=c.diroutput_maps)
ENS.pack_spaghettis_ns(suptitle, diroutput=c.diroutput_plots)
ENS.close()
del ENS
print(E.list_var_spag)
print(E.list_var_map)
E.close()
del E
if __name__ == "__main__":
USAGE = "usage: python postprocess.py [-b YYYYMMDD] [-e YYYYMMDD] [-o diroutput]"
PARSER = argparse.ArgumentParser(description="Postprocess new snow heights: "
"1) extracts operational simulation results,"
"2) Plots maps for the Alps, the Pyrenees the Corse,"
"3) Creates spaghetti plots "
"for all massifs and stations")
PARSER.add_argument("-b", action="store", type=str, dest="datebegin", default=today().ymd,
help="First year of extraction")
PARSER.add_argument("-e", action="store", type=str, dest="dateend", default=today().ymd,
help="Last year of extraction")
PARSER.add_argument("-o", action="store", type=str, dest="diroutput",
default=os.path.join(LUSTRE_NOSAVE_USER_DIR, "PEARPS2M"),
help="Output directory")
PARSER.add_argument("--dev", action="store_true", dest="dev", default=False)
PARSER.add_argument("--reforecast", action="store_true", dest="reforecast", default=False)
PARSER.add_argument("--dble", action="store_true", dest="dble", default=False)
OPTIONS = PARSER.parse_args() # @UnusedVariable
c = Config(OPTIONS)
main(c)
if OPTIONS.dev:
pp_plots(c)