How we share data requirements between ML applications

We use pydantic models to share data requirements and metadata between ML applications. Here’s how.

In our previous post, we wrote about how we use the Python package pydantic to validate input data. We serve our machine learning models with APIs written using the FastAPI library, which uses pydantic to validate and parse the input data received through the API. FastAPI was our introduction to pydantic. We started out with some pydantic models for validating input data to the API. We saw that sharing the models across several applications would lead to less boilerplate code, as well as more explicit handling of our data requirements.

The data models we share across our ML applications serve two purposes:

  • Sharing metadata, i.e. tidy models declaring what data a specific machine learning model requires, including the logic for naming standards and formats for exporting data. This is also used as the foundation for a mapping of how to select data from the training database.
  • Validating that data requirements are fulfilled. When training our models, we have certain requirements to the training data, for example that there is no missing data. The data models allow us to validate these requirements across other applications as well, such as in the API.

In this post, we will show how we share metadata and data requirements using pydantic. For an introduction to pydantic syntax and concepts, take a look at our previous post, or the excellent pydantic documentation.

A quick overview of applications

Before we go into detail, we’ll start with a quick overview of how data flows and what applications are at play.

Outline of architecture with the relevant components

Our forecast models are served by APIs, the ML API application in the figure. The models specify the data they require to make a prediction at a get endpoint. The ML / stream integration application requests the feature metadata specification from the ML API, fetches the data from the streaming data and posts the observations to the ML API’s prediction endpoint. The response is the prediction, which the ML / stream integration application published to the streaming data platform for users to consume.

The data from the streaming platform is persisted to a database. To ensure we have valid training data, data cleaning applications read the raw data, validate that the data meets the requirements. These applications also try to fill small holes of missing data according to domain specific rules, and warn if there are larges holes that need to be manually processed. Data validation is run on a regular schedule, as data is continuously persisted from the streaming platform to the database.

When training the models, validated data from the data cleaning applications is used.

When reading data, the applications use the metadata models to select the correct data. The ML API specifies what data should be fetched from the streaming platform using the metadata model. The data cleaning applications uses the metadata model to select the correct data from the database. When training models, the data is fetched from the database according to the metadata model.

When operating on data, the applications use the data requirements models to ensure the validity of the input data. Before making a prediction, data is validated in the ML API. The prognosis response is also validated before it is returned to the ML / stream integration application. The data cleaning applications also validate data according to the data requirements models before writing to the database for training.

The metadata models are used for reading the correct data, shown with a yellow star. The data requirements model is used for validating that data meets requirements, shown with a blue triangle.

Sharing metadata with pydantic

Below is an example of our BaseFeatureMetaData class, which we use for metadata on the base features in our models, i.e., the features as they come from our sources. This metadata model specifies all necessary details on the features a model requires:

  • The name of the feature.
  • The source we require for the feature, as several features are found in multiple sources. An example is weather forecasts, which are available from different vendors, but it could also be different estimates for measurements or unobservable values.
  • Which locations we require data from.
  • history_period and future_period together specifies the time series the model requires to provide its prediction.
  • The time resolution required for this feature, as many features are available in different time aggregations.
from datetime import timedelta
from typing import List

from pydantic import BaseModel

from our_internal_utils import timedelta_isoformat_


class BaseFeatureMetaData(BaseModel):
    name: str
    source: str
    locations: List[str]
    history_period: timedelta
    future_period: timedelta
    resolution: timedelta

    class Config:
        json_encoders = {timedelta: timedelta_isoformat_}

    def __str__(self):
        res_str = timedelta_isoformat_(self.resolution)
        return f"{self.name}_{self.source}_{res_str}"

We can now create a metadata specification for temperature data, from the meteorological institute, or met as we call them, from two weather stations, SN18700 (Blindern) and SN27500 (Lillehammer), for 48 hours of weather observations and 72 hours of forecast values, using the hourly resolution data:

temperature_metadata = BaseFeatureMetaData(
    name="temperature",
    source="met",
    locations=["SN18700", "SN27500"],
    history_period=timedelta(hours=48),
    future_period=timedelta(hours=72),
    resolution=timedelta(hours=1),
)

