Is bid filtering effective against network congestion?

Earlier this year, I wrote an introduction to the bid filtering problem, and explained how my team at Statnett are trying to solve it. The system we’ve built at Statnett combines data from various sources in its attempt to make the right call. But how well is it doing its job? Or, more precisely, what is the effect on network congestion of applying our bid filtering system in its current form?

Kyoto. Photo: Belle Co

Without calling it a definitive answer, a paper I wrote for the CIGRE Symposium contains research results that provide new insight. The symposium was in Kyoto, but a diverse list of reasons (including a strict midwife) forced me to leave the cherry blossom to my imagination and test my charming Japanese phrases from a meeting room in Trondheim.

A quick recap

European countries are moving toward a new, more integrated way of balancing their power systems. In a country with highly distributed electricity generation, we want to automatically identify power reserves that should not be used in a given situation due to their location in the grid. If you would like to learn the details about the approach, you are likely to enjoy reading the paper. Here is the micro-version:

To identify bids in problematic locations, we need a detailed network model, we try to predict the future situation in the power grid, and then we apply a nodal market model which gives us the optimal plan for balancing activations for a specific situation. But since we don’t really know how much is going to flow into or out of the country, we optimize many times with different assumptions on cross-border flows. Each of the exchange scenarios tells its own story about which bids should -and shouldn’t- be activated. The scenarios don’t always agree, but in the aggregate they let us form a consensus result, determining which bids will be made unavailable for selection in the balancing market.

An unfair competition

Today, human operators at Statnett select power reserves for activation when necessary to balance the system, always mindful of their locations in the grid and potential bottlenecks. Their decisions on which balancing bids to activate – and not activate – often build years of operational experience and an abundance of real-time data.

Before discussing whether our machine can beat the human operators, it’s important to keep in mind that the bid filtering system will take part in a different context: the new balancing market, where everyday balancing will take place without the involvement of human operators. This will change the rules of the balancing game completely. While human operators constantly make a flow of integrated last-minute decisions, the new automatic processes are distinct in their separation of concerns and must often act much earlier to respect strict timelines.

Setting up simulations

The quantitative results in our paper come from simulating one day in the Norwegian power grid, using our detailed, custom-built Python model together with recorded data. The balancing actions -and the way they are selected- are different between the simulations.

The first simulation is Historical operation. Here, we simply replay the historical balancing decisions of the human operators.

The second simulation is Bid filtering. Here, we replace the historical human decisions with balancing actions selected by a zonal market mechanism that doesn’t see the internal network constraints or respect the laws of physics. The balancing decisions will often be different from the human ones in order to save some money. But before the market selects any bids, some of them are removed from the list by our bid filtering machine in order to prevent network congestion. We try not to cheat, the bid filtering takes place using data and forecasts available 30 minutes before the balancing actions take effect.

The third simulation is No filtering. Here we try to establish the impact on congestion of moving from today’s manual, but flexible operation to zonal, market-based balancing. This simulation is a parallel run of the market-based selection, but without pre-filtering any bids, and it provides a second, possibly more relevant benchmark.

Example from 09:30 on August 25, 2021. Red cells are balancing bids made unavailable in the bid filtering simulation. As a result, the market-based balancing will not select exactly the same bids in the Bid filtering scenario (black dots) and the No filtering scenario (white dots).

Power flow analyses

The interesting part of the simulation is when we inject the balancing decisions into the historical system state and calculate all power flows in the network. Comparing these flows to the operational limits reveals which balancing approaches are doing a better job at avoiding overloads in the network.

Example from 09:30 on August 25, 2021 showing reliability limits. Reliability limits in Norway restrict the flow on a combination of transmission lines, so-called Power Transfer Corridors (PTCs). These 13 PTC constraints are violated in one or more of the simulations.

The overloads are similar between the simulation, but they are not the same. To better understand the big picture, we created a congestion index that summarizes the resulting overload situation in a single value. The number doesn’t have any physical interpretation, but gives a relative indication of how severe the overload situation is.

Congestion index for reliability limits in the Norwegian system from August 25, 2021

When we run the simulation for 24 historical hours, we see that with market-based balancing, there would be overloads throughout the day. When we apply bid filtering and remove the bids expected to be problematic, overloads are reduced in 9 of the 24 hours, and we’re able to avoid the most serious problems in the afternoon.

No matter the balancing mechanism, the congestion index virtually never touches zero. Even the human operators with all their extra information and experience run into many of the same congestion problems. This shows that balancing activations play a role in the amount of congestion, but they are just one part of the story, along with several other factors.

With that in mind, if you’re going to let a zonal market mechanism decide your balancing decisions, it seems that bid filtering can have a clear, positive effect in reducing network overloads.

What do you think? Do you read the results differently? Don’t be afraid to get in touch, my team and I are always happy to discuss.

ありがとうございました

Being a trainee on the forecasting team, including some secret tips

I’ve been a trainee at Statnett’s Data Science unit over the past months and learned a lot. In this post I will give you a quick look at what I have been working on, and some helpful advice for how to survive and thrive as a data science trainee.

I was put on a team together with colleagues from both Statnett and Svenska kraftnät, producing short-term time series forecasts. These forecasts automate several system operation tasks which are handled manually today, and are essential in the transition to a new Nordic balancing model. Specifically, my team is responsible for predicting power consumption and power imbalance (i.e. the mismatch between production and consumption). Every five minutes, we need to provide predictions for the next two hours, with high demands on robustness and data quality. This is not a one-man job, but here are some of the areas where I put my effort.

