Source code for gridwxcomp.ee_download

# -*- coding: utf-8 -*-
"""
This module has tools to download timeseries climate data from gridded climate
data collections that are hosted on Google's Earth Engine. It reads the
formatted file that was prepared using the  :mod:`gridwxcomp.prep_metadata`
module and uses the coordinate information there along with the variable names
specified in the configuration .INI file to know which data to download and
for which geographic locations which are paired with the station locations.
"""

import ee
import os
import pandas as pd
import re
import time
from gridwxcomp.util import read_config, affine_transform
from pathlib import Path
from multiprocessing.pool import ThreadPool as Pool


def _get_collection_date_range(path):
    """
        Acquires date range for EE image collection if dates are not provided
        by the user in the config file

        Arguments:
            path (str): path to image collection on earth engine

        Returns:
            start_date (str): date of first image as YYYY-MM-DD
            end_date (str): date of last image as YYYY-MM-DD
    """
    dataset = ee.ImageCollection(path)
    start_img = dataset.limit(1, 'system:index', True).first()
    start_text = start_img.getInfo()['properties']['system:index']
    start_date = start_text[:4] + '-' + start_text[4:]
    start_date = start_date[:7] + '-' + start_date[7:]
    end_img = dataset.limit(1, 'system:index', False).first()
    end_text = end_img.getInfo()['properties']['system:index']
    end_date = end_text[:4] + '-' + end_text[4:]
    end_date = end_date[:7] + '-' + end_date[7:]
    return start_date, end_date


def _download_point_data(param_dict):
    """
    Makes reduceRegion call to Earth Engine to extract timeseries for a station.
    Data is obtained via getInfo call and then saved locally.
    Called from :func:`download_grid_data` using a ThreadPool.
    Arguments:
        param_dict (dict): dictionary of parameters for reduceRegion call.

    Returns:
        None

    Note: You must authenticate with Google Earth Engine before using
        this function.
    """
    # todo: change force_download (bool) to download_method (string)
    #   and provide option for appending latest data

    # Don't re-download file unless force_download is True
    if (os.path.exists(param_dict['GRID_FILE_PATH']) and
            not param_dict['FORCE_DOWNLOAD']):
        print(f'{param_dict["GRID_FILE_PATH"]} already exists,'
              f' skipping download.\n')
        return

    # Time download process
    start_time = time.time()

    # get image properties
    ic = (ee.ImageCollection(param_dict['DATASET_PATH']).
          filterDate(param_dict['START_DATE'], param_dict['END_DATE']))
    bands = ic.first().bandNames().getInfo()
    projection = ic.first().projection()
    transform = affine_transform(ic.first())

    # Create point to reduce over
    point = ee.Geometry.Point([param_dict['STATION_LON_WGS84'],
                               param_dict['STATION_LAT_WGS84']])

    def _reduce_point_img(img):
        date_str = img.date()
        date_mean = date_str.format("YYYYMMdd")

        reduce_mean = img.reduceRegion(geometry=point,
                                       reducer=ee.Reducer.mean(),
                                       crs=projection, crsTransform=transform)

        return ee.Feature(None, reduce_mean).set(
            {"date": date_mean, 'station_name': param_dict['STATION_ID']})

    # function to create output stats list
    def _summary_feature_col(ftr):
        output_list = [ftr.get('date'), ftr.get('station_name')]
        for band in bands:
            output_list.append(ftr.get(band))

        return ftr.set({'output': output_list})

    output_stats = (ee.FeatureCollection(ic.map(_reduce_point_img))
                    .map(_summary_feature_col))
    output_timeseries = output_stats.aggregate_array('output').getInfo()
    column_names = ['date', 'station_name'] + bands
    output_df = pd.DataFrame(data=output_timeseries, columns=column_names)
    output_df.to_csv(param_dict['GRID_FILE_PATH'], index=False)

    execution_minutes = (time.time() - start_time) / 60
    print(f'\n{param_dict["GRID_FILE_PATH"]} downloaded in '
          f'{execution_minutes:.2f} minutes.')


[docs]def download_grid_data(metadata_path, config_path, local_folder=None, force_download=False): """ Takes in the metadata file generated by :func:`gridwxcomp.prep_metadata` and downloads the corresponding point data for all stations within. This function requires the dataset be accessible in the user's Google Earth Engine account, and the image collection name and path should be specified in the configuration .INI file (i.e., in the ``config_path`` file). The metadata file will be updated for the path the gridded data files are downloaded to. Arguments: metadata_path (str): path to the metadata path generated by :func:`gridwxcomp.prep_metadata` config_path (str): path to config file containing catalog info local_folder (str): folder to download point data to force_download (bool): will re-download all data even if local file already exists Returns: None Note: You must authenticate with Google Earth Engine before using this function. """ config = read_config(config_path) # Read config # Pull relevant metadata from dictionary dataset = config['collection_info']['name'] gridded_dataset_path = config['collection_info']['path'] gridded_dataset_date_start = config['collection_info']['start_date'] gridded_dataset_date_end = config['collection_info']['end_date'] # Fill in start/end dates if either are missing collection_start_date, collection_end_date = ( _get_collection_date_range(gridded_dataset_path)) if gridded_dataset_date_start == '': gridded_dataset_date_start = collection_start_date if gridded_dataset_date_end == '': gridded_dataset_date_end = collection_end_date # Open gridwxcomp station metadata file metadata_df = pd.read_csv(metadata_path) metadata_df['GRID_FILE_PATH'] = '' # Iterate over metadata_df to fill in other columns for index, row in metadata_df.iterrows(): formatted_station_id = re.sub( r'\W+', '', row["STATION_ID"].replace(' ', '_')).lower() if local_folder: Path(f'{local_folder}/{dataset}').mkdir(parents=True, exist_ok=True) local_path = (f'{local_folder}/{dataset}/{dataset}_' f'{formatted_station_id}.csv') else: Path(f'{dataset}').mkdir(parents=True, exist_ok=True) local_path = f'./{dataset}/{dataset}_{formatted_station_id}.csv' absolute_file_path = Path(local_path).absolute() metadata_df.loc[index, 'GRID_FILE_PATH'] = absolute_file_path # restructure metadata to make iterating simpler iterable_df = metadata_df[ ['STATION_ID', 'STATION_LAT_WGS84', 'STATION_LON_WGS84', 'GRID_FILE_PATH']].copy(deep=True) iterable_df['START_DATE'] = gridded_dataset_date_start iterable_df['END_DATE'] = gridded_dataset_date_end iterable_df['DATASET_PATH'] = gridded_dataset_path iterable_df['FORCE_DOWNLOAD'] = force_download iterable_list = iterable_df.to_dict('records') # open multiprocessing pool thread_count = int(os.cpu_count() / 2) pool = Pool(thread_count) pool.map(_download_point_data, iterable_list) pool.close() pool.join() metadata_df.to_csv(metadata_path, index=False) print( 'All points have been requested and the metadata has been updated.')
if __name__ == '__main__': print('\n--------------------------------------------------------' ' Functionality for running this library from the terminal' ' was removed. Please refer to the documentation on how to' ' make calls to these functions. \n\n')