Module geodata_harvester.validate_settings

How to:

import validate_settings validate_settings.validate(fname_settings)

Expand source code
# Validate YAML settings file

"""
# How to: 
import validate_settings
validate_settings.validate(fname_settings)

"""

import yaml
import datetime
from geodata_harvester import utils

# logger setup
from geodata_harvester import write_logs
import logging

# import AgReFed DataHarvester dictionaries
from geodata_harvester.getdata_slga import get_slgadict
from geodata_harvester.getdata_silo import get_silodict
from geodata_harvester.getdata_dea import get_deadict
from geodata_harvester.getdata_radiometric import get_radiometricdict
from geodata_harvester.getdata_landscape import get_landscapedict
from geodata_harvester.getdata_dem import get_demdict
from numpy import dtype


# Set date constraints
currentDateTime = datetime.datetime.now()
current_year = currentDateTime.date().year
min_year = 1970


def check_schema(settings):
    """
    Validate Schema
    """
    conf_schema = {
        "infile": str,
        "outpath": str,
        "colname_lat": str,
        "colname_lng": str,
        "target_bbox": [list, tuple, str, type(None)],
        "date_min": str,
        "date_max": str,
        "target_res": [float, int],
        "temp_intervals": int,
        "temp_buffer": int,
        "target_sources": dict,
    }
    for conf in list(settings.keys()):
        if conf not in list(conf_schema.keys()):
            print(f'"{conf}" key not in settings schema')
            return False
        if type(conf_schema[conf]) == str:
            if not type(settings[conf]) == conf_schema[conf]:
                print(f'type {dtype(settings[conf])} not valid for "{conf}"')
                print(f"Need to be {conf_schema[conf]}")
                return False
        elif type(conf_schema[conf]) == list:
            if not type(settings[conf]) in conf_schema[conf]:
                print(f'Dtype {type(settings[conf])} not valid for "{conf}"')
                print(f"Need to be one of: {conf_schema[conf]}")
                return False
    return True


def check_schema2(settings):
    """
    Validate Schema with package schema

    Requirements: schema
    """
    from schema import Schema, Or, Optional

    conf_schema = Schema(
        {
            "infile": str,
            "outpath": str,
            "colname_lat": str,
            "colname_lng": str,
            "target_bbox": Or(list, tuple, "", None),
            "date_min": str,
            "date_max": str,
            "target_intervals": int,
            "target_buffers": int,
            Optional("temp_res"): int,
            "target_sources": {
                Optional("DEA"): list,
                Optional("DEM"): list,
                Optional("Landscape"): list,
                Optional("Radiometric"): list,
                Optional("SILO"): dict,
                Optional("SLGA"): dict,
                Optional("GEE"): dict,
            },
        }
    )
    try:
        conf_schema.validate(settings)
        return True
    except SchemaError:
        return False


def check_target_dates(dates):
    """
    Validate date range
    """
    for date in dates:
        if date < min_year:
            print(f"{date} in target_dates must be at least {min_year}")
            return False
        if date > current_year:
            print(f"{date} in target_dates can not be in the future")
            return False
    return True


def check_target_size(bbox, target_res, nmax_pixels=1e8):
    """
    Validate bounding box and check number of raster pixels

    INPUT
    -----
    bbox: list, target bounding box
    target_res: float or int, target resolution
    nmax_pixels: maximum number of raster pixels for target image (nmax = nx * ny)
    """
    if (bbox != None) & (bbox != ""):
        # Check if bbox is correct order: [left, bottom, right, top]
        assert len(bbox) == 4, "Length of Bounding box must be 4"
        assert bbox[2] > bbox[0], "Bounding box[0] must be smaller than box[2]"
        assert bbox[3] > bbox[1], "Bounding box[1] must be smaller than box[3]"
        # Estimate number of raster pixel
        nx = round(3600 * (bbox[2] - bbox[0]) / target_res)
        ny = round(3600 * (bbox[3] - bbox[1]) / target_res)
        npix = nx * ny
        if npix > nmax_pixels:
            print(
                f"Number of pixels  of requested image ({npix}) is larger than maximum number of pixels ({nmax_pixels})."
            )
            print(
                "Reduce size of bounding box or set target resolution to larger value."
            )
            return False
    return True