Building apps and developing models

Our goal is to forecast the development of the system imbalance and power consumption in the near future. Statnett provides time series data describing power consumption, power production, power imbalance, weather forecasts and other relevant information. Utilizing these data, we can build mathematical models with machine learning algorithms that in the end are used to generate the predictions.

A comparison of our forecasted imbalance (orange) and the recorded, actual imbalance (blue).

Currently, a linear regression with a ridge penalty has proven to be the superior choice at Statnett, but we always aim to discover new models to keep improving our forecasts.

As a fun way of developing new models, the team recently set up a classic clash. The Norwegians on the team competed against the Swedes in improving the current model. For a week, communication across the border dropped to a bare minimum as we worked with high intensity to win the competition. Most importantly, we learned a lot on how to improve performance. And almost as important, the Norwegians won…

Live monitoring of apps and data streams

When a prediction model is fit for purpose, we deploy it in our production environment. At Statnett, we orchestrate our application containers with OpenShift, a PaaS equivalent to Kubernetes, but with stronger security policies. We queue our data with Apache Kafka, not only to maintain data flow between components, but also to deliver our end-product. Other than that, we also rely on PostgreSQL, Kotlin and GitLab in our everyday development.

To make the transition into more automatic operation of the power grid as smooth as possible, we need to make sure that our services are running robustly. In an effort to keep the required uptime, we monitor our services, including the full machine learning pipeline, with Grafana dashboards.

This dashboard provides a convenient overview of the system end-to-end. The dashboard shows the flow between different components, and also the status of streams and apps, indicated by colors. This makes it easy to identify and deal with issues when they occur.

Happy team, happy life

I’ve discovered how to cope with the daily struggles of working as a programmer, especially when working from home. The most important is that you end up laughing (or at least smiling) a couple of times each day while working. If you find that hard to achieve, feel free to apply these well-proven steps:

Sync each other’s Spotify playlists

and regularly dance in your office. I think this is how silent disco was invented.

Speak Swedish to your Swedish colleagues

Say things like tjenare and grabben to Swedish people in meetings. They will laugh, I don’t really know why.

Put sunglasses on while programming

Ask about other people’s day

take 5 minutes just to talk, while performing some paint art

and last, but not least,

Do not push to the main production branch as a trainee

Please be careful. If you do this, and something goes wrong, some of the more senior employees may not fix it right away, and might even enjoy seeing you stress to fix the issue. It has not happened to me, but if it did, it would be difficult to recover from.

Automatic data quality validations with Great Expectations: An Introduction to DQVT

Hi, I’m Patrick, a Senior Data Engineer at Statnett. I’m happy to present some of our work that has proven useful recently: automatic validation of data quality.

We have created the Data Quality Validation Tool (DQVT), which helps us define the content of our datasets by testing it against a set of expectations on a regular basis. It is built on top of some cool open-source packages: Great Expectations, streamlit, FastAPI and D-Tale.

In this post, I will explain what DQVT actually does, and why we built it the way we did. But first, let me just mention why Statnett takes data quality so seriously.

Monitor your data assets

History has showed us that cascading blackouts of the power grid can result from a single failure, often caused by extreme weather conditions or a defective component. Statnett and other transmission system operators (TSOs) learn continuously from these failures, adapt to them and prepare against them in case these physical assets fail again. This is probably also true in your job as well. Everyone experiences failures, but not everyone is prepared.

Data quality is important in the same way. Not very long ago, data could be mere logs, archived in case you might need to dig into it once in a while. Today, complex automated flows of information are crucial in our decision processes. Just like defective physical assets, we need to understand that, at some point, unexpected data may break data pipelines, possibly with huge consequences. Statnett operates critical infrastructure for an entire country, and in this context, high-quality data isn’t just gold, it is necessary.

Always know what to expect from your data

The motto of Great Expectations hints at a basic, but beautiful principle. You prepare against data surprises by testing and documenting your data. And when data surprises do arise, you want to get notified quickly, and trigger a plan B, such as switching to an alternative data pipeline.

By analyzing your data, you can often figure out what kind of values (formats, ranges, rules etc.) you are supposed to get in the usual conditions, and how this might change over time. This data knowledge allows you to test periodically that you always get what you expected.

So, a great principle, and a great package. How did we make this work at Statnett?

Understanding what DQVT is

Like many organisations, Statnett uses lots of different data sources, some well known (Oracle/PostgreSQL databases, Kafka Streams, …) and others more domain-specific (IBM Big SQL instance, e-terra data platform, …). Needless to say, a concequence of this diversity is the abundance of data quality horror stories.

In order to understand our issues with data and improve the quality of our datasets, we wanted a dedicated tool able to

  1. profile and document the content of datasets stored in different data sources
  2. check the data periodically
  3. identify mismatch between the data and what we expect from it and
  4. help us include data quality checks in our data pipelines

So we built the Data Quality Validation Tool (DQVT).

It is not a data catalog. Rather, it aims at documenting what the content of a dataset is expected to look like. DQVT helps us define tests on the data, called expectations, which are turned into documentation (thanks to Great Expectations). DQVT validates these expectations on a regular basis and reports any mismatch between the data and its documentation. Finally, DQVT computes scores on data quality metrics defined through our internal data standard.

By filling these roles, DQVT takes us towards better data quality, and consequently also more reliable and more performant software systems.