This metadata specification is used

  • By the ML / stream integration application that fetches data to models for live prediction. We provide the metadata in the API which serves our ML models, specifying all details on the features a model requires. The application uses the metadata to collect the required time series from our streaming platform.
  • When we train our models, to fetch the correct training data from the database for each model. The metadata specification of each model, which is exposed in the API, is also what we use to map a base feature the which data to read from the database.
  • When validating training data in the data cleaning applications. Each base feature has its own data cleaning application configured with the metadata model, used to map a base feature to which data to read from the database, as for training.

In order to provide the metadata model in json format from the API, we use pydantic’s Config class, which we can use to customize behaviour of the model. The json_encoder in our Config specifies that all timedelta fields should be converted using our utility function timedelta_isoformat_, to provide valid json. Pydantic provides a .json() method for exporting models to json. temperature_metadata.json() returns:

'{   
   "name":"temperature",
   "source":"met",
   "locations":[
      "SN18700",
      "SN27500"
],
   "history_period":"PT1H",
   "future_period":"PT1H",
   "resolution":"PT1H"
}'

following the timedelta representation standard we have agreed on in the interface between the ML API and the ML / stream integration application. We also overwrite the __str__ representation of the class to provide our internal naming standard for features. str(temperature_metadata) returns:

 'temperature_met_PT1H'

We use this naming standard in column names for the feature data in internal applications and naming each data cleaning application, as each base feature has its own cleaning application.

Validating data requirements across applications

Our BaseFeature class explicitly states our data requirements through validators: that time steps must be evenly spaced, that we don’t accept any NaNs in the timeseries and other basic requirements. This model is used

  • in our ML API, to validate input data before running predictions. This use case was our first introduction to pydantic data models, and is described in FastAPI’s excellent documentation.
  • in our data cleaning application, to validate that new potential training data, persisted from the streaming platform, meets the requirements we have in training.
  • Since we use the BaseFeature model when reading data from the database to train our models, the validation can be done before training as well, or we can use the construct() method for already validated sources, which creates models without validation.

This use of pydantic data models is analogous to the input validation example shared in our previous post. Using the data model in both the API serving the models, the models themselves and the data cleaning applications ensures that we collect all our data requirements in one place. This way we avoid multiple .dropna() lines spread throughout our code, which can lead to different treatment of data during training and prediction, and makes it difficult to refactor code. Using the data requirements in the data cleaning applications also ensures that there is validated data available for training at any time.

Below is a shortened example with a few of the validators of a parent model, DataFrameTimeSeries, which the data requirement models inherit from.

In the DataFrameTimeSeries model, missing values are allowed. Our BaseFeature model inherits from DataFrameTimeSeries and inherits all validators, but we override the data field, because missing data is not allowed for our BaseFeature input. When an instance of a pydantic BaseModel object is created, the validator decorated functions will be run, as well as validation that the type hints are adhered to. An exception will be raised if any of the validations fails.

from typing import List, Optional, Dict

from pydantic import BaseModel, validator


class DataFrameTimeSeries(BaseModel):
    columns: List[str]
    timestamps: List[int]
    data: List[List[Optional[float]]]
    # Some time series are allowed to have missing values

    @validator("columns")
    def at_least_one_column(cls, columns: List[str]) -> List[str]:
        if len(columns) < 1:
            raise ValueError("No column names specified")
        return columns

    @validator("timestamps", "data")
    def len_at_least(cls, value: List) -> List:
        minlen = 1
        if len(value) < minlen:
            raise ValueError(f"length is less than {minlen}")
        return value

    @validator("timestamps")
    def index_has_proper_spacing(cls, value: Dict) -> Dict:
        diffs = [x[0] - x[1] for x in zip(value[1:], value)]
        if len(diffs) < 1:
            return value
        if any(diff <= 0 for diff in diffs):
            raise ValueError("Index is not monotonically increasing")
        if any(diff != diffs[0] for diff in diffs):
            raise ValueError("Index has unequal time steps")
        return value


class BaseFeature(DataFrameTimeSeries):
    data: List[List[float]]
    # BaseFeatures inherit validators from DataFrameTimeSeries, 
    # but are not allowed to have missing values, so the data 
    # field does not have Optional floats

