How we share data requirements between ML applications

We use pydantic models to share data requirements and metadata between ML applications. Here’s how.

In our previous post, we wrote about how we use the Python package pydantic to validate input data. We serve our machine learning models with APIs written using the FastAPI library, which uses pydantic to validate and parse the input data received through the API. FastAPI was our introduction to pydantic. We started out with some pydantic models for validating input data to the API. We saw that sharing the models across several applications would lead to less boilerplate code, as well as more explicit handling of our data requirements.

The data models we share across our ML applications serve two purposes:

  • Sharing metadata, i.e. tidy models declaring what data a specific machine learning model requires, including the logic for naming standards and formats for exporting data. This is also used as the foundation for a mapping of how to select data from the training database.
  • Validating that data requirements are fulfilled. When training our models, we have certain requirements to the training data, for example that there is no missing data. The data models allow us to validate these requirements across other applications as well, such as in the API.

In this post, we will show how we share metadata and data requirements using pydantic. For an introduction to pydantic syntax and concepts, take a look at our previous post, or the excellent pydantic documentation.

A quick overview of applications

Before we go into detail, we’ll start with a quick overview of how data flows and what applications are at play.

Outline of architecture with the relevant components

Our forecast models are served by APIs, the ML API application in the figure. The models specify the data they require to make a prediction at a get endpoint. The ML / stream integration application requests the feature metadata specification from the ML API, fetches the data from the streaming data and posts the observations to the ML API’s prediction endpoint. The response is the prediction, which the ML / stream integration application published to the streaming data platform for users to consume.

The data from the streaming platform is persisted to a database. To ensure we have valid training data, data cleaning applications read the raw data, validate that the data meets the requirements. These applications also try to fill small holes of missing data according to domain specific rules, and warn if there are larges holes that need to be manually processed. Data validation is run on a regular schedule, as data is continuously persisted from the streaming platform to the database.

When training the models, validated data from the data cleaning applications is used.

When reading data, the applications use the metadata models to select the correct data. The ML API specifies what data should be fetched from the streaming platform using the metadata model. The data cleaning applications uses the metadata model to select the correct data from the database. When training models, the data is fetched from the database according to the metadata model.

When operating on data, the applications use the data requirements models to ensure the validity of the input data. Before making a prediction, data is validated in the ML API. The prognosis response is also validated before it is returned to the ML / stream integration application. The data cleaning applications also validate data according to the data requirements models before writing to the database for training.

The metadata models are used for reading the correct data, shown with a yellow star. The data requirements model is used for validating that data meets requirements, shown with a blue triangle.

Sharing metadata with pydantic

Below is an example of our BaseFeatureMetaData class, which we use for metadata on the base features in our models, i.e., the features as they come from our sources. This metadata model specifies all necessary details on the features a model requires:

  • The name of the feature.
  • The source we require for the feature, as several features are found in multiple sources. An example is weather forecasts, which are available from different vendors, but it could also be different estimates for measurements or unobservable values.
  • Which locations we require data from.
  • history_period and future_period together specifies the time series the model requires to provide its prediction.
  • The time resolution required for this feature, as many features are available in different time aggregations.
from datetime import timedelta
from typing import List

from pydantic import BaseModel

from our_internal_utils import timedelta_isoformat_


class BaseFeatureMetaData(BaseModel):
    name: str
    source: str
    locations: List[str]
    history_period: timedelta
    future_period: timedelta
    resolution: timedelta

    class Config:
        json_encoders = {timedelta: timedelta_isoformat_}

    def __str__(self):
        res_str = timedelta_isoformat_(self.resolution)
        return f"{self.name}_{self.source}_{res_str}"

We can now create a metadata specification for temperature data, from the meteorological institute, or met as we call them, from two weather stations, SN18700 (Blindern) and SN27500 (Lillehammer), for 48 hours of weather observations and 72 hours of forecast values, using the hourly resolution data:

temperature_metadata = BaseFeatureMetaData(
    name="temperature",
    source="met",
    locations=["SN18700", "SN27500"],
    history_period=timedelta(hours=48),
    future_period=timedelta(hours=72),
    resolution=timedelta(hours=1),
)

This metadata specification is used

  • By the ML / stream integration application that fetches data to models for live prediction. We provide the metadata in the API which serves our ML models, specifying all details on the features a model requires. The application uses the metadata to collect the required time series from our streaming platform.
  • When we train our models, to fetch the correct training data from the database for each model. The metadata specification of each model, which is exposed in the API, is also what we use to map a base feature the which data to read from the database.
  • When validating training data in the data cleaning applications. Each base feature has its own data cleaning application configured with the metadata model, used to map a base feature to which data to read from the database, as for training.