The story of DQVT

Faced with several high-profile digitalization projects, Statnett recently ramped up its data quality initiatives. At the time, Python Bytes presented Great Expectations on episode #115 (we highly recommend this podcast if you are a Pythonista🐍).

We tested Great Expectations and became fans pretty quickly, for several reasons:

  • the simplicity of use: a command line interface providing guidance, supporting various types of SQL databases, Pandas and Spark.
  • a beautiful concept in line with development best practices (documentation-as-code). In the words of Great Expectations, tests are docs and docs are tests.
  • an extremely detailed user documentation
  • and an active and inclusive open source community (Slack, Discuss)

We were interested to see if this tool could help us monitor data quality on our own infrastructure at Statnett, which includes two particularly important platforms. We use the GitLab devops platform to host our code and provide continuous integration and deployment pipelines, and we use OpenShift as our on-premises Platform-as-a-Service to run and orchestrate our Docker containers, managed by Kubernetes.

The time came to build a proof-of-concept, and we started lean: a limited amount of time and resources to reduce technology risks. The main goals and scope were revolved aroupnd a handful of features and requirements:

The goal of our first demo was to document the content of our datasets, not what the columns and fields of a table are (that is the job of a data catalog), but what was expected from the values in these fields. We were also keen on having this documentation human-readable and to be kept automatically up-to-date. Finally, we wanted to get notified when data expectations were not met, inticating either problems in the data, or that our expectations needed adjustments.

At the time, we weren’t sure how we would deploy validations on a schedule, or whether Great Expectations would be able to fetch data from our Big Data Lake (an IBM Big SQL), which is a high performance massively parallel processing (MPP) SQL engine for Hadoop. Failing in any of these integrations would have ended the experiment.

Despite having to do a small hack to connect to our Big Data Lake, we were able to have our data quality validations run periodically on OpenShift in less than a month! 🎉

What’s next?

At the end of the Python Bytes episode, host Brian Okken wonders how data engineers might include the Great Expectations tool in their data pipelines. I will be back soon to show you how to do just that! I’m creating a tutorial that details the individual steps and technologies we use in DQVT, but the structure of DQVT is quite simple, so you would likely be able to reproduce it on your own infrastructure.

And if you have some experience of your own or are just curious to learn more, you’re more than welcome to leave a comment!

Using data to handle intra-zonal constraints in the upcoming balancing market

Together with almost half of the Data science group at Statnett, I spend my time building automatic systems for congestion management. This job is fascinating and challenging at the same time, and I would love to share some of what our cross-functional team has done so far. But before diving in, let me first provide some context.

A good day at Statnett’s control centre. Photo: Trond Isaksen

The balancing act

Like other European transmission system operators (TSOs), Statnett is keeping the frequency stable at 50 Hz by making sure generation always matches consumption. The show is run by the human operators in Statnett’s control centre. They monitor the system continuously and instruct flexible power producers or consumers to increase or decrease their power levels when necessary.

These balancing adjustsments are handled through a balancing energy market. Flexible producers and consumers offer their reserve capacity as balancing energy bids (in price-volume pairs). The operators select and and activate as many of them as needed to balance the system. To minimize balancing costs, they try to follow the price order, utilizing the least expensive resources first.

While busy balancing the system, control centre operators also need to keep an eye on the network flows. If too much power is injected in one location, the network will be congested, meaning there are overloads that could compromise reliable operation of the grid. When an operator realizes a specific bid will cause congestion, she will mark it as unavailable and move on to use more expensive bids to balance the system.

In the Norwegian system, congestion does not only occur in a few well-known locations. Due to highly distributed generation and a relatively weak grid, there are hundreds of network constraints that could cause problems, and the Norwegian operators often need to be both careful and creative when selecting bids for activation.

A filtering problem

The Nordic TSOs are transitioning to a new balancing model in the upcoming year. A massive change is that balancing bids will no longer be selected by humans, but by an auction-like algorithm, just as in many other electricity markets. This algorithm (unfortunately, but understandably) uses a highly aggregated zonal structure, meaning that it will consider capacity restrictions between the 12 Nordic bidding zones, but not within them.

Consequently, the market algorithm will disregard all of the more obscure (but still important) intra-zonal constraints . This will -of course- lead to market results that simply do not fit inside the transmission grid, and there is neither time nor opportunity after the market clearing to modify the outcome in any substantial way.

My colleagues and I took the task of creating a bid filtering system. This means predicting which bids would cause congestion, and mark them as unavailable to prevent them from being selected by the market algorithm.

Filtering the correct bids is challenging. Network congestions depend on the situation in the grid, which is anything but constant due to variations in generation, consumption, grid topology and exchange with other countries. The unavailability decision must be made something like 15-25 minutes before the bid is to be used, and there is plenty of room for surprises during that period.

How it works

Although I enjoy discussing the details, I will give a only a short summary of the system works. To decide the availability of each bid in the balancing market, we have created a Python universe with custom-built libraries and microservices that follows these steps

  1. Assemble a detailed model of the Norwegian power system in its current state. Here, we combine the grid model from Statnett’s equpment database and combine it with fresh data from Statnett’s SCADA system.
  2. Adjust the model to reflect the expected state.
    Since we are looking up to 30 minutes ahead, we offset the effect of current balancing actions, and apply generation schedules and forecasts to update all injections in the model.
  3. Prepare to be surprised.
    To make more robust decisions in the face of high uncertainty in exchange volumes, we even apply a hundred or more scenarios representing different exchange patterns on the border.
  4. Find the best balancing actions for each scenario of the future.
    Interpreting the results of an optimal power flow calculation provides lots of insight into which bids should be activated (and which should not) in each exchange scenario.
  5. Agree on a decision.
    In the final step, the solution from each scenario is used to form a consensus decision on which bids to make unavailable for the balancing market algorithm.