In our API, the validation is done immediately when a new request is made, as the pydantic validation is enforced through type hints in the API routes.

Our data cleaning applications runs validation once per day, validating all BaseFeatures in the training database. There is one application running per BaseFeature. New data from the streaming platform is stored in the database continuously. The cleaning application reads the new data since last cleaning and, if there are small holes, attempts to fill them according to domain specific rules. The number of timesteps that can be filled automatically as well as the interpolation method is set per base feature. The joint dataset of original and processed data is then converted to BaseFeatures through a utility function. As this creates a BaseFeature instance, validation is performed immediately. A view containing only validated data is updated to include the newly added data if it meets the requirements. This view is the source for training data.

We could create additional validation functions, for example creating subclasses of BaseFeature with domain specific validation, such as checking whether temperature values are within normal ranges.

Going further

As of now, our use of data models ensure that the data meet minimum requirements for our specific use case. What this doesn’t provide is a broader evaluation of data quality: are values drifting, how often is data missing from this source, etc. For this kind of overview, we are working on a pipeline for monitoring input data closer to the source and provide overviews for other data consumers. Our initial investigations of combining pydantic models with great expectations, a data testing, documentation and profiling framework, are promising.

How we validate input data using pydantic

We use the Python package pydantic for fast and easy validation of input data. Here’s how.

We discovered the Python package pydantic through FastAPI, which we use for serving machine learning models. Pydantic is a Python package for data parsing and validation, based on type hints. We use pydantic because it is fast, does a lot of the dirty work for us, provides clear error messages and makes it easy to write readable code.

Two of our main uses cases for pydantic are:

  1. Validation of settings and input data.
    We often read settings from a configuration file, which we use as inputs to our functions. We often end up doing quite a bit of input validation to ensure the settings are valid for further processing. To avoid starting our functions with a long set of validations and assertions, we use pydantic to validate the input.
  2. Sharing data requirements between machine learning applications.
    Our ML models have certain requirements to the data we use for training and prediction, for example that no data is missing. These requirements are used when we train our models, when we run online predictions and when we validate newly added training data. We use pydantic to specify the requirements we have and ensure that the same requirements are used everywhere, avoiding duplication of error-prone code across different applications.

This post will focus on the first use case, validation of settings and input data. A later post will cover the second use case.

Validation of settings and input data

In some cases, we read settings from a configuration file, such as a toml file, to be parsed as nested dictionaries. We use the settings as inputs to different functions. We often end up doing quite a bit of input validation to ensure the settings parsed from file are valid for further processing. A concrete example is settings for machine learning models, where we use toml files for defining model parameters, features and training details for the models.

This is quite similar to how FastAPI uses pydantic for input validation: the input to the API call is json, which in Python translates to a dictionary, and input validation is done using pydantic.

In this post we will go through input validation for a function interpolating a time series to a higher frequency. If we want to do interpolation, we set the interpolation factor, i.e., the factor of upsamling, the interpolation method, and an option to interpolate on the integral. Our interpolation function is just a wrapper around pandas interpolation methods, including the validation of input and some data wrangling. The input validation code started out looking a bit like this:

from typing import Dict


def validate_input_settings(params_in: Dict) -> Dict:
    params_validated = {}
    for key, value in params_in.items():
        if key == "interpolation_factor":
            if not int(value) == value:
                raise ValueError(f"{key} has a non-int value")
            if not int(value) >= 2:
                raise ValueError(f"{key}: {value} should be >= 2")
            value = int(value)
        elif key == "interpolation_method":
            allowed_set = {
                "repeat", "distribute", "linear", "cubic", "akima"
            }
            if value not in allowed_set:
                raise ValueError(f"{key} should be one of {allowed_set}, got {value}")
        elif key == "interpolate_on_integral":
            if not isinstance(value, bool):
                raise ValueError(f"{key} should be bool, got {value}")
        else:
            raise ValueError(f"{key} not a recognized key")
        params_validated[key] = value
    return params_validated

This is heavily nested, which in itself makes it hard to read, and perhaps you find that the validation rules aren’t crystal clear at first glance. We use SonarQube for static code quality analysis, and this piece of code results in a code smell, complaining that the code is too complex. In fact, this already has a cognitive complexity of 18 as SonarQube counts, above the default threshold of 15. Cognitive complexity is a measure of how difficult it is to read code, and increments for each break in linear flow, such as an if statement or a for loop. Nested breaks of the flow are incremented again.

