In our previous post, we wrote about how we use the Python package pydantic to validate input data. We serve our machine learning models with APIs written using the FastAPI library, which uses pydantic to validate and parse the input data received through the API. FastAPI was our introduction to pydantic. We started out with some pydantic models for validating input data to the API. We saw that sharing the models across several applications would lead to less boilerplate code, as well as more explicit handling of our data requirements.
The data models we share across our ML applications serve two purposes:
- Sharing metadata, i.e. tidy models declaring what data a specific machine learning model requires, including the logic for naming standards and formats for exporting data. This is also used as the foundation for a mapping of how to select data from the training database.
- Validating that data requirements are fulfilled. When training our models, we have certain requirements to the training data, for example that there is no missing data. The data models allow us to validate these requirements across other applications as well, such as in the API.
In this post, we will show how we share metadata and data requirements using pydantic. For an introduction to pydantic syntax and concepts, take a look at our previous post, or the excellent pydantic documentation.
A quick overview of applications
Before we go into detail, we’ll start with a quick overview of how data flows and what applications are at play.

Our forecast models are served by APIs, the ML API application in the figure. The models specify the data they require to make a prediction at a get endpoint. The ML / stream integration application requests the feature metadata specification from the ML API, fetches the data from the streaming data and posts the observations to the ML API’s prediction endpoint. The response is the prediction, which the ML / stream integration application published to the streaming data platform for users to consume.
The data from the streaming platform is persisted to a database. To ensure we have valid training data, data cleaning applications read the raw data, validate that the data meets the requirements. These applications also try to fill small holes of missing data according to domain specific rules, and warn if there are larges holes that need to be manually processed. Data validation is run on a regular schedule, as data is continuously persisted from the streaming platform to the database.
When training the models, validated data from the data cleaning applications is used.
When reading data, the applications use the metadata models to select the correct data. The ML API specifies what data should be fetched from the streaming platform using the metadata model. The data cleaning applications uses the metadata model to select the correct data from the database. When training models, the data is fetched from the database according to the metadata model.
When operating on data, the applications use the data requirements models to ensure the validity of the input data. Before making a prediction, data is validated in the ML API. The prognosis response is also validated before it is returned to the ML / stream integration application. The data cleaning applications also validate data according to the data requirements models before writing to the database for training.