An example result and how to read it

My friend and mentor Gerard Doorman recently submitted a paper for the 2022 CIGRE session in Paris, explaining the bid filtering system in more detail. I will share one important figure here to illustrate the final step of the bid filtering method. The figure shows the simulated result of running the bid filtering system at 8 AM on August 23, 2021.

Simulated bid filtering results from August 23, 2021. Bids are sorted horizontally, according to price, and grouped by their bidding zone (NO1 through 5) and direction (up/down).

Before you cringe from information overload, let me assist you by explaining that the abundance of green cells in the horizontal bar on top shows that the vast majority of balancing bids were decided to be available.

There are also yellow cells, showing bids that likely need to be activated to keep the system operating within its security limits, no matter what happens.

The red cells are bids that have been made unavailable to prevent network congestion. To understand why, we need to look at the underlying results in the lower panel. Here, each row presents the outcome of one scenario, and purple cells show the bids that were rejected, i.e. not activated in the optimal solution, although being less expensive than other ones that were activated for balancing in the same scenario (in pink).

The different scenarios often do not tell the same story, a bid that is rejected in one scenario can be perfectly fine in the next, it all depends on the situation in the grid and which other bids are also activated. Because of this ambiguity, business rules are necessary to create a reasonable aggregate result, and the final outcome will generally be imperfect.

So, does this filtering system have any postive impact on network congestions at Statnett? I will leave the answer for later, but if you’re curious to learn more, don’t hesitate to leave a comment.

How we share data requirements between ML applications

We use pydantic models to share data requirements and metadata between ML applications. Here’s how.

In our previous post, we wrote about how we use the Python package pydantic to validate input data. We serve our machine learning models with APIs written using the FastAPI library, which uses pydantic to validate and parse the input data received through the API. FastAPI was our introduction to pydantic. We started out with some pydantic models for validating input data to the API. We saw that sharing the models across several applications would lead to less boilerplate code, as well as more explicit handling of our data requirements.

The data models we share across our ML applications serve two purposes:

  • Sharing metadata, i.e. tidy models declaring what data a specific machine learning model requires, including the logic for naming standards and formats for exporting data. This is also used as the foundation for a mapping of how to select data from the training database.
  • Validating that data requirements are fulfilled. When training our models, we have certain requirements to the training data, for example that there is no missing data. The data models allow us to validate these requirements across other applications as well, such as in the API.

In this post, we will show how we share metadata and data requirements using pydantic. For an introduction to pydantic syntax and concepts, take a look at our previous post, or the excellent pydantic documentation.

A quick overview of applications

Before we go into detail, we’ll start with a quick overview of how data flows and what applications are at play.

Outline of architecture with the relevant components

Our forecast models are served by APIs, the ML API application in the figure. The models specify the data they require to make a prediction at a get endpoint. The ML / stream integration application requests the feature metadata specification from the ML API, fetches the data from the streaming data and posts the observations to the ML API’s prediction endpoint. The response is the prediction, which the ML / stream integration application published to the streaming data platform for users to consume.

The data from the streaming platform is persisted to a database. To ensure we have valid training data, data cleaning applications read the raw data, validate that the data meets the requirements. These applications also try to fill small holes of missing data according to domain specific rules, and warn if there are larges holes that need to be manually processed. Data validation is run on a regular schedule, as data is continuously persisted from the streaming platform to the database.

When training the models, validated data from the data cleaning applications is used.

When reading data, the applications use the metadata models to select the correct data. The ML API specifies what data should be fetched from the streaming platform using the metadata model. The data cleaning applications uses the metadata model to select the correct data from the database. When training models, the data is fetched from the database according to the metadata model.

When operating on data, the applications use the data requirements models to ensure the validity of the input data. Before making a prediction, data is validated in the ML API. The prognosis response is also validated before it is returned to the ML / stream integration application. The data cleaning applications also validate data according to the data requirements models before writing to the database for training.

The metadata models are used for reading the correct data, shown with a yellow star. The data requirements model is used for validating that data meets requirements, shown with a blue triangle.

Sharing metadata with pydantic

Below is an example of our BaseFeatureMetaData class, which we use for metadata on the base features in our models, i.e., the features as they come from our sources. This metadata model specifies all necessary details on the features a model requires:

  • The name of the feature.
  • The source we require for the feature, as several features are found in multiple sources. An example is weather forecasts, which are available from different vendors, but it could also be different estimates for measurements or unobservable values.
  • Which locations we require data from.
  • history_period and future_period together specifies the time series the model requires to provide its prediction.
  • The time resolution required for this feature, as many features are available in different time aggregations.
from datetime import timedelta
from typing import List

from pydantic import BaseModel

from our_internal_utils import timedelta_isoformat_


class BaseFeatureMetaData(BaseModel):
    name: str
    source: str
    locations: List[str]
    history_period: timedelta
    future_period: timedelta
    resolution: timedelta

    class Config:
        json_encoders = {timedelta: timedelta_isoformat_}

    def __str__(self):
        res_str = timedelta_isoformat_(self.resolution)
        return f"{self.name}_{self.source}_{res_str}"