Let’s summarize what we check for in validate_input_settings:

  • interpolation_factor is an integer
  • interpolation_factor is greater than or equal to 2
  • interpolation_method is in a set of allowed values
  • interpolate_on_integral is boolean
  • The keys in our settings dictionary are among the three mentioned above

In addition to the code above, we have a few more checks:

  • if an interpolation_factor is given, but no interpolation_method, use the default method linear
  • if an interpolation_factor is given, but not interpolate_on_integral, set the default option False
  • check for invalid the invalid combination interpolate_on_integral = False and interpolation_method = "distribute"

At the end of another three if statements inside the for loop, we end up at a cognitive complexity of 24.

Pydantic to the rescue

We might consider using a pydantic model for the input validation.

Minimal start

We can start out with the simplest form of a pydantic model, with field types:

from pydantic import BaseModel


class InterpolationSetting(BaseModel):
    interpolation_factor: int
    interpolation_method: str
    interpolate_on_integral: bool

Pydantic models are simply classes inheriting from the BaseModel class. We can create an instance of the new class as:

InterpolationSetting(
    interpolation_factor=2, 
    interpolation_method="linear", 
    interpolate_on_integral=True
)

This automatically does two of the checks we had implemented:

  • interpolation_factor is an int
  • interpolate_on_integral is a bool

In the original script, the fields are in fact optional, i.e., it is possible to provide no interpolation settings, in which case we do not do interpolation. We will set the fields to optional later, and then implement the additional necessary checks.

We can verify the checks we have enforced now by supplying non-valid input:

from pydantic import ValidationError


try:
    InterpolationSetting(
        interpolation_factor="text",
        interpolation_method="linear",
        interpolate_on_integral=True,
    )
except ValidationError as e:
    print(e)

which outputs:

    1 validation error for InterpolationSetting
    interpolation_factor
      value is not a valid integer (type=type_error.integer)

Pydantic raises a ValidationError when the validation of the model fails, stating which field, i.e. attribute, raised the error and why. In this case interpolation_factor raised a type error because the value "text" is not a valid integer. The validation is performed on instantiation of an InterpolationSetting object.

Validation of single fields and combinations of fields

Our original code also had some additional requirements:

  • interpolation_factor should be greater than or equal to two.
  • interpolation_method must be chosen from a set of valid methods.
  • We do not allow the combination of interpolate_on_integral=False and interpolation_method="distribute"

The first restriction can be implemented using pydantic types. Pydantic provides many different types, we will use a constrained types this requirement, namely conint, a constrained integer type providing automatic restrictions such as lower limits.

The remaining two restrictions can be implemented as validators. We decorate our validation functions with the validator decorator. The input argument to the validator decorator is the name of the attribute(s) to perform the validation for.

All validators are run automatically when we instantiate an object of the InterpolationSetting class, as for the type checking.

Our validation functions are class methods, and the first argument is the class, not an instance of the class. The second argument is the value to validate, and can be named as we wish. We implement two validators, method_is_valid and valid_combination_of_method_and_on_integral:

from typing import Dict

from pydantic import BaseModel, conint, validator, root_validator


class InterpolationSetting(BaseModel):
    interpolation_factor: conint(gt=1)
    interpolation_method: str
    interpolate_on_integral: bool

    @validator("interpolation_method")
    def method_is_valid(cls, method: str) -> str:
        allowed_set = {"repeat", "distribute", "linear", "cubic", "akima"}
        if method not in allowed_set:
            raise ValueError(f"must be in {allowed_set}, got '{method}'")
        return method

    @root_validator()
    def valid_combination_of_method_and_on_integral(cls, values: Dict) -> Dict:
        on_integral = values.get("interpolate_on_integral")
        method = values.get("interpolation_method")
        if on_integral is False and method == "distribute":
            raise ValueError(
                f"Invalid combination of interpolation_method "
                f"{method} and interpolate_on_integral {on_integral}"
            )
        return values