def check_target_sources(target_sources):
    """
    Validate selected data layers and options

    TBD: GEE validations
    """

    for source in list(target_sources.keys()):
        # Check DEA
        if source == "DEA":
            layers = target_sources[source]
            dict_dea = get_deadict()
            options = list(dict_dea["layernames"].keys())
            # Check that all elements in source are a subset of dea
            ok = set(layers).issubset(set(options))
            if not ok:
                for layer in layers:
                    if layer not in options:
                        print(
                            f'Datalayer "{layer}" not supported for {source}.')
                        print("Supported data layers are: ", options)
                return False

        # Check DEM
        if source == "DEM":
            layers = target_sources[source]
            dict_dem = get_demdict()
            options = list(dict_dem["layernames"].keys())
            # Check that all elements in source are a subset
            ok = set(layers).issubset(set(options))
            if not ok:
                for layer in layers:
                    if layer not in options:
                        print(
                            f'Datalayer "{layer}" not supported for {source}.')
                        print("Supported data layers are: ", options)
                return False

        # Check Landscape
        if source == "Landscape":
            layers = target_sources[source]
            dict_landscape = get_landscapedict()
            options = list(dict_landscape["layernames"].keys())
            # Check that all elements in source are a subset
            ok = set(layers).issubset(set(options))
            if not ok:
                for layer in layers:
                    if layer not in options:
                        print(
                            f'Datalayer "{layer}" not supported for {source}.')
                        print("Supported data layers are: ", options)
                return False

        # Check Radiometric
        if source == "Radiometric":
            layers = target_sources[source]
            dict_radiometric = get_radiometricdict()
            options = list(dict_radiometric["layernames"].keys())
            # Check that all elements in source are a subset
            ok = set(layers).issubset(set(options))
            if not ok:
                for layer in layers:
                    if layer not in options:
                        print(
                            f'Datalayer "{layer}" not supported for {source}.')
                        print("Supported data layers are: ", options)
                return False

        # Check SLGA
        if source == "SLGA":
            layers = target_sources[source]
            depth_options = [
                "0-5cm",
                "5-15cm",
                "15-30cm",
                "30-60cm",
                "60-100cm",
                "100-200cm",
            ]
            dict_slga = get_slgadict()
            options = list(dict_slga["layers_url"].keys())
            # Check that all elements in source are a subset
            ok = set(layers).issubset(set(options))
            if not ok:
                for layer in list(layers.keys()):
                    if layer not in options:
                        print(
                            f'Datalayer "{layer}" not supported for {source}.')
                        print("Supported data layers are: ", options)
                return False
            # check if valid depth selected:
            for layer in list(layers.keys()):
                depths = layers[layer]
                ok = set(depths).issubset(set(depth_options))
                if not ok:
                    for depth in depths:
                        if depth not in depth_options:
                            print(
                                f'Depth "{depth}" not supported as SLGA depth option.'
                            )
                            print("Supported depths are: ", depth_options)
                    return False

        # Check SILO
        if source == "SILO":
            layers = target_sources[source]
            silo_options = [
                "mean",
                "median",
                "sum",
                "std",
                "perc95",
                "perc5",
                "max",
                "min",
            ]
            dict_silo = get_silodict()
            options = list(dict_silo["layernames"].keys())
            # Check that all elements in source are a subset
            ok = set(layers).issubset(set(options))
            if not ok:
                for layer in list(layers.keys()):
                    if layer not in options:
                        print(
                            f'Datalayer "{layer}" not supported for {source}.')
                        print("Supported data layers are: ", options)
                return False
            # check if valid aggregation options selected:
            for layer in list(layers.keys()):
                agoptions = layers[layer]
                ok = set(agoptions).issubset(set(silo_options))
                if not ok:
                    for agoption in agoptions:
                        if agoption not in silo_options:
                            print(
                                f'Option "{agoption}" not supported as temporal SILO aggregation option.'
                            )
                            print("Supported options are: ", silo_options)
                    return False

        # Check GEE: TBD!
        # if source == 'GEE':
        #   ...

    return True


def validate(fname_settings, verbose=False):
    """
    Validates all settings with regard
        - schema
        - date range
        - data size and bounding box
        - data layers and options

    INPUT:
    fname_settings: str, path + filename of settings
    """
    # fname_settings = 'settings/settings_v0.3.yaml'

    # Logger setup
    if verbose:
        write_logs.setup(level="info")
    else:
        write_logs.setup()

    with open(fname_settings, "r") as f:
        settings = yaml.load(f, Loader=yaml.FullLoader)

    # Check that schema is ok
    schema_ok = check_schema(settings)
    assert schema_ok, "Invalid Schema"

    # Check requested image size
    # Note that this is an upper limit and does not ensure Webservers reaches timeout
    bbox = settings["target_bbox"]
    target_res = settings["target_res"]
    size_ok = check_target_size(bbox, target_res)
    assert size_ok, "Invalid Size"

    # Check if requested dates are in valid range
   # dates = settings["target_dates"]
   # dates_ok = check_target_dates(dates)
   # assert dates_ok, "Invalid Dates"

    # Check if data layers and options ok
    target_sources = settings["target_sources"]
    targets_ok = check_target_sources(target_sources)
    assert targets_ok, "Invalid Data layers or options"

    utils.msg_success("Configuration validated successfully!")

