Source code for epygram.geometries.VGeometry

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) Météo France (2014-)
# This software is governed by the CeCILL-C license under French law.
# http://www.cecill.info
"""
Contains the class for Vertical geometry of fields.
"""

from __future__ import print_function, absolute_import, unicode_literals, division

import numpy
import sys

from footprints import proxy as fpx

from epygram import profiles, epygramError, config
from epygram.util import (RecursiveObject, write_formatted, separation_line,
                          ifNone_emptydict)
from epygram.geometries import VGeometry


[docs]class VGeometry(RecursiveObject): """ Handles the vertical geometry for fields. Here, the grid defines the vertical position of each level between a bottom and a top positions. The position of points w/r to the vertical grid (mass or flux points), is interpreted as:\n - mass: points are located on same levels as the grid points. - flux: points are located on half-levels, hence are N+1. levels is a list with one item for each level represented in data. Each item can be:\n - a scalar (constant value for all the data point), - an array with the horizontal geographic shape (level constant in time but varying on the horizontal), - an array with the first dimension corresponding to the validity lengthy and other dimensions to represent the horizotal. It is not allowed to have a level varying in time and constant on the geographic domain. """ def __init__(self, typeoffirstfixedsurface, levels, typeofsecondfixedsurface=None, toplevel=None, bottomlevel=None, grid=None, position_on_grid='__unknown__'): """ :param str structure: must be 'V' :param int typeoffirstfixedsurface: Type of horizontal level, as of GRIB2 norm (inspired from GRIB_API). :param list levels: Effective levels on which data is available. :param dict grid: Handles description of the vertical grid. :param str position_on_grid: Position of points w/r to the vertical grid. (among ['mass', 'flux', '__unknown__']) """ self.add_attr_int('typeoffirstfixedsurface') self.add_attr_int('typeofsecondfixedsurface') self.add_attr_list('levels') self.add_attr_dict('grid') self.add_attr_inlist('position_on_grid', ['mass', 'flux', '__unknown__']) self.typeoffirstfixedsurface = typeoffirstfixedsurface if typeofsecondfixedsurface is not None: self.typeofsecondfixedsurface = typeofsecondfixedsurface self.levels = levels self.grid = ifNone_emptydict(grid) self.position_on_grid = position_on_grid
[docs] def what(self, out=sys.stdout, levels=True): """ Writes in file a summary of the geometry. :param out: the output open file-like object :param levels: if True, writes the levels of the geometry """ out.write("#########################\n") out.write("### VERTICAL GEOMETRY ###\n") out.write("#########################\n") write_formatted(out, "Number of vertical levels", len(self.levels)) write_formatted(out, "Type of first fixed surface", self.typeoffirstfixedsurface) if levels: if len(self.levels) == 1: write_formatted(out, "Level", self.levels[0]) else: write_formatted(out, "Levels", self.levels) if self.grid is not None and self.grid != {}: if self.typeoffirstfixedsurface in (118, 119): if self.typeoffirstfixedsurface == 119: name = "Hybrid-pressure" elif self.typeoffirstfixedsurface == 118: name = "Hybrid-height" out.write(name + " coord. A coefficients\n") Ai = [level[1]['Ai'] for level in self.grid['gridlevels']] for c in range(len(Ai)): write_formatted(out, str(c + 1), Ai[c], align='>') out.write(separation_line) out.write(name + " coord. B coefficients\n") Bi = [level[1]['Bi'] for level in self.grid['gridlevels']] for c in range(len(Bi)): write_formatted(out, str(c + 1), Bi[c], align='>') else: if len(self.grid['gridlevels']) == 1: write_formatted(out, "Grid", self.grid['gridlevels'][0]) else: write_formatted(out, "Grid", self.grid['gridlevels']) out.write(separation_line) out.write("\n")
[docs] def to_vgrid(self, **kwargs): """ Convert the VGeometry object to a :mod:`vgrid` one. Other kwargs passed to HybridPressureVGrid() constructor. """ from pyvgrid import HybridPressureVGrid if self.typeoffirstfixedsurface == 119: vg = HybridPressureVGrid(self, **kwargs) else: raise NotImplementedError('to_vgrid(): self.typeoffirstfixedsurface==' + str(self.typeoffirstfixedsurface)) return vg
######################## # CONVERSION FUNCTIONS # ########################
[docs]def hybridP2pressure(hybridP_geometry, Psurf, vertical_mean, gridposition=None): """ Converts a 'hybrid_pressure' VGeometry to a 'pressure' (in hPa) VGeometry. :param VGeometry hybridP_geometry: the initial vertical coordinate :param float Psurf: the surface pressure in Pa, needed for integration of Ai and Bi. :param str vertical_mean: defines the kind of averaging done on the vertical to compute half-levels from full-levels, or inverse: 'geometric' or 'arithmetic'. :param str gridposition: (= 'mass' or 'flux') is the target grid position. By default the data position in the origin geometry is taken. :rtype: VGeometry """ assert isinstance(hybridP_geometry, VGeometry), "*hybridP_geometry* must be of type VGeometry." if hybridP_geometry.typeoffirstfixedsurface != 119: raise epygramError("*hybridP_geometry.typeoffirstfixedsurface* must be 119.") if hybridP_geometry.grid['ABgrid_position'] != 'flux': raise NotImplementedError("A and B must define flux levels.") if gridposition is None: gridposition = hybridP_geometry.position_on_grid # compute pressures A = [level[1]['Ai'] for level in hybridP_geometry.grid['gridlevels']][1:] B = [level[1]['Bi'] for level in hybridP_geometry.grid['gridlevels']][1:] if gridposition == 'mass': levels = profiles.hybridP2masspressure(A, B, Psurf, vertical_mean) elif gridposition == 'flux': levels = profiles.hybridP2fluxpressure(A, B, Psurf) else: raise epygramError("gridposition != 'mass' or 'flux'.") levels = [l / 100. for l in levels.squeeze()] kwargs_vcoord = {'typeoffirstfixedsurface': 100, 'position_on_grid': hybridP_geometry.position_on_grid, # 'grid': {'gridlevels':levels}, 'levels': levels } return VGeometry(**kwargs_vcoord)
[docs]def hybridH2pressure(hybridH_geometry, P, position): """ Converts a hybrid_height coordinate grid into pressure (in hPa). :param P: the vertical profile of pressure to use :param position: the position of P values on the grid ('mass' or 'flux') :rtype: VGeometry """ assert isinstance(hybridH_geometry, VGeometry), "*hybridH_geometry* must be of type VGeometry." if hybridH_geometry.typeoffirstfixedsurface != 118: raise epygramError("*hybridH_geometry.typeoffirstfixedsurface* must be 118.") if position == hybridH_geometry.position_on_grid: levels = P elif position == 'flux' and hybridH_geometry.position_on_grid == 'mass': raise NotImplementedError('this set of positions is not implemented 1.') elif position == 'mass' and hybridH_geometry.position_on_grid == 'flux': Pnew = numpy.zeros(len(P)) for k in range(len(P)): if k == 0: Pnew[k] = P[k] else: Pnew[k] = 0.5 * (P[k - 1] + P[k]) levels = Pnew else: raise NotImplementedError("grid positions can only be 'mass' or 'flux'.") levels = levels[numpy.array(hybridH_geometry.levels) - 1] levels = [l / 100 for l in levels.squeeze()] kwargs_vcoord = {'typeoffirstfixedsurface': 100, 'position_on_grid': hybridH_geometry.position_on_grid, # 'grid':{'gridlevels':levels}, 'levels': levels } return VGeometry(**kwargs_vcoord)
[docs]def hybridP2altitude(hybridP_geometry, R, T, Psurf, vertical_mean, Pdep=None, Phi_surf=None): """ Converts a hybrid_pressure coordinate grid into altitude of mass levels. :param VGeometry hybridP_geometry: the initial vertical coordinate :param list,numpy.ndarray R: the profile of specific gas constant (J/kg/K). :param list,numpy.ndarray T: the profile of temperature (K). :param float Psurf: the surface pressure, needed for integration of Ai and Bi. :param str vertical_mean: defines the kind of averaging done on the vertical to compute half-levels from full-levels, or inverse: 'geometric' or 'arithmetic'. :param list,numpy.ndarray Pdep: the optional profile of NH pressure departures. :param float Phi_surf: the optional surface geopotential. If given, the final coordinate is altitude above sea level, else height above ground surface. :rtype: VGeometry """ assert isinstance(hybridP_geometry, VGeometry), "*hybridP_geometry* must be of type VGeometry." if hybridP_geometry.typeoffirstfixedsurface != 119: raise epygramError("*hybridP_geometry.typeoffirstfixedsurface* must be 119.") A = [level[1]['Ai'] for level in hybridP_geometry.grid['gridlevels']][1:] B = [level[1]['Bi'] for level in hybridP_geometry.grid['gridlevels']][1:] if hybridP_geometry.grid['ABgrid_position'] == 'flux': # compute alt alt = profiles.hybridP2altitude(A, B, R, T, Psurf, vertical_mean, Pdep=Pdep, Phi_surf=Phi_surf, Ptop=config.default_Ptop) # and update info if Phi_surf is None: coordinate = 103 else: coordinate = 102 levels = alt elif hybridP_geometry.grid['ABgrid_position'] == 'mass': raise NotImplementedError("hybrid-pressure grid at mass-levels.") levels = levels[numpy.array(hybridP_geometry.levels) - 1] kwargs_vcoord = {'typeoffirstfixedsurface': coordinate, 'position_on_grid': hybridP_geometry.position_on_grid, # 'grid':{'gridlevels':list(levels)}, 'levels': list(levels) } return VGeometry(**kwargs_vcoord)
[docs]def hybridH2altitude(hybridH_geometry, Zsurf, gridposition=None, conv2height=False): """ Converts a hybrid_height coordinate grid into altitude. :param Zsurf: the surface height, needed for integration of Ai and Bi. :param gridposition: if given ('mass' or 'flux'), the target grid is computed accordingly. By default the data position in the origin geometry is taken. :param conv2height: if True, conversion into height is performed instead of altitude. :rtype: VGeometry """ assert isinstance(hybridH_geometry, VGeometry), "*hybridH_geometry* must be of type VGeometry." if hybridH_geometry.typeoffirstfixedsurface != 118: raise epygramError("*hybridH_geometry.coordinate* must be 118.") if hybridH_geometry.grid['ABgrid_position'] != 'flux': raise NotImplementedError('A and B must define flux levels') if gridposition is None: gridposition = hybridH_geometry.position_on_grid # compute altitudes A = [level[1]['Ai'] for level in hybridH_geometry.grid['gridlevels']] B = [level[1]['Bi'] for level in hybridH_geometry.grid['gridlevels']] # profiles.hybridH2*height wait for Ai and Bi describing the flux level of extra level under the ground A = [-A[1]] + A B = [2 - B[1]] + B if gridposition == 'mass': levels = profiles.hybridH2massheight(A, B, Zsurf, conv2height=conv2height) elif gridposition == 'flux': levels = profiles.hybridH2fluxheight(A, B, Zsurf, conv2height=conv2height) else: raise epygramError("gridposition != 'mass' or 'flux'.") levels = levels[numpy.array(hybridH_geometry.levels)] kwargs_vcoord = {'typeoffirstfixedsurface': 103 if conv2height else 102, 'position_on_grid': hybridH_geometry.position_on_grid, # 'grid':{'gridlevels':list(levels)}, 'levels': list(levels) } return VGeometry(**kwargs_vcoord)
def height2altitude(height_geometry, Zsurf): """ Converts a height coordinate grid into altitude. :param Zsurf: the surface height. :rtype: VGeometry """ assert isinstance(height_geometry, VGeometry), "*height_geometry* must be of type VGeometry." if height_geometry.typeoffirstfixedsurface != 103: raise epygramError("*height_geometry.coordinate* must be 103.") levels = list(height_geometry.levels) for k in range(len(levels)): levels[k] = levels[k] + Zsurf kwargs_vcoord = {'typeoffirstfixedsurface': 102, 'position_on_grid': height_geometry.position_on_grid, 'levels': levels } return VGeometry(**kwargs_vcoord) def altitude2height(altitude_geometry, Zsurf): """ Converts an altitude coordinate grid into height. :param Zsurf: the surface height. :rtype: VGeometry """ assert isinstance(altitude_geometry, VGeometry), "*altitude_geometry* must be of type VGeometry." if altitude_geometry.typeoffirstfixedsurface != 102: raise epygramError("*altitude_geometry.coordinate* must be 102.") levels = list(altitude_geometry.levels) for k in range(len(levels)): levels[k] = levels[k] - Zsurf kwargs_vcoord = {'typeoffirstfixedsurface': 103, 'position_on_grid': altitude_geometry.position_on_grid, 'levels': levels } return VGeometry(**kwargs_vcoord)
[docs]def pressure2altitude(pressure_geometry, R, T, vertical_mean, Pdep=0., Phi_surf=0.): """ Converts a pressure coordinate grid (on mass or flux levels) to altitude on mass levels). :param R: the profile of specific gas constant (J/kg/K). :param T: the profile of temperature (K). :param Pdep: the optional profile of NH pressure departures. :param Phi_surf: the optional surface geopotential. If given, the final coordinate is altitude above sea level, else height above ground surface. :param vertical_mean: defines the kind of averaging done on the vertical to compute half-levels from full-levels, or inverse: 'geometric' or 'arithmetic'. """ raise NotImplementedError("Must be modified") # La fonction n'a pas été revue depuis la modification sur les géométries verticales. # Je n'ai pas prévu le cas d'une grille définie sur des niveaux pression avec des niveaux # qui peuvent être différents (au moins à cause des mass/flux). # Pour intégrer la possibilité, il faudrait peut-être mettre une grille dans tous les cas # et que levels ne soit, à chaque fois, qu'une liste d'entiers indiquant les niveaux dans la grille. if pressure_geometry.grid['gridposition'] == 'flux': # compute alt levels = profiles.pressure2altitude(R, T, vertical_mean, pi_tilde=pressure_geometry.grid['levels'] * 100, Pdep=Pdep, Phi_surf=Phi_surf) elif pressure_geometry.grid['gridposition'] == 'mass': # compute alt levels = profiles.pressure2altitude(R, T, vertical_mean, pi=pressure_geometry.grid['levels'] * 100, Pdep=Pdep, Phi_surf=Phi_surf) # and update info if Phi_surf is None: coordinate = 103 else: coordinate = 102 geom = pressure_geometry.deppcopy() grid = pressure_geometry.grid.copy() grid.update({'gridposition':pressure_geometry.gridposition, 'levels':list(levels)}) return geom
[docs]def hybridP_coord_and_surfpressure_to_3D_pressure_field( hybridP_geometry, Psurf, vertical_mean, gridposition=None): """ From a hybridP VGeometry and a surface pressure (in Pa) H2D field, compute a 3D field containing the pressure (in hPa) at each hybridP level for each gridpoint. :param VGeometry hybridP_geometry: the hybridP VGeometry :param H2DField Psurf: the surface pressure H2DField in Pa, needed for integration of Ai and Bi. :param str vertical_mean: defines the kind of averaging done on the vertical to compute half-levels from full-levels, or inverse: 'geometric' or 'arithmetic'. :param str gridposition: (= 'mass' or 'flux') is the target grid position. By default the data position in the origin geometry is taken. :rtype: D3Field """ from epygram.fields import H2DField assert isinstance(Psurf, H2DField) if Psurf.spectral: Psurf.sp2gp() pressures = hybridP2pressure(hybridP_geometry, Psurf.data, vertical_mean, gridposition=gridposition).levels vgeom = hybridP_geometry.deepcopy() vgeom.levels = list(range(1, len(pressures) + 1)) geom = Psurf.geometry.deepcopy() geom.vcoordinate = vgeom d3pressure = fpx.field(fid={'computed':'pressure'}, structure='3D', geometry=geom, units='hPa') if isinstance(pressures[0], numpy.ma.masked_array): pressures = numpy.ma.array(pressures) else: pressures = numpy.array(pressures) d3pressure.setdata(pressures) return d3pressure
[docs]def hybridP_coord_to_3D_altitude_field( hybridP_geometry, Psurf, vertical_mean, t3D, q3D, ql3D=None, qi3D=None, qr3D=None, qs3D=None, qg3D=None, Pdep3D=None, Phi_surf=None): """ From a hybridP VGeometry, a surface pressure (in Pa) H2D field, and temperature and specific humidity 3D fields, compute a 3D field containing the altitude (in m) at each hybridP level for each gridpoint. Hydrometeors 3D fields can be provided for more accurate R computation. :param VGeometry hybridP_geometry: the hybridP VGeometry :param H2DField Psurf: the surface pressure H2DField in Pa, needed for integration of Ai and Bi. :param str vertical_mean: defines the kind of averaging done on the vertical to compute half-levels from full-levels, or inverse: 'geometric' or 'arithmetic'. :param D3Field t3D: temperature D3Field :param D3Field q3D: specific humidity D3Field :param D3Field ql3D: liquid water content D3Field :param D3Field qi3D: ice water content D3Field :param D3Field qr3D: rain water content D3Field :param D3Field qs3D: snow water content D3Field :param D3Field qg3D: graupel water content D3Field :param D3Field Pdep3D: non-hydrostatic pressure departure D3Field :param H2DField Phi_surf: surface geopotential H2DField (for altitude vs. height) :rtype: D3Field """ from epygram.fields import D3Field, H2DField from bronx.meteo.conversion import q2R assert isinstance(t3D, D3Field) assert isinstance(q3D, D3Field) if Phi_surf is not None: assert isinstance(Phi_surf, H2DField) # preparations if t3D.spectral: t3D.sp2gp() if Phi_surf is not None: if Phi_surf.spectral: Phi_surf.sp2gp() Phi_surf = Phi_surf.data hydrometeors = {} for qk in ('ql', 'qi', 'qr', 'qs', 'qg'): if locals()[qk + '3D'] is not None: hydrometeors[qk] = locals()[qk + '3D'].data if isinstance(Pdep3D, D3Field): Pdep3D = Pdep3D.data # computations p3D = hybridP_coord_and_surfpressure_to_3D_pressure_field( hybridP_geometry, Psurf, vertical_mean, gridposition='mass') p3D.operation('*', 100) R3D = q2R(q3D.data, **hydrometeors) alt3D = profiles.pressure2altitude(R3D, t3D.data, vertical_mean='geometric', pi=p3D.data, Phi_surf=Phi_surf, Pdep=Pdep3D) # arrange output field p3D.setdata(alt3D) alt3D = p3D if Phi_surf is None: alt3D.fid['computed'] = 'height' else: alt3D.fid['computed'] = 'altitude' alt3D.units = 'm' return alt3D