Sharing metadata with pydantic
Below is an example of our BaseFeatureMetaData
class, which we use for metadata on
the base features in our models, i.e., the features as they come from our sources. This metadata model specifies all necessary details on the features
a model requires:
- The name of the feature.
- The source we require for the feature, as several features are found in multiple sources. An example is weather forecasts, which are available from different vendors, but it could also be different estimates for measurements or unobservable values.
- Which locations we require data from.
history_period
andfuture_period
together specifies the time series the model requires to provide its prediction.- The time resolution required for this feature, as many features are available in different time aggregations.
from datetime import timedelta from typing import List from pydantic import BaseModel from our_internal_utils import timedelta_isoformat_ class BaseFeatureMetaData(BaseModel): name: str source: str locations: List[str] history_period: timedelta future_period: timedelta resolution: timedelta class Config: json_encoders = {timedelta: timedelta_isoformat_} def __str__(self): res_str = timedelta_isoformat_(self.resolution) return f"{self.name}_{self.source}_{res_str}"
We can now create a metadata specification for temperature data, from the meteorological institute, or met as we call them, from two weather stations, SN18700 (Blindern) and SN27500 (Lillehammer), for 48 hours of weather observations and 72 hours of forecast values, using the hourly resolution data:
temperature_metadata = BaseFeatureMetaData( name="temperature", source="met", locations=["SN18700", "SN27500"], history_period=timedelta(hours=48), future_period=timedelta(hours=72), resolution=timedelta(hours=1), )
This metadata specification is used
- By the ML / stream integration application that fetches data to models for live prediction. We provide the metadata in the API which serves our ML models, specifying all details on the features a model requires. The application uses the metadata to collect the required time series from our streaming platform.
- When we train our models, to fetch the correct training data from the database for each model. The metadata specification of each model, which is exposed in the API, is also what we use to map a base feature the which data to read from the database.
- When validating training data in the data cleaning applications. Each base feature has its own data cleaning application configured with the metadata model, used to map a base feature to which data to read from the database, as for training.
In order to provide the metadata model in json format from the API, we use pydantic’s Config
class, which we can use to customize behaviour of the model. The json_encoder
in our
Config
specifies that all timedelta
fields should be converted using our utility
function timedelta_isoformat_
, to provide valid json. Pydantic provides a .json()
method for exporting models to json. temperature_metadata.json()
returns:
'{ "name":"temperature", "source":"met", "locations":[ "SN18700", "SN27500" ], "history_period":"PT1H", "future_period":"PT1H", "resolution":"PT1H" }'
following the timedelta representation standard we have agreed on in the interface between the ML API and the ML / stream integration application. We also overwrite the __str__
representation of the class to provide our internal
naming standard for features. str(temperature_metadata)
returns:
'temperature_met_PT1H'
We use this naming standard in column names for the feature data in internal applications and naming each data cleaning application, as each base feature has its own cleaning application.
Validating data requirements across applications
Our BaseFeature
class explicitly states our data requirements
through validators: that time steps must be evenly spaced, that we don’t
accept any NaN
s in the timeseries and other basic requirements. This model is used
- in our ML API, to validate input data before running predictions. This use case was our first introduction to pydantic data models, and is described in FastAPI’s excellent documentation.
- in our data cleaning application, to validate that new potential training data, persisted from the streaming platform, meets the requirements we have in training.
- Since we use the
BaseFeature
model when reading data from the database to train our models, the validation can be done before training as well, or we can use theconstruct
() method for already validated sources, which creates models without validation.
This use of pydantic data models is analogous to the input validation example shared in our previous post. Using the data model in both the API serving the models, the models themselves and the data cleaning applications ensures that we collect all our data requirements in one place. This way we avoid multiple .dropna()
lines spread throughout our code,
which can lead to different treatment of data during training and prediction, and makes it difficult to refactor code. Using the data requirements in the data cleaning applications also ensures that there is validated data available for training at any time.
Below is a shortened example with a few of the validators of a parent model,
DataFrameTimeSeries
, which the data requirement models inherit from.
In the DataFrameTimeSeries
model, missing values are allowed. Our BaseFeature
model
inherits from DataFrameTimeSeries
and inherits all validators, but we override the
data field, because missing data is not allowed for our BaseFeature
input. When an instance of a pydantic BaseModel
object is created, the validator decorated functions will be run, as well as validation that the type hints are adhered to. An exception will be raised if any of the validations fails.
from typing import List, Optional, Dict from pydantic import BaseModel, validator class DataFrameTimeSeries(BaseModel): columns: List[str] timestamps: List[int] data: List[List[Optional[float]]] # Some time series are allowed to have missing values @validator("columns") def at_least_one_column(cls, columns: List[str]) -> List[str]: if len(columns) < 1: raise ValueError("No column names specified") return columns @validator("timestamps", "data") def len_at_least(cls, value: List) -> List: minlen = 1 if len(value) < minlen: raise ValueError(f"length is less than {minlen}") return value @validator("timestamps") def index_has_proper_spacing(cls, value: Dict) -> Dict: diffs = [x[0] - x[1] for x in zip(value[1:], value)] if len(diffs) < 1: return value if any(diff <= 0 for diff in diffs): raise ValueError("Index is not monotonically increasing") if any(diff != diffs[0] for diff in diffs): raise ValueError("Index has unequal time steps") return value class BaseFeature(DataFrameTimeSeries): data: List[List[float]] # BaseFeatures inherit validators from DataFrameTimeSeries, # but are not allowed to have missing values, so the data # field does not have Optional floats
In our API, the validation is done immediately when a new request is made, as the pydantic validation is enforced through type hints in the API routes.
Our data cleaning applications runs validation once per day, validating all
BaseFeature
s in the training database. There is one application running per BaseFeature
. New data from the streaming platform is stored in the database continuously. The cleaning application reads the new data since last cleaning and, if there are small holes, attempts to fill them according to domain specific rules. The number of timesteps that can be filled automatically as well as the interpolation method is set per base feature. The joint dataset of original and processed data is then converted to BaseFeature
s through a utility function. As this creates a BaseFeature
instance, validation is performed immediately. A view containing only validated data is updated to include the newly added data if it meets the requirements. This view is the source for training data.
We could create additional validation functions, for example creating subclasses
of BaseFeature
with domain specific validation, such as checking whether temperature values
are within normal ranges.
Going further
As of now, our use of data models ensure that the data meet minimum requirements for our specific use case. What this doesn’t provide is a broader evaluation of data quality: are values drifting, how often is data missing from this source, etc. For this kind of overview, we are working on a pipeline for monitoring input data closer to the source and provide overviews for other data consumers. Our initial investigations of combining pydantic models with great expectations, a data testing, documentation and profiling framework, are promising.
Thank you. I would be interested to hear how your integration of Pydantic and great_expectations is going as that is something we are looking into as well.
Looking forward to the integration too. great_expectations seems quite promising.