Functions

def check_schema(settings)

Validate Schema

Expand source code
def check_schema(settings):
    """
    Validate Schema
    """
    conf_schema = {
        "infile": str,
        "outpath": str,
        "colname_lat": str,
        "colname_lng": str,
        "target_bbox": [list, tuple, str, type(None)],
        "date_min": str,
        "date_max": str,
        "target_res": [float, int],
        "temp_intervals": int,
        "temp_buffer": int,
        "target_sources": dict,
    }
    for conf in list(settings.keys()):
        if conf not in list(conf_schema.keys()):
            print(f'"{conf}" key not in settings schema')
            return False
        if type(conf_schema[conf]) == str:
            if not type(settings[conf]) == conf_schema[conf]:
                print(f'type {dtype(settings[conf])} not valid for "{conf}"')
                print(f"Need to be {conf_schema[conf]}")
                return False
        elif type(conf_schema[conf]) == list:
            if not type(settings[conf]) in conf_schema[conf]:
                print(f'Dtype {type(settings[conf])} not valid for "{conf}"')
                print(f"Need to be one of: {conf_schema[conf]}")
                return False
    return True
def check_schema2(settings)

Validate Schema with package schema

Requirements: schema

Expand source code
def check_schema2(settings):
    """
    Validate Schema with package schema

    Requirements: schema
    """
    from schema import Schema, Or, Optional

    conf_schema = Schema(
        {
            "infile": str,
            "outpath": str,
            "colname_lat": str,
            "colname_lng": str,
            "target_bbox": Or(list, tuple, "", None),
            "date_min": str,
            "date_max": str,
            "target_intervals": int,
            "target_buffers": int,
            Optional("temp_res"): int,
            "target_sources": {
                Optional("DEA"): list,
                Optional("DEM"): list,
                Optional("Landscape"): list,
                Optional("Radiometric"): list,
                Optional("SILO"): dict,
                Optional("SLGA"): dict,
                Optional("GEE"): dict,
            },
        }
    )
    try:
        conf_schema.validate(settings)
        return True
    except SchemaError:
        return False
def check_target_dates(dates)

Validate date range

Expand source code
def check_target_dates(dates):
    """
    Validate date range
    """
    for date in dates:
        if date < min_year:
            print(f"{date} in target_dates must be at least {min_year}")
            return False
        if date > current_year:
            print(f"{date} in target_dates can not be in the future")
            return False
    return True
def check_target_size(bbox, target_res, nmax_pixels=100000000.0)

Validate bounding box and check number of raster pixels

Input

bbox: list, target bounding box target_res: float or int, target resolution nmax_pixels: maximum number of raster pixels for target image (nmax = nx * ny)

Expand source code
def check_target_size(bbox, target_res, nmax_pixels=1e8):
    """
    Validate bounding box and check number of raster pixels

    INPUT
    -----
    bbox: list, target bounding box
    target_res: float or int, target resolution
    nmax_pixels: maximum number of raster pixels for target image (nmax = nx * ny)
    """
    if (bbox != None) & (bbox != ""):
        # Check if bbox is correct order: [left, bottom, right, top]
        assert len(bbox) == 4, "Length of Bounding box must be 4"
        assert bbox[2] > bbox[0], "Bounding box[0] must be smaller than box[2]"
        assert bbox[3] > bbox[1], "Bounding box[1] must be smaller than box[3]"
        # Estimate number of raster pixel
        nx = round(3600 * (bbox[2] - bbox[0]) / target_res)
        ny = round(3600 * (bbox[3] - bbox[1]) / target_res)
        npix = nx * ny
        if npix > nmax_pixels:
            print(
                f"Number of pixels  of requested image ({npix}) is larger than maximum number of pixels ({nmax_pixels})."
            )
            print(
                "Reduce size of bounding box or set target resolution to larger value."
            )
            return False
    return True
def check_target_sources(target_sources)

Validate selected data layers and options

TBD: GEE validations

