Module geodata_harvester.temporal
Utility functions for temporal processing.
–Main function list–
combine_rasters_temporal: Concatenates files by time returns xarray. aggregate_temporal: Aggregates xarrays by specified function and time period. temporal_crop: Cuts an xarray object by start and end times. aggregate_temporal: Make a data aggregation (mean, median, sum, etc) through time on an xarray.
–Helper function list–
get_date_after_last_underscore: Extract the date from the file name after the last underscore. get_mask_array: Return mask of the data, e.g. for cloud-cover.
Expand source code
"""
Utility functions for temporal processing.
--Main function list--
combine_rasters_temporal: Concatenates files by time returns xarray.
aggregate_temporal: Aggregates xarrays by specified function and time period.
temporal_crop: Cuts an xarray object by start and end times.
aggregate_temporal: Make a data aggregation (mean, median, sum, etc) through time on an xarray.
--Helper function list--
get_date_after_last_underscore: Extract the date from the file name after the last underscore.
get_mask_array: Return mask of the data, e.g. for cloud-cover.
"""
import numpy as np
import pandas as pd
import rioxarray
import xarray as xr
import datetime
def combine_rasters_temporal(
file_list, channel_name="band", attribute_name="long_name"
):
"""
Combines multiple tif files into single xarray object.
Assumes additional channels contain sequential time step data.
If multiple files in file_list, files must be in temporal order and same data type.
Also assumes files are of the same shape (x,y,t).
Example:
file_list = ['../data/mvp_daily_rain_silo/daily_rain_2017_cropped.tif',
'../data/mvp_daily_rain_silo/daily_rain_2018_cropped.tif']
xdr = combine_rasters_temporal(file_list, channel_name='band',attribute_name='long_name')
Parameters
----------
file_list : str or list of filename strings in date order to concatenate.
Expected to be of the form "x,y" or "x,y,z1"
channel_name : string of coordinate dimension to concatentate (band, time,
etc). Check options with rioxarray.open_rasterio('filename').coords
attribute_name : string name of rioxarray attribute holding a time/date
label. Check with rioxarray.open_rasterio('filename').attrs
Returns
-------
xdr : xarray object of x,y,time, with approriate metadata.
"""
#print("Concatenating", channel_name, "and", attribute_name, "over", file_list)
# file_list = glob(os.path.join(data_dir, '*.tif'))
if type(file_list) == str:
file_list = [file_list]
# Append all data/channels, collect metadata lists
array_list = []
attrs = ()
first = True
for x in file_list:
xds = rioxarray.open_rasterio(x)
if channel_name not in xds.coords:
raise ValueError(
channel_name + " not a channel in the raster " + x + " Options are",
[t for t in xds.coords],
)
return None
if attribute_name not in xds.attrs:
raise ValueError(
attribute_name
+ " not an attribute in the raster "
+ x
+ " Options are",
[t for t in xds.attrs],
)
return None
array_list.append(xds)
#print("attrs", xds.attrs[attribute_name])
try:
attrs = attrs + xds.attrs[attribute_name]
except:
attrs = attrs + (xds.attrs[attribute_name],)
if first == True:
coords = xds[channel_name].values
first = False
else:
coords = np.append(coords, xds[channel_name].values + coords[-1])
xdr = xr.concat(array_list, channel_name)
# print(agg,coords,attrs)
# xdr = xdr.assign_attrs({attr: attrs})
xdr = xdr.assign_coords({channel_name: np.array(pd.to_datetime(attrs))})
xdr = xdr.rename({channel_name: "time"})
del xdr.attrs[attribute_name]
return xdr
def multiband_raster_to_xarray(file_list, date_list = None, mask_bandname = None):
"""
Converts a stack of multiband raster with different dates to an xarray object.
Parameters
----------
file_list : list of filename strings in date order to concatenate.
date_list : list of dates in date order to concatenate.
If None provided, the dates will be extracted from the file names.
This assumes that the date is given at the end of the file name after an underscore.
"""
# Extract the dates from the file names if no date list is provided
if date_list is None:
date_list = get_date_after_last_underscore(file_list)
# Check if the lists have the same length
assert len(file_list) == len(date_list), "File list and date list must have the same length."
# Create an empty list to store the DataArrays
data_arrays = []
for file, date in zip(file_list, date_list):
# Read the raster file using rioxarray
xds = rioxarray.open_rasterio(file)
# Assign the time coordinate
xds = xds.assign_coords({"time": pd.to_datetime(date)})
xds = xds.expand_dims("time")
# Append the DataArray to the list
data_arrays.append(xds)
# Concatenate the DataArrays along the time dimension
stacked_data = xr.concat(data_arrays, dim="time")
return stacked_data
def temporal_crop(xdr, start_time, end_time):
"""
Cuts an xarray object by start and end times.
Parameters
----------
xdr : xarray object of x,y,time
start_time : string time in 'yyyy-mm-dd' format.
end_time : string time in 'yyyy-mm-dd' format.
Returns
-------
xdr_crop : xarray object of x,y,time, with approriate metadata.
"""
xdr_crop = xdr.sel(time=slice(start_time, end_time))
return(xdr_crop)
def aggregate_temporal(xdr,
period="yearly", agg=["mean"], outfile="temporal_agg", buffer = None, fill_nan = True):
"""
Make a data aggregation (mean, median, sum, etc) through time on an xarray.
Expects xarray coordinates to be x, y, time. Saves every aggregation for
every time period as its own tif file.
Example:
file_list = ['../data/mvp_daily_rain_silo/daily_rain_2017_cropped.tif',
'../data/mvp_daily_rain_silo/daily_rain_2018_cropped.tif']
xdr = combine_rasters_temporal(file_list, channel_name='band',attribute_name='long_name')
outfname_list, agg_list = aggregate_temporal(
xdr,period=100,agg=['mean','sum'],outfile='temporal_agg')
Parameters
----------
xdr : xarray object of x,y,time
period : string or int. Time period to perform aggregation,
'yearly', 'monthly', or number of periods to aggregate over.
agg: list of strings. Choice of aggregation methods to apply of
['mean','median','sum','perc95','perc5']
outfile : string. Prefix of output file name.
buffer: integer time period in same units as period to buffer into the future.
fill_nan: boolean. If True (Default), will automatically try to find the value for missing data
from header and fills with nan before aggregating. If False, will not fill nan.
Returns
-------
outfname_list : list of strings of output file names
agg_list : list of strings of aggregation methods
"""
if fill_nan:
# Define the possible attribute names for fill values
nodata_names = ["_FillValue", "missing_value", "nodata", "nodatavalue"]
nodata_name_found = False
for nodata_name in nodata_names:
if nodata_name in xdr.attrs:
xdr = xdr.where(xdr != xdr.attrs[nodata_name], np.nan)
nodata_name_found = True
break
# Check for case-insensitive nodata names
if not nodata_name_found:
for key, value in xdr.attrs.items():
if key.lower() in [attr.lower() for attr in nodata_names]:
xdr = xdr.where(xdr != value, np.nan)
nodata_name_found = True
break
# Check the aggregation methods are okay
agg_types = ["mean", "median", "sum", "perc95", "perc5", "max", "min"]
aggcheck = [a for a in agg if a in agg_types]
if aggcheck is None:
raise ValueError("Invalid Aggregation type. Expected any of: %s" % agg_types)
#else:
#print("Finding", aggcheck, " out of possible", agg_types)
#print("for", period, " period.")
# Group by the appropriate time period
if period == "yearly":
xdr_groups = xdr.groupby("time.year")
if buffer != None:
xx = xdr_groups.apply(lambda x: x.isel(time=slice(0,buffer)))
xdr_groups = xx.groupby("time.year")
elif period == "monthly":
xdr_groups = xdr.groupby("time.month")
if buffer != None:
xx = xdr_groups.apply(lambda x: x.isel(time=slice(0,buffer)))
xdr_groups = xx.groupby("time.month")
elif type(period) == int:
time_start = xdr.time.values[0].astype('datetime64[s]').tolist()
time_end = xdr.time.values[-1].astype('datetime64[s]').tolist()
bins = (time_end - time_start).days // period
#bins = int(np.floor(len(xdr) / period)) # this only works if len(xdr) is in days
if bins == 0:
# If the period is smaller than the time step, use the time step as the period
bins = 1
xdr_groups = xdr.groupby_bins("time", bins)
if buffer != None:
xx = xdr_groups.apply(lambda x: x.isel(time=slice(0,buffer)))
xdr_groups = xx.groupby_bins("time", period)
else:
raise ValueError(
"Invalid temporal period. Expected any of: 'yearly', 'monthly', or an integer period"
)
aggdict = {}
for agg_type in aggcheck:
if agg_type == "mean":
aggdict["mean"] = xdr_groups.mean()
elif agg_type == "median":
aggdict["median"] = xdr_groups.median()
elif agg_type == "sum":
aggdict["sum"] = xdr_groups.sum()
elif agg_type == "perc95":
aggdict["perc95"] = xdr_groups.quantile(q=0.95)
elif agg_type == "perc5":
aggdict["perc5"] = xdr_groups.quantile(q=0.05)
elif agg_type == "max":
aggdict["max"] = xdr_groups.max()
elif agg_type == "min":
aggdict["min"] = xdr_groups.min()
# Keep track of the names of all files produced
outfname_list = []
agg_list = []
# For all the different aggregation methods
for a in aggcheck:
# For each period of time in each of the groups, save it out!
for p in aggdict[a]:
# Each temporal grouping results in different group labels
if period == "yearly":
label = str(p["year"].values)
elif period == "monthly":
label = str(p["month"].values).zfill(2)
elif type(period) == int:
label = str(p["time_bins"].values)[1:11]
p.rio.to_raster(outfile + "_" + a + "_" + label + ".tif")
outfname_list.append(outfile + "_" + a + "_" + label + ".tif")
agg_list.append(a)
print(a, "of", label, "saved in:", outfile + "_" + a + "_" + label + ".tif")
return outfname_list, agg_list
def get_date_after_last_underscore(file_list):
"""
Extract the date from the file name after the last underscore.
Parameters
----------
file_list : list of filename strings in date order to concatenate.
Returns
-------
result : list of dates in date order to concatenate.
"""
result = []
for filename in file_list:
split_string = filename.rsplit('_', 1) # Split the string from the right side, keeping only the last part
# Check if the string was split
if len(split_string) > 1:
last_part = split_string[-1] # Get the part after the last "_"
else:
last_part = filename # If the string didn't have any "_", return the original string
# Remove the file format ending
last_part = last_part.rsplit('.', 1)[0]
# test if the last part is a date
try:
datetime.datetime.strptime(last_part, '%Y-%m-%d')
except ValueError:
print("The last part of the filename is not a date: ", last_part)
raise ValueError
result.append(last_part)
return result
def get_mask_array(xdr, mask_band = None, verbose = True):
"""
Return mask of the data, e.g. for cloud-cover.
The mask values will be set to True if the mask band is not 0, and False otherwise.
If no mask band is provided, a mask band will be searched for in the xarray attribute metadata.
Parameters
----------
xdr : xarray
xarray dataset to mask
mask_band : str or int, optional
Name or index of the band to use as a mask.
If not provided, a mask abd will be searched for in the xarray attribute metadata.
Returns
-------
mask: array, bool
"""
if mask_band is not None:
if isinstance(mask_band, int):
if verbose: print(f"Masking values with Nan where mask band {mask_band} is not 0")
elif isinstance(mask_band, str):
if verbose: print(f"Masking values with Nan where mask band {mask_band} is not 0")
mask_band = [i for i, s in enumerate(xdr.attrs['long_name']) if s == mask_band]
mask_band = mask_band[0] + 1
else:
if verbose: print("Mask band must be an integer or string")
return
mask = xdr.sel(band=mask_band).values != 0
if mask_band is None:
# find band number for attribute that includes 'mask'
mask_band = [i for i, s in enumerate(xdr.attrs['long_name']) if 'mask' in s]
if len(mask_band) == 0:
if verbose: print('No mask band found in attributes. Proceeding without masking.')
else:
if len(mask_band) > 1:
if verbose: print(f"Multiple mask bands found in attributes. Proceeding with mask band: {xdr.attrs['long_name'][mask_band[0]]}")
mask_band = mask_band[0] + 1
if verbose: print(f"Masking values with Nan where mask band {xdr.attrs['long_name'][mask_band[0]]} is valid")
mask = xdr.sel(band=mask_band).values != 0
return mask
Functions
def aggregate_temporal(xdr, period='yearly', agg=['mean'], outfile='temporal_agg', buffer=None, fill_nan=True)
-
Make a data aggregation (mean, median, sum, etc) through time on an xarray. Expects xarray coordinates to be x, y, time. Saves every aggregation for every time period as its own tif file.
Example: file_list = ['../data/mvp_daily_rain_silo/daily_rain_2017_cropped.tif', '../data/mvp_daily_rain_silo/daily_rain_2018_cropped.tif']
xdr = combine_rasters_temporal(file_list, channel_name='band',attribute_name='long_name')
outfname_list, agg_list = aggregate_temporal( xdr,period=100,agg=['mean','sum'],outfile='temporal_agg')
Parameters
xdr
:xarray object
ofx,y,time
period
:string
orint. Time period to perform aggregation,
- 'yearly', 'monthly', or number of periods to aggregate over.
agg
:list
ofstrings. Choice
ofaggregation methods to apply of
- ['mean','median','sum','perc95','perc5']
- outfile : string. Prefix of output file name.
- buffer: integer time period in same units as period to buffer into the future.
fill_nan
:boolean. If True (Default), will automatically try to find the value for missing data
- from header and fills with nan before aggregating. If False, will not fill nan.
Returns
outfname_list
:list
ofstrings
ofoutput file names
agg_list
:list
ofstrings
ofaggregation methods
Expand source code
def aggregate_temporal(xdr, period="yearly", agg=["mean"], outfile="temporal_agg", buffer = None, fill_nan = True): """ Make a data aggregation (mean, median, sum, etc) through time on an xarray. Expects xarray coordinates to be x, y, time. Saves every aggregation for every time period as its own tif file. Example: file_list = ['../data/mvp_daily_rain_silo/daily_rain_2017_cropped.tif', '../data/mvp_daily_rain_silo/daily_rain_2018_cropped.tif'] xdr = combine_rasters_temporal(file_list, channel_name='band',attribute_name='long_name') outfname_list, agg_list = aggregate_temporal( xdr,period=100,agg=['mean','sum'],outfile='temporal_agg') Parameters ---------- xdr : xarray object of x,y,time period : string or int. Time period to perform aggregation, 'yearly', 'monthly', or number of periods to aggregate over. agg: list of strings. Choice of aggregation methods to apply of ['mean','median','sum','perc95','perc5'] outfile : string. Prefix of output file name. buffer: integer time period in same units as period to buffer into the future. fill_nan: boolean. If True (Default), will automatically try to find the value for missing data from header and fills with nan before aggregating. If False, will not fill nan. Returns ------- outfname_list : list of strings of output file names agg_list : list of strings of aggregation methods """ if fill_nan: # Define the possible attribute names for fill values nodata_names = ["_FillValue", "missing_value", "nodata", "nodatavalue"] nodata_name_found = False for nodata_name in nodata_names: if nodata_name in xdr.attrs: xdr = xdr.where(xdr != xdr.attrs[nodata_name], np.nan) nodata_name_found = True break # Check for case-insensitive nodata names if not nodata_name_found: for key, value in xdr.attrs.items(): if key.lower() in [attr.lower() for attr in nodata_names]: xdr = xdr.where(xdr != value, np.nan) nodata_name_found = True break # Check the aggregation methods are okay agg_types = ["mean", "median", "sum", "perc95", "perc5", "max", "min"] aggcheck = [a for a in agg if a in agg_types] if aggcheck is None: raise ValueError("Invalid Aggregation type. Expected any of: %s" % agg_types) #else: #print("Finding", aggcheck, " out of possible", agg_types) #print("for", period, " period.") # Group by the appropriate time period if period == "yearly": xdr_groups = xdr.groupby("time.year") if buffer != None: xx = xdr_groups.apply(lambda x: x.isel(time=slice(0,buffer))) xdr_groups = xx.groupby("time.year") elif period == "monthly": xdr_groups = xdr.groupby("time.month") if buffer != None: xx = xdr_groups.apply(lambda x: x.isel(time=slice(0,buffer))) xdr_groups = xx.groupby("time.month") elif type(period) == int: time_start = xdr.time.values[0].astype('datetime64[s]').tolist() time_end = xdr.time.values[-1].astype('datetime64[s]').tolist() bins = (time_end - time_start).days // period #bins = int(np.floor(len(xdr) / period)) # this only works if len(xdr) is in days if bins == 0: # If the period is smaller than the time step, use the time step as the period bins = 1 xdr_groups = xdr.groupby_bins("time", bins) if buffer != None: xx = xdr_groups.apply(lambda x: x.isel(time=slice(0,buffer))) xdr_groups = xx.groupby_bins("time", period) else: raise ValueError( "Invalid temporal period. Expected any of: 'yearly', 'monthly', or an integer period" ) aggdict = {} for agg_type in aggcheck: if agg_type == "mean": aggdict["mean"] = xdr_groups.mean() elif agg_type == "median": aggdict["median"] = xdr_groups.median() elif agg_type == "sum": aggdict["sum"] = xdr_groups.sum() elif agg_type == "perc95": aggdict["perc95"] = xdr_groups.quantile(q=0.95) elif agg_type == "perc5": aggdict["perc5"] = xdr_groups.quantile(q=0.05) elif agg_type == "max": aggdict["max"] = xdr_groups.max() elif agg_type == "min": aggdict["min"] = xdr_groups.min() # Keep track of the names of all files produced outfname_list = [] agg_list = [] # For all the different aggregation methods for a in aggcheck: # For each period of time in each of the groups, save it out! for p in aggdict[a]: # Each temporal grouping results in different group labels if period == "yearly": label = str(p["year"].values) elif period == "monthly": label = str(p["month"].values).zfill(2) elif type(period) == int: label = str(p["time_bins"].values)[1:11] p.rio.to_raster(outfile + "_" + a + "_" + label + ".tif") outfname_list.append(outfile + "_" + a + "_" + label + ".tif") agg_list.append(a) print(a, "of", label, "saved in:", outfile + "_" + a + "_" + label + ".tif") return outfname_list, agg_list
def combine_rasters_temporal(file_list, channel_name='band', attribute_name='long_name')
-
Combines multiple tif files into single xarray object. Assumes additional channels contain sequential time step data. If multiple files in file_list, files must be in temporal order and same data type. Also assumes files are of the same shape (x,y,t).
Example: file_list = ['../data/mvp_daily_rain_silo/daily_rain_2017_cropped.tif', '../data/mvp_daily_rain_silo/daily_rain_2018_cropped.tif']
xdr = combine_rasters_temporal(file_list, channel_name='band',attribute_name='long_name')
Parameters
- file_list : str or list of filename strings in date order to concatenate.
- Expected to be of the form "x,y" or "x,y,z1"
channel_name
:string
ofcoordinate dimension to concatentate (band, time,
- etc). Check options with rioxarray.open_rasterio('filename').coords
attribute_name
:string name
ofrioxarray attribute holding a time/date
- label. Check with rioxarray.open_rasterio('filename').attrs
Returns
xdr : xarray object of x,y,time, with approriate metadata.
Expand source code
def combine_rasters_temporal( file_list, channel_name="band", attribute_name="long_name" ): """ Combines multiple tif files into single xarray object. Assumes additional channels contain sequential time step data. If multiple files in file_list, files must be in temporal order and same data type. Also assumes files are of the same shape (x,y,t). Example: file_list = ['../data/mvp_daily_rain_silo/daily_rain_2017_cropped.tif', '../data/mvp_daily_rain_silo/daily_rain_2018_cropped.tif'] xdr = combine_rasters_temporal(file_list, channel_name='band',attribute_name='long_name') Parameters ---------- file_list : str or list of filename strings in date order to concatenate. Expected to be of the form "x,y" or "x,y,z1" channel_name : string of coordinate dimension to concatentate (band, time, etc). Check options with rioxarray.open_rasterio('filename').coords attribute_name : string name of rioxarray attribute holding a time/date label. Check with rioxarray.open_rasterio('filename').attrs Returns ------- xdr : xarray object of x,y,time, with approriate metadata. """ #print("Concatenating", channel_name, "and", attribute_name, "over", file_list) # file_list = glob(os.path.join(data_dir, '*.tif')) if type(file_list) == str: file_list = [file_list] # Append all data/channels, collect metadata lists array_list = [] attrs = () first = True for x in file_list: xds = rioxarray.open_rasterio(x) if channel_name not in xds.coords: raise ValueError( channel_name + " not a channel in the raster " + x + " Options are", [t for t in xds.coords], ) return None if attribute_name not in xds.attrs: raise ValueError( attribute_name + " not an attribute in the raster " + x + " Options are", [t for t in xds.attrs], ) return None array_list.append(xds) #print("attrs", xds.attrs[attribute_name]) try: attrs = attrs + xds.attrs[attribute_name] except: attrs = attrs + (xds.attrs[attribute_name],) if first == True: coords = xds[channel_name].values first = False else: coords = np.append(coords, xds[channel_name].values + coords[-1]) xdr = xr.concat(array_list, channel_name) # print(agg,coords,attrs) # xdr = xdr.assign_attrs({attr: attrs}) xdr = xdr.assign_coords({channel_name: np.array(pd.to_datetime(attrs))}) xdr = xdr.rename({channel_name: "time"}) del xdr.attrs[attribute_name] return xdr
def get_date_after_last_underscore(file_list)
-
Extract the date from the file name after the last underscore.
Parameters
file_list : list of filename strings in date order to concatenate.
Returns
result : list of dates in date order to concatenate.
Expand source code
def get_date_after_last_underscore(file_list): """ Extract the date from the file name after the last underscore. Parameters ---------- file_list : list of filename strings in date order to concatenate. Returns ------- result : list of dates in date order to concatenate. """ result = [] for filename in file_list: split_string = filename.rsplit('_', 1) # Split the string from the right side, keeping only the last part # Check if the string was split if len(split_string) > 1: last_part = split_string[-1] # Get the part after the last "_" else: last_part = filename # If the string didn't have any "_", return the original string # Remove the file format ending last_part = last_part.rsplit('.', 1)[0] # test if the last part is a date try: datetime.datetime.strptime(last_part, '%Y-%m-%d') except ValueError: print("The last part of the filename is not a date: ", last_part) raise ValueError result.append(last_part) return result
def get_mask_array(xdr, mask_band=None, verbose=True)
-
Return mask of the data, e.g. for cloud-cover. The mask values will be set to True if the mask band is not 0, and False otherwise. If no mask band is provided, a mask band will be searched for in the xarray attribute metadata.
Parameters
xdr
:xarray
- xarray dataset to mask
mask_band
:str
orint
, optional- Name or index of the band to use as a mask. If not provided, a mask abd will be searched for in the xarray attribute metadata.
Returns
mask
:array, bool
Expand source code
def get_mask_array(xdr, mask_band = None, verbose = True): """ Return mask of the data, e.g. for cloud-cover. The mask values will be set to True if the mask band is not 0, and False otherwise. If no mask band is provided, a mask band will be searched for in the xarray attribute metadata. Parameters ---------- xdr : xarray xarray dataset to mask mask_band : str or int, optional Name or index of the band to use as a mask. If not provided, a mask abd will be searched for in the xarray attribute metadata. Returns ------- mask: array, bool """ if mask_band is not None: if isinstance(mask_band, int): if verbose: print(f"Masking values with Nan where mask band {mask_band} is not 0") elif isinstance(mask_band, str): if verbose: print(f"Masking values with Nan where mask band {mask_band} is not 0") mask_band = [i for i, s in enumerate(xdr.attrs['long_name']) if s == mask_band] mask_band = mask_band[0] + 1 else: if verbose: print("Mask band must be an integer or string") return mask = xdr.sel(band=mask_band).values != 0 if mask_band is None: # find band number for attribute that includes 'mask' mask_band = [i for i, s in enumerate(xdr.attrs['long_name']) if 'mask' in s] if len(mask_band) == 0: if verbose: print('No mask band found in attributes. Proceeding without masking.') else: if len(mask_band) > 1: if verbose: print(f"Multiple mask bands found in attributes. Proceeding with mask band: {xdr.attrs['long_name'][mask_band[0]]}") mask_band = mask_band[0] + 1 if verbose: print(f"Masking values with Nan where mask band {xdr.attrs['long_name'][mask_band[0]]} is valid") mask = xdr.sel(band=mask_band).values != 0 return mask
def multiband_raster_to_xarray(file_list, date_list=None, mask_bandname=None)
-
Converts a stack of multiband raster with different dates to an xarray object.
Parameters
- file_list : list of filename strings in date order to concatenate.
date_list
:list
ofdates in date order to concatenate.
- If None provided, the dates will be extracted from the file names. This assumes that the date is given at the end of the file name after an underscore.
Expand source code
def multiband_raster_to_xarray(file_list, date_list = None, mask_bandname = None): """ Converts a stack of multiband raster with different dates to an xarray object. Parameters ---------- file_list : list of filename strings in date order to concatenate. date_list : list of dates in date order to concatenate. If None provided, the dates will be extracted from the file names. This assumes that the date is given at the end of the file name after an underscore. """ # Extract the dates from the file names if no date list is provided if date_list is None: date_list = get_date_after_last_underscore(file_list) # Check if the lists have the same length assert len(file_list) == len(date_list), "File list and date list must have the same length." # Create an empty list to store the DataArrays data_arrays = [] for file, date in zip(file_list, date_list): # Read the raster file using rioxarray xds = rioxarray.open_rasterio(file) # Assign the time coordinate xds = xds.assign_coords({"time": pd.to_datetime(date)}) xds = xds.expand_dims("time") # Append the DataArray to the list data_arrays.append(xds) # Concatenate the DataArrays along the time dimension stacked_data = xr.concat(data_arrays, dim="time") return stacked_data
def temporal_crop(xdr, start_time, end_time)
-
Cuts an xarray object by start and end times.
Parameters
xdr
:xarray object
ofx,y,time
start_time : string time in 'yyyy-mm-dd' format. end_time : string time in 'yyyy-mm-dd' format.
Returns
xdr_crop : xarray object of x,y,time, with approriate metadata.
Expand source code
def temporal_crop(xdr, start_time, end_time): """ Cuts an xarray object by start and end times. Parameters ---------- xdr : xarray object of x,y,time start_time : string time in 'yyyy-mm-dd' format. end_time : string time in 'yyyy-mm-dd' format. Returns ------- xdr_crop : xarray object of x,y,time, with approriate metadata. """ xdr_crop = xdr.sel(time=slice(start_time, end_time)) return(xdr_crop)