In order to provide the metadata model in json format from the API, we use pydantic’s Config class, which we can use to customize behaviour of the model. The json_encoder in our Config specifies that all timedelta fields should be converted using our utility function timedelta_isoformat_, to provide valid json. Pydantic provides a .json() method for exporting models to json. temperature_metadata.json() returns:

'{   
   "name":"temperature",
   "source":"met",
   "locations":[
      "SN18700",
      "SN27500"
],
   "history_period":"PT1H",
   "future_period":"PT1H",
   "resolution":"PT1H"
}'

following the timedelta representation standard we have agreed on in the interface between the ML API and the ML / stream integration application. We also overwrite the __str__ representation of the class to provide our internal naming standard for features. str(temperature_metadata) returns:

 'temperature_met_PT1H'

We use this naming standard in column names for the feature data in internal applications and naming each data cleaning application, as each base feature has its own cleaning application.

Validating data requirements across applications

Our BaseFeature class explicitly states our data requirements through validators: that time steps must be evenly spaced, that we don’t accept any NaNs in the timeseries and other basic requirements. This model is used

  • in our ML API, to validate input data before running predictions. This use case was our first introduction to pydantic data models, and is described in FastAPI’s excellent documentation.
  • in our data cleaning application, to validate that new potential training data, persisted from the streaming platform, meets the requirements we have in training.
  • Since we use the BaseFeature model when reading data from the database to train our models, the validation can be done before training as well, or we can use the construct() method for already validated sources, which creates models without validation.

This use of pydantic data models is analogous to the input validation example shared in our previous post. Using the data model in both the API serving the models, the models themselves and the data cleaning applications ensures that we collect all our data requirements in one place. This way we avoid multiple .dropna() lines spread throughout our code, which can lead to different treatment of data during training and prediction, and makes it difficult to refactor code. Using the data requirements in the data cleaning applications also ensures that there is validated data available for training at any time.

Below is a shortened example with a few of the validators of a parent model, DataFrameTimeSeries, which the data requirement models inherit from.

In the DataFrameTimeSeries model, missing values are allowed. Our BaseFeature model inherits from DataFrameTimeSeries and inherits all validators, but we override the data field, because missing data is not allowed for our BaseFeature input. When an instance of a pydantic BaseModel object is created, the validator decorated functions will be run, as well as validation that the type hints are adhered to. An exception will be raised if any of the validations fails.

from typing import List, Optional, Dict

from pydantic import BaseModel, validator


class DataFrameTimeSeries(BaseModel):
    columns: List[str]
    timestamps: List[int]
    data: List[List[Optional[float]]]
    # Some time series are allowed to have missing values

    @validator("columns")
    def at_least_one_column(cls, columns: List[str]) -> List[str]:
        if len(columns) < 1:
            raise ValueError("No column names specified")
        return columns

    @validator("timestamps", "data")
    def len_at_least(cls, value: List) -> List:
        minlen = 1
        if len(value) < minlen:
            raise ValueError(f"length is less than {minlen}")
        return value

    @validator("timestamps")
    def index_has_proper_spacing(cls, value: Dict) -> Dict:
        diffs = [x[0] - x[1] for x in zip(value[1:], value)]
        if len(diffs) < 1:
            return value
        if any(diff <= 0 for diff in diffs):
            raise ValueError("Index is not monotonically increasing")
        if any(diff != diffs[0] for diff in diffs):
            raise ValueError("Index has unequal time steps")
        return value


class BaseFeature(DataFrameTimeSeries):
    data: List[List[float]]
    # BaseFeatures inherit validators from DataFrameTimeSeries, 
    # but are not allowed to have missing values, so the data 
    # field does not have Optional floats

In our API, the validation is done immediately when a new request is made, as the pydantic validation is enforced through type hints in the API routes.

Our data cleaning applications runs validation once per day, validating all BaseFeatures in the training database. There is one application running per BaseFeature. New data from the streaming platform is stored in the database continuously. The cleaning application reads the new data since last cleaning and, if there are small holes, attempts to fill them according to domain specific rules. The number of timesteps that can be filled automatically as well as the interpolation method is set per base feature. The joint dataset of original and processed data is then converted to BaseFeatures through a utility function. As this creates a BaseFeature instance, validation is performed immediately. A view containing only validated data is updated to include the newly added data if it meets the requirements. This view is the source for training data.

We could create additional validation functions, for example creating subclasses of BaseFeature with domain specific validation, such as checking whether temperature values are within normal ranges.

Going further

As of now, our use of data models ensure that the data meet minimum requirements for our specific use case. What this doesn’t provide is a broader evaluation of data quality: are values drifting, how often is data missing from this source, etc. For this kind of overview, we are working on a pipeline for monitoring input data closer to the source and provide overviews for other data consumers. Our initial investigations of combining pydantic models with great expectations, a data testing, documentation and profiling framework, are promising.

2 thoughts on “How we share data requirements between ML applications”

Leave a Reply to Tobias Brandt Cancel reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: