Source code for epygram.extra.griberies.definitions.fa

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) Météo France (2014-)
# This software is governed by the CeCILL-C license under French law.
# http://www.cecill.info
"""
FA Grib definitions
"""

import os
import re

from .util import GribDef
from .. import parse_GRIBstr_todict
from ..paths import get_definition_paths
from ...util import init_before
from epygram import config


[docs] class FaGribDef(GribDef): """ Handle FA-related GRIB definition files. To add user files: a) use env var ECCODES_DEFINITION_PATH or b) move it under {config.userlocaldir}/{FaGribDef.dirname} """ _re_dirname = re.compile(r'gribapi\.def\.[\d_]+') _official_rootdir = os.path.join(config.installdir, 'data') # TODO: use data _non_GRIB_keys = ('LSTCUM', 'FMULTM', 'FMULTE', 'ZLMULT') def __init__(self, actual_init=True, concepts=['faFieldName', 'faModelName', 'faLevelName']): super(FaGribDef, self).__init__(actual_init, concepts) def _find_gribdefs(self, root): dirs = [] if os.path.exists(root) and os.path.isdir(root): for d in sorted(os.listdir(root)): if self._re_dirname.match(d): d = os.path.join(root, d) if os.path.isdir(d): dirs.append(d) return dirs def _actual_init(self): """Read definition files.""" # first, those specified by env var defpaths = get_definition_paths() # then those embarked with epygram defpaths = self._find_gribdefs(self._official_rootdir) + defpaths # complete with local user definitions, if existing defpaths = self._find_gribdefs(config.userlocaldir) + defpaths # read gribdef files for d in defpaths[::-1]: for grib_edition in ('grib1', 'grib2'): for concept in self._concepts: self._read_localConcept(concept, d, grib_edition) self._initialized = True def _read_localConcept(self, concept, directory, grib_edition=GribDef._default_grib_edition): pathname = os.path.join(directory, grib_edition, 'localConcepts', 'lfpw', concept + '.def') if os.path.exists(pathname): self.read(pathname, grib_edition) @init_before def FA2GRIB(self, fieldname, grib_edition=GribDef._default_grib_edition, include_comments=False, fatal=False, filter_non_GRIB_keys=True): """ Convert FA field name to GRIB fid, including vertical level identification. :param grib_edition: among ('grib1', 'grib2'), the version of GRIB fid :param include_comments: if a comment is present if grib def, bring it in fid :param fatal: if True and fieldname is not retrieved, raise a ValueError; else, return a default 255 fid :param filter_non_GRIB_keys: filter out the non-GRIB keys that may be present in grib def of field """ _re_altitude = re.compile(r'(?P<ltype>[A-W]+)(?P<level>\d+)(?P<param>.+)') if fieldname in self.tables[grib_edition]['faFieldName']: fid = self._get_def(fieldname, 'faFieldName', grib_edition, include_comments) elif fieldname.replace(' ', '_') in self.tables[grib_edition]['faFieldName']: # FIXME: ? fid = self._get_def(fieldname.replace(' ', '_'), 'faFieldName', grib_edition, include_comments) else: rematch = _re_altitude.match(fieldname) if rematch: fid = self._get_def(rematch.group('ltype'), 'faLevelName', grib_edition, include_comments, fatal=False) fid.update(self._get_def(rematch.group('param'), 'faFieldName', grib_edition, include_comments, fatal=False)) level = int(rematch.group('level')) if level == 0: # formatting issue level = 100000 fid['scaleFactorOfFirstFixedSurface'] = 0 fid['scaledValueOfFirstFixedSurface'] = level if rematch.group('ltype') == 'P': level /= 100 fid['level'] = level else: if fatal: raise ValueError("field not found: {}".format(fieldname)) else: fid = self._get_def('default', 'faFieldName', grib_edition, include_comments) fid.setdefault('typeOfFirstFixedSurface', 255) if filter_non_GRIB_keys: fid = self._filter_non_GRIB_keys(fid) # productDefinitionTemplateNumber to distinguish between # CLSTEMPERATURE and CLSMINI.TEMPERAT / CLSMAXI.TEMPERAT (for instance) if 'productDefinitionTemplateNumber' not in fid: fid['productDefinitionTemplateNumber'] = 0 return fid @init_before def GRIB2FA(self, gribfid, grib_edition=GribDef._default_grib_edition): """ Look for a unique matching field in tables. ! WARNING ! the unicity might not be ensured depending on the version of grib definitions files. :param grib_edition: among ('grib1', 'grib2'), the version of GRIB fid """ matching_fields = self.lookup_GRIB(gribfid, grib_edition) if len(matching_fields) == 1: return matching_fields.keys[0] elif len(matching_fields) == 0: raise ValueError("No field matching *fid* was found in tables.") elif len(matching_fields) > 1: raise ValueError("Several fields matching were found. Use method *lookup_GRIB* to refine.") @init_before def lookup_FA(self, partial_fieldname, grib_edition=GribDef._default_grib_edition, include_comments=False, filter_non_GRIB_keys=True): """ Look for all the fields which FA name contain **partial_fieldname**. :param grib_edition: among ('grib1', 'grib2'), the version of GRIB fid :param include_comments: if a comment is present if grib def, bring it in fid :param filter_non_GRIB_keys: filter out the non-GRIB keys that may be present in grib def of field """ fields = {} try: # first try to get it as an altitude one gribfid = self.FA2GRIB(partial_fieldname, grib_edition=grib_edition, include_comments=include_comments, fatal=True, filter_non_GRIB_keys=filter_non_GRIB_keys) except ValueError: pass # field was not found as such; might be partial => finally else: fields[partial_fieldname] = gribfid finally: for f, gribfid in self.tables[grib_edition]['faFieldName'].items(): if partial_fieldname in f: fields[f] = gribfid for k, fid in fields.items(): if not include_comments: fid.pop('#comment', None) if filter_non_GRIB_keys: fields[k] = self._filter_non_GRIB_keys(fid) return fields @init_before def lookup_GRIB(self, partial_fid, grib_edition=GribDef._default_grib_edition, include_comments=False, filter_non_GRIB_keys=True): """ Look for all the fields which GRIB fid contain **partial_fid**. :param grib_edition: among ('grib1', 'grib2'), the version of GRIB fid :param include_comments: if a comment is present if grib def, bring it in fid :param filter_non_GRIB_keys: filter out the non-GRIB keys that may be present in grib def of field """ return self._lookup_from_kv(partial_fid, 'faFieldName', grib_edition=grib_edition, include_comments=include_comments, filter_non_GRIB_keys=filter_non_GRIB_keys) def __contains__(self, fid): try: if isinstance(fid, str): fid = parse_GRIBstr_todict(fid) except SyntaxError: # fid is a FA fieldname try: self.FA2GRIB(fid, fatal=True) except ValueError: ok = False else: ok = True else: # fid is a GRIB fid ok = len(self.lookup_GRIB(fid)) > 0 return ok
fagribdef = FaGribDef(actual_init=False)