Source code for tools.change_forcing
# -*- coding: utf-8 -*-
"""
Created on 3 Aug. 2017
@author: lafaysse
"""
import os
import numpy as np
import datetime
import netCDF4
# For compatibility python 2 / python 3
from snowtools.utils.sun import sun
from snowtools.utils.prosimu import prosimu
from snowtools.utils.infomassifs import infomassifs
# Take care : exceptions have to been imported with snowtools prefix to be recognized by vortex
from snowtools.utils.FileException import (
FileNameException, DirNameException, VarWriteException,
GeometryException, MassifException, TimeListException)
from snowtools.utils.dates import TypeException
from snowtools.utils.resources import print_used_memory
from snowtools.utils.S2M_standard_file import StandardSAFRAN, StandardCROCUS
[docs]
class forcinput_tomerge:
"""
This class represents a group of forcing files which must be merged
in a single one through the ``Number_of_points`` dimension.
:param forcin: List of input files names
:type forcin: list
:param forcout: Name of merged output file
:type forcout: str
"""
printmemory = False
formatout = "NETCDF4_CLASSIC"
def __init__(self, forcin, forcout, *args, **kwargs):
"""Generic method to open and merge multiple forcing files"""
if type(forcin) is list:
forcin = list(map(str, forcin))
else:
raise TypeException(type(forcin), list)
init_forcing_file = []
for ficin in forcin:
if os.path.isfile(ficin):
init_forcing_file.append(prosimu(ficin))
else:
raise FileNameException(ficin)
self.checktime(init_forcing_file, forcin)
dirout = os.path.dirname(forcout)
if not (dirout == '' or os.path.isdir(dirout)):
raise DirNameException(dirout)
new_forcing_file = StandardSAFRAN(forcout, "w", format=self.formatout)
self.merge(init_forcing_file, new_forcing_file, args)
for openedfic in init_forcing_file:
openedfic.close()
new_forcing_file.GlobalAttributes(**kwargs)
new_forcing_file.add_standard_names()
new_forcing_file.close()
[docs]
def checktime(self, init_forcing_file, forcin):
"""Check time consistency of forcing files to merge.
:param init_forcing_file: Input files to merge object
:type init_forcing_file: :class:`utils.prosimu.prosimu`
:param forcin: file names
:type forcin: list
"""
dimtime = []
for unitfile in init_forcing_file:
dimtime.append(unitfile.getlendim("time"))
if len(set(dimtime)) > 1:
raise TimeListException(forcin, dimtime)
[docs]
def merge(self, init_forcing_file, new_forcing_file, *args):
"""Merge forcing files.
:param init_forcing_file: Input files to merge object
:type init_forcing_file: :class:`utils.prosimu.prosimu`
:param new_forcing_file: Output file object
:type new_forcing_file: :class:`utils.prosimu.prosimu`
"""
spatial_dim_name = "Number_of_points"
index_deb = 0
len_dim = 0
spatial_index = []
for unitfile in init_forcing_file:
dim_file = unitfile.getlendim(spatial_dim_name)
index_end = index_deb + dim_file - 1
spatial_index.append((index_deb, index_end))
len_dim += dim_file
index_deb = index_end + 1
new_forcing_file.createDimension("time", None)
new_forcing_file.createDimension(spatial_dim_name, len_dim)
dictdim = init_forcing_file[0].listdim()
del dictdim["time"]
del dictdim[spatial_dim_name]
for dimname, dim in dictdim.items():
print("Create dimension " + dimname + " " + str(len(dim)))
new_forcing_file.createDimension(dimname, len(dim))
time = init_forcing_file[0].readtime()
listvar = init_forcing_file[0].listvar()
savevar = {}
for varname in listvar:
print(varname)
if self.printmemory:
print_used_memory()
vartype, rank, array_dim, varFillvalue, var_attrs = init_forcing_file[0].infovar(varname)
if varname == "DIR_SWdown":
direct = new_forcing_file.createVariable(varname, vartype, array_dim, fill_value=varFillvalue)
var = direct
elif varname == "SCA_SWdown":
diffus = new_forcing_file.createVariable(varname, vartype, array_dim, fill_value=varFillvalue)
var = diffus
else:
var = new_forcing_file.createVariable(varname, vartype, array_dim, fill_value=varFillvalue)
for attname in var_attrs:
if not attname == '_FillValue':
setattr(var, attname, init_forcing_file[0].getattr(varname, attname))
if rank >= 1 and spatial_dim_name in array_dim:
for i, unitfile in enumerate(init_forcing_file):
var_array = unitfile.read(varname, keepfillvalue=True)
if varname in ["LAT", "LON", "aspect", "slope", "DIR_SWdown",
"SCA_SWdown", "massif_number", "station"]:
savevar[varname + str(i)] = var_array
if varname not in ["DIR_SWdown", "SCA_SWdown"]:
if rank == 1:
var[spatial_index[i][0]:spatial_index[i][1] + 1] = var_array[:]
elif rank == 2:
var[:, spatial_index[i][0]:spatial_index[i][1] + 1] = var_array[:, :]
else:
var[:] = unitfile.read(varname, keepfillvalue=True)[:]
for i, unitfile in enumerate(init_forcing_file):
direct_array, diffus_array = self.compute_solar_radiations(time, savevar, i)
direct[:, spatial_index[i][0]:spatial_index[i][1] + 1] = direct_array[:, :]
diffus[:, spatial_index[i][0]:spatial_index[i][1] + 1] = diffus_array[:, :]
def compute_solar_radiations(self, time, savevar, i):
return savevar["DIR_SWdown" + str(i)], savevar["SCA_SWdown" + str(i)]
[docs]
class forcinput_applymask(forcinput_tomerge):
"""
This class represents a group of forcing files which must be merged
in a single one through the ``Number_of_points`` dimension.
and for which incoming shortwave radiation must be corrected from shadows.
Or a single forcing file for which incoming shortwave radiation must be corrected from shadows.
"""
def compute_solar_radiations(self, time, savevar, i):
if "station" + str(i) in list(savevar.keys()):
INFO = infomassifs()
list_list_azim = []
list_list_mask = []
print("ACCOUNT FOR SURROUNDING MASKS FOR THE FOLLOWING STATIONS:")
for poste in map(self.stringstation, list(savevar["station" + str(i)])):
azim, mask = INFO.maskposte(poste)
list_list_azim.append(azim)
list_list_mask.append(mask)
if not azim == [0, 360]:
print(poste, INFO.nameposte(poste))
else:
list_list_azim = None
list_list_mask = None
return sun().slope_aspect_correction(savevar["DIR_SWdown" + str(i)], savevar["SCA_SWdown" + str(i)],
time, savevar["LAT" + str(i)], savevar["LON" + str(i)],
savevar["aspect" + str(i)], savevar["slope" + str(i)],
list_list_azim=list_list_azim, list_list_mask=list_list_mask,
lnosof_surfex=True)
def stringstation(self, station):
station = str(station)
if len(station) == 7:
station = "0" + station
return station
[docs]
class forcinput_tomodify:
"""This class represents a forcing file for which modifications are needed before being used as SURFEX input
Instanciation opens the initial forcing file to read and create the modified forcing file.
The :meth:`change_forcing.forcinput_tomodify.modify` must be defined in child classes.
:param forcin: Input file name
:type forcin: str
:param forcout: Output file name
:type forcout: str
"""
printmemory = False
formatout = "NETCDF4_CLASSIC"
def __init__(self, forcin, forcout, *args, **kwargs):
if type(forcin) is int:
forcin = str(forcin)
if not os.path.isfile(forcin):
raise FileNameException(forcin)
dirout = os.path.dirname(forcout)
if not (dirout == '' or os.path.isdir(dirout)):
raise DirNameException(dirout)
self.filename = forcin
if forcin == forcout:
init_forcing_file = prosimu(forcin, openmode='r+')
self.modify(init_forcing_file, init_forcing_file, args)
else:
init_forcing_file = prosimu(forcin)
print("INFO INPUT FORCING FILE FORMAT: " + init_forcing_file.format())
new_forcing_file = self.StandardFILE(forcout, "w", format=self.formatout)
self.modify(init_forcing_file, new_forcing_file, args)
init_forcing_file.close()
if forcin != forcout:
new_forcing_file.GlobalAttributes(**kwargs)
new_forcing_file.add_standard_names()
new_forcing_file.close()
def StandardFILE(self, *args, **kwargs):
return StandardSAFRAN(*args, **kwargs)
[docs]
def modify(self, init_forcing_file, new_forcing_file, *args):
"""The key method to be overriden
:param init_forcing_file: Input file object
:type init_forcing_file: :class:`utils.prosimu.prosimu`
:param new_forcing_file: Output file object
:type new_forcing_file: :class:`utils.prosimu.prosimu`
"""
pass
[docs]
def add_massif_variables(self, init_forcing_file, new_forcing_file, savevar={}):
"""Add massif-scale diagnostics (isotherm 0 and rain snow limit elevation)
:param init_forcing_file: Input file object
:type init_forcing_file: :class:`utils.prosimu.prosimu`
:param new_forcing_file: Output file object
:type new_forcing_file: :class:`utils.prosimu.prosimu`
:param savevar: Dictionnary of variables in cache
:type savevar: dict
"""
# if new_forcing_file is prosimu instance, take the dataset class instead
if type(new_forcing_file) not in [netCDF4.Dataset, StandardSAFRAN]:
new_forcing_file = new_forcing_file.dataset
self.create_massif_dimension(init_forcing_file, new_forcing_file, savevar)
if "isoZeroAltitude" not in list(new_forcing_file.variables.keys()):
self.add_iso_zero(init_forcing_file, new_forcing_file, savevar)
if "rainSnowLimit" not in list(new_forcing_file.variables.keys()):
self.add_snow_rain_limit(init_forcing_file, new_forcing_file, savevar)
[docs]
def create_massif_dimension(self, init_forcing_file, new_forcing_file, savevar):
"""create massif dimension from existing massifs in the forcing file
:param init_forcing_file: Input file object
:type init_forcing_file: :class:`utils.prosimu.prosimu`
:param new_forcing_file: Output file object
:type new_forcing_file: :class:`utils.prosimu.prosimu`
:param savevar: Dictionnary of variables in cache
:type savevar: dict
"""
massif_dim_name = "massif"
if "massif_number" in list(savevar.keys()):
self.massif_number = list(map(int, savevar["massif_number"]))
else:
self.massif_number = list(map(int, init_forcing_file.read("massif_number")))
self.list_massifs = np.unique(self.massif_number)
self.nmassifs = len(self.list_massifs)
if massif_dim_name not in new_forcing_file.dimensions.copy():
new_forcing_file.createDimension(massif_dim_name, self.nmassifs)
var = new_forcing_file.createVariable(massif_dim_name, 'i4', ["massif"], fill_value=0)
var[:] = self.list_massifs[:]
if "slope" in list(savevar.keys()):
self.slope = savevar["slope"]
else:
self.slope = init_forcing_file.read("slope")
if "ZS" in list(savevar.keys()):
self.zs = savevar["ZS"]
else:
self.zs = init_forcing_file.read("ZS")
[docs]
def add_snow_rain_limit(self, init_forcing_file, new_forcing_file, savevar):
"""Adds a massif-scale snow-rain limit elevation diagnostic
:param init_forcing_file: Input file object
:type init_forcing_file: :class:`utils.prosimu.prosimu`
:param new_forcing_file: Output file object
:type new_forcing_file: :class:`utils.prosimu.prosimu`
:param savevar: Dictionnary of variables in cache
:type savevar: dict
"""
if "Snowf" in list(savevar.keys()):
snowf = savevar["Snowf"]
else:
snowf = init_forcing_file.read("Snowf")
if "Snowf" in list(savevar.keys()):
rainf = savevar["Rainf"]
else:
rainf = init_forcing_file.read("Rainf")
ntime = snowf.shape[0]
lpn = np.empty((ntime, self.nmassifs))
for m, massif in enumerate(self.list_massifs):
indflat = (self.slope == 0.) & (self.massif_number == massif)
altitude = self.zs[indflat]
if len(altitude) < 2:
return
rain_full = rainf[:, indflat]
snow_full = snowf[:, indflat]
preciptot = rain_full + snow_full
phase = np.where(snow_full > 0, -1, np.where(rain_full > 0, 1, 0))
lowestlevelindex = np.argmin(phase, axis=1)
lpn_temp = altitude[lowestlevelindex] - 150
lpn_temp = np.where(snow_full[:, 0] > 0., -3, lpn_temp)
lpn_temp = np.where(rain_full[:, -1] > 0., -2, lpn_temp)
lpn_temp = np.where(np.all(preciptot < 2.E-4, axis=1) & np.any(preciptot < 1.E-8, axis=1), -1, lpn_temp)
lpn[:, m] = lpn_temp
var = new_forcing_file.createVariable("rainSnowLimit", 'float', ["time", "massif"], fill_value=-9999.)
var.long_name = ('Rain-snow transition altitude (resolution 300 m). -1 if no precipitation;'
'-2 if above the top of the massif; -3 if below the bottom of the massif.')
var.units = 'm'
var[:] = lpn
[docs]
def add_iso_zero(self, init_forcing_file, new_forcing_file, savevar):
"""Adds a massif-scale 0 degree isotherm diagnostic
"""
zero = 273.16
if "Tair" in list(savevar.keys()):
tair = savevar["Tair"]
else:
tair = init_forcing_file.read("Tair")
ntime = tair.shape[0]
isozero = np.empty((ntime, self.nmassifs))
for m, massif in enumerate(self.list_massifs):
indflat = (self.slope == 0.) & (self.massif_number == massif)
altitude = self.zs[indflat]
if len(altitude) < 2:
return
zmax = np.max(altitude)
tair_full = tair[:, indflat]
# Temperature of the level just above
# Last column will not be used but is necessary to conform shapes.
tair_full_up = np.concatenate((tair_full[:, 1:], np.zeros_like(tair_full[:, 0:1]) + 999), axis=1)
# Store both temperatures in the same variable
x = (tair_full * 100.).astype('int') + tair_full_up / 1000.
# Elevations with positive temperatures
# Decimal part represents the temperature of the correponding level and the temperature above
altipos = np.where(tair_full > zero, altitude + x / 100000., -999.)
# Identify the maximum level of positive temperature
z = np.max(altipos, axis=1)
# Separate elevation and group of temperatures
y, zlow = np.modf(z)
zup = zlow + 300
# Separate and reconstruct both temperatures
ttempup, ttemplow = np.modf(y*100000)
tlow = np.where(ttemplow > 0., ttemplow / 100., -999.) # Temperature of the level just below freezing level
tup = np.where(ttempup > 0., ttempup * 1000., -998.) # Temperature of the level just above freezing level
# Note that in cases of missing values, tup and tlow must be different to avoid a division by 0 just after
# Freezing level is computed with a linear interpolation.
isozero_temp = ((tlow - zero) * zup + (zero - tup) * zlow) / (tlow - tup)
isozero_temp = np.where(zlow == -999., -3, isozero_temp) # Freezing level below the lowest level
isozero_temp = np.where(zlow == zmax, -2, isozero_temp) # Freezing level above the lowest level
isozero[:, m] = isozero_temp
var = new_forcing_file.createVariable("isoZeroAltitude", 'float', ["time", "massif"], fill_value=-9999.)
var.long_name = ('Freezing level altitude obtained by interpolation from SAFRAN standard levels.'
'-2 if above the top of the massif ; -3 if below the bottom of the massif.')
var.units = 'm'
var[:] = isozero
def addfirstdimension(self, array, length):
return np.tile(array, (length, 1))
[docs]
class forcinput_addmeteomassif(forcinput_tomodify):
"""
This class represents a forcing file for which massif-scale meteorological diagnostics must be added.
"""
[docs]
def modify(self, init_forcing_file, new_forcing_file, *args):
"""Add massif-scale meteorological diagnostics
:param init_forcing_file: Input file object
:type init_forcing_file: :class:`utils.prosimu.prosimu`
:param new_forcing_file: Output file object
:type new_forcing_file: :class:`utils.prosimu.prosimu`
"""
self.add_massif_variables(init_forcing_file, new_forcing_file)
[docs]
class forcinput_select(forcinput_tomodify):
"""This class represents a forcing file for which the geometry has to been modified
before being used as SURFEX input including selection of massifs, elevation levels, slopes or aspects,
duplication of slopes.
"""
# This class was first implemented by G. Lecourt for spatial reduction of a forcing file
# M Lafaysse generalized the method to both FORCING and PRO files (June 2016)
# M Lafaysse added a treatement to increase the number of slopes (July 2016)
# M Lafaysse added a treatment to create coordinates for direct compatibilty with the new SAFRAN output (Aug 2017)
def massifvarname(self):
return 'massif_number'
[docs]
def modify(self, init_forcing_file, new_forcing_file, *args):
"""Selection of massifs, elevation levels, slopes or aspects, duplication of slopes.
:param init_forcing_file: Input file object
:type init_forcing_file: :class:`utils.prosimu.prosimu`
:param new_forcing_file: Output file object
:type new_forcing_file: :class:`utils.prosimu.prosimu`
:param args: list of massif numbers, minimum elevation, maximum elevation, list of slopes, list of aspects
:type args: list, int, int, list, list
"""
print("Modify forcing file towards the prescribed geometry:")
(list_massif_number, min_alt, max_alt, liste_pentes, list_exp) = args[:][0]
list_exp_degres = list_exp[:] # Necessary to not modify this value in the module call for next iteration
# print list_exp_degres
liste_pentes_int = list(map(int, liste_pentes))
listvar = init_forcing_file.listvar()
init_alt = init_forcing_file.read("ZS", keepfillvalue=True)
b_points_alt = (init_alt >= min_alt) * (init_alt <= max_alt)
if self.massifvarname() in listvar:
init_massif_nb_sop = init_forcing_file.read(self.massifvarname(), keepfillvalue=True)
b_points_massif = np.in1d(init_massif_nb_sop, list_massif_number)
if np.sum(b_points_massif) == 0:
raise MassifException(list_massif_number, list(set(init_massif_nb_sop)))
else:
b_points_massif = b_points_alt[:]
if min_alt > np.max(init_alt) or max_alt < np.min(init_alt):
raise GeometryException(np.min(init_alt), np.max(init_alt))
init_slopes = init_forcing_file.read("slope", keepfillvalue=True)
init_exp = init_forcing_file.read("aspect", keepfillvalue=True)
# list_pentes is the user-defined target
if "0" in liste_pentes:
nb_slope_angles_notflat = len(liste_pentes) - 1 # Number of target slopes excluding flat
nb_aspect_angles_notflat = len(list_exp_degres) - 1 # Number of target aspects excluding flat
nb_slopes_bylevel = 1 + (nb_slope_angles_notflat *
nb_aspect_angles_notflat) # Number of slopes for a given elevation level
else:
nb_slope_angles_notflat = len(liste_pentes) # Number of target slopes excluding flat
nb_aspect_angles_notflat = len(list_exp_degres) # Number of target aspects excluding flat
nb_slopes_bylevel = (nb_slope_angles_notflat *
nb_aspect_angles_notflat) # Number of slopes for a given elevation level
# print nb_slopes_bylevel, nb_slope_angles_notflat, nb_aspect_angles_notflat
# Extend aspects mode only if input file have only -1 aspects
extendaspects = nb_slopes_bylevel > 1 and np.all(init_exp == -1)
# Extend slopes if input file already have several aspects but we want more slope values
extendslopes = not extendaspects and (len(liste_pentes) > len(np.unique(init_slopes)))
if extendaspects:
print("Extend aspects of the input forcing file")
if extendslopes:
print("Extend slopes of the input forcing file")
if extendaspects:
# Indexes of points to extract: only flat values if create new aspects
b_points_slope = np.in1d(init_slopes, [0])
b_points_aspect = np.in1d(init_exp, [-1])
else:
# Indexes of points to extract: can be a subset of available slopes or the whole available slopes
b_points_slope = np.in1d(init_slopes, liste_pentes_int)
b_points_aspect = np.in1d(init_exp, list_exp_degres)
# Identify points to extract
index_points = np.where(b_points_massif * b_points_alt * b_points_slope * b_points_aspect)[0]
if extendslopes:
# Points to duplicate correspond to all indexes but -1 aspect
points_to_duplicate = np.invert(np.in1d(init_exp[index_points], [-1]))
if extendaspects or extendslopes:
# In these cases, we remove flat cases of output slopes list because it is dealt in a specific way
if "0" in liste_pentes:
liste_pentes_int.remove(0)
if -1 in list_exp_degres:
list_exp_degres.remove(-1)
init_forcing_file_dimensions = init_forcing_file.listdim()
massif_dim_name = "massif"
nbpoints_dim_name = "Number_of_points"
loc_dim_name = "location"
# Il faut créer la dimension time en premier (obligatoire au format NETCDF4_CLASSIC)
new_forcing_file.createDimension("time", None)
if massif_dim_name in init_forcing_file_dimensions and massif_dim_name in init_forcing_file.listvar():
init_massif = init_forcing_file.read(massif_dim_name, keepfillvalue=True)
index_massif = np.where(np.in1d(init_massif, list_massif_number))[0]
len_dim = len(index_massif)
if len_dim == 0:
raise MassifException(list_massif_number, init_massif)
new_forcing_file.createDimension(massif_dim_name, len_dim)
del init_forcing_file_dimensions[massif_dim_name]
if nbpoints_dim_name in init_forcing_file_dimensions:
spatial_dim_name = nbpoints_dim_name
elif loc_dim_name in init_forcing_file_dimensions:
spatial_dim_name = loc_dim_name
else:
spatial_dim_name = "missing"
if spatial_dim_name in init_forcing_file_dimensions:
# print (extendaspects)
# print ("NB slopes by level")
# print (nb_slopes_bylevel)
# print (len(index_points))
if extendaspects:
len_dim = len(index_points) * nb_slopes_bylevel
elif extendslopes:
nslopes_to_create = len(liste_pentes) - 2
len_dim = len(index_points) + np.sum(points_to_duplicate) * nslopes_to_create
indflat = np.arange(0, len_dim, nb_slope_angles_notflat * len(list_exp_degres) + 1)
indnoflat = np.delete(np.arange(0, len_dim, 1), indflat)
else:
len_dim = len(index_points)
print("create dimension :" + spatial_dim_name + " " + str(len_dim))
len_dim_spatial = len_dim
new_forcing_file.createDimension(spatial_dim_name, len_dim)
del init_forcing_file_dimensions[spatial_dim_name]
for dimname, dim in init_forcing_file_dimensions.items():
print("Create dimension " + dimname + " " + str(len(dim)))
if not dimname == "time":
new_forcing_file.createDimension(dimname, len(dim))
savevar = {}
for varname in listvar:
print(varname)
if self.printmemory:
print_used_memory()
print(datetime.datetime.today())
vartype, rank, array_dim, varFillvalue, var_attrs = init_forcing_file.infovar(varname)
if len(array_dim) > 0:
index_dim_massif = np.where(array_dim == massif_dim_name)[0]
index_dim_nbpoints = np.where(array_dim == spatial_dim_name)[0]
var_array = init_forcing_file.read(varname, keepfillvalue=True, removetile=False)
else:
index_dim_massif = []
index_dim_nbpoints = []
var_array = init_forcing_file.read(varname, keepfillvalue=True, removetile=False).getValue()
if len(index_dim_massif) == 1:
var_array = np.take(var_array, index_massif, index_dim_massif[0])
if len(index_dim_nbpoints) == 1:
var_array = np.take(var_array, index_points, index_dim_nbpoints[0])
if extendaspects or extendslopes:
if varname == "aspect":
expo_1level_notflat = np.tile(list_exp_degres, nb_slope_angles_notflat)
if "0" in liste_pentes:
expo_1level = np.append(-1, expo_1level_notflat)
else:
expo_1level = expo_1level_notflat
if extendaspects:
var_array = np.tile(expo_1level, len(index_points))
elif extendslopes:
var_array = np.tile(expo_1level, len(indflat))
elif varname == "slope":
slope_1level_notflat = np.repeat(liste_pentes_int, nb_aspect_angles_notflat)
if "0" in liste_pentes:
slope_1level = np.append(0, slope_1level_notflat)
else:
slope_1level = slope_1level_notflat
if extendaspects:
var_array = np.tile(slope_1level, len(index_points))
elif extendslopes:
var_array = np.tile(slope_1level, len(indflat))
if varname not in ["aspect", "slope"]:
if rank >= 2:
if extendaspects:
var_array = np.repeat(var_array, nb_slopes_bylevel, axis=1)
elif extendslopes:
newvar_array = np.empty((var_array.shape[0], len_dim_spatial), vartype)
newvar_array[:, indflat] = var_array[:, ~points_to_duplicate]
# WE CAN NOT USE NP.REPEAT IN THAT CASE BECAUSE WE WANT TO DUPICATE SEQUENCES OF 8 ASPECTS
# WITH THE ASPECT VARYING FASTER THAN THE SLOPE ANGLE.
# USE NP.SPLIT TO SEPARATE EACH MASSIF-ELEVATION (GROUPS OF 8 ASPECTS),
# THEN USE NP.TILE TO DUPICATE THE SLOPES,
# FINALLY NP.HSTACK CONCATENATE THE SEQUENCES ALONG THE LAST DIMENSION
newvar_array[:, indnoflat] = np.hstack(
np.tile(np.split(var_array[:, points_to_duplicate], len(indflat), axis=1),
1 + nslopes_to_create))
del var_array
var_array = newvar_array
elif rank >= 1:
if extendaspects:
var_array = np.repeat(var_array, nb_slopes_bylevel)
elif extendslopes:
newvar_array = np.empty(len_dim_spatial)
newvar_array[indflat] = var_array[~points_to_duplicate]
# THIS IS EQUIVALENT TO THE SEQUENCE ABOVE FOR RANK 2 VARIABLES
newvar_array[indnoflat] = np.tile(
np.array(np.split(var_array[points_to_duplicate], len(indflat))),
1 + nslopes_to_create).flatten()
# print indflat
# print indnoflat
del var_array
var_array = newvar_array
# print "BEFORE CREATE", datetime.datetime.today()
var = new_forcing_file.createVariable(varname, vartype, array_dim, fill_value=varFillvalue)
# print "AFTER CREATE", datetime.datetime.today()
for attname in var_attrs:
if not attname == '_FillValue':
setattr(var, attname, init_forcing_file.getattr(varname, attname))
try:
if not (varname in ["DIR_SWdown", "SCA_SWdown"] and (extendaspects or extendslopes)):
# print "BEFORE WRITE", datetime.datetime.today()
# do not write direct solar radiations if aspects and slopes were extended
# because we need to recompute the values
if rank == 0:
var[:] = var_array
elif rank == 1:
var[:] = var_array
elif rank == 2:
var[:, :] = var_array
elif rank == 3:
var[:, :, :] = var_array
elif rank == 4:
var[:, :, :, :] = var_array
elif rank == 5:
var[:, :, :, :, :] = var_array
# print ("AFTER WRITE", datetime.datetime.today())
except Exception:
print(var_array)
raise VarWriteException(varname, var_array.shape, var.shape)
# Some variables need to be saved for solar computations
if varname in ["time"]:
savevar[varname] = init_forcing_file.readtime()
if varname in ["LAT", "LON", "ZS", "aspect", "slope", "DIR_SWdown",
"SCA_SWdown", self.massifvarname(), "Tair", "Rainf", "Snowf"]:
savevar[varname] = var_array
if varname == self.massifvarname():
save_array_dim = array_dim
if 'snow_layer' in init_forcing_file_dimensions:
return
if "LAT" not in init_forcing_file.listvar():
lat, lon = self.addCoord(new_forcing_file, savevar[self.massifvarname()], save_array_dim, varFillvalue)
else:
lat = savevar["LAT"]
lon = savevar["LON"]
# Compute new solar radiations according to the new values of slope and aspect
if extendaspects or extendslopes:
# print ("BEFORE RADIATION COMPUTATIONS", datetime.datetime.today())
direct, diffus = sun().slope_aspect_correction(savevar["DIR_SWdown"], savevar["SCA_SWdown"],
savevar["time"], lat, lon, savevar["aspect"],
savevar["slope"])
# print ("AFTER RADIATION COMPUTATIONS", datetime.datetime.today())
new_forcing_file.variables["DIR_SWdown"][:] = direct
new_forcing_file.variables["SCA_SWdown"][:] = diffus
del savevar["DIR_SWdown"]
del savevar["SCA_SWdown"]
del savevar["time"]
del savevar["aspect"]
print("AFTER WRITE RADIATIONS", datetime.datetime.today())
self.add_massif_variables(init_forcing_file, new_forcing_file, savevar=savevar)
print("AFTER MASSIF VARIABLES", datetime.datetime.today())
del savevar
[docs]
def addCoord(self, forcing, massifnumber, dimension, varFillValue):
"""Routine to add coordinates in the forcing file for the SAFRAN massifs
"""
INFOmassifs = infomassifs()
dicLonLat = INFOmassifs.getAllMassifLatLon()
lat = np.empty(massifnumber.shape, np.float)
lon = np.empty(massifnumber.shape, np.float)
for point in range(0, len(massifnumber)):
lonlat = dicLonLat[massifnumber[point]]
lat[point] = lonlat[1]
lon[point] = lonlat[0]
var = forcing.createVariable("LAT", np.float, dimension, fill_value=varFillValue)
setattr(var, 'long_name', 'latitude')
setattr(var, 'units', 'degrees_north')
var[:] = lat
var = forcing.createVariable("LON", np.float, dimension, fill_value=varFillValue)
setattr(var, 'long_name', 'longitude')
setattr(var, 'units', 'degrees_east')
var[:] = lon
return lat, lon
[docs]
class proselect(forcinput_select):
"""
This class is designed to extract a selection of massifs, elevation levels, slopes or aspects from a PRO file.
"""
def massifvarname(self):
return 'massif_num'
def StandardFILE(self, *args, **kwargs):
return StandardCROCUS(*args, **kwargs)
[docs]
class forcinput_ESMSnowMIP(forcinput_tomodify):
"""
This class prepares FORCING files from the ESMSnowMIP offical netcdf dataset
"""
formatout = "NETCDF3_CLASSIC"
def upscale_tab_time(self, var, theshape):
# array slope can be considered at the same shape than the other arrays (simplify matricial traitment)
bigvar = np.ma.zeros(theshape)
if len(theshape) == 2:
for i in range(0, theshape[1]):
bigvar[:, i] = var[:]
elif len(theshape) == 3:
for ilat in range(0, theshape[1]):
for ilon in range(0, theshape[2]):
bigvar[:, ilat, ilon] = var[:]
else:
print("error on indices in upscale_tab_time")
return bigvar
[docs]
def modify(self, init_forcing_file, new_forcing_file, *args):
""" Prepare ESM-SnowMIP forcings.
:param init_forcing_file: Input file object from ESM-SnowMIP database
:type init_forcing_file: :class:`utils.prosimu.prosimu`
:param new_forcing_file: Output file object (SURFEX format)
:type new_forcing_file: :class:`utils.prosimu.prosimu`
"""
site = os.path.basename(self.filename)[11:14]
source = os.path.basename(self.filename)[4:10]
if site == "cdp":
const = {"LAT": 45.3, "LON": 5.77, "ZS": 1325, "slope": 0, "aspect": -1,
"ZREF": 37, "UREF": 38, "CO2air": 0.00062, "Wind_DIR": 0, "SCA_SWdown": 0}
if source == "insitu":
timeshift = 0
elif source == "gswp3c":
timeshift = -1
if site == "oas":
const = {"LAT": 53.63, "LON": -106.2, "ZS": 600, "slope": 0, "aspect": -1,
"ZREF": 37, "UREF": 38, "CO2air": 0.00062, "Wind_DIR": 0, "SCA_SWdown": 0}
if source == "insitu":
timeshift = 6
elif source == "gswp3c":
timeshift = -1
elif site == "obs":
const = {"LAT": 53.99, "LON": -105.12, "ZS": 629, "slope": 0,
"aspect": -1, "ZREF": 25, "UREF": 26, "CO2air": 0.00062, "Wind_DIR": 0, "SCA_SWdown": 0}
if source == "insitu":
timeshift = 6
elif source == "gswp3c":
timeshift = -1
elif site == "ojp":
const = {"LAT": 53.92, "LON": -104.69, "ZS": 579, "slope": 0,
"aspect": -1, "ZREF": 28, "UREF": 29, "CO2air": 0.00062, "Wind_DIR": 0, "SCA_SWdown": 0}
if source == "insitu":
timeshift = 6
elif source == "gswp3c":
timeshift = -1
elif site == "rme":
const = {"LAT": 43.19, "LON": -116.78, "ZS": 2060, "slope": 0,
"aspect": -1, "ZREF": 3, "UREF": 3, "CO2air": 0.00062, "Wind_DIR": 0, "SCA_SWdown": 0}
if source == "insitu":
timeshift = 7
elif source == "gswp3c":
timeshift = -1
elif site == "sap":
const = {"LAT": 43.08, "LON": 141.34, "ZS": 15, "slope": 0,
"aspect": -1, "ZREF": 1.5, "UREF": 1.5, "CO2air": 0.00062, "Wind_DIR": 0, "SCA_SWdown": 0}
if source == "insitu":
timeshift = -9
elif source == "gswp3c":
timeshift = -1
elif site == "snb":
const = {"LAT": 37.91, "LON": -107.73, "ZS": 3714, "slope": 0,
"aspect": -1, "ZREF": 3.8, "UREF": 4, "CO2air": 0.00062, "Wind_DIR": 0, "SCA_SWdown": 0}
if source == "insitu":
timeshift = 7
elif source == "gswp3c":
timeshift = -1
elif site == "sod":
const = {"LAT": 67.37, "LON": 26.63, "ZS": 179, "slope": 0,
"aspect": -1, "ZREF": 2, "UREF": 2, "CO2air": 0.00062, "Wind_DIR": 0, "SCA_SWdown": 0}
if source == "insitu":
timeshift = 0
elif source == "gswp3c":
timeshift = -1
elif site == "swa":
const = {"LAT": 37.91, "LON": -107.71, "ZS": 3371, "slope": 0,
"aspect": -1, "ZREF": 3.4, "UREF": 3.8, "CO2air": 0.00062, "Wind_DIR": 0, "SCA_SWdown": 0}
if source == "insitu":
timeshift = 7
elif source == "gswp3c":
timeshift = -1
elif site == "wfj":
const = {"LAT": 46.82962, "LON": 9.80934, "ZS": 2540, "slope": 0,
"aspect": -1, "ZREF": 4.5, "UREF": 5.5, "CO2air": 0.00062, "Wind_DIR": 0, "SCA_SWdown": 0}
if source == "insitu":
timeshift = 0
elif source == "gswp3c":
timeshift = -1
if source == "gswp3c":
timeshift = -1
const["ZREF"] = 2.
const["UREF"] = 10.
const["PSurf"] = 101325 * ((288 - 0.0065 * const["ZS"]) / 288)**5.255
new_forcing_file.createDimension("time", None)
new_forcing_file.createDimension("Number_of_points", 1)
for varname, ncvar in init_forcing_file.dataset.variables.iteritems():
var_array = ncvar[:]
if varname == "time":
var = new_forcing_file.createVariable(varname, 'd', ("time"), fill_value=-9999999.)
time_array = var_array[:] + timeshift
var[:] = time_array
unit_time = getattr(ncvar, "units")
elif varname == "SWdown":
vardir = new_forcing_file.createVariable("DIR_SWdown", 'd', ("time", "Number_of_points"),
fill_value=-9999999.)
setattr(vardir, "long_name", "Direct downward shortwave radiation")
vardif = new_forcing_file.createVariable("SCA_SWdown", 'd', ("time", "Number_of_points"),
fill_value=-9999999.)
setattr(vardif, "long_name", "Diffuse downward shortwave radiation")
vardir[:, 0], vardif[:, 0] = sun().directdiffus(var_array, netCDF4.num2date(time_array, unit_time),
const["LAT"], const["LON"], const["slope"],
const["aspect"], site)
else:
var = new_forcing_file.createVariable(varname, 'd', ("time", "Number_of_points"), fill_value=-9999999.)
var[:, 0] = var_array[:]
if varname != "SWdown":
for attname in ncvar.ncattrs():
setattr(var, attname, getattr(ncvar, attname))
frc = new_forcing_file.createVariable('FRC_TIME_STP', 'd', fill_value=-999999)
frc[:] = 3600.
for varname in ["LAT", "LON", "ZS", "slope", "aspect", "ZREF", "UREF"]:
var = new_forcing_file.createVariable(varname, 'd', ("Number_of_points"), fill_value=-9999999.)
var[:] = const[varname]
for varname in ["PSurf", "CO2air", "Wind_DIR"]:
var = new_forcing_file.createVariable(varname, 'd', ("time", "Number_of_points"), fill_value=-9999999.)
var[:, 0] = const[varname]
return False
[docs]
class forcinput_extract(forcinput_tomodify):
"""This class allows to extract from an original forcing file all the variables corresponding
to a pre-defined list of points.
Implemented by C. Carmagnola in November 2018 (PROSNOW project).
"""
[docs]
def modify(self, init_forcing_file, new_forcing_file, *args):
""" Extract a pre-defined list of points in a forcing file.
:param init_forcing_file: Input file object
:type init_forcing_file: :class:`utils.prosimu.prosimu`
:param new_forcing_file: Output file object
:type new_forcing_file: :class:`utils.prosimu.prosimu`
:param args: .txt file name containing the list of points to be extracted
:type args: str
"""
# Read data from file
if not os.path.isfile(args[0][0]):
raise FileNameException(args[0][0])
mdat = open(args[0][0])
list_massif_number, list_sru, list_alt, list_asp, list_slo = np.loadtxt(mdat, delimiter=' ',
usecols=(2, 3, 1, 6, 7), unpack=True)
# print "Points to be extracted:"
# print "massif = " + str(list_massif_number)
# print "elevation = " + str(list_alt)
# print "aspect = " + str(list_asp)
# print "slope = " + str(list_slo)
# print "---------------"
# Variables
listvar = init_forcing_file.listvar()
listvar.append(u'stations')
# Massif / Elevation / Aspect / Slope
init_massif_nb_sop = init_forcing_file.read("massif_number", keepfillvalue=True)
init_alt = init_forcing_file.read("ZS", keepfillvalue=True)
init_exp = init_forcing_file.read("aspect", keepfillvalue=True)
init_slopes = init_forcing_file.read("slope", keepfillvalue=True)
list_asp_degres = list_asp[:]
list_slo_int = list(map(int, list_slo))
# Indices
index_points = np.zeros(len(list_massif_number), 'int')
for i, j in ((a, b) for a in range(len(init_massif_nb_sop)) for b in range(len(list_massif_number))):
if init_massif_nb_sop[i] == list_massif_number[j] and init_alt[i] == list_alt[j] and \
init_slopes[i] == list_slo_int[j] and init_exp[i] == list_asp_degres[j]:
index_points[j] = i
# print "Indices:"
# print index_points
# print "---------------"
# Create dimension
init_forcing_file_dimensions = init_forcing_file.listdim()
massif_dim_name = "massif"
nbpoints_dim_name = "Number_of_points"
loc_dim_name = "location"
new_forcing_file.createDimension("time", None)
if massif_dim_name in init_forcing_file_dimensions:
init_massif = init_forcing_file.read("massif", keepfillvalue=True)
index_massif = np.where(np.in1d(init_massif, list_massif_number))[0]
len_dim = len(index_massif)
new_forcing_file.createDimension(massif_dim_name, len_dim)
del init_forcing_file_dimensions[massif_dim_name]
if nbpoints_dim_name in init_forcing_file_dimensions:
spatial_dim_name = nbpoints_dim_name
elif loc_dim_name in init_forcing_file_dimensions:
spatial_dim_name = loc_dim_name
else:
spatial_dim_name = "missing"
if spatial_dim_name in init_forcing_file_dimensions:
len_dim = len(index_points)
new_forcing_file.createDimension(spatial_dim_name, len_dim)
del init_forcing_file_dimensions[spatial_dim_name]
# print (spatial_dim_name + ": ")
# print str(len_dim)
# Fill new file
for varname in listvar:
# Variable containing the sru numbers
if varname == 'stations':
vartype = 'float32'
rank = 1
array_dim = [u'Number_of_points']
varFillvalue = -1e+07
var_attrs = [u'_FillValue', u'long_name', u'units']
var = new_forcing_file.createVariable(varname, vartype, array_dim, fill_value=varFillvalue)
var[:] = list_sru
# All other variables
else:
vartype, rank, array_dim, varFillvalue, var_attrs = init_forcing_file.infovar(varname)
if len(array_dim) > 0:
index_dim_massif = np.where(array_dim == massif_dim_name)[0]
index_dim_nbpoints = np.where(array_dim == spatial_dim_name)[0]
var_array = init_forcing_file.read(varname, keepfillvalue=True)
else:
index_dim_massif = []
index_dim_nbpoints = []
var_array = init_forcing_file.read(varname, keepfillvalue=True).getValue()
if len(index_dim_massif) == 1:
var_array = np.take(var_array, index_massif, index_dim_massif[0])
if len(index_dim_nbpoints) == 1:
var_array = np.take(var_array, index_points, index_dim_nbpoints[0])
var = new_forcing_file.createVariable(varname, vartype, array_dim, fill_value=varFillvalue)
for attname in var_attrs:
if not attname == '_FillValue':
setattr(var, attname, init_forcing_file.getattr(varname, attname))
try:
if rank == 0:
var[:] = var_array
elif rank == 1:
var[:] = var_array
elif rank == 2:
var[:, :] = var_array
elif rank == 3:
var[:, :, :] = var_array
elif rank == 4:
var[:, :, :, :] = var_array
elif rank == 5:
var[:, :, :, :, :] = var_array
except Exception:
print(var_array)
raise VarWriteException(varname, var_array.shape, var.shape)
[docs]
class forcinput_changedates(forcinput_tomodify):
"""This class allows to change the dates of a forcing file from the climatology
Implemented by C. Carmagnola in November 2018 (PROSNOW project).
"""
[docs]
def modify(self, init_forcing_file, new_forcing_file, *args):
""" Change the dates of a forcing file for climatological forecast.
:param init_forcing_file: Input file object
:type init_forcing_file: :class:`utils.prosimu.prosimu`
:param new_forcing_file: Output file object
:type new_forcing_file: :class:`utils.prosimu.prosimu`
:param args: New initial date
:type args: :class:`bronx.stddtypes.date.Date`
"""
# Open file
file_name = netCDF4.Dataset(init_forcing_file.path, "a")
nc_time = file_name.variables["time"]
nc_unit = file_name.variables["time"].units
# Compute new time
date_time_old = netCDF4.num2date(nc_time[:], units=nc_unit)
delta = args[0][0] - date_time_old[0]
date_time_new = date_time_old + delta
# Insert new time in file
nc_time[:] = netCDF4.date2num(date_time_new, units=nc_unit)
# Close file
file_name.close()