Source code for utils.S2M_standard_file

# -*- coding: utf-8 -*-

"""
Created on 20 août 2019

@author: lafaysse
"""

import datetime
import os
import sys

import netCDF4
import configparser
import numpy as np

from snowtools.utils.FileException import VarNameException, UnknownGridTypeException, FileNameException,\
    TimeUnitsException
from snowtools.utils.infomassifs import infomassifs
from snowtools.utils.git import get_summary_git
from snowtools.DATA import SNOWTOOLS_DIR, SNOWTOOLS_CEN


class _StandardNC(netCDF4.Dataset):
    """
    abstract class for S2M netcdf file based on ACDD conventions
    """

    def read_constant_attributes(self, section):
        config = configparser.RawConfigParser()
        rootdir = SNOWTOOLS_DIR
        confile = os.path.join(rootdir, 'conf', 'S2M_Standard_Metadata.ini')
        if os.path.isfile(confile):
            config.read(confile)
            myconfig = dict(config.items(section))
            # Add conf variables into the global attribute
            for key, value in list(myconfig.items()):
                if type(value) is str:
                    # Remove multiple spaces and line breaks
                    value = ' '.join(value.split())
                setattr(self, key, value)
        else:
            raise FileNameException(confile)

    def GlobalAttributes(self, **additionnal_attributes):

        time = self.readtime()
        reanalysis = (time[-1] - time[0]) > datetime.timedelta(days=360)  # Bad condition --> should be changed

        if reanalysis:
            self.read_constant_attributes('GlobalAttributesReanalysis')
            self.date_created = datetime.datetime.today().replace(second=0, microsecond=0).isoformat()
        else:
            self.read_constant_attributes('GlobalAttributesOper')
            # For reproductibility tests, the hour is fixed in the operational chain
            # In future versions it would be nice to use the operational rundate.
            self.date_created = datetime.datetime.today().replace(hour=12, minute=0, second=0,
                                                                  microsecond=0).isoformat()

        contributors = ['Matthieu Vernay', 'Matthieu Lafaysse', 'Mathieu Fructus', 'Léo Viallon-Galinier',
                        'Sabine Radanovics']
        roles = ['Matthieu Vernay collected the input data and develops the SAFRAN reanalysis',
                 'Matthieu Lafaysse leads the team and developments around the SURFEX-ISBA/Crocus simulations',
                 'Mathieu Fructus contributes to the SURFEX-ISBA/Crocus development',
                 'Léo Viallon-Galinier supervises the snowtools development',
                 'Sabine Radanovics contributes to snowtools development']

        try:
            login_who_run = os.getlogin()
            contributors.append(login_who_run)
            roles.append(login_who_run + ' ran this simulation')
        except OSError:
            # Note relevant on supercomputers, where by the way os.getlogin raises an error
            pass

        self.contributor_name = ' ; '.join(contributors)
        self.contributor_role = ' ; '.join(roles)

        self.get_coord()

        self.time_coverage_start = time[0].isoformat()
        self.time_coverage_end = time[-1].isoformat()
        self.time_coverage_duration = str(time[-1] - time[0])
        if len(time) > 1:
            self.time_coverage_resolution = str(time[1] - time[0])

        # With vortex snowtools_commit is extracted by S2MTaskMixin because git is not available on computing nodes
        # With snowtools on PC the commit is extracted here.
        if 'snowtools_commit' not in additionnal_attributes:
            self.snowtools_commit = get_summary_git(SNOWTOOLS_CEN)

        self.python_version = sys.version
        self.python_binary = os.path.realpath(sys.executable)

        # Add attributes provided in the dictionnary to the netcdf file (can also be empty)
        for attribute in additionnal_attributes:
            setattr(self, attribute, additionnal_attributes[attribute])

    def standard_names(self):
        return dict(ZS = 'surface_altitude',
                    time       = 'time'
                    )

    def readtime(self):
        # Vérification du nom de la variable
        if "time" not in list(self.variables.keys()):
            raise VarNameException("time", self.filepath())

        time = self.variables["time"]

        if not hasattr(time, 'units'):
            raise TimeUnitsException(self.filepath())

        if netCDF4.__version__ >= '1.4.2':
            return np.array(netCDF4.num2date(time[:], time.units, only_use_cftime_datetimes=False))
        elif netCDF4.__version__ >= '1.4.0':
            return np.array(netCDF4.num2date(time[:], time.units, only_use_cftime_datetimes=False,
                                             only_use_python_datetimes=True))
        else:
            return np.array(netCDF4.num2date(time[:], time.units))

    def get_coord(self):

        latname = self.getlatname
        lonname = self.getlonname
        altiname = "ZS"
        xname, yname = self.getcoordname

        if (set([latname, lonname]).issubset(set(self.variables.keys()))):
            lat, lon = self.variables[latname], self.variables[lonname]
        else:
            if (set([xname, yname]).issubset(set(self.variables.keys()))):
                lat, lon = self.xy2latlon(self.variables[xname], self.variables[yname])
            else:
                try:
                    self.addCoord()
                    lat, lon = self.variables[latname], self.variables[lonname]
                except Exception:
                    raise VarNameException(latname, self.filepath())

        if altiname in self.variables.keys():
            alti = self.variables[altiname]
        else:
            alti = np.nan

        self.geospatial_lat_min = np.min(lat)
        self.geospatial_lat_max = np.max(lat)
        self.geospatial_lon_min = np.min(lon)
        self.geospatial_lon_max = np.max(lon)
        self.geospatial_lat_units = "degree_north"
        self.geospatial_lon_units = "degree_east"
        self.geospatial_vertical_min = np.min(alti)
        self.geospatial_vertical_max = np.max(alti)
        self.geospatial_vertical_units = 'm'
        self.geospatial_vertical_positive = 'up'

    def xy2latlon(self, x, y):
        from pyproj import Transformer
        from bronx.datagrip.namelist import NamelistParser
        n = NamelistParser()
        N = n.parse("OPTIONS.nam")
        gridtype = N['NAM_PGD_GRID'].CGRID
        if gridtype == "IGN":
            projtype = N['NAM_IGN'].CLAMBERT
            if projtype == 'L93':
                epsg = 'epsg:2154'
            else:
                raise UnknownGridTypeException(gridtype, projtype)
        elif gridtype == "CONF PROJ":
            if N['NAM_CONF_PROJ'].XRPK == 0:
                epsg = 'epsg:23032'
            else:
                projtype = 'No Mercator: XRPK != 0 '
                raise UnknownGridTypeException(gridtype, projtype)
        else:
            raise UnknownGridTypeException(gridtype, "")

        transformer = Transformer.from_crs(epsg, 'epsg:4326')

        XX, YY = np.meshgrid(np.array(x), np.array(y))
        lon, lat = transformer.transform(XX, YY)

        return lat, lon

    def addCoord(self):
        '''Routine to add coordinates in the forcing file for the SAFRAN massifs'''
        INFOmassifs = infomassifs()
        dicLonLat = INFOmassifs.getAllMassifLatLon()
        if netCDF4.__version__ >= '1.4.2':
            massifnumber = self.variables['massif_number'][:]
        else:
            massifnumber = self.variables['massif_number']

        lat = np.empty(massifnumber.shape, np.float)
        lon = np.empty(massifnumber.shape, np.float)

        dimension = self.variables['ZS'].dimensions
        varFillValue = -9999999.

        for point in range(0, len(massifnumber)):
            lonlat = dicLonLat[massifnumber[point]]
            lat[point] = lonlat[1]
            lon[point] = lonlat[0]

        var = self.createVariable("LAT", np.float, dimension, fill_value=varFillValue)
        setattr(var, 'long_name', 'latitude')
        setattr(var, 'units', 'degrees_north')
        var[:] = lat
        var = self.createVariable("LON", np.float, dimension, fill_value=varFillValue)
        setattr(var, 'long_name', 'longitude')
        setattr(var, 'units', 'degrees_east')
        var[:] = lon

        return lat, lon

    def special_long_names(self):

        massifname = self.getmassifname
        longnames = dict()
        longnames[massifname] = 'SAFRAN massif number. Metadata are provided in the associated shapefile.'

        return longnames

    def add_standard_names(self):

        dicstd = self.standard_names()
        diclong = self.special_long_names()
        for varname in self.variables.keys():
            if varname in dicstd.keys():
                self.variables[varname].standard_name = dicstd[varname]
            if varname in diclong.keys():
                self.variables[varname].long_name = diclong[varname]


[docs] class StandardSAFRANetMET(_StandardNC): def GlobalAttributes(self, **additionnal_attributes): super(StandardSAFRANetMET, self).GlobalAttributes(**additionnal_attributes) self.read_constant_attributes('StandardSAFRANetMET') @property def getlatname(self): return 'LAT' @property def getlonname(self): return 'LON' @property def getcoordname(self): return 'x', 'y' def standard_names(self): dicfather = super(StandardSAFRANetMET, self).standard_names() dicson = dict(PSurf = 'surface_air_pressure', Tair = 'air_temperature', Qair = 'specific_humidity', Wind_DIR = 'wind_from_direction', Wind = 'wind_speed', Rainf = 'rainfall_flux', Snowf = 'snowfall_flux', LWdown = 'surface_downwelling_longwave_flux_in_air', DIR_SWdown = 'surface_direct_downwelling_shortwave_flux_in_air', SCA_SWdown = 'surface_diffuse_downwelling_shortwave_flux_in_air', NEB = 'cloud_area_fraction', HUMREL = 'relative_humidity', CO2air = 'mass_concentration_of_carbon_dioxide_in_air', isoZeroAltitude = 'freezing_level_altitude', LAT = 'latitude', LON = 'longitude', ) dicfather.update(dicson) return dicfather
[docs] class StandardSAFRAN(StandardSAFRANetMET): def GlobalAttributes(self, **additionnal_attributes): super(StandardSAFRAN, self).GlobalAttributes(**additionnal_attributes) self.read_constant_attributes('StandardSAFRAN') self.title = self.title + ": meteorological variables" self.summary = self.summary + ' This file provides the SAFRAN meteorological fields' self.keywords = self.keywords + ',INCOMING SOLAR RADIATION,LONGWAVE RADIATION,SHORTWAVE RADIATION,AIR' \ ' TEMPERATURE,SURFACE TEMPERATURE,ABSOLUTE HUMIDITY,RELATIVE HUMIDITY,' \ 'WIND DIRECTION,WIND SPEED,SURFACE WINDS,RAIN,LIQUID PRECIPITATION,' \ 'HOURLY PRECIPITATION AMOUNT,SOLID PRECIPITATION' @property def getmassifname(self): return 'massif_number'
[docs] class StandardCDP(StandardSAFRANetMET): def GlobalAttributes(self): super(StandardCDP, self).GlobalAttributes() self.read_constant_attributes('StandardCDP') self.title = self.title + ": meteorological variables" self.summary = self.summary + ' This file provides the Col de Porte observed meteorological fields, completed' \ ' with SAFRAN' self.keywords = self.keywords + ',INCOMING SOLAR RADIATION,LONGWAVE RADIATION,SHORTWAVE RADIATION,AIR' \ ' TEMPERATURE,SURFACE TEMPERATURE,ABSOLUTE HUMIDITY,RELATIVE HUMIDITY,' \ 'WIND DIRECTION,WIND SPEED,SURFACE WINDS,RAIN,LIQUID PRECIPITATION,HOURLY' \ ' PRECIPITATION AMOUNT,SOLID PRECIPITATION'
[docs] class StandardPROSNOW(StandardSAFRAN): def GlobalAttributes(self): super(StandardPROSNOW, self).GlobalAttributes() self.read_constant_attributes('StandardPROSNOW') self.title = "SAFRAN: meteorological variables" self.summary = 'This file is a meteorological forcing file generated by the SAFRAN system. Is is based on the' \ ' 4 days forecasts of the 35 members french Ensemble Forecasting system PEARP with a 6-hours' \ ' temporal resolution. It provides at an hourly time step the main meteorological parameters' \ ' to force a snowpack model on all of the PROSNOW stations except the french ones.' self.platform = 'MODELS_ANALYSES.MODELS' self.comment = "This file provides data for various stations of the PROSNOW project, the mapping between the" \ " station numbers and the station names is given by the following dictionary :" \ " {1: 'ZAMG-11706', 2: 'ZAMG-11803', 3: 'ZAMG-11804', 4: 'ZAMG-14631', 5: 'ZAMG-14701'," \ " 6: 'ZAMG-14812', 7: 'ZAMG-17301', 8: 'ZAMG-17315', 9: 'dwd_gar_1995_20'," \ " 10: 'dwd_zug_1995_20', 11: 'lwd_ost_1995_20', 12: 'lwd_pla_2012_20', 13: '*CHU'," \ " 14: 'CMA1', 15: 'CMA2', 16: '*DAV', 17: 'DAV3', 18: 'DAV4', 19: 'DAV5', 20: 'ELA1'," \ " 21: 'ELA2', 22: 'PMA1', 23: 'PMA2', 24: 'ROT2', 25: 'ROT3', 26: 'TAM3', 27: '*WFJ'," \ " 28: 'WFJ1', 29: 'WFJ2', 30: 'BER1', 31: 'BER2', 32: 'BER3', 33: 'BEV1', 34: 'BEV2'," \ " 35: 'GESS2', 36: 'KES1', 37: 'KES2', 38: 'LAG1', 39: 'LAG2', 40: 'OFE1', 41: 'OFE2'," \ " 42: 'VALL2', 43: 'ZNZ1', 44: 'ZNZ2', 45: 'ZNZ3', 46: 'Bruneck', 47: 'Deutschnofen'," \ " 48: 'Merbalm', 49: 'PizlaIla', 50: 'Rittnerhorn', 51: 'SterzingFlughaf'}"
[docs] def addCoord(self): '''Routine to add coordinates in the forcing file for the SAFRAN massifs''' massifnumber = self.variables['massif_number'] lat = np.empty(massifnumber.shape, np.float) lon = np.empty(massifnumber.shape, np.float) dimension = self.variables['ZS'].dimensions varFillValue = -9999999. var = self.createVariable("LAT", np.float, dimension, fill_value=varFillValue) setattr(var, 'long_name', 'latitude') setattr(var, 'units', 'degrees_north') var[:] = lat var = self.createVariable("LON", np.float, dimension, fill_value=varFillValue) setattr(var, 'long_name', 'longitude') setattr(var, 'units', 'degrees_east') var[:] = lon return lat, lon
[docs] class StandardCROCUS(_StandardNC): def GlobalAttributes(self, **additionnal_attributes): super(StandardCROCUS, self).GlobalAttributes(**additionnal_attributes) self.read_constant_attributes('StandardCROCUS') self.title = self.title + ": snow variables" self.summary = self.summary + ' This file provides the snowpack properties of the Crocus model.' self.keywords = self.keywords + ',SNOW WATER EQUIVALENT,SNOW,ALBEDO,AVALANCHE,FREEZE/THAW,SNOW COVER,' \ 'SNOW DENSITY,SNOW DEPTH,SNOW ENERGY BALANCE,SNOW MELT,SNOW WATER EQUIVALENT,' \ 'SNOW/ICE TEMPERATURE' @property def getlatname(self): return 'latitude' @property def getlonname(self): return 'longitude' @property def getcoordname(self): return 'xx', 'yy' @property def getmassifname(self): return 'massif_num' def getsoilgrid(self): from bronx.datagrip.namelist import NamelistParser n = NamelistParser() N = n.parse("OPTIONS.nam") if 'XSOILGRID' in N['NAM_ISBA']: bottom = list(map(float, N['NAM_ISBA'].XSOILGRID)) top = [0] + bottom[:-1] self.soilgrid = (np.array(top) + np.array(bottom)) / 2. else: from snowtools.utils.prosimu import prosimu if os.path.isfile("PGD.nc"): pgd = prosimu("PGD.nc") nlayers = pgd.read("GROUND_LAYER") bottom = [] for layer in range(1, nlayers[0] + 1): bottom.append(pgd.read('SOILGRID' + str(layer))[0]) top = [0] + bottom[:-1] self.soilgrid = (np.array(top) + np.array(bottom)) / 2. pgd.close() def soil_long_names(self, varname): import re r = re.search(r'\d+', varname) # r is None for varname without number: 'WGTOT_ISBA' if r is None: return '' else: layer = int(r.group()) - 1 if not hasattr(self, 'soilgrid'): self.getsoilgrid() if hasattr(self, 'soilgrid'): return '(depth %.4f m)' % self.soilgrid[layer] else: return '' def standard_names(self): dicfather = super(StandardCROCUS, self).standard_names() dicson = dict(WSN_T_ISBA = 'surface_snow_amount', DSN_T_ISBA = 'thickness_of_snowfall_amount', TS_ISBA = 'surface_temperature', TALB_ISBA = 'surface_albedo', RN_ISBA = 'surface_net_downward_radiative_flux', H_ISBA = 'surface_upward_sensible_heat_flux', LE_ISBA = 'surface_upward_latent_heat_flux', RAINF_ISBA = 'rainfall_flux', SWD_ISBA = 'surface_downwelling_shortwave_flux_in_air', SWU_ISBA = 'surface_upwelling_shortwave_flux_in_air', LWD_ISBA = 'surface_downwelling_longwave_flux_in_air', LWU_ISBA = 'surface_upwelling_longwave_flux_in_air', RUNOFF_ISBA= 'surface_runoff_flux', DRAIN_ISBA = 'subsurface_runoff_flux', EVAP_ISBA = 'surface_water_evaporation_flux', TG1 = 'soil_temperature', TG4 = 'soil_temperature', WG1 = 'liquid_water_content_of_soil_layer', WGI1 = 'frozen_water_content_of_soil_layer', latitude = 'latitude', longitude = 'longitude', ) dicfather.update(dicson) return dicfather def add_standard_names(self): super(StandardCROCUS, self).add_standard_names() if os.path.isfile("OPTIONS.nam"): for varname in self.variables.keys(): if varname[0:2] in ['TG', 'WG']: if hasattr(self.variables[varname], 'long_name'): self.variables[varname].long_name = self.variables[varname].long_name + \ self.soil_long_names(varname)
[docs] class StandardHYDRO(_StandardNC): def get_coord(self): pass