There are a few things to note here:

  • Validators should return a validated value. The validators are run sequentially, and populate the fields of the data model if they are valid.
  • Validators should only raise ValueError, TypeError or AssertionError. Pydantic will catch these errors to populate the ValidationError and raise one exception regardless of the number of errors found in validation. You can read more about error handling in the docs.
  • When we validate a field against another, we can use the root_validator, which runs validation on entire model. Root validators are a little different: they have access to the values argument, which is a dictionary containing all fields that have already been validated. When the root validator runs, the interpolation_method may have failed to validate, in which case it will not be added to the values dictionary. Here, we handle that by using values.get("interpolation_method") which returns None if the key is not in values. The docs contain more information on root validators and field ordering, which is important to consider when we are using the values dictionary.

Again, we can verify by choosing input parameters to trigger the errors:

from pydantic import ValidationError


try:
    InterpolationSetting(
        interpolation_factor=1,
        interpolation_method="distribute",
        interpolate_on_integral=False,
    )
except ValidationError as e:
    print(e)

which outputs:

    2 validation errors for InterpolationSetting
    interpolation_factor
      ensure this value is greater than 1 (type=value_error.number.not_gt; limit_value=1)
    __root__
      Invalid combination of interpolation_method distribute and interpolate_on_integral False (type=value_error)

As we see, pydantic raises a single ValidationError regardless of the number of ValueErrors raised in our model.

Implementing dynamic defaults

We also had some default values if certain parameters were not given:

  • If an interpolation_factor is given, set the default value linear for interpolation_method if none is given.
  • If an interpolation_factor is given, set the default value False for interpolate_on_integral if none is given.

In this case, we have dynamic defaults dependent on other fields.

This can also be achieved with root validators, by returning a conditional value. As this means validating one field against another, we must take care to ensure our code runs whether or not the two fields have passed validation and been added to the values dictionary. We will now also use Optional types, because we will handle the cases where not all values are provided. We add the new validators set_method_given_interpolation_factor and set_on_integral_given_interpolation_factor:

from typing import Dict, Optional

from pydantic import BaseModel, conint, validator, root_validator


class InterpolationSetting(BaseModel):
    interpolation_factor: Optional[conint(gt=2)]
    interpolation_method: Optional[str]
    interpolate_on_integral: Optional[bool]

    @validator("interpolation_method")
    def method_is_valid(cls, method: Optional[str]) -> Optional[str]:
        allowed_set = {"repeat", "distribute", "linear", "cubic", "akima"}
        if method is not None and method not in allowed_set:
            raise ValueError(f"must be in {allowed_set}, got '{method}'")
        return method

    @root_validator()
    def valid_combination_of_method_and_on_integral(cls, values: Dict) -> Dict:
        on_integral = values.get("interpolate_on_integral")
        method = values.get("interpolation_method")
        if on_integral is False and method == "distribute":
            raise ValueError(
                f"Invalid combination of interpolation_method "
                f"{method} and interpolate_on_integral {on_integral}"
            )
        return values

    @root_validator()
    def set_method_given_interpolation_factor(cls, values: Dict) -> Dict:
        factor = values.get("interpolation_factor")
        method = values.get("interpolation_method")
        if method is None and factor is not None:
            values["interpolation_method"] = "linear"
        return values

    @root_validator()
    def set_on_integral_given_interpolation_factor(cls, values: Dict) -> Dict:
        on_integral = values.get("interpolate_on_integral")
        factor = values.get("interpolation_factor")
        if on_integral is None and factor is not None:
            values["interpolate_on_integral"] = False
        return values

We can verify that the default values are set only when interpolation_factor
is provided, running InterpolationSetting(interpolation_factor=3) returns:

InterpolationSetting(interpolation_factor=3, interpolation_method='linear', interpolate_on_integral=None)

whereas supplying no input parameters, InterpolationSetting(), returns a data model with all parameters set to None:

InterpolationSetting(interpolation_factor=None, interpolation_method=None, interpolate_on_integral=None)

Note: If we have static defaults, we can simply set them for the fields:

class InterpolationSetting(BaseModel):
    interpolation_factor: Optional[int] = 42

Final safeguard against typos

Finally, we had one more check in out previous script: That no unknown keys were provided. If we provide unknown keys to our data model now, nothing really happens, for example InterpolationSetting(hello="world") outputs:

