Source code for epygram.geometries.GaussGeometry

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) Météo France (2014-)
# This software is governed by the CeCILL-C license under French law.
# http://www.cecill.info


"""
Contains the classes for 3D geometries of fields.
"""

from __future__ import print_function, absolute_import, unicode_literals, division

import numpy
import sys
import re

import footprints
from footprints import proxy as fpx
from bronx.syntax.arrays import stretch_array

from epygram import epygramError, config
from epygram.util import (degrees_nearest_mod, Angle,
                          positive_longitudes, longitudes_between_minus180_180,
                          write_formatted,
                          nearlyEqual,
                          as_numpy_array, moveaxis)

from .VGeometry import VGeometry
from .AbstractGeometry import Geometry
from . import _need_pyproj_geod

epylog = footprints.loggers.getLogger(__name__)
_re_nearest_sq = re.compile(r'(?P<n>\d+)\*(?P<m>\d+)')



[docs]class GaussGeometry(Geometry): """ Handles the geometry for a Global Gauss grid 3-Dimensions Field. """ _ghost_attributes = Geometry._ghost_attributes + ['_buffered_gauss_grid'] def __init__(self, name, grid, dimensions, vcoordinate, position_on_horizontal_grid='__unknown__', geoid=None): """ :param name: Name of geometrical type of representation of points on the Globe. Name must be among ['rotated_reduced_gauss', 'reduced_gauss', 'regular_gauss' :param grid: Handles description of the horizontal grid. :param dimensions: Handles grid dimensions. :param vcoordinate: Handles vertical geometry parameters. :param position_on_horizontal_grid: Position of points w/r to the horizontal. among: ['upper-right', 'upper-left', 'lower-left', 'lower-right', 'center-left', 'center-right', 'lower-center', 'upper-center', 'center', '__unknown__'] :param geoid: To specify geoid shape. """ self.add_attr_inlist('name', ['rotated_reduced_gauss', 'reduced_gauss', 'regular_gauss']) self.name = name super(GaussGeometry, self).__init__(grid, dimensions, vcoordinate, position_on_horizontal_grid, geoid) @property def isglobal(self): """ :return: True if geometry is global """ return True def _consistency_check(self): """Check that the geometry is consistent.""" grid_keys = ['dilatation_coef', 'latitudes'] if self.name == 'rotated_reduced_gauss': grid_keys.extend(['pole_lon', 'pole_lat']) if set(self.grid.keys()) != set(grid_keys): raise epygramError("grid attribute must consist in keys: " + str(grid_keys)) assert isinstance(self.grid['latitudes'][0], Angle), \ "'latitudes' attribute of grid must be a list of Angle objects." if self.name == 'rotated_reduced_gauss': assert isinstance(self.grid['pole_lon'], Angle) assert isinstance(self.grid['pole_lat'], Angle) dimensions_keys = ['max_lon_number', 'lat_number', 'lon_number_by_lat'] if set(self.dimensions.keys()) != set(dimensions_keys): raise epygramError("dimensions attribute must consist in keys: " + str(dimensions_keys)) if self.name == 'rotated_reduced_gauss' \ and nearlyEqual(self.grid['pole_lon'].get('degrees'), 0.) \ and nearlyEqual(self.grid['pole_lat'].get('degrees'), 90.): self._attributes['name'] = 'reduced_gauss' self.grid.pop('pole_lon') self.grid.pop('pole_lat') def suggested_GRIB2_sample(self, spectral=False): if self.structure == 'H2D': prefix = {'rotated_reduced_gauss':'reduced_rotated_gg', 'reduced_gauss':'reduced_gg', 'regular_gauss':'regular_gg'}[self.name] return self._GRIB2_sample('sh' if spectral else prefix) else: raise NotImplementedError()
[docs] def ij2ll(self, i, j, position=None): """ Return the (lon, lat) coordinates of point *(i,j)*, in degrees. :param i: X index of point in the 2D matrix of gridpoints :param j: Y index of point in the 2D matrix of gridpoints :param position: lat lon position to return with respect to the model cell. Defaults to self.position_on_horizontal_grid. """ if self._getoffset(position) != (0., 0.): raise NotImplementedError("horizontal staggered grids for reduced\ gauss grid are not implemented.") if isinstance(i, list) or isinstance(i, tuple) or\ isinstance(i, numpy.ndarray): latitudes = as_numpy_array([self.grid['latitudes'][n].get('degrees') for n in range(len(self.grid['latitudes']))]) dimensions = as_numpy_array(self.dimensions['lon_number_by_lat']) lat = latitudes[j] lon = (numpy.pi * 2 * as_numpy_array(i)) / dimensions[j] else: lat = self.grid['latitudes'][j].get('degrees') lon = (numpy.pi * 2. * i) / self.dimensions['lon_number_by_lat'][j] lon = numpy.degrees(lon) (lon, lat) = self._rotate_stretch(lon, lat, reverse=True) return (lon, lat)
[docs] def ll2ij(self, lon, lat, position=None): """ Return the (*i, j*) coordinates in the 2D matrix of gridpoints, of the gridpoint nearest to (*lon*, *lat*). :param lon: longitude of point in degrees :param lat: latitude of point in degrees :param position: lat lon position to return with respect to the model cell. Defaults to self.position_on_horizontal_grid. """ lon, lat = as_numpy_array(lon), as_numpy_array(lat) ij = moveaxis(self.nearest_points(lon, lat, {'n': '1'}, position), 0, -1).squeeze() return (ij[0], ij[1])
def _allocate_colocation_grid(self, compressed=False, as_float=False): """ Creates the array for lonlat grid. Just a trick to avoid recomputing the array for several fields that share their geometry. :param compressed: if True, return 1D arrays, else 2D masked arrays. :param as_float: if True, return arrays with dtype float64, else int64. """ Jmax = self.dimensions['lat_number'] Imax = self.dimensions['lon_number_by_lat'] igrid = [] jgrid = [] for j in range(Jmax): for i in range(Imax[j]): igrid.append(i) jgrid.append(j) if as_float: igrid = numpy.array(igrid, dtype=numpy.float64) jgrid = numpy.array(jgrid, dtype=numpy.float64) else: igrid = numpy.array(igrid) jgrid = numpy.array(jgrid) if not compressed: igrid = self.reshape_data(igrid) jgrid = self.reshape_data(jgrid) return (igrid, jgrid) def _clear_buffered_gauss_grid(self): """Deletes the buffered lonlat grid if any.""" if hasattr(self, '_buffered_gauss_grid'): del self._buffered_gauss_grid @property def gridpoints_number(self, **_): """Returns the number of gridpoints of the grid.""" return sum(self.dimensions['lon_number_by_lat'])
[docs] def get_lonlat_grid(self, position=None, d4=False, nb_validities=0, force_longitudes=None, **_): """ Returns a tuple of two tables containing one the longitude of each point, the other the latitude, with 2D shape. Shape of 2D data in Gauss grids: \n - grid[0, 0:Nj] is first (Northern) band of latitude, masked after Nj = number of longitudes for latitude j \n - grid[-1, 0:Nj] is last (Southern) band of latitude (idem). :param position: position of lonlat grid with respect to the model cell. Defaults to self.position_on_horizontal_grid. :param d4: - if True, returned values are shaped in a 4 dimensions array - if False, shape of returned values is determined with respect to geometry. d4=True requires nb_validities > 0 :param nb_validities: number of validities represented in data values :param force_longitudes: if 'positive', the longitudes will be forced positive if ']-180,180]', the longitudes will be in the ]-180, 180] interval """ # !!! **_ enables the method to receive arguments specific to # other geometries but useless here ! Do not remove. if hasattr(self, '_buffered_gauss_grid') and \ self._buffered_gauss_grid.get('filled'): lons = self._buffered_gauss_grid['lons'] lats = self._buffered_gauss_grid['lats'] else: (igrid, jgrid) = self._allocate_colocation_grid(compressed=True, as_float=False) (lons, lats) = self.ij2ll(igrid, jgrid, position) lons = self.reshape_data(lons) lats = self.reshape_data(lats) if config.FA_buffered_gauss_grid: if not hasattr(self, '_buffered_gauss_grid'): self._buffered_gauss_grid = {'lons':lons, 'lats':lats} else: # trick: the arrays remain pointers to where they were # created, so that they can be shared by several geometry # objects or fields ! self._buffered_gauss_grid['lons'].data[...] = lons[...].data self._buffered_gauss_grid['lons'].mask[...] = lons[...].mask self._buffered_gauss_grid['lats'].data[...] = lats[...].data self._buffered_gauss_grid['lats'].mask[...] = lons[...].mask self._buffered_gauss_grid['filled'] = True if d4: lons, lats = self._reshape_lonlat_4d(lons, lats, nb_validities) elif not d4 and nb_validities != 0: raise ValueError("*nb_validities* must be 0 when d4==False") if force_longitudes == 'positive': lons = positive_longitudes(lons) elif force_longitudes == ']-180,180]': lons = longitudes_between_minus180_180(lons) return (lons, lats)
[docs] def get_datashape(self, force_dimZ=None, dimT=None, d4=False, **_): """ Returns the data shape according to the geometry. :param force_dimZ: if supplied, force the Z dimension instead of that of the vertical geometry :param dimT: if supplied, is the time dimension to be added to the data shape :param d4: - if True, shape is 4D (need to specify *dimT*) - if False, shape is 3D if dimZ > 1 else 2D """ dimY = self.dimensions['lat_number'] dimX = self.dimensions['max_lon_number'] if force_dimZ is not None: dimZ = force_dimZ else: dimZ = len(self.vcoordinate.levels) if d4: assert dimT is not None, \ "*dimT* must be supplied with *d4*=True" shape = [dimT, dimZ, dimY, dimX] else: shape = [] if self.datashape['k'] or dimZ > 1: shape.append(dimZ) shape.append(dimY) shape.append(dimX) return tuple(shape)
[docs] def reshape_data(self, data, first_dimension=None, d4=False): """ Returns a 2D data (horizontal dimensions) reshaped from 1D, according to geometry. :param data: the 1D data (or 3D with a T and Z dimensions, or 2D with either a T/Z dimension, to be specified), of dimension concording with geometry. In case data is 3D, T must be first dimension and Z the second. :param first_dimension: in case data is 2D, specify what is the first dimension of data among ('T', 'Z') :param d4: - if True, returned values are shaped in a 4 dimensions array - if False, shape of returned values is determined with respect to geometry """ assert 1 <= len(data.shape) <= 3 shp_in = data.shape nb_levels = 1 nb_validities = 1 if len(shp_in) == 2: assert first_dimension in ('T', 'Z'), \ "*first_dimension* must be among ('T', 'Z') if *data*.shape == 2" if first_dimension == 'T': nb_validities = shp_in[0] elif first_dimension == 'Z': nb_levels = shp_in[0] elif len(shp_in) == 3: nb_validities = shp_in[0] nb_levels = shp_in[1] assert nb_levels in (1, len(self.vcoordinate.levels)), \ "vertical dimension of data must be 1 or self.vcoordinate.levels=" + \ str(self.vcoordinate.levels) shp4D = self.get_datashape(dimT=nb_validities, force_dimZ=nb_levels, d4=True) data4D = numpy.ma.masked_all(shp4D) ind_end = 0 for j in range(self.dimensions['lat_number']): ind_begin = ind_end ind_end = ind_begin + self.dimensions['lon_number_by_lat'][j] if len(shp_in) == 1: buff = data[slice(ind_begin, ind_end)] data4D[0, 0, j, slice(0, self.dimensions['lon_number_by_lat'][j])] = buff elif len(shp_in) == 2: buff = data[:, slice(ind_begin, ind_end)] if nb_levels > 1: data4D[0, :, j, slice(0, self.dimensions['lon_number_by_lat'][j])] = buff else: data4D[:, 0, j, slice(0, self.dimensions['lon_number_by_lat'][j])] = buff elif len(shp_in) == 3: buff = data[:, :, slice(ind_begin, ind_end)] data4D[:, :, j, slice(0, self.dimensions['lon_number_by_lat'][j])] = buff if ind_end != data.shape[-1]: raise epygramError("data have a wrong length") if d4 or len(shp_in) == 3: data_out = data4D else: if len(shp_in) == 1: data_out = data4D[0, 0, :, :] elif len(shp_in) == 2: if first_dimension == 'T': data_out = data4D[:, 0, :, :] elif first_dimension == 'Z': data_out = data4D[0, :, :, :] return data_out
[docs] def fill_maskedvalues(self, data, fill_value=None): """ Returns a copy of *data* with 'real' masked values (i.e. not those linked to reduced Gauss) filled with *fill_value*. *data* must be already 4D for simplicity reasons. """ assert isinstance(data, numpy.ma.masked_array) assert len(data.shape) == 4 data_filled = numpy.ma.masked_array(data.filled(fill_value)) mask = numpy.zeros(data_filled.data.shape, dtype=numpy.bool_) for j in range(self.dimensions['lat_number']): i0 = self.dimensions['lon_number_by_lat'][j] mask[:, :, j, i0:] = True data_filled.mask = mask return data_filled
[docs] def horizontally_flattened(self, data): """ Returns a copy of *data* with horizontal dimensions flattened and compressed (cf. numpy.ma.masked_array.compressed). *data* must be 4D for simplicity reasons. """ assert len(data.shape) == 4 assert isinstance(data, numpy.ma.masked_array) data3D = numpy.empty(tuple(list(data.shape[:2]) + [self.gridpoints_number])) for t in range(data.shape[0]): for k in range(data.shape[1]): data3D[t, k, :] = data[t, k, :, :].compressed() return data3D
[docs] def resolution_ll(self, lon, lat): """ Returns the average meridian resolution (worst directional resolution) at point position. :param lon: longitude of the point in degrees :param lat: latitude of the point in degrees """ return self.resolution_j(self.ll2ij(lon, lat)[1])
[docs] def meridian_resolution_j(self, j): """ Returns the average meridian resolution at longitude circle number *j*. """ jint = numpy.rint(j).astype('int') jm1 = jint - 1 jp1 = jint + 1 if jm1 == -1: dist = self.distance(self.ij2ll(0, jint), self.ij2ll(0, jp1)) elif jp1 == self.dimensions['lat_number']: dist = self.distance(self.ij2ll(0, jint), self.ij2ll(0, jm1)) else: dist = (self.distance(self.ij2ll(0, jint), self.ij2ll(0, jp1)) + self.distance(self.ij2ll(0, jint), self.ij2ll(0, jm1))) / 2. return dist
[docs] def zonal_resolution_j(self, j): """ Returns the average zonal resolution at longitude circle number j. """ jint = numpy.rint(j).astype('int') return self.distance(self.ij2ll(0, jint), self.ij2ll(1, jint))
@_need_pyproj_geod def resolution_field_from_stretching(self): """ Returns a field which values are the local resolution computed as the nominal resolution stretched locally by the map factor. """ assert self._pyproj_geod.sphere, "Method is not available with a non-spheroid geoid." zonal_equatorial_resolution = 2. * numpy.pi * self._pyproj_geod.a / self.dimensions['max_lon_number'] mf = self.map_factor_field() mf.fid['geometry'] = 'resolution_from_stretching' mf.setdata(zonal_equatorial_resolution / mf.data) return mf
[docs] def resolution_j(self, j): """ Returns the average meridian resolution at longitude circle number j. """ return self.meridian_resolution_j(j)
[docs] def resolution_field(self, direction='meridian'): """ Returns a field whose values are the local resolution in m. :param direction: among ('zonal', 'meridian'), direction in which the resolution is computed. """ assert direction in ('zonal', 'meridian') resolutions = [getattr(self, direction + '_resolution_j')(j) for j in range(self.dimensions['lat_number'])] resol_2d = (numpy.ma.ones(self.get_lonlat_grid()[0].data.shape).transpose() * numpy.array(resolutions)).transpose() resol_2d.mask = self.get_lonlat_grid()[0].mask f = fpx.field(structure='H2D', geometry=self, fid={'geometry':direction + ' resolution'}, units='m') f.setdata(resol_2d) return f
[docs] def distance_to_nearest_neighbour_ll(self, lon, lat): """ Returns the local resolution at the nearest point of lon/lat. It's the distance between this point and its closest neighbour. :param lon: longitude of the point in degrees :param lat: latitude of the point in degrees """ return self.distance_to_nearest_neighbour_ij(*self.ll2ij(lon, lat))
[docs] def distance_to_nearest_neighbour_ij(self, i, j): """ Returns the distance to the nearest point of (i,j) point :param i: X index of point in the 2D matrix of gridpoints :param j: Y index of point in the 2D matrix of gridpoints """ # FIXME: not sure this is exactly computed (iint, jint) = (numpy.rint(i).astype('int'), numpy.rint(j).astype('int')) points_list = [] for oi in [-1, 0, 1]: for oj in [-1, 0, 1]: oj = 0 if (oi, oj) != (0, 0): pi = iint + oi pj = jint + oj if pj < self.dimensions['lat_number'] and pj >= 0: pi = pi % self.dimensions['lon_number_by_lat'][pj] points_list.append((pi, pj)) return numpy.array([self.distance(self.ij2ll(iint, jint), self.ij2ll(*p)) for p in points_list]).min()
[docs] def point_is_inside_domain_ll(self, lon, *_, **__): """ Returns True if the point(s) of lon/lat coordinates is(are) inside the field. This is always the case in Gauss grids, no real meaning. :param lon: longitude of the point in degrees :param lat: latitude of the point in degrees :param margin: considers the point inside if at least 'margin' points far from the border. The -0.1 default is a safety for precision errors. :param position: position of the grid with respect to the model cell. Defaults to self.position_on_horizontal_grid. """ return numpy.ones_like(lon, dtype=numpy.bool)
[docs] def point_is_inside_domain_ij(self, i, j, margin=-0.1): """ Returns True if the point(s) of lon/lat coordinates is(are) inside the field. :param i: X index of point in the 2D matrix of gridpoints :param j: Y index of point in the 2D matrix of gridpoints :param margin: DEPRECATED """ i = as_numpy_array(i) if isinstance(i, (list, tuple)) else i j = as_numpy_array(j) if isinstance(j, (list, tuple)) else j dimensions = as_numpy_array(self.dimensions['lon_number_by_lat']) # Firstly we test the validity of j. # In case j is invalid result will be False # but we need a valid value for j to test the i validity j2 = numpy.maximum(0, numpy.minimum(j, self.dimensions['lat_number'] - 1)) inside = numpy.logical_and(numpy.logical_and(j >= 0, j < self.dimensions['lat_number']), numpy.logical_and(i >= 0, i < dimensions[j2])) return inside
def _rotate_stretch(self, lon, lat, reverse=False): """ Internal method used to transform true lon/lat into rotated and stretched lon/lat. :param lon: longitude in degrees :param lat: latitude in degrees :param reverse: if True, do the reverse transform. Computation adapted from arpifs/transform/trareca.F90 and tracare.F90. """ if self.name == 'rotated_reduced_gauss': KTYP = 2 else: KTYP = 1 PFACDI = self.grid['dilatation_coef'] ZDBLC = 2.0 * PFACDI ZC2P1 = PFACDI * PFACDI + 1. ZC2M1 = PFACDI * PFACDI - 1. ZCRAP = -ZC2M1 / ZC2P1 ZEPS = config.epsilon if isinstance(lon, list) or isinstance(lon, tuple): lon = numpy.array(lon) if isinstance(lat, list) or isinstance(lat, tuple): lat = numpy.array(lat) if not reverse: if 'rotated' in self.name: lon0 = self.grid['pole_lon'].get('degrees') else: lon0 = 0.0 lon = degrees_nearest_mod(lon, lon0) PSLAR = numpy.sin(numpy.radians(lat)) PSLOR = numpy.sin(numpy.radians(lon)) PCLOR = numpy.cos(numpy.radians(lon)) if KTYP == 2: # Complete ARPEGE (PCLAP, PSLAP) = self.grid['pole_lat'].get('cos_sin') (PCLOP, PSLOP) = self.grid['pole_lon'].get('cos_sin') ZCLAR = numpy.sqrt(1. - PSLAR * PSLAR) ZA = PSLAP * PSLAR + PCLAP * ZCLAR * \ (PCLOP * PCLOR + PSLOP * PSLOR) PSLAC = (ZCRAP + ZA) / (1. + ZA * ZCRAP) ZB = 1. / numpy.sqrt(numpy.maximum(ZEPS, 1. - ZA * ZA)) PCLOC = (PCLAP * PSLAR - PSLAP * ZCLAR * (PCLOP * PCLOR + PSLOP * PSLOR)) * ZB PSLOC = ZCLAR * (PSLOP * PCLOR - PCLOP * PSLOR) * ZB PSLAC = numpy.maximum(-1., numpy.minimum(1., PSLAC)) PCLOC = numpy.maximum(-1., numpy.minimum(1., PCLOC)) PSLOC = numpy.maximum(-1., numpy.minimum(1., PSLOC)) if isinstance(PSLOC, numpy.ndarray): for k in range(0, len(PSLOC)): if PCLOC[k] * PCLOC[k] + PSLOC[k] * PSLOC[k] < 0.5: PSLOC[k] = 1. PCLOC[k] = 0. else: if PCLOC * PCLOC + PSLOC * PSLOC < 0.5: PSLOC = 1. PCLOC = 0. else: # Schmidt PSLOC = PSLOR PCLOC = PCLOR PSLAC = (ZC2P1 * PSLAR - ZC2M1) / (ZC2P1 - ZC2M1 * PSLAR) PSLAC = numpy.maximum(-1., numpy.minimum(1., PSLAC)) lat = numpy.arcsin(PSLAC) lon = numpy.arctan2(PSLOC, PCLOC) % (numpy.pi * 2) elif reverse: PSLAC = numpy.sin(numpy.radians(lat)) PCLOC = numpy.cos(numpy.radians(lon)) PSLOC = numpy.sin(numpy.radians(lon)) if KTYP == 2: # Complete ARPEGE (PCLAP, PSLAP) = self.grid['pole_lat'].get('cos_sin') (PCLOP, PSLOP) = self.grid['pole_lon'].get('cos_sin') ZCLAC = numpy.sqrt(1. - PSLAC * PSLAC) ZA = 1. / (ZC2P1 + ZC2M1 * PSLAC) ZB = ZC2P1 * PSLAC + ZC2M1 ZC = ZDBLC * PCLAP * ZCLAC * PCLOC + ZB * PSLAP PSLAR = ZC * ZA ZD = ZA / numpy.sqrt(numpy.maximum(ZEPS, 1. - PSLAR * PSLAR)) ZE = ZB * PCLAP * PCLOP - ZDBLC * ZCLAC * \ (PSLAP * PCLOC * PCLOP - PSLOP * PSLOC) ZF = ZB * PCLAP * PSLOP - ZDBLC * ZCLAC * \ (PSLAP * PCLOC * PSLOP + PCLOP * PSLOC) PCLOR = ZE * ZD PSLOR = ZF * ZD PSLAR = numpy.maximum(-1., numpy.minimum(1., PSLAR)) PCLOR = numpy.maximum(-1., numpy.minimum(1., PCLOR)) PSLOR = numpy.maximum(-1., numpy.minimum(1., PSLOR)) else: # Schmidt PSLOR = PSLOC PCLOR = PCLOC PSLAR = (ZC2P1 * PSLAC + ZC2M1) / (ZC2P1 + ZC2M1 * PSLAC) PSLAR = numpy.maximum(-1., numpy.minimum(1., PSLAR)) lon = numpy.arctan2(PSLOR, PCLOR) lat = numpy.arcsin(PSLAR) lon = numpy.degrees(lon) lat = numpy.degrees(lat) return (lon, lat)
[docs] def map_factor(self, lon, lat): """ Returns the map factor at the given longitude/latitude(s) :param lon: longitude in degrees :param lat: latitude in degrees """ pc = self.grid['dilatation_coef'] (_, plab) = self._rotate_stretch(lon, lat) zlat1 = numpy.radians(plab) # From rotated/streched sphere to rotated zinterm = 1. / pc * numpy.cos(zlat1) / (1. + numpy.sin(zlat1)) zlat2 = 2. * numpy.arctan((1. - zinterm) / (1. + zinterm)) zm = numpy.cos(zlat1) / numpy.cos(zlat2) return zm
[docs] def map_factor_field(self, position=None): """ Returns a new field whose data is the map factor over the field. :param position: grid position with respect to the model cell. Defaults to self.position_on_horizontal_grid. """ kwargs_vcoord = {'typeoffirstfixedsurface': 255, 'position_on_grid': '__unknown__', 'levels': [0]} vcoordinate = VGeometry(**kwargs_vcoord) geometry = self.deepcopy() geometry.vcoordinate=vcoordinate f = fpx.field(structure='H2D', geometry=geometry, fid={'geometry':'Map Factor'}, units='-') (lons, lats) = self.get_lonlat_grid(position=position) data = self.map_factor(stretch_array(lons), stretch_array(lats)) data = self.reshape_data(data) f.setdata(data) return f
[docs] def reproject_wind_on_lonlat(self, u, v, lon, lat, map_factor_correction=True, reverse=False): """ Reprojects a wind vector (u, v) on rotated/stretched sphere onto real sphere, i.e. with components on true zonal/meridian axes. :param u: the u == zonal-on-the-grid component of wind :param v: the v == meridian-on-the-grid component of wind :param lon: longitudes of points in degrees :param lat: latitudes of points in degrees :param map_factor_correction: applies a correction of magnitude due to map factor. :param reverse: if True, apply the reverse reprojection. lon/lat are coordinates on real sphere. """ pc = self.grid['dilatation_coef'] if 'rotated' in self.name: plac = self.grid['pole_lat'].get('degrees') else: plac = 90. (plob, plab) = self._rotate_stretch(lon, lat) # the below formulas seem to be working with # lon_on_rotated_sphere begining below pole_of_rotation, # hence a 180° rotation. plob += 180. pust = u pvst = v # Adapted from J.M.Piriou's pastrv.F90 zlon1 = numpy.radians(plob) zlat1 = numpy.radians(plab) zlatp = numpy.radians(plac) # From rotated/streched sphere to rotated zinterm = 1. / pc * numpy.cos(zlat1) / (1. + numpy.sin(zlat1)) zlat2 = 2. * numpy.arctan((1. - zinterm) / (1. + zinterm)) zlon2 = zlon1 # Map factor if map_factor_correction: zm = numpy.cos(zlat1) / numpy.cos(zlat2) # zm = self.map_factor(lon, lat) # but redundant computations else: if self.grid['dilatation_coef'] != 1.: epylog.warning('check carefully *map_factor_correction* w.r.t. dilatation_coef') zm = numpy.ones(zlat1.shape) # From rotated sphere to real sphere # Compute latitude on real sphere zsla3 = -numpy.cos(zlat2) * numpy.cos(zlon2) * numpy.cos(zlatp) + numpy.sin(zlat2) * numpy.sin(zlatp) if (zsla3 > 1.).any() or (zsla3 < -1.).any(): epylog.warning('reproject_wind_on_lonlat: zsla3=' + str(zsla3)) zsla3 = min(1., max(-1., zsla3)) zlat3 = numpy.arcsin(zsla3) # Real North components zca = (numpy.cos(zlat2) * numpy.sin(zlatp) + numpy.sin(zlat2) * numpy.cos(zlatp) * numpy.cos(zlon2)) / \ numpy.cos(zlat3) zsa = numpy.cos(zlatp) * numpy.sin(zlon2) / numpy.cos(zlat3) # Wind transformation if reverse: zm = 1. / zm zsa = -zsa pusr = zm * (zca * pust - zsa * pvst) pvsr = zm * (zsa * pust + zca * pvst) return (pusr, pvsr)
[docs] def nearest_points(self, lon, lat, request, position=None, external_distance=None, squeeze=True): """ Returns a list of the (i, j) position of the points needed to perform an interpolation. :param lon: longitude of point in degrees. :param lat: latitude of point in degrees. :param request: criteria for selecting the points, among: * {'n':'1'} - the nearest point * {'n':'2*2'} - the 2*2 square points around the position * {'n':'4*4'} - the 4*4 square points around the position * {'n':'N*N'} - the N*N square points around the position: N must be even * {'radius':xxxx, 'shape':'square'} - the points which are xxxx metres around the position in X or Y direction * {'radius':xxxx, 'shape':'circle'} - the points within xxxx metres around the position. (default shape == circle) :param position: position in the model cell of the lat lon position. Defaults to self.position_on_horizontal_grid. :param external_distance: can be a dict containing the target point value and an external field on the same grid as self, to which the distance is computed within the 4 horizontally nearest points; e.g. {'target_value':4810, 'external_field':a_3DField_with_same_geometry}. If so, the nearest point is selected with distance = |target_value - external_field.data| :param squeeze: True to suppress useless dimensions :rtype: general output form is [list, list, ..., list] with as many list items as the length of lon/lat. Each list item is of the form [tuple, tuple, ..., tuple] with as many tuples as the request implies. A tuple represents one of the nearest points associated with one value taken from lon/lat. Each tuple as the form (i, j). Dimensions with a length of one are removed except if squeeze is False. If squeeze is True and if request implies only one nearest point, the list item of the general output form is replaced by the tuple item; if length of lon/lat is one, the output is directly the list item of the general output form. Hence, if length of lon/lat is one and the request implies only one point, the output is a tuple. In case of a simple square request, output is actually an array. Otherwise, the output is as described (it cannot be an array because the number of nearest points can vary with the entry point). """ if self._getoffset(position) != (0., 0.): raise NotImplementedError("horizontal staggered grids for " + "reduced gauss grid are not implemented.") lon, lat = as_numpy_array(lon).flatten(), as_numpy_array(lat).flatten() # ## internal functions def nearest_lats(latrs, num): """ Internal method used to find the nearest latitude circles. *latrs* is the rotated and streched latitude. *num* is: - 2 to find the two surrounding latitude cicles, - 3 to find the nearest latitude circles plus the one above and the one under. - 4 to find the two surrounding latitude circles plus the preceding one and the following one. - and so on... returns an array of shape (len(latrs), num) """ if not numpy.all(numpy.array(latitudes[1:]) <= numpy.array(latitudes[:-1])): raise ValueError('latitudes must be in descending order') latrs = as_numpy_array(latrs) nearest = numpy.ndarray((len(latrs), ), dtype=int) distmin = numpy.ndarray((len(latrs), ), dtype=float) # Slicing is needed to prevent memory error batch_size = 100 # on a particular example, increasing this value implies a longuer execution time for imin in range(0, len(latrs), batch_size): imax = min(imin + batch_size, len(latrs)) dist = latrs[imin:imax, numpy.newaxis] - as_numpy_array(latitudes)[numpy.newaxis, :] nearest[imin:imax] = numpy.argmin(numpy.abs(dist), axis=1) distmin[imin:imax] = dist[numpy.arange(dist.shape[0]), nearest[imin:imax]] result = numpy.zeros((len(latrs), num), dtype=int) if num == 2: result[:, 1] = - numpy.copysign(1, distmin).astype('int') elif num == 3: result[:, 0] = -1 result[:, 2] = 1 elif num % 2: # odd for k in range(1, num // 2 + 1): result[:, num // 2 - k] = - k result[:, num // 2 + k] = k elif not num % 2: # even for k in range(1, num // 2): result[:, num // 2 - k - 1] = - k result[:, num // 2 + k - 1] = k result[:, -1] = num // 2 result = result + nearest[:, numpy.newaxis] return result def nearest_lons(lonrs, latnum, num): """ Internal method used to find the nearest points on a latitude circle. Args:\n - *lonrs* is the rotated and streched longitude. - *num* is: - 1 to find the nearest point, - 2 to find the two surrounding points, - 4 to find the two surrounding points plus the preceding one and the following one. - and so on - *latnum*: if -1 (resp. -2), we search for the opposite longitude on the first (resp. second) latitude circle. The same is true for the other pole. latnum must be of shape (len(lonrs), x) where x is the number of latitude to search for each longitude result shape is (len(lonrs), x, num, 2) """ lonrs = as_numpy_array(lonrs) latnum = as_numpy_array(latnum) assert len(lonrs.shape) == 1 and lonrs.shape == latnum.shape[:1] result = numpy.ndarray(tuple(list(latnum.shape) + [num, 2]), dtype=int) lonrs = numpy.radians(lonrs) lonnummax = numpy.ndarray(latnum.shape, dtype=int) j = numpy.ndarray(latnum.shape, dtype=int) i = numpy.ndarray(latnum.shape, dtype=float) lonrs2d = numpy.repeat(lonrs[:, numpy.newaxis], latnum.shape[1], axis=1) # Near the pole, have to look-up on the first latitudes # circles, symmetrically with regards to the pole mask1 = latnum < 0 j[mask1] = abs(latnum[mask1]) - 1 lonnummax[mask1] = as_numpy_array(self.dimensions['lon_number_by_lat'])[j[mask1]] i[mask1] = ((lonrs2d[mask1] - numpy.pi) % (numpy.pi * 2)) * \ lonnummax[mask1] / (numpy.pi * 2) mask2 = latnum >= len(latitudes) j[mask2] = len(latitudes) - (latnum[mask2] - len(latitudes)) - 1 # TOBECHECKED: next circle past the pole lonnummax[mask2] = as_numpy_array(self.dimensions['lon_number_by_lat'])[j[mask2]] i[mask2] = ((lonrs2d[mask2] - numpy.pi) % (numpy.pi * 2)) * \ lonnummax[mask2] / (numpy.pi * 2) mask = numpy.logical_not(numpy.logical_or(mask1, mask2)) j[mask] = latnum[mask] lonnummax[mask] = as_numpy_array(self.dimensions['lon_number_by_lat'])[latnum[mask]] i[mask] = lonrs2d[mask] * lonnummax[mask] / (numpy.pi * 2) if num == 1: result[:, :, 0, 0] = numpy.rint(i).astype('int') % lonnummax result[:, :, 0, 1] = j elif num == 2: result[:, :, 0, 0] = numpy.floor(i).astype('int') % lonnummax result[:, :, 0, 1] = j result[:, :, 1, 0] = (numpy.floor(i).astype('int') + 1) % lonnummax result[:, :, 1, 1] = j elif num % 2: # odd raise NotImplementedError("but is it necessary ?") elif not num % 2: # even ii = numpy.floor(i).astype('int') for k in range(1, num // 2): result[:, :, num // 2 - k - 1, 0] = (ii - k) % lonnummax result[:, :, num // 2 - k - 1, 1] = j result[:, :, num // 2 - 1, 0] = ii % lonnummax result[:, :, num // 2 - 1, 1] = j for k in range(1, num // 2 + 1): result[:, :, num // 2 + k - 1, 0] = (ii + num // 2 + 1 - k) % lonnummax # reverse order? why? result[:, :, num // 2 + k - 1, 1] = j return result def nearest(lon, lat, lonrs, latrs): """ Internal method used to find the nearest point. lon/lat are the true coordinate, lonrs/latrs are rotated and streched coordinates. Returns an array of points """ lon, lat = as_numpy_array(lon).flatten(), as_numpy_array(lat).flatten() lonrs, latrs = as_numpy_array(lonrs).flatten(), as_numpy_array(latrs).flatten() assert len(lon) == len(lat) == len(lonrs) == len(latrs) all_nearest_lats = nearest_lats(latrs, 3) all_nearest_lons = nearest_lons(lonrs, all_nearest_lats, 1) lon2d = numpy.repeat(lon[:, numpy.newaxis], 3, axis=1) lat2d = numpy.repeat(lat[:, numpy.newaxis], 3, axis=1) ll = self.ij2ll(*[all_nearest_lons[:, :, 0, 0].flatten(), all_nearest_lons[:, :, 0, 1].flatten()]) all_dist = self.distance((lon2d.flatten(), lat2d.flatten()), ll) all_dist = all_dist.reshape((len(lonrs), 3)) result = all_nearest_lons[numpy.arange(all_dist.shape[0]), numpy.argmin(all_dist, axis=1), 0, :] return result def nearests(lonrs, latrs, n): """ Internal methods used to find the n*n points surrunding the point lonrs/latrs, lonrs/latrs are rotated and stretched coordinates. lonrs and latrs can be arrays """ lonrs = as_numpy_array(lonrs) latrs = as_numpy_array(latrs) assert lonrs.shape == latrs.shape and len(lonrs.shape) == 1, "only scalar or 1d arrays with same length" all_nearest_lats = nearest_lats(latrs, n) all_nearest_lons = nearest_lons(lonrs, all_nearest_lats, n) result = all_nearest_lons.reshape((len(lonrs), n**2, 2)) return result # ## actual algorithm # initializations if isinstance(lon, list) or isinstance(lon, tuple): lon = numpy.array(lon) if isinstance(lat, list) or isinstance(lat, tuple): lat = numpy.array(lat) latitudes = [self.grid['latitudes'][n].get('degrees') for n in range(0, self.dimensions['lat_number'])] # compute rotated/stretched lon/lat (lonrs, latrs) = self._rotate_stretch(lon, lat) # 1.A: nearest point is needed only nsquare_match = _re_nearest_sq.match(request.get('n', '')) if nsquare_match: assert nsquare_match.group('n') == nsquare_match.group('m'), \ "anisotropic request {'n':'N*M'} is not supported." if external_distance is not None: assert request == {'n':'1'} def _increments(n): def _rng(n): return numpy.arange(-n // 2 + 1, n // 2 + 1) if self.name == 'academic' and self.dimensions['X'] == 1: i_incr = _rng(1) else: i_incr = _rng(n) if self.name == 'academic' and self.dimensions['Y'] == 1: j_incr = _rng(1) else: j_incr = _rng(n) return (i_incr, j_incr) if request == {'n':'1'} and not external_distance: result = nearest(lon, lat, lonrs, latrs) # 2.: several points are needed else: # 2.1: how many ? if external_distance: n = 1 elif request.get('radius'): if isinstance(lat, numpy.ndarray) and len(lat) > 1: raise NotImplementedError("request:{'radius':..., 'shape':'radius'} with several points.") resolution = self.resolution_ll(lon, lat) n = max(numpy.around(float(request['radius']) / float(resolution)).astype('int') * 2, 1) elif nsquare_match: n = int(nsquare_match.group('n')) else: raise epygramError("unrecognized **request**: " + str(request)) # 2.2: get points result = nearests(lonrs, latrs, n) # 2.3: only select the nearest with regards to external_distance if external_distance: result = list(result) # We transform into list to be able to modify the length for ipt, points in enumerate(result): mindistance = None for p in points: dist = abs(external_distance['external_field'].getvalue_ij(*p, one=True) - external_distance['target_value']) if mindistance is None or dist < mindistance: result[ipt] = [p] mindistance = dist result = as_numpy_array(result) # all item must now have the same length if request.get('radius'): result = list(result) # We transform into list to be able to modify the length for ipt, points in enumerate(result): if request.get('shape', 'circle') == 'circle': #for i,j in points: # print(i,j,self.distance((lon[ipt], lat[ipt]), self.ij2ll(i, j))) result[ipt] = [(i, j) for (i, j) in points if self.distance((lon[ipt], lat[ipt]), self.ij2ll(i, j)) <= request['radius']] elif request.get('shape') == 'square': result[ipt] = [(i, j) for (i, j) in points if all(abs(numpy.array(self.ll2xy(lon[ipt], lat[ipt])) - numpy.array(self.ij2xy(i, j))) <= request['radius'])] assert len(result[ipt]) > 0, "no points found: radius may be too small." if squeeze and len(result) == 1: result = result[0] else: if squeeze: result = result.squeeze() return result
def _what_grid_dimensions(self, out=sys.stdout, spectral_geometry=None): """ Writes in file a summary of the grid & dimensions of the field. :param out: the output open file-like object :param spectral_geometry: an optional dict containing the spectral truncature {'max':}. """ grid = self.grid dimensions = self.dimensions gridmap = {'reduced_gauss':'Reduced Gauss', 'rotated_reduced_gauss':'Rotated Reduced Gauss', 'regular_gauss':'Regular Gauss'} write_formatted(out, "Grid", gridmap[self.name]) if self.name == 'rotated_reduced_gauss': write_formatted(out, "Pole Longitude", grid['pole_lon'].get('degrees')) write_formatted(out, "Pole Latitude", grid['pole_lat'].get('degrees')) write_formatted(out, "Dilatation coefficient", grid['dilatation_coef']) write_formatted(out, "Number of latitudes", dimensions['lat_number']) write_formatted(out, "Maximum number of longitudes on a parallel", dimensions['max_lon_number']) if spectral_geometry is not None: write_formatted(out, "Truncation", spectral_geometry['max'])