# -*- coding: utf-8 -*-
"""
This module has tools to download timeseries climate data from gridded climate
data collections that are hosted on Google's Earth Engine. It reads the
formatted file that was prepared using the :mod:`gridwxcomp.prep_metadata`
module and uses the coordinate information there along with the variable names
specified in the configuration .INI file to know which data to download and
for which geographic locations which are paired with the station locations.
"""
import ee
import os
import pandas as pd
import re
import time
from gridwxcomp.util import read_config, affine_transform
from pathlib import Path
from multiprocessing.pool import ThreadPool as Pool
def _get_collection_date_range(path):
"""
Acquires date range for EE image collection if dates are not provided
by the user in the config file
Arguments:
path (str): path to image collection on earth engine
Returns:
start_date (str): date of first image as YYYY-MM-DD
end_date (str): date of last image as YYYY-MM-DD
"""
dataset = ee.ImageCollection(path)
start_img = dataset.limit(1, 'system:index', True).first()
start_text = start_img.getInfo()['properties']['system:index']
start_date = start_text[:4] + '-' + start_text[4:]
start_date = start_date[:7] + '-' + start_date[7:]
end_img = dataset.limit(1, 'system:index', False).first()
end_text = end_img.getInfo()['properties']['system:index']
end_date = end_text[:4] + '-' + end_text[4:]
end_date = end_date[:7] + '-' + end_date[7:]
return start_date, end_date
def _download_point_data(param_dict):
"""
Makes reduceRegion call to Earth Engine to extract timeseries for a station.
Data is obtained via getInfo call and then saved locally.
Called from :func:`download_grid_data` using a ThreadPool.
Arguments:
param_dict (dict): dictionary of parameters for reduceRegion call.
Returns:
None
Note: You must authenticate with Google Earth Engine before using
this function.
"""
# todo: change force_download (bool) to download_method (string)
# and provide option for appending latest data
# Don't re-download file unless force_download is True
if (os.path.exists(param_dict['GRID_FILE_PATH']) and
not param_dict['FORCE_DOWNLOAD']):
print(f'{param_dict["GRID_FILE_PATH"]} already exists,'
f' skipping download.\n')
return
# Time download process
start_time = time.time()
# get image properties
ic = (ee.ImageCollection(param_dict['DATASET_PATH']).
filterDate(param_dict['START_DATE'], param_dict['END_DATE']))
bands = ic.first().bandNames().getInfo()
projection = ic.first().projection()
transform = affine_transform(ic.first())
# Create point to reduce over
point = ee.Geometry.Point([param_dict['STATION_LON_WGS84'],
param_dict['STATION_LAT_WGS84']])
def _reduce_point_img(img):
date_str = img.date()
date_mean = date_str.format("YYYYMMdd")
reduce_mean = img.reduceRegion(geometry=point,
reducer=ee.Reducer.mean(),
crs=projection, crsTransform=transform)
return ee.Feature(None, reduce_mean).set(
{"date": date_mean, 'station_name': param_dict['STATION_ID']})
# function to create output stats list
def _summary_feature_col(ftr):
output_list = [ftr.get('date'), ftr.get('station_name')]
for band in bands:
output_list.append(ftr.get(band))
return ftr.set({'output': output_list})
output_stats = (ee.FeatureCollection(ic.map(_reduce_point_img))
.map(_summary_feature_col))
output_timeseries = output_stats.aggregate_array('output').getInfo()
column_names = ['date', 'station_name'] + bands
output_df = pd.DataFrame(data=output_timeseries, columns=column_names)
output_df.to_csv(param_dict['GRID_FILE_PATH'], index=False)
execution_minutes = (time.time() - start_time) / 60
print(f'\n{param_dict["GRID_FILE_PATH"]} downloaded in '
f'{execution_minutes:.2f} minutes.')
[docs]def download_grid_data(metadata_path, config_path,
local_folder=None, force_download=False):
"""
Takes in the metadata file generated by :func:`gridwxcomp.prep_metadata`
and downloads the corresponding point data for all stations within. This
function requires the dataset be accessible in the user's Google Earth Engine
account, and the image collection name and path should be specified
in the configuration .INI file (i.e., in the ``config_path`` file).
The metadata file will be updated for the path the gridded data files
are downloaded to.
Arguments:
metadata_path (str): path to the metadata path generated by
:func:`gridwxcomp.prep_metadata`
config_path (str): path to config file containing catalog info
local_folder (str): folder to download point data to
force_download (bool): will re-download all data even if local file
already exists
Returns:
None
Note: You must authenticate with Google Earth Engine before using
this function.
"""
config = read_config(config_path) # Read config
# Pull relevant metadata from dictionary
dataset = config['collection_info']['name']
gridded_dataset_path = config['collection_info']['path']
gridded_dataset_date_start = config['collection_info']['start_date']
gridded_dataset_date_end = config['collection_info']['end_date']
# Fill in start/end dates if either are missing
collection_start_date, collection_end_date = (
_get_collection_date_range(gridded_dataset_path))
if gridded_dataset_date_start == '':
gridded_dataset_date_start = collection_start_date
if gridded_dataset_date_end == '':
gridded_dataset_date_end = collection_end_date
# Open gridwxcomp station metadata file
metadata_df = pd.read_csv(metadata_path)
metadata_df['GRID_FILE_PATH'] = ''
# Iterate over metadata_df to fill in other columns
for index, row in metadata_df.iterrows():
formatted_station_id = re.sub(
r'\W+', '',
row["STATION_ID"].replace(' ', '_')).lower()
if local_folder:
Path(f'{local_folder}/{dataset}').mkdir(parents=True, exist_ok=True)
local_path = (f'{local_folder}/{dataset}/{dataset}_'
f'{formatted_station_id}.csv')
else:
Path(f'{dataset}').mkdir(parents=True, exist_ok=True)
local_path = f'./{dataset}/{dataset}_{formatted_station_id}.csv'
absolute_file_path = Path(local_path).absolute()
metadata_df.loc[index, 'GRID_FILE_PATH'] = absolute_file_path
# restructure metadata to make iterating simpler
iterable_df = metadata_df[
['STATION_ID', 'STATION_LAT_WGS84',
'STATION_LON_WGS84', 'GRID_FILE_PATH']].copy(deep=True)
iterable_df['START_DATE'] = gridded_dataset_date_start
iterable_df['END_DATE'] = gridded_dataset_date_end
iterable_df['DATASET_PATH'] = gridded_dataset_path
iterable_df['FORCE_DOWNLOAD'] = force_download
iterable_list = iterable_df.to_dict('records')
# open multiprocessing pool
thread_count = int(os.cpu_count() / 2)
pool = Pool(thread_count)
pool.map(_download_point_data, iterable_list)
pool.close()
pool.join()
metadata_df.to_csv(metadata_path, index=False)
print(
'All points have been requested and the metadata has been updated.')
if __name__ == '__main__':
print('\n--------------------------------------------------------'
' Functionality for running this library from the terminal'
' was removed. Please refer to the documentation on how to'
' make calls to these functions. \n\n')