We can now create a metadata specification for temperature data, from the meteorological institute, or met as we call them, from two weather stations, SN18700 (Blindern) and SN27500 (Lillehammer), for 48 hours of weather observations and 72 hours of forecast values, using the hourly resolution data:

temperature_metadata = BaseFeatureMetaData(
    name="temperature",
    source="met",
    locations=["SN18700", "SN27500"],
    history_period=timedelta(hours=48),
    future_period=timedelta(hours=72),
    resolution=timedelta(hours=1),
)

This metadata specification is used

  • By the ML / stream integration application that fetches data to models for live prediction. We provide the metadata in the API which serves our ML models, specifying all details on the features a model requires. The application uses the metadata to collect the required time series from our streaming platform.
  • When we train our models, to fetch the correct training data from the database for each model. The metadata specification of each model, which is exposed in the API, is also what we use to map a base feature the which data to read from the database.
  • When validating training data in the data cleaning applications. Each base feature has its own data cleaning application configured with the metadata model, used to map a base feature to which data to read from the database, as for training.

In order to provide the metadata model in json format from the API, we use pydantic’s Config class, which we can use to customize behaviour of the model. The json_encoder in our Config specifies that all timedelta fields should be converted using our utility function timedelta_isoformat_, to provide valid json. Pydantic provides a .json() method for exporting models to json. temperature_metadata.json() returns:

'{   
   "name":"temperature",
   "source":"met",
   "locations":[
      "SN18700",
      "SN27500"
],
   "history_period":"PT1H",
   "future_period":"PT1H",
   "resolution":"PT1H"
}'

following the timedelta representation standard we have agreed on in the interface between the ML API and the ML / stream integration application. We also overwrite the __str__ representation of the class to provide our internal naming standard for features. str(temperature_metadata) returns:

 'temperature_met_PT1H'

We use this naming standard in column names for the feature data in internal applications and naming each data cleaning application, as each base feature has its own cleaning application.

Validating data requirements across applications

Our BaseFeature class explicitly states our data requirements through validators: that time steps must be evenly spaced, that we don’t accept any NaNs in the timeseries and other basic requirements. This model is used

  • in our ML API, to validate input data before running predictions. This use case was our first introduction to pydantic data models, and is described in FastAPI’s excellent documentation.
  • in our data cleaning application, to validate that new potential training data, persisted from the streaming platform, meets the requirements we have in training.
  • Since we use the BaseFeature model when reading data from the database to train our models, the validation can be done before training as well, or we can use the construct() method for already validated sources, which creates models without validation.

This use of pydantic data models is analogous to the input validation example shared in our previous post. Using the data model in both the API serving the models, the models themselves and the data cleaning applications ensures that we collect all our data requirements in one place. This way we avoid multiple .dropna() lines spread throughout our code, which can lead to different treatment of data during training and prediction, and makes it difficult to refactor code. Using the data requirements in the data cleaning applications also ensures that there is validated data available for training at any time.

Below is a shortened example with a few of the validators of a parent model, DataFrameTimeSeries, which the data requirement models inherit from.

In the DataFrameTimeSeries model, missing values are allowed. Our BaseFeature model inherits from DataFrameTimeSeries and inherits all validators, but we override the data field, because missing data is not allowed for our BaseFeature input. When an instance of a pydantic BaseModel object is created, the validator decorated functions will be run, as well as validation that the type hints are adhered to. An exception will be raised if any of the validations fails.

from typing import List, Optional, Dict

from pydantic import BaseModel, validator


class DataFrameTimeSeries(BaseModel):
    columns: List[str]
    timestamps: List[int]
    data: List[List[Optional[float]]]
    # Some time series are allowed to have missing values

    @validator("columns")
    def at_least_one_column(cls, columns: List[str]) -> List[str]:
        if len(columns) < 1:
            raise ValueError("No column names specified")
        return columns

    @validator("timestamps", "data")
    def len_at_least(cls, value: List) -> List:
        minlen = 1
        if len(value) < minlen:
            raise ValueError(f"length is less than {minlen}")
        return value

    @validator("timestamps")
    def index_has_proper_spacing(cls, value: Dict) -> Dict:
        diffs = [x[0] - x[1] for x in zip(value[1:], value)]
        if len(diffs) < 1:
            return value
        if any(diff <= 0 for diff in diffs):
            raise ValueError("Index is not monotonically increasing")
        if any(diff != diffs[0] for diff in diffs):
            raise ValueError("Index has unequal time steps")
        return value


class BaseFeature(DataFrameTimeSeries):
    data: List[List[float]]
    # BaseFeatures inherit validators from DataFrameTimeSeries, 
    # but are not allowed to have missing values, so the data 
    # field does not have Optional floats

In our API, the validation is done immediately when a new request is made, as the pydantic validation is enforced through type hints in the API routes.

Our data cleaning applications runs validation once per day, validating all BaseFeatures in the training database. There is one application running per BaseFeature. New data from the streaming platform is stored in the database continuously. The cleaning application reads the new data since last cleaning and, if there are small holes, attempts to fill them according to domain specific rules. The number of timesteps that can be filled automatically as well as the interpolation method is set per base feature. The joint dataset of original and processed data is then converted to BaseFeatures through a utility function. As this creates a BaseFeature instance, validation is performed immediately. A view containing only validated data is updated to include the newly added data if it meets the requirements. This view is the source for training data.

