Source code for epygram.resources.MultiValiditiesResource

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) Météo France (2014-)
# This software is governed by the CeCILL-C license under French law.
# http://www.cecill.info
"""
Contains the class that handle a MultiValiditiesResource.
"""

from __future__ import print_function, absolute_import, unicode_literals, division

import copy
import numpy

from footprints import FPList, FPDict, proxy as fpx

from epygram import epygramError
from epygram.util import fmtfid
from epygram.base import Resource, FieldSet, FieldValidityList
from epygram.geometries import VGeometry
from . import open_and_close_resource


[docs]class MultiValiditiesResource(Resource): """Class implementing a MultiValiditiesResource.""" _collector = ('resource_modificator', 'epyresource') _footprint = dict( attr=dict( resources=dict( type=FPList, info="List of resources to join."), name=dict( values=set(['MultiValidities'])) ) ) def __init__(self, *args, **kwargs): super(Resource, self).__init__(*args, **kwargs) if len(self.resources) == 0: raise epygramError("There must be at least one resource in *resources*") for r in self.resources: if r.openmode != self.openmode: raise epygramError("All low level resources must be opened using the same mode as this high-level resource.") if len(set([r.format for r in self.resources])) != 1: raise epygramError("All low level resources must have the same format.") self.format = "MultiValidities" self.lowLevelFormat = self.resources[0].format self.isopen = True # resource always appear to be open
[docs] def close(self): """Closes all resources.""" for r in self.resources: try: r.close() except Exception: pass
[docs] def find_fields_in_resource(self, *args, **kwargs): """Call to find_fields_in_resource""" tmp = [] for r in self.resources: with open_and_close_resource(r): tmp.extend(r.find_fields_in_resource(*args, **kwargs)) result = [] for res in tmp: if res not in result: result.append(res) return result
[docs] def listfields(self, *args, **kwargs): """Call to listfields.""" complete = 'complete' in kwargs and kwargs['complete'] tmp = [] for r in self.resources: with open_and_close_resource(r): fidlist = r.listfields(*args, **kwargs) if complete: for fid in fidlist: fid[self.format] = fid[fmtfid(r.format, fid)] tmp.extend(fidlist) result = [] for res in tmp: if res not in result: result.append(res) return result
[docs] def sortfields(self, *args, **kwargs): """Call to sortfields""" tmp = {} for r in self.resources: with open_and_close_resource(r): for k, v in r.sortfields(*args, **kwargs).items(): tmp[k] = tmp.get(k, []) + v result = {} for k, v in tmp.items(): result[k] = list(set(v)) return result
[docs] def readfield(self, *args, **kwargs): """ Reads the field in the different resources and join the validities. """ fieldset = FieldSet() for r in self.resources: with open_and_close_resource(r): fieldset.append(r.readfield(*args, **kwargs)) return self._join_validities(fieldset, **kwargs)
[docs] def writefield(self, *args, **kwargs): """Write fields.""" raise NotImplementedError("writefield is not impelemented")
# To implement writefield we need to check that each validity is affected to only one resource
[docs] def extractprofile(self, *args, **kwargs): """ Extracts the profiles in the different resources and join the validities. """ fieldset = FieldSet() for r in self.resources: with open_and_close_resource(r): fieldset.append(r.extractprofile(*args, **kwargs)) return self._join_validities(fieldset)
[docs] def extractsection(self, *args, **kwargs): """ Extracts the sections in the different resources and join the validities. """ fieldset = FieldSet() for r in self.resources: with open_and_close_resource(r): fieldset.append(r.extractsection(*args, **kwargs)) return self._join_validities(fieldset)
@property def spectral_geometry(self): """ Returns the spectral_geometry """ ref = self.resources[0].spectral_geometry if numpy.all([r.spectral_geometry == ref for r in self.resources]): return ref else: raise epygramError("All spectral_geometry are not identical") def _join_validities(self, fieldset, **kwargs): """ Join the different fields """ # Validities validities = {} for i in range(len(fieldset)): field = fieldset[i] if len(field.validity) != 1: raise NotImplementedError("Join of several multi validities field is not (yet?) implemented") validities[field.validity.get()] = i if len(validities) == 1 and len(fieldset) != 1: validities = {} for i in range(len(fieldset)): field = fieldset[i] if len(field.validity) != 1: raise NotImplementedError("Join of several multi validities field is not (yet?) implemented") validities[field.validity.getbasis()] = i if len(validities) != len(fieldset): raise epygramError("Several resources have returned the same validity") sortedValidities = sorted(validities.keys()) # Geometries geometry = fieldset[0].geometry.deepcopy() geometry.vcoordinate = VGeometry(typeoffirstfixedsurface=255, levels=[255]) vcoordinate = fieldset[0].geometry.vcoordinate.deepcopy() vcoordinate.levels = [] levels = fieldset[0].geometry.vcoordinate.levels joinLevels = False spectral = fieldset[0].spectral for field in fieldset[1:]: if spectral != field.spectral: raise epygramError("All fields must be gridpoint or spectral") geometry_field = field.geometry.deepcopy() geometry_field.vcoordinate = VGeometry(typeoffirstfixedsurface=255, levels=[255]) vcoordinate_field = field.geometry.vcoordinate.deepcopy() vcoordinate_field.levels = [] levels_field = field.geometry.vcoordinate.levels if geometry != geometry_field: raise epygramError("All resources must return fields with the same geometry.") if vcoordinate != vcoordinate_field: raise epygramError("All resources must return fields with the same vertical geometry.") if numpy.any(numpy.array(levels) != numpy.array(levels_field)): if len(vcoordinate.grid) > 0: raise epygramError("All resources must return fields with the same vertical geometry.") else: if len(levels) != len(levels_field): raise epygramError("All resources must return fields with the same vertical geometry length.") joinLevels = True geometry.vcoordinate = fieldset[0].geometry.vcoordinate.deepcopy() if joinLevels: if spectral: raise epygramError("Not sure how to merge vertical levels when spectral") concat = numpy.concatenate newLevels = [] for i in range(len(fieldset)): mylevel = fieldset[validities[sortedValidities[i]]].geometry.get_levels(d4=True, nb_validities=len(field.validity)) if isinstance(mylevel, numpy.ma.masked_array): concat = numpy.ma.concatenate newLevels.append(mylevel) geometry.vcoordinate.levels = list(concat(newLevels, axis=0).swapaxes(0, 1).squeeze()) # Other metadata kwargs_field = copy.deepcopy(fieldset[0].footprint_as_dict()) kwargs_field['validity'] = FieldValidityList() kwargs_field['geometry'] = geometry for k, v in kwargs_field.items(): if isinstance(v, FPDict): kwargs_field[k] = dict(v) field = fpx.field(**kwargs_field) sameProcesstype = True for f in fieldset[1:]: kwargs_field = copy.deepcopy(f.footprint_as_dict()) kwargs_field['validity'] = FieldValidityList() kwargs_field['geometry'] = geometry sameProcesstype = sameProcesstype and kwargs_field['processtype'] == fieldset[0].processtype kwargs_field['processtype'] = fieldset[0].processtype # we exclude processtype from the comparison kwargs_field['fid'] = field.fid # change FPDict to dict for k, v in kwargs_field.items(): if isinstance(v, FPDict): kwargs_field[k] = dict(v) myf = fpx.field(**kwargs_field) if field != myf: raise epygramError("All fields must be of the same kind") # Returned field fieldvaliditylist = FieldValidityList() fieldvaliditylist.pop() getdata = kwargs.get('getdata', True) for i in range(len(fieldset)): field = fieldset[validities[sortedValidities[i]]] fieldvaliditylist.extend(field.validity) if getdata: if i == 0: data = field.getdata(d4=True) else: if isinstance(data, numpy.ma.masked_array): concat = numpy.ma.concatenate else: concat = numpy.concatenate data = concat([data, field.getdata(d4=True)], axis=0) if not sameProcesstype: kwargs_field['processtype'] = None kwargs_field['validity'] = fieldvaliditylist kwargs_field['fid'][self.format] = kwargs_field['fid'][fmtfid(self.lowLevelFormat, kwargs_field['fid'])] field = fpx.field(**kwargs_field) if getdata: field.setdata(data) return field