Source code for satpy.tests.test_yaml_reader

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2015-2019 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy.  If not, see <http://www.gnu.org/licenses/>.
"""Testing the yaml_reader module."""

import os
import random
import unittest
from datetime import datetime
from tempfile import mkdtemp
from unittest.mock import MagicMock, patch

import satpy.readers.yaml_reader as yr
from satpy.readers.file_handlers import BaseFileHandler
from satpy.dataset import DataQuery
from satpy.tests.utils import make_dataid
import xarray as xr
import numpy as np


[docs]class FakeFH(BaseFileHandler): """Fake file handler class.""" def __init__(self, start_time, end_time): """Initialize fake file handler.""" super(FakeFH, self).__init__("", {}, {}) self._start_time = start_time self._end_time = end_time self.get_bounding_box = MagicMock() fake_ds = MagicMock() fake_ds.return_value.dims = ['x', 'y'] self.get_dataset = fake_ds self.combine_info = MagicMock() @property def start_time(self): """Return start time.""" return self._start_time @property def end_time(self): """Return end time.""" return self._end_time
[docs]class TestUtils(unittest.TestCase): """Test the utility functions."""
[docs] def test_get_filebase(self): """Check the get_filebase function.""" base_dir = os.path.join(os.path.expanduser('~'), 'data', 'satellite', 'Sentinel-3') base_data = ('S3A_OL_1_EFR____20161020T081224_20161020T081524_' '20161020T102406_0179_010_078_2340_SVL_O_NR_002.SEN3') base_dir = os.path.join(base_dir, base_data) pattern = ('{mission_id:3s}_OL_{processing_level:1s}_{datatype_id:_<6s' '}_{start_time:%Y%m%dT%H%M%S}_{end_time:%Y%m%dT%H%M%S}_{cre' 'ation_time:%Y%m%dT%H%M%S}_{duration:4d}_{cycle:3d}_{relati' 've_orbit:3d}_{frame:4d}_{centre:3s}_{mode:1s}_{timeliness:' '2s}_{collection:3s}.SEN3/geo_coordinates.nc') pattern = os.path.join(*pattern.split('/')) filename = os.path.join(base_dir, 'Oa05_radiance.nc') expected = os.path.join(base_data, 'Oa05_radiance.nc') self.assertEqual(yr._get_filebase(filename, pattern), expected)
[docs] def test_match_filenames(self): """Check that matching filenames works.""" # just a fake path for testing that doesn't have to exist base_dir = os.path.join(os.path.expanduser('~'), 'data', 'satellite', 'Sentinel-3') base_data = ('S3A_OL_1_EFR____20161020T081224_20161020T081524_' '20161020T102406_0179_010_078_2340_SVL_O_NR_002.SEN3') base_dir = os.path.join(base_dir, base_data) pattern = ('{mission_id:3s}_OL_{processing_level:1s}_{datatype_id:_<6s' '}_{start_time:%Y%m%dT%H%M%S}_{end_time:%Y%m%dT%H%M%S}_{cre' 'ation_time:%Y%m%dT%H%M%S}_{duration:4d}_{cycle:3d}_{relati' 've_orbit:3d}_{frame:4d}_{centre:3s}_{mode:1s}_{timeliness:' '2s}_{collection:3s}.SEN3/geo_coordinates.nc') pattern = os.path.join(*pattern.split('/')) filenames = [os.path.join(base_dir, 'Oa05_radiance.nc'), os.path.join(base_dir, 'geo_coordinates.nc')] expected = os.path.join(base_dir, 'geo_coordinates.nc') self.assertEqual(yr._match_filenames(filenames, pattern), {expected})
[docs] def test_match_filenames_windows_forward_slash(self): """Check that matching filenames works on Windows with forward slashes. This is common from Qt5 which internally uses forward slashes everywhere. """ # just a fake path for testing that doesn't have to exist base_dir = os.path.join(os.path.expanduser('~'), 'data', 'satellite', 'Sentinel-3') base_data = ('S3A_OL_1_EFR____20161020T081224_20161020T081524_' '20161020T102406_0179_010_078_2340_SVL_O_NR_002.SEN3') base_dir = os.path.join(base_dir, base_data) pattern = ('{mission_id:3s}_OL_{processing_level:1s}_{datatype_id:_<6s' '}_{start_time:%Y%m%dT%H%M%S}_{end_time:%Y%m%dT%H%M%S}_{cre' 'ation_time:%Y%m%dT%H%M%S}_{duration:4d}_{cycle:3d}_{relati' 've_orbit:3d}_{frame:4d}_{centre:3s}_{mode:1s}_{timeliness:' '2s}_{collection:3s}.SEN3/geo_coordinates.nc') pattern = os.path.join(*pattern.split('/')) filenames = [os.path.join(base_dir, 'Oa05_radiance.nc').replace(os.sep, '/'), os.path.join(base_dir, 'geo_coordinates.nc').replace(os.sep, '/')] expected = os.path.join(base_dir, 'geo_coordinates.nc').replace(os.sep, '/') self.assertEqual(yr._match_filenames(filenames, pattern), {expected})
[docs] def test_listify_string(self): """Check listify_string.""" self.assertEqual(yr.listify_string(None), []) self.assertEqual(yr.listify_string('some string'), ['some string']) self.assertEqual(yr.listify_string(['some', 'string']), ['some', 'string'])
[docs]class DummyReader(BaseFileHandler): """Dummy reader instance.""" def __init__(self, filename, filename_info, filetype_info): """Initialize the dummy reader.""" super(DummyReader, self).__init__( filename, filename_info, filetype_info) self._start_time = datetime(2000, 1, 1, 12, 1) self._end_time = datetime(2000, 1, 1, 12, 2) self.metadata = {} @property def start_time(self): """Return start time.""" return self._start_time @property def end_time(self): """Return end time.""" return self._end_time
[docs]class TestFileFileYAMLReaderMultiplePatterns(unittest.TestCase): """Test units from FileYAMLReader with multiple readers."""
[docs] def setUp(self): """Prepare a reader instance with a fake config.""" patterns = ['a{something:3s}.bla', 'a0{something:2s}.bla'] res_dict = {'reader': {'name': 'fake', 'sensors': ['canon']}, 'file_types': {'ftype1': {'name': 'ft1', 'file_patterns': patterns, 'file_reader': DummyReader}}, 'datasets': {'ch1': {'name': 'ch01', 'wavelength': [0.5, 0.6, 0.7], 'calibration': 'reflectance', 'file_type': 'ftype1', 'coordinates': ['lons', 'lats']}, 'ch2': {'name': 'ch02', 'wavelength': [0.7, 0.75, 0.8], 'calibration': 'counts', 'file_type': 'ftype1', 'coordinates': ['lons', 'lats']}, 'lons': {'name': 'lons', 'file_type': 'ftype2'}, 'lats': {'name': 'lats', 'file_type': 'ftype2'}}} self.config = res_dict self.reader = yr.FileYAMLReader(self.config, filter_parameters={ 'start_time': datetime(2000, 1, 1), 'end_time': datetime(2000, 1, 2)})
[docs] def test_select_from_pathnames(self): """Check select_files_from_pathnames.""" filelist = ['a001.bla', 'a002.bla', 'abcd.bla', 'k001.bla', 'a003.bli'] res = self.reader.select_files_from_pathnames(filelist) for expected in ['a001.bla', 'a002.bla', 'abcd.bla']: self.assertIn(expected, res) self.assertEqual(len(res), 3)
[docs] def test_fn_items_for_ft(self): """Check filename_items_for_filetype.""" filelist = ['a001.bla', 'a002.bla', 'abcd.bla', 'k001.bla', 'a003.bli'] ft_info = self.config['file_types']['ftype1'] fiter = self.reader.filename_items_for_filetype(filelist, ft_info) filenames = dict(fname for fname in fiter) self.assertEqual(len(filenames.keys()), 3)
[docs] def test_create_filehandlers(self): """Check create_filehandlers.""" filelist = ['a001.bla', 'a002.bla', 'a001.bla', 'a002.bla', 'abcd.bla', 'k001.bla', 'a003.bli'] self.reader.create_filehandlers(filelist) self.assertEqual(len(self.reader.file_handlers['ftype1']), 3)
[docs]class TestFileFileYAMLReader(unittest.TestCase): """Test units from FileYAMLReader."""
[docs] def setUp(self): """Prepare a reader instance with a fake config.""" patterns = ['a{something:3s}.bla'] res_dict = {'reader': {'name': 'fake', 'sensors': ['canon']}, 'file_types': {'ftype1': {'name': 'ft1', 'file_reader': BaseFileHandler, 'file_patterns': patterns}}, 'datasets': {'ch1': {'name': 'ch01', 'wavelength': [0.5, 0.6, 0.7], 'calibration': 'reflectance', 'file_type': 'ftype1', 'coordinates': ['lons', 'lats']}, 'ch2': {'name': 'ch02', 'wavelength': [0.7, 0.75, 0.8], 'calibration': 'counts', 'file_type': 'ftype1', 'coordinates': ['lons', 'lats']}, 'lons': {'name': 'lons', 'file_type': 'ftype2'}, 'lats': {'name': 'lats', 'file_type': 'ftype2'}}} self.config = res_dict self.reader = yr.FileYAMLReader(res_dict, filter_parameters={ 'start_time': datetime(2000, 1, 1), 'end_time': datetime(2000, 1, 2), })
[docs] def test_deprecated_passing_config_files(self): """Test that we get an exception when config files are passed to inti.""" self.assertRaises(ValueError, yr.FileYAMLReader, '/path/to/some/file.yaml')
[docs] def test_all_data_ids(self): """Check that all datasets ids are returned.""" for dataid in self.reader.all_dataset_ids: name = dataid['name'].replace('0', '') assert self.config['datasets'][name]['name'] == dataid['name'] if 'wavelength' in self.config['datasets'][name]: assert self.config['datasets'][name]['wavelength'] == list(dataid['wavelength'])[:3] if 'calibration' in self.config['datasets'][name]: assert self.config['datasets'][name]['calibration'] == dataid['calibration']
[docs] def test_all_dataset_names(self): """Get all dataset names.""" self.assertSetEqual(self.reader.all_dataset_names, set(['ch01', 'ch02', 'lons', 'lats']))
[docs] def test_available_dataset_ids(self): """Get ids of the available datasets.""" loadables = self.reader.select_files_from_pathnames(['a001.bla']) self.reader.create_filehandlers(loadables) self.assertSetEqual(set(self.reader.available_dataset_ids), {make_dataid(name='ch02', wavelength=(0.7, 0.75, 0.8), calibration='counts', modifiers=()), make_dataid(name='ch01', wavelength=(0.5, 0.6, 0.7), calibration='reflectance', modifiers=())})
[docs] def test_available_dataset_names(self): """Get ids of the available datasets.""" loadables = self.reader.select_files_from_pathnames(['a001.bla']) self.reader.create_filehandlers(loadables) self.assertSetEqual(set(self.reader.available_dataset_names), set(["ch01", "ch02"]))
[docs] def test_filter_fh_by_time(self): """Check filtering filehandlers by time.""" fh0 = FakeFH(datetime(1999, 12, 30), datetime(1999, 12, 31)) fh1 = FakeFH(datetime(1999, 12, 31, 10, 0), datetime(2000, 1, 1, 12, 30)) fh2 = FakeFH(datetime(2000, 1, 1, 10, 0), datetime(2000, 1, 1, 12, 30)) fh3 = FakeFH(datetime(2000, 1, 1, 12, 30), datetime(2000, 1, 2, 12, 30)) fh4 = FakeFH(datetime(2000, 1, 2, 12, 30), datetime(2000, 1, 3, 12, 30)) fh5 = FakeFH(datetime(1999, 12, 31, 10, 0), datetime(2000, 1, 3, 12, 30)) for idx, fh in enumerate([fh0, fh1, fh2, fh3, fh4, fh5]): res = self.reader.time_matches(fh.start_time, fh.end_time) # only the first one should be false self.assertEqual(res, idx not in [0, 4]) for idx, fh in enumerate([fh0, fh1, fh2, fh3, fh4, fh5]): res = self.reader.time_matches(fh.start_time, None) self.assertEqual(res, idx not in [0, 1, 4, 5])
[docs] @patch('satpy.readers.yaml_reader.get_area_def') @patch('satpy.readers.yaml_reader.AreaDefBoundary') @patch('satpy.readers.yaml_reader.Boundary') def test_file_covers_area(self, bnd, adb, gad): """Test that area coverage is checked properly.""" file_handler = FakeFH(datetime(1999, 12, 31, 10, 0), datetime(2000, 1, 3, 12, 30)) self.reader.filter_parameters['area'] = True bnd.return_value.contour_poly.intersection.return_value = True adb.return_value.contour_poly.intersection.return_value = True res = self.reader.check_file_covers_area(file_handler, True) self.assertTrue(res) bnd.return_value.contour_poly.intersection.return_value = False adb.return_value.contour_poly.intersection.return_value = False res = self.reader.check_file_covers_area(file_handler, True) self.assertFalse(res) file_handler.get_bounding_box.side_effect = NotImplementedError() self.reader.filter_parameters['area'] = True res = self.reader.check_file_covers_area(file_handler, True) self.assertTrue(res)
[docs] def test_start_end_time(self): """Check start and end time behaviours.""" self.reader.file_handlers = {} def get_start_time(): return self.reader.start_time self.assertRaises(RuntimeError, get_start_time) def get_end_time(): return self.reader.end_time self.assertRaises(RuntimeError, get_end_time) fh0 = FakeFH(datetime(1999, 12, 30, 0, 0), datetime(1999, 12, 31, 0, 0)) fh1 = FakeFH(datetime(1999, 12, 31, 10, 0), datetime(2000, 1, 1, 12, 30)) fh2 = FakeFH(datetime(2000, 1, 1, 10, 0), datetime(2000, 1, 1, 12, 30)) fh3 = FakeFH(datetime(2000, 1, 1, 12, 30), datetime(2000, 1, 2, 12, 30)) fh4 = FakeFH(datetime(2000, 1, 2, 12, 30), datetime(2000, 1, 3, 12, 30)) fh5 = FakeFH(datetime(1999, 12, 31, 10, 0), datetime(2000, 1, 3, 12, 30)) self.reader.file_handlers = { '0': [fh1, fh2, fh3, fh4, fh5], '1': [fh0, fh1, fh2, fh3, fh4, fh5], '2': [fh2, fh3], } self.assertEqual(self.reader.start_time, datetime(1999, 12, 30, 0, 0)) self.assertEqual(self.reader.end_time, datetime(2000, 1, 3, 12, 30))
[docs] def test_select_from_pathnames(self): """Check select_files_from_pathnames.""" filelist = ['a001.bla', 'a002.bla', 'abcd.bla', 'k001.bla', 'a003.bli'] res = self.reader.select_files_from_pathnames(filelist) for expected in ['a001.bla', 'a002.bla', 'abcd.bla']: self.assertIn(expected, res) self.assertEqual(0, len(self.reader.select_files_from_pathnames([])))
[docs] def test_select_from_directory(self): """Check select_files_from_directory.""" filelist = ['a001.bla', 'a002.bla', 'abcd.bla', 'k001.bla', 'a003.bli'] dpath = mkdtemp() for fname in filelist: with open(os.path.join(dpath, fname), 'w'): pass res = self.reader.select_files_from_directory(dpath) for expected in ['a001.bla', 'a002.bla', 'abcd.bla']: self.assertIn(os.path.join(dpath, expected), res) for fname in filelist: os.remove(os.path.join(dpath, fname)) self.assertEqual(0, len(self.reader.select_files_from_directory(dpath))) os.rmdir(dpath) from fsspec.implementations.local import LocalFileSystem class Silly(LocalFileSystem): def glob(self, pattern): return ["/grocery/apricot.nc", "/grocery/aubergine.nc"] res = self.reader.select_files_from_directory(dpath, fs=Silly()) self.assertEqual( res, {"/grocery/apricot.nc", "/grocery/aubergine.nc"})
[docs] def test_supports_sensor(self): """Check supports_sensor.""" self.assertTrue(self.reader.supports_sensor('canon')) self.assertFalse(self.reader.supports_sensor('nikon'))
[docs] @patch('satpy.readers.yaml_reader.StackedAreaDefinition') def test_load_area_def(self, sad): """Test loading the area def for the reader.""" dataid = MagicMock() file_handlers = [] items = random.randrange(2, 10) for _i in range(items): file_handlers.append(MagicMock()) final_area = self.reader._load_area_def(dataid, file_handlers) self.assertEqual(final_area, sad.return_value.squeeze.return_value) args, kwargs = sad.call_args self.assertEqual(len(args), items)
[docs] def test_preferred_filetype(self): """Test finding the preferred filetype.""" self.reader.file_handlers = {'a': 'a', 'b': 'b', 'c': 'c'} self.assertEqual(self.reader._preferred_filetype(['c', 'a']), 'c') self.assertEqual(self.reader._preferred_filetype(['a', 'c']), 'a') self.assertEqual(self.reader._preferred_filetype(['d', 'e']), None)
[docs] def test_get_coordinates_for_dataset_key(self): """Test getting coordinates for a key.""" ds_q = DataQuery(name='ch01', wavelength=(0.5, 0.6, 0.7, 'µm'), calibration='reflectance', modifiers=()) res = self.reader._get_coordinates_for_dataset_key(ds_q) self.assertListEqual(res, [make_dataid(name='lons'), make_dataid(name='lats')])
[docs] def test_get_coordinates_for_dataset_key_without(self): """Test getting coordinates for a key without coordinates.""" ds_id = make_dataid(name='lons', modifiers=()) res = self.reader._get_coordinates_for_dataset_key(ds_id) self.assertListEqual(res, [])
[docs] def test_get_coordinates_for_dataset_keys(self): """Test getting coordinates for keys.""" ds_id1 = make_dataid(name='ch01', wavelength=(0.5, 0.6, 0.7), calibration='reflectance', modifiers=()) ds_id2 = make_dataid(name='ch02', wavelength=(0.7, 0.75, 0.8), calibration='counts', modifiers=()) lons = make_dataid(name='lons', modifiers=()) lats = make_dataid(name='lats', modifiers=()) res = self.reader._get_coordinates_for_dataset_keys([ds_id1, ds_id2, lons]) expected = {ds_id1: [lons, lats], ds_id2: [lons, lats], lons: []} self.assertDictEqual(res, expected)
[docs] def test_get_file_handlers(self): """Test getting filehandler to load a dataset.""" ds_id1 = make_dataid(name='ch01', wavelength=(0.5, 0.6, 0.7), calibration='reflectance', modifiers=()) self.reader.file_handlers = {'ftype1': 'bla'} self.assertEqual(self.reader._get_file_handlers(ds_id1), 'bla') lons = make_dataid(name='lons', modifiers=()) self.assertEqual(self.reader._get_file_handlers(lons), None)
[docs] @patch('satpy.readers.yaml_reader.xr') def test_load_entire_dataset(self, xarray): """Check loading an entire dataset.""" file_handlers = [FakeFH(None, None), FakeFH(None, None), FakeFH(None, None), FakeFH(None, None)] proj = self.reader._load_dataset(None, {}, file_handlers) self.assertIs(proj, xarray.concat.return_value)
[docs]class TestFileYAMLReaderLoading(unittest.TestCase): """Tests for FileYAMLReader.load."""
[docs] def setUp(self): """Prepare a reader instance with a fake config.""" patterns = ['a{something:3s}.bla'] res_dict = {'reader': {'name': 'fake', 'sensors': ['canon']}, 'file_types': {'ftype1': {'name': 'ft1', 'file_reader': BaseFileHandler, 'file_patterns': patterns}}, 'datasets': {'ch1': {'name': 'ch01', 'wavelength': [0.5, 0.6, 0.7], 'calibration': 'reflectance', 'file_type': 'ftype1'}, }} self.config = res_dict self.reader = yr.FileYAMLReader(res_dict, filter_parameters={ 'start_time': datetime(2000, 1, 1), 'end_time': datetime(2000, 1, 2), }) fake_fh = FakeFH(None, None) self.lons = xr.DataArray(np.ones((2, 2)) * 2, dims=['y', 'x'], attrs={'standard_name': 'longitude', 'name': 'longitude'}) self.lats = xr.DataArray(np.ones((2, 2)) * 2, dims=['y', 'x'], attrs={'standard_name': 'latitude', 'name': 'latitude'}) self.data = None def _assign_array(dsid, *_args, **_kwargs): if dsid['name'] == 'longitude': return self.lons if dsid['name'] == 'latitude': return self.lats return self.data fake_fh.get_dataset.side_effect = _assign_array self.reader.file_handlers = {'ftype1': [fake_fh]}
[docs] def test_load_dataset_with_builtin_coords(self): """Test loading a dataset with builtin coordinates.""" self.data = xr.DataArray(np.ones((2, 2)), coords={'longitude': self.lons, 'latitude': self.lats}, dims=['y', 'x']) self._check_area_for_ch01()
[docs] def test_load_dataset_with_builtin_coords_in_wrong_order(self): """Test loading a dataset with builtin coordinates in the wrong order.""" self.data = xr.DataArray(np.ones((2, 2)), coords={'latitude': self.lats, 'longitude': self.lons}, dims=['y', 'x']) self._check_area_for_ch01()
def _check_area_for_ch01(self): res = self.reader.load(['ch01']) assert 'area' in res['ch01'].attrs np.testing.assert_array_equal(res['ch01'].attrs['area'].lons, self.lons) np.testing.assert_array_equal(res['ch01'].attrs['area'].lats, self.lats) assert res['ch01'].attrs.get("reader") == "fake"
[docs]class TestFileFileYAMLReaderMultipleFileTypes(unittest.TestCase): """Test units from FileYAMLReader with multiple file types."""
[docs] def setUp(self): """Prepare a reader instance with a fake config.""" # Example: GOES netCDF data # a) From NOAA CLASS: ftype1, including coordinates # b) From EUMETSAT: ftype2, coordinates in extra file (ftype3) # # For test completeness add one channel (ch3) which is only available # in ftype1. patterns1 = ['a.nc'] patterns2 = ['b.nc'] patterns3 = ['geo.nc'] res_dict = {'reader': {'name': 'fake', 'sensors': ['canon']}, 'file_types': {'ftype1': {'name': 'ft1', 'file_patterns': patterns1}, 'ftype2': {'name': 'ft2', 'file_patterns': patterns2}, 'ftype3': {'name': 'ft3', 'file_patterns': patterns3}}, 'datasets': {'ch1': {'name': 'ch01', 'wavelength': [0.5, 0.6, 0.7], 'calibration': 'reflectance', 'file_type': ['ftype1', 'ftype2'], 'coordinates': ['lons', 'lats']}, 'ch2': {'name': 'ch02', 'wavelength': [0.7, 0.75, 0.8], 'calibration': 'counts', 'file_type': ['ftype1', 'ftype2'], 'coordinates': ['lons', 'lats']}, 'ch3': {'name': 'ch03', 'wavelength': [0.8, 0.85, 0.9], 'calibration': 'counts', 'file_type': 'ftype1', 'coordinates': ['lons', 'lats']}, 'lons': {'name': 'lons', 'file_type': ['ftype1', 'ftype3']}, 'lats': {'name': 'lats', 'file_type': ['ftype1', 'ftype3']}}} self.config = res_dict self.reader = yr.FileYAMLReader(self.config)
[docs] def test_update_ds_ids_from_file_handlers(self): """Test updating existing dataset IDs with information from the file.""" from functools import partial orig_ids = self.reader.all_ids def available_datasets(self, configured_datasets=None): res = self.resolution # update previously configured datasets for is_avail, ds_info in (configured_datasets or []): if is_avail is not None: yield is_avail, ds_info matches = self.file_type_matches(ds_info['file_type']) if matches and ds_info.get('resolution') != res: new_info = ds_info.copy() new_info['resolution'] = res yield True, new_info elif is_avail is None: yield is_avail, ds_info def file_type_matches(self, ds_ftype): if isinstance(ds_ftype, str) and ds_ftype == self.filetype_info['file_type']: return True if self.filetype_info['file_type'] in ds_ftype: return True return None for ftype, resol in zip(('ftype1', 'ftype2'), (1, 2)): # need to copy this because the dataset infos will be modified _orig_ids = {key: val.copy() for key, val in orig_ids.items()} with patch.dict(self.reader.all_ids, _orig_ids, clear=True), \ patch.dict(self.reader.available_ids, {}, clear=True): # Add a file handler with resolution property fh = MagicMock(filetype_info={'file_type': ftype}, resolution=resol) fh.available_datasets = partial(available_datasets, fh) fh.file_type_matches = partial(file_type_matches, fh) self.reader.file_handlers = { ftype: [fh]} # Update existing dataset IDs with resolution property from # the file handler self.reader.update_ds_ids_from_file_handlers() # Make sure the resolution property has been transferred # correctly from the file handler to the dataset ID for ds_id, ds_info in self.reader.all_ids.items(): file_types = ds_info['file_type'] if not isinstance(file_types, list): file_types = [file_types] if ftype in file_types: self.assertEqual(resol, ds_id['resolution'])
[docs]class TestGEOFlippableFileYAMLReader(unittest.TestCase): """Test GEOFlippableFileYAMLReader."""
[docs] @patch.object(yr.FileYAMLReader, "__init__", lambda x: None) @patch.object(yr.FileYAMLReader, "_load_dataset_with_area") def test_load_dataset_with_area_for_single_areas(self, ldwa): """Test _load_dataset_with_area() for single area definitions.""" from pyresample.geometry import AreaDefinition from satpy.readers.yaml_reader import GEOFlippableFileYAMLReader reader = GEOFlippableFileYAMLReader() dsid = MagicMock() coords = MagicMock() # create a dummy upright xarray original_area_extent = (-1500, -1000, 1500, 1000) original_array = np.arange(6).reshape((2, 3)) area_def = AreaDefinition( 'test', 'test', 'test', {'proj': 'geos', 'h': 35785831, 'type': 'crs'}, 3, 2, original_area_extent, ) dummy_ds_xr = xr.DataArray(original_array, coords={'y': np.arange(2), 'x': np.arange(3), 'time': ("y", np.arange(2))}, attrs={'area': area_def}, dims=('y', 'x')) # assign the dummy xr as return for the super _load_dataset_with_area method ldwa.return_value = dummy_ds_xr # check no input, nothing should change res = reader._load_dataset_with_area(dsid, coords) np.testing.assert_equal(res.values, original_array) np.testing.assert_equal(res.attrs['area'].area_extent, original_area_extent) np.testing.assert_equal(res.coords['y'], np.arange(2)) np.testing.assert_equal(res.coords['x'], np.arange(3)) np.testing.assert_equal(res.coords['time'], np.arange(2)) # check wrong input with self.assertRaises(ValueError): _ = reader._load_dataset_with_area(dsid, coords, 'wronginput') # check native orientation, nothing should change res = reader._load_dataset_with_area(dsid, coords, 'native') np.testing.assert_equal(res.values, original_array) np.testing.assert_equal(res.attrs['area'].area_extent, original_area_extent) np.testing.assert_equal(res.coords['y'], np.arange(2)) np.testing.assert_equal(res.coords['x'], np.arange(3)) np.testing.assert_equal(res.coords['time'], np.arange(2)) # check upright orientation, nothing should change since area is already upright res = reader._load_dataset_with_area(dsid, coords, 'NE') np.testing.assert_equal(res.values, original_array) np.testing.assert_equal(res.attrs['area'].area_extent, original_area_extent) np.testing.assert_equal(res.coords['y'], np.arange(2)) np.testing.assert_equal(res.coords['x'], np.arange(3)) np.testing.assert_equal(res.coords['time'], np.arange(2)) # check that left-right image is flipped correctly dummy_ds_xr.attrs['area'] = area_def.copy(area_extent=(1500, -1000, -1500, 1000)) ldwa.return_value = dummy_ds_xr.copy() res = reader._load_dataset_with_area(dsid, coords, 'NE') np.testing.assert_equal(res.values, np.fliplr(original_array)) np.testing.assert_equal(res.attrs['area'].area_extent, original_area_extent) np.testing.assert_equal(res.coords['y'], np.arange(2)) np.testing.assert_equal(res.coords['x'], np.flip(np.arange(3))) np.testing.assert_equal(res.coords['time'], np.arange(2)) # check that upside down image is flipped correctly dummy_ds_xr.attrs['area'] = area_def.copy(area_extent=(-1500, 1000, 1500, -1000)) ldwa.return_value = dummy_ds_xr.copy() res = reader._load_dataset_with_area(dsid, coords, 'NE') np.testing.assert_equal(res.values, np.flipud(original_array)) np.testing.assert_equal(res.attrs['area'].area_extent, original_area_extent) np.testing.assert_equal(res.coords['y'], np.flip(np.arange(2))) np.testing.assert_equal(res.coords['x'], np.arange(3)) np.testing.assert_equal(res.coords['time'], np.flip(np.arange(2))) # check different projection than geos, nothing should be changed area_def = AreaDefinition( 'test', 'test', 'test', {'proj': 'lcc', 'lat_1': 25.0, 'type': 'crs'}, 3, 2, original_area_extent, ) dummy_ds_xr = xr.DataArray(original_array, dims=('y', 'x'), attrs={'area': area_def}) ldwa.return_value = dummy_ds_xr res = reader._load_dataset_with_area(dsid, coords, 'NE') np.testing.assert_equal(res.values, original_array) np.testing.assert_equal(res.attrs['area'].area_extent, original_area_extent)
[docs] @patch.object(yr.FileYAMLReader, "__init__", lambda x: None) @patch.object(yr.FileYAMLReader, "_load_dataset_with_area") def test_load_dataset_with_area_for_stacked_areas(self, ldwa): """Test _load_dataset_with_area() for stacked area definitions.""" from pyresample.geometry import AreaDefinition, StackedAreaDefinition from satpy.readers.yaml_reader import GEOFlippableFileYAMLReader reader = GEOFlippableFileYAMLReader() dsid = MagicMock() coords = MagicMock() # create a dummy upright xarray original_area_extents = [(-1500, -1000, 1500, 1000), (3000, 5000, 7000, 8000)] original_array = np.arange(12).reshape((4, 3)) area_def0 = AreaDefinition( 'test', 'test', 'test', {'proj': 'geos', 'h': 35785831, 'type': 'crs'}, 3, 2, original_area_extents[0], ) area_def1 = area_def0.copy(area_extent=original_area_extents[1]) dummy_ds_xr = xr.DataArray(original_array, dims=('y', 'x'), coords={'y': np.arange(4), 'x': np.arange(3), 'time': ("y", np.arange(4))}, attrs={'area': StackedAreaDefinition(area_def0, area_def1)}) # check that left-right image is flipped correctly dummy_ds_xr.attrs['area'].defs[0] = area_def0.copy(area_extent=(1500, -1000, -1500, 1000)) dummy_ds_xr.attrs['area'].defs[1] = area_def1.copy(area_extent=(7000, 5000, 3000, 8000)) ldwa.return_value = dummy_ds_xr.copy() res = reader._load_dataset_with_area(dsid, coords, 'NE') np.testing.assert_equal(res.values, np.fliplr(original_array)) np.testing.assert_equal(res.attrs['area'].defs[0].area_extent, original_area_extents[0]) np.testing.assert_equal(res.attrs['area'].defs[1].area_extent, original_area_extents[1]) np.testing.assert_equal(res.coords['y'], np.arange(4)) np.testing.assert_equal(res.coords['x'], np.flip(np.arange(3))) np.testing.assert_equal(res.coords['time'], np.arange(4)) # check that upside down image is flipped correctly dummy_ds_xr.attrs['area'].defs[0] = area_def0.copy(area_extent=(-1500, 1000, 1500, -1000)) dummy_ds_xr.attrs['area'].defs[1] = area_def1.copy(area_extent=(3000, 8000, 7000, 5000)) ldwa.return_value = dummy_ds_xr.copy() res = reader._load_dataset_with_area(dsid, coords, 'NE') np.testing.assert_equal(res.values, np.flipud(original_array)) # note that the order of the stacked areadefs is flipped here, as expected np.testing.assert_equal(res.attrs['area'].defs[1].area_extent, original_area_extents[0]) np.testing.assert_equal(res.attrs['area'].defs[0].area_extent, original_area_extents[1]) np.testing.assert_equal(res.coords['y'], np.flip(np.arange(4))) np.testing.assert_equal(res.coords['x'], np.arange(3)) np.testing.assert_equal(res.coords['time'], np.flip(np.arange(4)))
[docs]class TestGEOSegmentYAMLReader(unittest.TestCase): """Test GEOSegmentYAMLReader."""
[docs] @patch.object(yr.FileYAMLReader, "__init__", lambda x: None) @patch.object(yr.FileYAMLReader, "create_filehandlers") def test_get_expected_segments(self, cfh): """Test that expected segments can come from the filename.""" from satpy.readers.yaml_reader import GEOSegmentYAMLReader reader = GEOSegmentYAMLReader() fake_fh = MagicMock() fake_fh.filename_info = {} fake_fh.filetype_info = {} cfh.return_value = {'ft1': [fake_fh]} # default (1) created_fhs = reader.create_filehandlers(['fake.nc']) es = created_fhs['ft1'][0].filetype_info['expected_segments'] self.assertEqual(es, 1) # YAML defined for each file type fake_fh.filetype_info['expected_segments'] = 2 created_fhs = reader.create_filehandlers(['fake.nc']) es = created_fhs['ft1'][0].filetype_info['expected_segments'] self.assertEqual(es, 2) # defined both in the filename and the YAML metadata # YAML has priority fake_fh.filename_info = {'total_segments': 3} fake_fh.filetype_info = {'expected_segments': 2} created_fhs = reader.create_filehandlers(['fake.nc']) es = created_fhs['ft1'][0].filetype_info['expected_segments'] self.assertEqual(es, 2) # defined in the filename fake_fh.filename_info = {'total_segments': 3} fake_fh.filetype_info = {} created_fhs = reader.create_filehandlers(['fake.nc']) es = created_fhs['ft1'][0].filetype_info['expected_segments'] self.assertEqual(es, 3) # check correct FCI chunk number reading into segment fake_fh.filename_info = {'count_in_repeat_cycle': 5} created_fhs = reader.create_filehandlers(['fake.nc']) es = created_fhs['ft1'][0].filename_info['segment'] self.assertEqual(es, 5)
[docs] @patch.object(yr.FileYAMLReader, "__init__", lambda x: None) @patch('satpy.readers.yaml_reader._get_empty_segment_with_height') @patch('satpy.readers.yaml_reader.FileYAMLReader._load_dataset') @patch('satpy.readers.yaml_reader.xr') @patch('satpy.readers.yaml_reader._find_missing_segments') def test_load_dataset(self, mss, xr, parent_load_dataset, geswh): """Test _load_dataset().""" from satpy.readers.yaml_reader import GEOSegmentYAMLReader reader = GEOSegmentYAMLReader() # Projectable is None mss.return_value = [0, 0, 0, False, None] with self.assertRaises(KeyError): res = reader._load_dataset(None, None, None) # Failure is True mss.return_value = [0, 0, 0, True, 0] with self.assertRaises(KeyError): res = reader._load_dataset(None, None, None) # Setup input, and output of mocked functions counter = 9 expected_segments = 8 seg = MagicMock(dims=['y', 'x']) slice_list = expected_segments * [seg, ] failure = False projectable = MagicMock() mss.return_value = (counter, expected_segments, slice_list, failure, projectable) empty_segment = MagicMock() xr.full_like.return_value = empty_segment concat_slices = MagicMock() xr.concat.return_value = concat_slices dataid = MagicMock() ds_info = MagicMock() file_handlers = MagicMock() # No missing segments res = reader._load_dataset(dataid, ds_info, file_handlers) self.assertTrue(res.attrs is file_handlers[0].combine_info.return_value) self.assertTrue(empty_segment not in slice_list) # One missing segment in the middle slice_list[4] = None counter = 8 mss.return_value = (counter, expected_segments, slice_list, failure, projectable) res = reader._load_dataset(dataid, ds_info, file_handlers) self.assertTrue(slice_list[4] is empty_segment) # The last segment is missing slice_list = expected_segments * [seg, ] slice_list[-1] = None counter = 8 mss.return_value = (counter, expected_segments, slice_list, failure, projectable) res = reader._load_dataset(dataid, ds_info, file_handlers) self.assertTrue(slice_list[-1] is empty_segment) # The last two segments are missing slice_list = expected_segments * [seg, ] slice_list[-1] = None counter = 7 mss.return_value = (counter, expected_segments, slice_list, failure, projectable) res = reader._load_dataset(dataid, ds_info, file_handlers) self.assertTrue(slice_list[-1] is empty_segment) self.assertTrue(slice_list[-2] is empty_segment) # The first segment is missing slice_list = expected_segments * [seg, ] slice_list[0] = None counter = 9 mss.return_value = (counter, expected_segments, slice_list, failure, projectable) res = reader._load_dataset(dataid, ds_info, file_handlers) self.assertTrue(slice_list[0] is empty_segment) # The first two segments are missing slice_list = expected_segments * [seg, ] slice_list[0] = None slice_list[1] = None counter = 9 mss.return_value = (counter, expected_segments, slice_list, failure, projectable) res = reader._load_dataset(dataid, ds_info, file_handlers) self.assertTrue(slice_list[0] is empty_segment) self.assertTrue(slice_list[1] is empty_segment) # Check that new FCI empty segment is generated if missing in the middle and at the end fake_fh = MagicMock() fake_fh.filename_info = {} fake_fh.filetype_info = {'file_type': 'fci_l1c_fdhsi'} empty_segment.shape = (140, 5568) slice_list[4] = None counter = 7 mss.return_value = (counter, expected_segments, slice_list, failure, projectable) res = reader._load_dataset(dataid, ds_info, [fake_fh]) assert 2 == geswh.call_count # Disable padding res = reader._load_dataset(dataid, ds_info, file_handlers, pad_data=False) parent_load_dataset.assert_called_once_with(dataid, ds_info, file_handlers)
[docs] def test_get_empty_segment_with_height(self): """Test _get_empty_segment_with_height().""" from satpy.readers.yaml_reader import _get_empty_segment_with_height as geswh dim = 'y' # check expansion of empty segment empty_segment = xr.DataArray(np.ones((139, 5568)), dims=['y', 'x']) new_height = 140 new_empty_segment = geswh(empty_segment, new_height, dim) assert new_empty_segment.shape == (140, 5568) # check reduction of empty segment empty_segment = xr.DataArray(np.ones((140, 5568)), dims=['y', 'x']) new_height = 139 new_empty_segment = geswh(empty_segment, new_height, dim) assert new_empty_segment.shape == (139, 5568) # check that empty segment is not modified if it has the right height already empty_segment = xr.DataArray(np.ones((140, 5568)), dims=['y', 'x']) new_height = 140 new_empty_segment = geswh(empty_segment, new_height, dim) assert new_empty_segment is empty_segment
[docs] @patch.object(yr.FileYAMLReader, "__init__", lambda x: None) @patch('satpy.readers.yaml_reader._load_area_def') @patch('satpy.readers.yaml_reader._stack_area_defs') @patch('satpy.readers.yaml_reader._pad_earlier_segments_area') @patch('satpy.readers.yaml_reader._pad_later_segments_area') def test_load_area_def(self, pesa, plsa, sad, parent_load_area_def): """Test _load_area_def().""" from satpy.readers.yaml_reader import GEOSegmentYAMLReader reader = GEOSegmentYAMLReader() dataid = MagicMock() file_handlers = MagicMock() reader._load_area_def(dataid, file_handlers) pesa.assert_called_once() plsa.assert_called_once() sad.assert_called_once() parent_load_area_def.assert_not_called() # Disable padding reader._load_area_def(dataid, file_handlers, pad_data=False) parent_load_area_def.assert_called_once_with(dataid, file_handlers)
[docs] @patch('satpy.readers.yaml_reader.AreaDefinition') def test_pad_later_segments_area(self, AreaDefinition): """Test _pad_later_segments_area().""" from satpy.readers.yaml_reader import _pad_later_segments_area as plsa seg1_area = MagicMock() seg1_area.crs = 'some_crs' seg1_area.area_extent = [0, 1000, 200, 500] seg1_area.shape = [200, 500] get_area_def = MagicMock() get_area_def.return_value = seg1_area fh_1 = MagicMock() filetype_info = {'expected_segments': 2} filename_info = {'segment': 1} fh_1.filetype_info = filetype_info fh_1.filename_info = filename_info fh_1.get_area_def = get_area_def file_handlers = [fh_1] dataid = 'dataid' res = plsa(file_handlers, dataid) self.assertEqual(len(res), 2) seg2_extent = (0, 1500, 200, 1000) expected_call = ('fill', 'fill', 'fill', 'some_crs', 500, 200, seg2_extent) AreaDefinition.assert_called_once_with(*expected_call)
[docs] @patch('satpy.readers.yaml_reader.AreaDefinition') def test_pad_later_segments_area_for_FCI_padding(self, AreaDefinition): """Test _pad_later_segments_area() in the FCI padding case.""" from satpy.readers.yaml_reader import _pad_later_segments_area as plsa seg1_area = MagicMock() seg1_area.crs = 'some_crs' seg1_area.area_extent = [0, 1000, 200, 500] seg1_area.shape = [556, 11136] get_area_def = MagicMock() get_area_def.return_value = seg1_area fh_1 = MagicMock() filetype_info = {'expected_segments': 2, 'file_type': 'fci_l1c_fdhsi'} filename_info = {'segment': 1} fh_1.filetype_info = filetype_info fh_1.filename_info = filename_info fh_1.get_area_def = get_area_def file_handlers = [fh_1] dataid = 'dataid' res = plsa(file_handlers, dataid) self.assertEqual(len(res), 2) # the previous chunk size is 556, which is exactly double the size of the FCI chunk 2 size (278) # therefore, the new vertical area extent should be half of the previous size (1000-500)/2=250. # The new area extent lower-left row is therefore 1000+250=1250 seg2_extent = (0, 1250, 200, 1000) expected_call = ('fill', 'fill', 'fill', 'some_crs', 11136, 278, seg2_extent) AreaDefinition.assert_called_once_with(*expected_call)
[docs] @patch('satpy.readers.yaml_reader.AreaDefinition') def test_pad_earlier_segments_area(self, AreaDefinition): """Test _pad_earlier_segments_area().""" from satpy.readers.yaml_reader import _pad_earlier_segments_area as pesa seg2_area = MagicMock() seg2_area.crs = 'some_crs' seg2_area.area_extent = [0, 1000, 200, 500] seg2_area.shape = [200, 500] get_area_def = MagicMock() get_area_def.return_value = seg2_area fh_2 = MagicMock() filetype_info = {'expected_segments': 2} filename_info = {'segment': 2} fh_2.filetype_info = filetype_info fh_2.filename_info = filename_info fh_2.get_area_def = get_area_def file_handlers = [fh_2] dataid = 'dataid' area_defs = {2: seg2_area} res = pesa(file_handlers, dataid, area_defs) self.assertEqual(len(res), 2) seg1_extent = (0, 500, 200, 0) expected_call = ('fill', 'fill', 'fill', 'some_crs', 500, 200, seg1_extent) AreaDefinition.assert_called_once_with(*expected_call)
[docs] @patch('satpy.readers.yaml_reader.AreaDefinition') def test_pad_earlier_segments_area_for_FCI_padding(self, AreaDefinition): """Test _pad_earlier_segments_area() for the FCI case.""" from satpy.readers.yaml_reader import _pad_earlier_segments_area as pesa seg2_area = MagicMock() seg2_area.crs = 'some_crs' seg2_area.area_extent = [0, 1000, 200, 500] seg2_area.shape = [278, 5568] get_area_def = MagicMock() get_area_def.return_value = seg2_area fh_2 = MagicMock() filetype_info = {'expected_segments': 2, 'file_type': 'fci_l1c_fdhsi'} filename_info = {'segment': 2} fh_2.filetype_info = filetype_info fh_2.filename_info = filename_info fh_2.get_area_def = get_area_def file_handlers = [fh_2] dataid = 'dataid' area_defs = {2: seg2_area} res = pesa(file_handlers, dataid, area_defs) self.assertEqual(len(res), 2) # the previous chunk size is 278, which is exactly double the size of the FCI chunk 1 size (139) # therefore, the new vertical area extent should be half of the previous size (1000-500)/2=250. # The new area extent lower-left row is therefore 500-250=250 seg1_extent = (0, 500, 200, 250) expected_call = ('fill', 'fill', 'fill', 'some_crs', 5568, 139, seg1_extent) AreaDefinition.assert_called_once_with(*expected_call)
[docs] def test_find_missing_segments(self): """Test _find_missing_segments().""" from satpy.readers.yaml_reader import _find_missing_segments as fms # Dataset with only one segment filename_info = {'segment': 1} fh_seg1 = MagicMock(filename_info=filename_info) projectable = 'projectable' get_dataset = MagicMock() get_dataset.return_value = projectable fh_seg1.get_dataset = get_dataset file_handlers = [fh_seg1] ds_info = {'file_type': []} dataid = 'dataid' res = fms(file_handlers, ds_info, dataid) counter, expected_segments, slice_list, failure, proj = res self.assertEqual(counter, 2) self.assertEqual(expected_segments, 1) self.assertTrue(projectable in slice_list) self.assertFalse(failure) self.assertTrue(proj is projectable) # Three expected segments, first and last missing filename_info = {'segment': 2} filetype_info = {'expected_segments': 3, 'file_type': 'foo'} fh_seg2 = MagicMock(filename_info=filename_info, filetype_info=filetype_info) projectable = 'projectable' get_dataset = MagicMock() get_dataset.return_value = projectable fh_seg2.get_dataset = get_dataset file_handlers = [fh_seg2] ds_info = {'file_type': ['foo']} dataid = 'dataid' res = fms(file_handlers, ds_info, dataid) counter, expected_segments, slice_list, failure, proj = res self.assertEqual(counter, 3) self.assertEqual(expected_segments, 3) self.assertEqual(slice_list, [None, projectable, None]) self.assertFalse(failure) self.assertTrue(proj is projectable)