We could create additional validation functions, for example creating subclasses of BaseFeature with domain specific validation, such as checking whether temperature values are within normal ranges.

Going further

As of now, our use of data models ensure that the data meet minimum requirements for our specific use case. What this doesn’t provide is a broader evaluation of data quality: are values drifting, how often is data missing from this source, etc. For this kind of overview, we are working on a pipeline for monitoring input data closer to the source and provide overviews for other data consumers. Our initial investigations of combining pydantic models with great expectations, a data testing, documentation and profiling framework, are promising.

How we validate input data using pydantic

We use the Python package pydantic for fast and easy validation of input data. Here’s how.

We discovered the Python package pydantic through FastAPI, which we use for serving machine learning models. Pydantic is a Python package for data parsing and validation, based on type hints. We use pydantic because it is fast, does a lot of the dirty work for us, provides clear error messages and makes it easy to write readable code.

Two of our main uses cases for pydantic are:

  1. Validation of settings and input data.
    We often read settings from a configuration file, which we use as inputs to our functions. We often end up doing quite a bit of input validation to ensure the settings are valid for further processing. To avoid starting our functions with a long set of validations and assertions, we use pydantic to validate the input.
  2. Sharing data requirements between machine learning applications.
    Our ML models have certain requirements to the data we use for training and prediction, for example that no data is missing. These requirements are used when we train our models, when we run online predictions and when we validate newly added training data. We use pydantic to specify the requirements we have and ensure that the same requirements are used everywhere, avoiding duplication of error-prone code across different applications.

This post will focus on the first use case, validation of settings and input data. A later post will cover the second use case.

Validation of settings and input data

In some cases, we read settings from a configuration file, such as a toml file, to be parsed as nested dictionaries. We use the settings as inputs to different functions. We often end up doing quite a bit of input validation to ensure the settings parsed from file are valid for further processing. A concrete example is settings for machine learning models, where we use toml files for defining model parameters, features and training details for the models.

This is quite similar to how FastAPI uses pydantic for input validation: the input to the API call is json, which in Python translates to a dictionary, and input validation is done using pydantic.

In this post we will go through input validation for a function interpolating a time series to a higher frequency. If we want to do interpolation, we set the interpolation factor, i.e., the factor of upsamling, the interpolation method, and an option to interpolate on the integral. Our interpolation function is just a wrapper around pandas interpolation methods, including the validation of input and some data wrangling. The input validation code started out looking a bit like this:

from typing import Dict


def validate_input_settings(params_in: Dict) -> Dict:
    params_validated = {}
    for key, value in params_in.items():
        if key == "interpolation_factor":
            if not int(value) == value:
                raise ValueError(f"{key} has a non-int value")
            if not int(value) >= 2:
                raise ValueError(f"{key}: {value} should be >= 2")
            value = int(value)
        elif key == "interpolation_method":
            allowed_set = {
                "repeat", "distribute", "linear", "cubic", "akima"
            }
            if value not in allowed_set:
                raise ValueError(f"{key} should be one of {allowed_set}, got {value}")
        elif key == "interpolate_on_integral":
            if not isinstance(value, bool):
                raise ValueError(f"{key} should be bool, got {value}")
        else:
            raise ValueError(f"{key} not a recognized key")
        params_validated[key] = value
    return params_validated

This is heavily nested, which in itself makes it hard to read, and perhaps you find that the validation rules aren’t crystal clear at first glance. We use SonarQube for static code quality analysis, and this piece of code results in a code smell, complaining that the code is too complex. In fact, this already has a cognitive complexity of 18 as SonarQube counts, above the default threshold of 15. Cognitive complexity is a measure of how difficult it is to read code, and increments for each break in linear flow, such as an if statement or a for loop. Nested breaks of the flow are incremented again.

Let’s summarize what we check for in validate_input_settings:

  • interpolation_factor is an integer
  • interpolation_factor is greater than or equal to 2
  • interpolation_method is in a set of allowed values
  • interpolate_on_integral is boolean
  • The keys in our settings dictionary are among the three mentioned above

In addition to the code above, we have a few more checks:

  • if an interpolation_factor is given, but no interpolation_method, use the default method linear
  • if an interpolation_factor is given, but not interpolate_on_integral, set the default option False
  • check for invalid the invalid combination interpolate_on_integral = False and interpolation_method = "distribute"

At the end of another three if statements inside the for loop, we end up at a cognitive complexity of 24.

Pydantic to the rescue

We might consider using a pydantic model for the input validation.

Minimal start

We can start out with the simplest form of a pydantic model, with field types:

from pydantic import BaseModel


class InterpolationSetting(BaseModel):
    interpolation_factor: int
    interpolation_method: str
    interpolate_on_integral: bool

Pydantic models are simply classes inheriting from the BaseModel class. We can create an instance of the new class as:

InterpolationSetting(
    interpolation_factor=2, 
    interpolation_method="linear", 
    interpolate_on_integral=True
)

This automatically does two of the checks we had implemented:

  • interpolation_factor is an int
  • interpolate_on_integral is a bool

In the original script, the fields are in fact optional, i.e., it is possible to provide no interpolation settings, in which case we do not do interpolation. We will set the fields to optional later, and then implement the additional necessary checks.

We can verify the checks we have enforced now by supplying non-valid input:

from pydantic import ValidationError