InterpolationSetting(interpolation_factor=None, interpolation_method=None, interpolate_on_integral=None)

Often, an unknown field name is the result of a typo in the toml file. Therefore we want to raise an error to alert the user. We do this using a the model config, controlling the behaviour of the model. The extra attribute of the config determines what we do with extra fields. The default is ignore, which we can see in the example above, where the field is ignored, and not added to the model, as the option allow does. We can use the forbid option to raise an exception when extra fields are supplied.

from typing import Dict, Optional

from pydantic import BaseModel, conint, validator, root_validator


class InterpolationSetting(BaseModel):
    interpolation_factor: Optional[conint(gt=2)]
    interpolation_method: Optional[str]
    interpolate_on_integral: Optional[bool]

    class Config:
        extra = "forbid"

    @validator("interpolation_method")
    def method_is_valid(cls, method: Optional[str]) -> Optional[str]:
        allowed_set = {"repeat", "distribute", "linear", "cubic", "akima"}
        if method is not None and method not in allowed_set:
            raise ValueError(f"must be in {allowed_set}, got '{method}'")
        return method

    @root_validator()
    def valid_combination_of_method_and_on_integral(cls, values: Dict) -> Dict:
        on_integral = values.get("interpolate_on_integral")
        method = values.get("interpolation_method")
        if on_integral is False and method == "distribute":
            raise ValueError(
                f"Invalid combination of interpolation_method "
                f"{method} and interpolate_on_integral {on_integral}"
            )
        return values

    @root_validator()
    def set_method_given_interpolation_factor(cls, values: Dict) -> Dict:
        factor = values.get("interpolation_factor")
        method = values.get("interpolation_method")
        if method is None and factor is not None:
            values["interpolation_method"] = "linear"
        return values

    @root_validator()
    def set_on_integral_given_interpolation_factor(cls, values: Dict) -> Dict:
        on_integral = values.get("interpolate_on_integral")
        factor = values.get("interpolation_factor")
        if on_integral is None and factor is not None:
            values["interpolation_factor"] = False
        return values

If we try again with an unknown key, we now get a ValidationError:

from pydantic import ValidationError


try:
    InterpolationSetting(hello=True)
except ValidationError as e:
    print(e)

This raises a validation error for the unknown field:

1 validation error for InterpolationSetting
hello
  extra fields not permitted (type=value_error.extra)

Adapting our existing code is easy

Now we have implemented all our checks, and can go on to adapt our existing code to use the new data model. In our original implementation, we would do something like

params_in = toml.load(path_to_settings_file)
params_validated = validate_input_settings(params_in)
interpolate_result(params_validated)

We can replace the call to validate_input_settings with instantiation of the pydantic model: params_validated = InterpolationSetting(params_in). Each pydantic data model has a .dict() method that returns the parameters as a dictionary, so we can use it in the input argument to interpolate_result directly: interpolate_result(params_validated.dict()). Another option is to refactor interpolate_result to use the attributes of the InterpolationSetting objects, such as params_validated.interpolation_method instead of the values of a dictionary.

Conclusion

In the end, we can replace one 43 line method (for the full functionality) and cognitive complexity of 24 with one 40 line class containing six methods, each with cognitive complexity less than 4. The pydantic data models will not necessarily be shorter than the custom validation code they replace, and since there are a few quirks and concepts to pay attention to, they are not necessarily easier to read at the first try.

However, as we use the library for validation in our APIs, we are getting familiar with it, and we can understand more easily.

Some of the benefits of using pydantic for this are:

  • Type checking (and in fact also some type conversion), which we previously did ourselves, is now done automatically for us, saving us the work of repeating lines of error-prone code in many different functions.
  • Each validator has a name which, if we put a little thought into it, makes it very clear what we are trying to achieve. In our previous example, the purpose of each nested condition had to be deduced from the many if clauses and error messages. This should be a lot clearer now, especially if we use pydantic across different projects.
  • If speed is important, pydantic’s benchmarks show that they are fast compared to similar libraries.

Hopefully this will help you determine whether or not you should consider using pydantic models in your projects. In a later post, we will show how we use pydantic data models to share metadata between machine learning applications, and to share data requirements between the applications.