Expand source code
def check_target_sources(target_sources):
    """
    Validate selected data layers and options

    TBD: GEE validations
    """

    for source in list(target_sources.keys()):
        # Check DEA
        if source == "DEA":
            layers = target_sources[source]
            dict_dea = get_deadict()
            options = list(dict_dea["layernames"].keys())
            # Check that all elements in source are a subset of dea
            ok = set(layers).issubset(set(options))
            if not ok:
                for layer in layers:
                    if layer not in options:
                        print(
                            f'Datalayer "{layer}" not supported for {source}.')
                        print("Supported data layers are: ", options)
                return False

        # Check DEM
        if source == "DEM":
            layers = target_sources[source]
            dict_dem = get_demdict()
            options = list(dict_dem["layernames"].keys())
            # Check that all elements in source are a subset
            ok = set(layers).issubset(set(options))
            if not ok:
                for layer in layers:
                    if layer not in options:
                        print(
                            f'Datalayer "{layer}" not supported for {source}.')
                        print("Supported data layers are: ", options)
                return False

        # Check Landscape
        if source == "Landscape":
            layers = target_sources[source]
            dict_landscape = get_landscapedict()
            options = list(dict_landscape["layernames"].keys())
            # Check that all elements in source are a subset
            ok = set(layers).issubset(set(options))
            if not ok:
                for layer in layers:
                    if layer not in options:
                        print(
                            f'Datalayer "{layer}" not supported for {source}.')
                        print("Supported data layers are: ", options)
                return False

        # Check Radiometric
        if source == "Radiometric":
            layers = target_sources[source]
            dict_radiometric = get_radiometricdict()
            options = list(dict_radiometric["layernames"].keys())
            # Check that all elements in source are a subset
            ok = set(layers).issubset(set(options))
            if not ok:
                for layer in layers:
                    if layer not in options:
                        print(
                            f'Datalayer "{layer}" not supported for {source}.')
                        print("Supported data layers are: ", options)
                return False

        # Check SLGA
        if source == "SLGA":
            layers = target_sources[source]
            depth_options = [
                "0-5cm",
                "5-15cm",
                "15-30cm",
                "30-60cm",
                "60-100cm",
                "100-200cm",
            ]
            dict_slga = get_slgadict()
            options = list(dict_slga["layers_url"].keys())
            # Check that all elements in source are a subset
            ok = set(layers).issubset(set(options))
            if not ok:
                for layer in list(layers.keys()):
                    if layer not in options:
                        print(
                            f'Datalayer "{layer}" not supported for {source}.')
                        print("Supported data layers are: ", options)
                return False
            # check if valid depth selected:
            for layer in list(layers.keys()):
                depths = layers[layer]
                ok = set(depths).issubset(set(depth_options))
                if not ok:
                    for depth in depths:
                        if depth not in depth_options:
                            print(
                                f'Depth "{depth}" not supported as SLGA depth option.'
                            )
                            print("Supported depths are: ", depth_options)
                    return False

        # Check SILO
        if source == "SILO":
            layers = target_sources[source]
            silo_options = [
                "mean",
                "median",
                "sum",
                "std",
                "perc95",
                "perc5",
                "max",
                "min",
            ]
            dict_silo = get_silodict()
            options = list(dict_silo["layernames"].keys())
            # Check that all elements in source are a subset
            ok = set(layers).issubset(set(options))
            if not ok:
                for layer in list(layers.keys()):
                    if layer not in options:
                        print(
                            f'Datalayer "{layer}" not supported for {source}.')
                        print("Supported data layers are: ", options)
                return False
            # check if valid aggregation options selected:
            for layer in list(layers.keys()):
                agoptions = layers[layer]
                ok = set(agoptions).issubset(set(silo_options))
                if not ok:
                    for agoption in agoptions:
                        if agoption not in silo_options:
                            print(
                                f'Option "{agoption}" not supported as temporal SILO aggregation option.'
                            )
                            print("Supported options are: ", silo_options)
                    return False

        # Check GEE: TBD!
        # if source == 'GEE':
        #   ...

    return True
def validate(fname_settings, verbose=False)

Validates all settings with regard - schema - date range - data size and bounding box - data layers and options

INPUT: fname_settings: str, path + filename of settings

Expand source code
def validate(fname_settings, verbose=False):
    """
    Validates all settings with regard
        - schema
        - date range
        - data size and bounding box
        - data layers and options

    INPUT:
    fname_settings: str, path + filename of settings
    """
    # fname_settings = 'settings/settings_v0.3.yaml'

    # Logger setup
    if verbose:
        write_logs.setup(level="info")
    else:
        write_logs.setup()

    with open(fname_settings, "r") as f:
        settings = yaml.load(f, Loader=yaml.FullLoader)

    # Check that schema is ok
    schema_ok = check_schema(settings)
    assert schema_ok, "Invalid Schema"

    # Check requested image size
    # Note that this is an upper limit and does not ensure Webservers reaches timeout
    bbox = settings["target_bbox"]
    target_res = settings["target_res"]
    size_ok = check_target_size(bbox, target_res)
    assert size_ok, "Invalid Size"

    # Check if requested dates are in valid range
   # dates = settings["target_dates"]
   # dates_ok = check_target_dates(dates)
   # assert dates_ok, "Invalid Dates"

    # Check if data layers and options ok
    target_sources = settings["target_sources"]
    targets_ok = check_target_sources(target_sources)
    assert targets_ok, "Invalid Data layers or options"

    utils.msg_success("Configuration validated successfully!")