try:
    InterpolationSetting(
        interpolation_factor="text",
        interpolation_method="linear",
        interpolate_on_integral=True,
    )
except ValidationError as e:
    print(e)

which outputs:

    1 validation error for InterpolationSetting
    interpolation_factor
      value is not a valid integer (type=type_error.integer)

Pydantic raises a ValidationError when the validation of the model fails, stating which field, i.e. attribute, raised the error and why. In this case interpolation_factor raised a type error because the value "text" is not a valid integer. The validation is performed on instantiation of an InterpolationSetting object.

Validation of single fields and combinations of fields

Our original code also had some additional requirements:

  • interpolation_factor should be greater than or equal to two.
  • interpolation_method must be chosen from a set of valid methods.
  • We do not allow the combination of interpolate_on_integral=False and interpolation_method="distribute"

The first restriction can be implemented using pydantic types. Pydantic provides many different types, we will use a constrained types this requirement, namely conint, a constrained integer type providing automatic restrictions such as lower limits.

The remaining two restrictions can be implemented as validators. We decorate our validation functions with the validator decorator. The input argument to the validator decorator is the name of the attribute(s) to perform the validation for.

All validators are run automatically when we instantiate an object of the InterpolationSetting class, as for the type checking.

Our validation functions are class methods, and the first argument is the class, not an instance of the class. The second argument is the value to validate, and can be named as we wish. We implement two validators, method_is_valid and valid_combination_of_method_and_on_integral:

from typing import Dict

from pydantic import BaseModel, conint, validator, root_validator


class InterpolationSetting(BaseModel):
    interpolation_factor: conint(gt=1)
    interpolation_method: str
    interpolate_on_integral: bool

    @validator("interpolation_method")
    def method_is_valid(cls, method: str) -> str:
        allowed_set = {"repeat", "distribute", "linear", "cubic", "akima"}
        if method not in allowed_set:
            raise ValueError(f"must be in {allowed_set}, got '{method}'")
        return method

    @root_validator()
    def valid_combination_of_method_and_on_integral(cls, values: Dict) -> Dict:
        on_integral = values.get("interpolate_on_integral")
        method = values.get("interpolation_method")
        if on_integral is False and method == "distribute":
            raise ValueError(
                f"Invalid combination of interpolation_method "
                f"{method} and interpolate_on_integral {on_integral}"
            )
        return values

There are a few things to note here:

  • Validators should return a validated value. The validators are run sequentially, and populate the fields of the data model if they are valid.
  • Validators should only raise ValueError, TypeError or AssertionError. Pydantic will catch these errors to populate the ValidationError and raise one exception regardless of the number of errors found in validation. You can read more about error handling in the docs.
  • When we validate a field against another, we can use the root_validator, which runs validation on entire model. Root validators are a little different: they have access to the values argument, which is a dictionary containing all fields that have already been validated. When the root validator runs, the interpolation_method may have failed to validate, in which case it will not be added to the values dictionary. Here, we handle that by using values.get("interpolation_method") which returns None if the key is not in values. The docs contain more information on root validators and field ordering, which is important to consider when we are using the values dictionary.

Again, we can verify by choosing input parameters to trigger the errors:

from pydantic import ValidationError


try:
    InterpolationSetting(
        interpolation_factor=1,
        interpolation_method="distribute",
        interpolate_on_integral=False,
    )
except ValidationError as e:
    print(e)

which outputs:

    2 validation errors for InterpolationSetting
    interpolation_factor
      ensure this value is greater than 1 (type=value_error.number.not_gt; limit_value=1)
    __root__
      Invalid combination of interpolation_method distribute and interpolate_on_integral False (type=value_error)

As we see, pydantic raises a single ValidationError regardless of the number of ValueErrors raised in our model.

Implementing dynamic defaults

We also had some default values if certain parameters were not given:

  • If an interpolation_factor is given, set the default value linear for interpolation_method if none is given.
  • If an interpolation_factor is given, set the default value False for interpolate_on_integral if none is given.

In this case, we have dynamic defaults dependent on other fields.

This can also be achieved with root validators, by returning a conditional value. As this means validating one field against another, we must take care to ensure our code runs whether or not the two fields have passed validation and been added to the values dictionary. We will now also use Optional types, because we will handle the cases where not all values are provided. We add the new validators set_method_given_interpolation_factor and set_on_integral_given_interpolation_factor:

from typing import Dict, Optional

from pydantic import BaseModel, conint, validator, root_validator


class InterpolationSetting(BaseModel):
    interpolation_factor: Optional[conint(gt=2)]
    interpolation_method: Optional[str]
    interpolate_on_integral: Optional[bool]

    @validator("interpolation_method")
    def method_is_valid(cls, method: Optional[str]) -> Optional[str]:
        allowed_set = {"repeat", "distribute", "linear", "cubic", "akima"}
        if method is not None and method not in allowed_set:
            raise ValueError(f"must be in {allowed_set}, got '{method}'")
        return method

    @root_validator()
    def valid_combination_of_method_and_on_integral(cls, values: Dict) -> Dict:
        on_integral = values.get("interpolate_on_integral")
        method = values.get("interpolation_method")
        if on_integral is False and method == "distribute":
            raise ValueError(
                f"Invalid combination of interpolation_method "
                f"{method} and interpolate_on_integral {on_integral}"
            )
        return values

    @root_validator()
    def set_method_given_interpolation_factor(cls, values: Dict) -> Dict:
        factor = values.get("interpolation_factor")
        method = values.get("interpolation_method")
        if method is None and factor is not None:
            values["interpolation_method"] = "linear"
        return values

    @root_validator()
    def set_on_integral_given_interpolation_factor(cls, values: Dict) -> Dict:
        on_integral = values.get("interpolate_on_integral")
        factor = values.get("interpolation_factor")
        if on_integral is None and factor is not None:
            values["interpolate_on_integral"] = False
        return values

We can verify that the default values are set only when interpolation_factor
is provided, running InterpolationSetting(interpolation_factor=3) returns:

InterpolationSetting(interpolation_factor=3, interpolation_method='linear', interpolate_on_integral=None)

whereas supplying no input parameters, InterpolationSetting(), returns a data model with all parameters set to None:

InterpolationSetting(interpolation_factor=None, interpolation_method=None, interpolate_on_integral=None)

Note: If we have static defaults, we can simply set them for the fields:

class InterpolationSetting(BaseModel):
    interpolation_factor: Optional[int] = 42

Final safeguard against typos

Finally, we had one more check in out previous script: That no unknown keys were provided. If we provide unknown keys to our data model now, nothing really happens, for example InterpolationSetting(hello="world") outputs:

InterpolationSetting(interpolation_factor=None, interpolation_method=None, interpolate_on_integral=None)

Often, an unknown field name is the result of a typo in the toml file. Therefore we want to raise an error to alert the user. We do this using a the model config, controlling the behaviour of the model. The extra attribute of the config determines what we do with extra fields. The default is ignore, which we can see in the example above, where the field is ignored, and not added to the model, as the option allow does. We can use the forbid option to raise an exception when extra fields are supplied.

from typing import Dict, Optional

from pydantic import BaseModel, conint, validator, root_validator


class InterpolationSetting(BaseModel):
    interpolation_factor: Optional[conint(gt=2)]
    interpolation_method: Optional[str]
    interpolate_on_integral: Optional[bool]

    class Config:
        extra = "forbid"

    @validator("interpolation_method")
    def method_is_valid(cls, method: Optional[str]) -> Optional[str]:
        allowed_set = {"repeat", "distribute", "linear", "cubic", "akima"}
        if method is not None and method not in allowed_set:
            raise ValueError(f"must be in {allowed_set}, got '{method}'")
        return method

    @root_validator()
    def valid_combination_of_method_and_on_integral(cls, values: Dict) -> Dict:
        on_integral = values.get("interpolate_on_integral")
        method = values.get("interpolation_method")
        if on_integral is False and method == "distribute":
            raise ValueError(
                f"Invalid combination of interpolation_method "
                f"{method} and interpolate_on_integral {on_integral}"
            )
        return values

    @root_validator()
    def set_method_given_interpolation_factor(cls, values: Dict) -> Dict:
        factor = values.get("interpolation_factor")
        method = values.get("interpolation_method")
        if method is None and factor is not None:
            values["interpolation_method"] = "linear"
        return values

    @root_validator()
    def set_on_integral_given_interpolation_factor(cls, values: Dict) -> Dict:
        on_integral = values.get("interpolate_on_integral")
        factor = values.get("interpolation_factor")
        if on_integral is None and factor is not None:
            values["interpolation_factor"] = False
        return values

If we try again with an unknown key, we now get a ValidationError:

from pydantic import ValidationError


try:
    InterpolationSetting(hello=True)
except ValidationError as e:
    print(e)

This raises a validation error for the unknown field:

1 validation error for InterpolationSetting
hello
  extra fields not permitted (type=value_error.extra)

Adapting our existing code is easy

Now we have implemented all our checks, and can go on to adapt our existing code to use the new data model. In our original implementation, we would do something like

params_in = toml.load(path_to_settings_file)
params_validated = validate_input_settings(params_in)
interpolate_result(params_validated)

We can replace the call to validate_input_settings with instantiation of the pydantic model: params_validated = InterpolationSetting(params_in). Each pydantic data model has a .dict() method that returns the parameters as a dictionary, so we can use it in the input argument to interpolate_result directly: interpolate_result(params_validated.dict()). Another option is to refactor interpolate_result to use the attributes of the InterpolationSetting objects, such as params_validated.interpolation_method instead of the values of a dictionary.

Conclusion

In the end, we can replace one 43 line method (for the full functionality) and cognitive complexity of 24 with one 40 line class containing six methods, each with cognitive complexity less than 4. The pydantic data models will not necessarily be shorter than the custom validation code they replace, and since there are a few quirks and concepts to pay attention to, they are not necessarily easier to read at the first try.

However, as we use the library for validation in our APIs, we are getting familiar with it, and we can understand more easily.

Some of the benefits of using pydantic for this are:

  • Type checking (and in fact also some type conversion), which we previously did ourselves, is now done automatically for us, saving us the work of repeating lines of error-prone code in many different functions.
  • Each validator has a name which, if we put a little thought into it, makes it very clear what we are trying to achieve. In our previous example, the purpose of each nested condition had to be deduced from the many if clauses and error messages. This should be a lot clearer now, especially if we use pydantic across different projects.
  • If speed is important, pydantic’s benchmarks show that they are fast compared to similar libraries.

Hopefully this will help you determine whether or not you should consider using pydantic models in your projects. In a later post, we will show how we use pydantic data models to share metadata between machine learning applications, and to share data requirements between the applications.

%d bloggers like this: