How we share data requirements between ML applications

We use pydantic models to share data requirements and metadata between ML applications. Here’s how.

In our previous post, we wrote about how we use the Python package pydantic to validate input data. We serve our machine learning models with APIs written using the FastAPI library, which uses pydantic to validate and parse the input data received through the API. FastAPI was our introduction to pydantic. We started out with some pydantic models for validating input data to the API. We saw that sharing the models across several applications would lead to less boilerplate code, as well as more explicit handling of our data requirements.

The data models we share across our ML applications serve two purposes:

  • Sharing metadata, i.e. tidy models declaring what data a specific machine learning model requires, including the logic for naming standards and formats for exporting data. This is also used as the foundation for a mapping of how to select data from the training database.
  • Validating that data requirements are fulfilled. When training our models, we have certain requirements to the training data, for example that there is no missing data. The data models allow us to validate these requirements across other applications as well, such as in the API.

In this post, we will show how we share metadata and data requirements using pydantic. For an introduction to pydantic syntax and concepts, take a look at our previous post, or the excellent pydantic documentation.

A quick overview of applications

Before we go into detail, we’ll start with a quick overview of how data flows and what applications are at play.

Outline of architecture with the relevant components

Our forecast models are served by APIs, the ML API application in the figure. The models specify the data they require to make a prediction at a get endpoint. The ML / stream integration application requests the feature metadata specification from the ML API, fetches the data from the streaming data and posts the observations to the ML API’s prediction endpoint. The response is the prediction, which the ML / stream integration application published to the streaming data platform for users to consume.

The data from the streaming platform is persisted to a database. To ensure we have valid training data, data cleaning applications read the raw data, validate that the data meets the requirements. These applications also try to fill small holes of missing data according to domain specific rules, and warn if there are larges holes that need to be manually processed. Data validation is run on a regular schedule, as data is continuously persisted from the streaming platform to the database.

When training the models, validated data from the data cleaning applications is used.

When reading data, the applications use the metadata models to select the correct data. The ML API specifies what data should be fetched from the streaming platform using the metadata model. The data cleaning applications uses the metadata model to select the correct data from the database. When training models, the data is fetched from the database according to the metadata model.

When operating on data, the applications use the data requirements models to ensure the validity of the input data. Before making a prediction, data is validated in the ML API. The prognosis response is also validated before it is returned to the ML / stream integration application. The data cleaning applications also validate data according to the data requirements models before writing to the database for training.

The metadata models are used for reading the correct data, shown with a yellow star. The data requirements model is used for validating that data meets requirements, shown with a blue triangle.

Sharing metadata with pydantic

Below is an example of our BaseFeatureMetaData class, which we use for metadata on the base features in our models, i.e., the features as they come from our sources. This metadata model specifies all necessary details on the features a model requires:

  • The name of the feature.
  • The source we require for the feature, as several features are found in multiple sources. An example is weather forecasts, which are available from different vendors, but it could also be different estimates for measurements or unobservable values.
  • Which locations we require data from.
  • history_period and future_period together specifies the time series the model requires to provide its prediction.
  • The time resolution required for this feature, as many features are available in different time aggregations.
from datetime import timedelta
from typing import List

from pydantic import BaseModel

from our_internal_utils import timedelta_isoformat_


class BaseFeatureMetaData(BaseModel):
    name: str
    source: str
    locations: List[str]
    history_period: timedelta
    future_period: timedelta
    resolution: timedelta

    class Config:
        json_encoders = {timedelta: timedelta_isoformat_}

    def __str__(self):
        res_str = timedelta_isoformat_(self.resolution)
        return f"{self.name}_{self.source}_{res_str}"

We can now create a metadata specification for temperature data, from the meteorological institute, or met as we call them, from two weather stations, SN18700 (Blindern) and SN27500 (Lillehammer), for 48 hours of weather observations and 72 hours of forecast values, using the hourly resolution data:

temperature_metadata = BaseFeatureMetaData(
    name="temperature",
    source="met",
    locations=["SN18700", "SN27500"],
    history_period=timedelta(hours=48),
    future_period=timedelta(hours=72),
    resolution=timedelta(hours=1),
)

This metadata specification is used

  • By the ML / stream integration application that fetches data to models for live prediction. We provide the metadata in the API which serves our ML models, specifying all details on the features a model requires. The application uses the metadata to collect the required time series from our streaming platform.
  • When we train our models, to fetch the correct training data from the database for each model. The metadata specification of each model, which is exposed in the API, is also what we use to map a base feature the which data to read from the database.
  • When validating training data in the data cleaning applications. Each base feature has its own data cleaning application configured with the metadata model, used to map a base feature to which data to read from the database, as for training.

In order to provide the metadata model in json format from the API, we use pydantic’s Config class, which we can use to customize behaviour of the model. The json_encoder in our Config specifies that all timedelta fields should be converted using our utility function timedelta_isoformat_, to provide valid json. Pydantic provides a .json() method for exporting models to json. temperature_metadata.json() returns:

'{   
   "name":"temperature",
   "source":"met",
   "locations":[
      "SN18700",
      "SN27500"
],
   "history_period":"PT1H",
   "future_period":"PT1H",
   "resolution":"PT1H"
}'

following the timedelta representation standard we have agreed on in the interface between the ML API and the ML / stream integration application. We also overwrite the __str__ representation of the class to provide our internal naming standard for features. str(temperature_metadata) returns:

 'temperature_met_PT1H'

We use this naming standard in column names for the feature data in internal applications and naming each data cleaning application, as each base feature has its own cleaning application.

Validating data requirements across applications

Our BaseFeature class explicitly states our data requirements through validators: that time steps must be evenly spaced, that we don’t accept any NaNs in the timeseries and other basic requirements. This model is used

  • in our ML API, to validate input data before running predictions. This use case was our first introduction to pydantic data models, and is described in FastAPI’s excellent documentation.
  • in our data cleaning application, to validate that new potential training data, persisted from the streaming platform, meets the requirements we have in training.
  • Since we use the BaseFeature model when reading data from the database to train our models, the validation can be done before training as well, or we can use the construct() method for already validated sources, which creates models without validation.

This use of pydantic data models is analogous to the input validation example shared in our previous post. Using the data model in both the API serving the models, the models themselves and the data cleaning applications ensures that we collect all our data requirements in one place. This way we avoid multiple .dropna() lines spread throughout our code, which can lead to different treatment of data during training and prediction, and makes it difficult to refactor code. Using the data requirements in the data cleaning applications also ensures that there is validated data available for training at any time.

Below is a shortened example with a few of the validators of a parent model, DataFrameTimeSeries, which the data requirement models inherit from.

In the DataFrameTimeSeries model, missing values are allowed. Our BaseFeature model inherits from DataFrameTimeSeries and inherits all validators, but we override the data field, because missing data is not allowed for our BaseFeature input. When an instance of a pydantic BaseModel object is created, the validator decorated functions will be run, as well as validation that the type hints are adhered to. An exception will be raised if any of the validations fails.

from typing import List, Optional, Dict

from pydantic import BaseModel, validator


class DataFrameTimeSeries(BaseModel):
    columns: List[str]
    timestamps: List[int]
    data: List[List[Optional[float]]]
    # Some time series are allowed to have missing values

    @validator("columns")
    def at_least_one_column(cls, columns: List[str]) -> List[str]:
        if len(columns) < 1:
            raise ValueError("No column names specified")
        return columns

    @validator("timestamps", "data")
    def len_at_least(cls, value: List) -> List:
        minlen = 1
        if len(value) < minlen:
            raise ValueError(f"length is less than {minlen}")
        return value

    @validator("timestamps")
    def index_has_proper_spacing(cls, value: Dict) -> Dict:
        diffs = [x[0] - x[1] for x in zip(value[1:], value)]
        if len(diffs) < 1:
            return value
        if any(diff <= 0 for diff in diffs):
            raise ValueError("Index is not monotonically increasing")
        if any(diff != diffs[0] for diff in diffs):
            raise ValueError("Index has unequal time steps")
        return value


class BaseFeature(DataFrameTimeSeries):
    data: List[List[float]]
    # BaseFeatures inherit validators from DataFrameTimeSeries, 
    # but are not allowed to have missing values, so the data 
    # field does not have Optional floats

In our API, the validation is done immediately when a new request is made, as the pydantic validation is enforced through type hints in the API routes.

Our data cleaning applications runs validation once per day, validating all BaseFeatures in the training database. There is one application running per BaseFeature. New data from the streaming platform is stored in the database continuously. The cleaning application reads the new data since last cleaning and, if there are small holes, attempts to fill them according to domain specific rules. The number of timesteps that can be filled automatically as well as the interpolation method is set per base feature. The joint dataset of original and processed data is then converted to BaseFeatures through a utility function. As this creates a BaseFeature instance, validation is performed immediately. A view containing only validated data is updated to include the newly added data if it meets the requirements. This view is the source for training data.

We could create additional validation functions, for example creating subclasses of BaseFeature with domain specific validation, such as checking whether temperature values are within normal ranges.

Going further

As of now, our use of data models ensure that the data meet minimum requirements for our specific use case. What this doesn’t provide is a broader evaluation of data quality: are values drifting, how often is data missing from this source, etc. For this kind of overview, we are working on a pipeline for monitoring input data closer to the source and provide overviews for other data consumers. Our initial investigations of combining pydantic models with great expectations, a data testing, documentation and profiling framework, are promising.

How we validate input data using pydantic

We use the Python package pydantic for fast and easy validation of input data. Here’s how.

We discovered the Python package pydantic through FastAPI, which we use for serving machine learning models. Pydantic is a Python package for data parsing and validation, based on type hints. We use pydantic because it is fast, does a lot of the dirty work for us, provides clear error messages and makes it easy to write readable code.

Two of our main uses cases for pydantic are:

  1. Validation of settings and input data.
    We often read settings from a configuration file, which we use as inputs to our functions. We often end up doing quite a bit of input validation to ensure the settings are valid for further processing. To avoid starting our functions with a long set of validations and assertions, we use pydantic to validate the input.
  2. Sharing data requirements between machine learning applications.
    Our ML models have certain requirements to the data we use for training and prediction, for example that no data is missing. These requirements are used when we train our models, when we run online predictions and when we validate newly added training data. We use pydantic to specify the requirements we have and ensure that the same requirements are used everywhere, avoiding duplication of error-prone code across different applications.

This post will focus on the first use case, validation of settings and input data. A later post will cover the second use case.

Validation of settings and input data

In some cases, we read settings from a configuration file, such as a toml file, to be parsed as nested dictionaries. We use the settings as inputs to different functions. We often end up doing quite a bit of input validation to ensure the settings parsed from file are valid for further processing. A concrete example is settings for machine learning models, where we use toml files for defining model parameters, features and training details for the models.

This is quite similar to how FastAPI uses pydantic for input validation: the input to the API call is json, which in Python translates to a dictionary, and input validation is done using pydantic.

In this post we will go through input validation for a function interpolating a time series to a higher frequency. If we want to do interpolation, we set the interpolation factor, i.e., the factor of upsamling, the interpolation method, and an option to interpolate on the integral. Our interpolation function is just a wrapper around pandas interpolation methods, including the validation of input and some data wrangling. The input validation code started out looking a bit like this:

from typing import Dict


def validate_input_settings(params_in: Dict) -> Dict:
    params_validated = {}
    for key, value in params_in.items():
        if key == "interpolation_factor":
            if not int(value) == value:
                raise ValueError(f"{key} has a non-int value")
            if not int(value) >= 2:
                raise ValueError(f"{key}: {value} should be >= 2")
            value = int(value)
        elif key == "interpolation_method":
            allowed_set = {
                "repeat", "distribute", "linear", "cubic", "akima"
            }
            if value not in allowed_set:
                raise ValueError(f"{key} should be one of {allowed_set}, got {value}")
        elif key == "interpolate_on_integral":
            if not isinstance(value, bool):
                raise ValueError(f"{key} should be bool, got {value}")
        else:
            raise ValueError(f"{key} not a recognized key")
        params_validated[key] = value
    return params_validated

This is heavily nested, which in itself makes it hard to read, and perhaps you find that the validation rules aren’t crystal clear at first glance. We use SonarQube for static code quality analysis, and this piece of code results in a code smell, complaining that the code is too complex. In fact, this already has a cognitive complexity of 18 as SonarQube counts, above the default threshold of 15. Cognitive complexity is a measure of how difficult it is to read code, and increments for each break in linear flow, such as an if statement or a for loop. Nested breaks of the flow are incremented again.

Let’s summarize what we check for in validate_input_settings:

  • interpolation_factor is an integer
  • interpolation_factor is greater than or equal to 2
  • interpolation_method is in a set of allowed values
  • interpolate_on_integral is boolean
  • The keys in our settings dictionary are among the three mentioned above

In addition to the code above, we have a few more checks:

  • if an interpolation_factor is given, but no interpolation_method, use the default method linear
  • if an interpolation_factor is given, but not interpolate_on_integral, set the default option False
  • check for invalid the invalid combination interpolate_on_integral = False and interpolation_method = "distribute"

At the end of another three if statements inside the for loop, we end up at a cognitive complexity of 24.

Pydantic to the rescue

We might consider using a pydantic model for the input validation.

Minimal start

We can start out with the simplest form of a pydantic model, with field types:

from pydantic import BaseModel


class InterpolationSetting(BaseModel):
    interpolation_factor: int
    interpolation_method: str
    interpolate_on_integral: bool

Pydantic models are simply classes inheriting from the BaseModel class. We can create an instance of the new class as:

InterpolationSetting(
    interpolation_factor=2, 
    interpolation_method="linear", 
    interpolate_on_integral=True
)

This automatically does two of the checks we had implemented:

  • interpolation_factor is an int
  • interpolate_on_integral is a bool

In the original script, the fields are in fact optional, i.e., it is possible to provide no interpolation settings, in which case we do not do interpolation. We will set the fields to optional later, and then implement the additional necessary checks.

We can verify the checks we have enforced now by supplying non-valid input:

from pydantic import ValidationError


try:
    InterpolationSetting(
        interpolation_factor="text",
        interpolation_method="linear",
        interpolate_on_integral=True,
    )
except ValidationError as e:
    print(e)

which outputs:

    1 validation error for InterpolationSetting
    interpolation_factor
      value is not a valid integer (type=type_error.integer)

Pydantic raises a ValidationError when the validation of the model fails, stating which field, i.e. attribute, raised the error and why. In this case interpolation_factor raised a type error because the value "text" is not a valid integer. The validation is performed on instantiation of an InterpolationSetting object.

Validation of single fields and combinations of fields

Our original code also had some additional requirements:

  • interpolation_factor should be greater than or equal to two.
  • interpolation_method must be chosen from a set of valid methods.
  • We do not allow the combination of interpolate_on_integral=False and interpolation_method="distribute"

The first restriction can be implemented using pydantic types. Pydantic provides many different types, we will use a constrained types this requirement, namely conint, a constrained integer type providing automatic restrictions such as lower limits.

The remaining two restrictions can be implemented as validators. We decorate our validation functions with the validator decorator. The input argument to the validator decorator is the name of the attribute(s) to perform the validation for.

All validators are run automatically when we instantiate an object of the InterpolationSetting class, as for the type checking.

Our validation functions are class methods, and the first argument is the class, not an instance of the class. The second argument is the value to validate, and can be named as we wish. We implement two validators, method_is_valid and valid_combination_of_method_and_on_integral:

from typing import Dict

from pydantic import BaseModel, conint, validator, root_validator


class InterpolationSetting(BaseModel):
    interpolation_factor: conint(gt=1)
    interpolation_method: str
    interpolate_on_integral: bool

    @validator("interpolation_method")
    def method_is_valid(cls, method: str) -> str:
        allowed_set = {"repeat", "distribute", "linear", "cubic", "akima"}
        if method not in allowed_set:
            raise ValueError(f"must be in {allowed_set}, got '{method}'")
        return method

    @root_validator()
    def valid_combination_of_method_and_on_integral(cls, values: Dict) -> Dict:
        on_integral = values.get("interpolate_on_integral")
        method = values.get("interpolation_method")
        if on_integral is False and method == "distribute":
            raise ValueError(
                f"Invalid combination of interpolation_method "
                f"{method} and interpolate_on_integral {on_integral}"
            )
        return values

There are a few things to note here:

  • Validators should return a validated value. The validators are run sequentially, and populate the fields of the data model if they are valid.
  • Validators should only raise ValueError, TypeError or AssertionError. Pydantic will catch these errors to populate the ValidationError and raise one exception regardless of the number of errors found in validation. You can read more about error handling in the docs.
  • When we validate a field against another, we can use the root_validator, which runs validation on entire model. Root validators are a little different: they have access to the values argument, which is a dictionary containing all fields that have already been validated. When the root validator runs, the interpolation_method may have failed to validate, in which case it will not be added to the values dictionary. Here, we handle that by using values.get("interpolation_method") which returns None if the key is not in values. The docs contain more information on root validators and field ordering, which is important to consider when we are using the values dictionary.

Again, we can verify by choosing input parameters to trigger the errors:

from pydantic import ValidationError


try:
    InterpolationSetting(
        interpolation_factor=1,
        interpolation_method="distribute",
        interpolate_on_integral=False,
    )
except ValidationError as e:
    print(e)

which outputs:

    2 validation errors for InterpolationSetting
    interpolation_factor
      ensure this value is greater than 1 (type=value_error.number.not_gt; limit_value=1)
    __root__
      Invalid combination of interpolation_method distribute and interpolate_on_integral False (type=value_error)

As we see, pydantic raises a single ValidationError regardless of the number of ValueErrors raised in our model.

Implementing dynamic defaults

We also had some default values if certain parameters were not given:

  • If an interpolation_factor is given, set the default value linear for interpolation_method if none is given.
  • If an interpolation_factor is given, set the default value False for interpolate_on_integral if none is given.

In this case, we have dynamic defaults dependent on other fields.

This can also be achieved with root validators, by returning a conditional value. As this means validating one field against another, we must take care to ensure our code runs whether or not the two fields have passed validation and been added to the values dictionary. We will now also use Optional types, because we will handle the cases where not all values are provided. We add the new validators set_method_given_interpolation_factor and set_on_integral_given_interpolation_factor:

from typing import Dict, Optional

from pydantic import BaseModel, conint, validator, root_validator


class InterpolationSetting(BaseModel):
    interpolation_factor: Optional[conint(gt=2)]
    interpolation_method: Optional[str]
    interpolate_on_integral: Optional[bool]

    @validator("interpolation_method")
    def method_is_valid(cls, method: Optional[str]) -> Optional[str]:
        allowed_set = {"repeat", "distribute", "linear", "cubic", "akima"}
        if method is not None and method not in allowed_set:
            raise ValueError(f"must be in {allowed_set}, got '{method}'")
        return method

    @root_validator()
    def valid_combination_of_method_and_on_integral(cls, values: Dict) -> Dict:
        on_integral = values.get("interpolate_on_integral")
        method = values.get("interpolation_method")
        if on_integral is False and method == "distribute":
            raise ValueError(
                f"Invalid combination of interpolation_method "
                f"{method} and interpolate_on_integral {on_integral}"
            )
        return values

    @root_validator()
    def set_method_given_interpolation_factor(cls, values: Dict) -> Dict:
        factor = values.get("interpolation_factor")
        method = values.get("interpolation_method")
        if method is None and factor is not None:
            values["interpolation_method"] = "linear"
        return values

    @root_validator()
    def set_on_integral_given_interpolation_factor(cls, values: Dict) -> Dict:
        on_integral = values.get("interpolate_on_integral")
        factor = values.get("interpolation_factor")
        if on_integral is None and factor is not None:
            values["interpolate_on_integral"] = False
        return values

We can verify that the default values are set only when interpolation_factor
is provided, running InterpolationSetting(interpolation_factor=3) returns:

InterpolationSetting(interpolation_factor=3, interpolation_method='linear', interpolate_on_integral=None)

whereas supplying no input parameters, InterpolationSetting(), returns a data model with all parameters set to None:

InterpolationSetting(interpolation_factor=None, interpolation_method=None, interpolate_on_integral=None)

Note: If we have static defaults, we can simply set them for the fields:

class InterpolationSetting(BaseModel):
    interpolation_factor: Optional[int] = 42

Final safeguard against typos

Finally, we had one more check in out previous script: That no unknown keys were provided. If we provide unknown keys to our data model now, nothing really happens, for example InterpolationSetting(hello="world") outputs:

InterpolationSetting(interpolation_factor=None, interpolation_method=None, interpolate_on_integral=None)

Often, an unknown field name is the result of a typo in the toml file. Therefore we want to raise an error to alert the user. We do this using a the model config, controlling the behaviour of the model. The extra attribute of the config determines what we do with extra fields. The default is ignore, which we can see in the example above, where the field is ignored, and not added to the model, as the option allow does. We can use the forbid option to raise an exception when extra fields are supplied.

from typing import Dict, Optional

from pydantic import BaseModel, conint, validator, root_validator


class InterpolationSetting(BaseModel):
    interpolation_factor: Optional[conint(gt=2)]
    interpolation_method: Optional[str]
    interpolate_on_integral: Optional[bool]

    class Config:
        extra = "forbid"

    @validator("interpolation_method")
    def method_is_valid(cls, method: Optional[str]) -> Optional[str]:
        allowed_set = {"repeat", "distribute", "linear", "cubic", "akima"}
        if method is not None and method not in allowed_set:
            raise ValueError(f"must be in {allowed_set}, got '{method}'")
        return method

    @root_validator()
    def valid_combination_of_method_and_on_integral(cls, values: Dict) -> Dict:
        on_integral = values.get("interpolate_on_integral")
        method = values.get("interpolation_method")
        if on_integral is False and method == "distribute":
            raise ValueError(
                f"Invalid combination of interpolation_method "
                f"{method} and interpolate_on_integral {on_integral}"
            )
        return values

    @root_validator()
    def set_method_given_interpolation_factor(cls, values: Dict) -> Dict:
        factor = values.get("interpolation_factor")
        method = values.get("interpolation_method")
        if method is None and factor is not None:
            values["interpolation_method"] = "linear"
        return values

    @root_validator()
    def set_on_integral_given_interpolation_factor(cls, values: Dict) -> Dict:
        on_integral = values.get("interpolate_on_integral")
        factor = values.get("interpolation_factor")
        if on_integral is None and factor is not None:
            values["interpolation_factor"] = False
        return values

If we try again with an unknown key, we now get a ValidationError:

from pydantic import ValidationError


try:
    InterpolationSetting(hello=True)
except ValidationError as e:
    print(e)

This raises a validation error for the unknown field:

1 validation error for InterpolationSetting
hello
  extra fields not permitted (type=value_error.extra)

Adapting our existing code is easy

Now we have implemented all our checks, and can go on to adapt our existing code to use the new data model. In our original implementation, we would do something like

params_in = toml.load(path_to_settings_file)
params_validated = validate_input_settings(params_in)
interpolate_result(params_validated)

We can replace the call to validate_input_settings with instantiation of the pydantic model: params_validated = InterpolationSetting(params_in). Each pydantic data model has a .dict() method that returns the parameters as a dictionary, so we can use it in the input argument to interpolate_result directly: interpolate_result(params_validated.dict()). Another option is to refactor interpolate_result to use the attributes of the InterpolationSetting objects, such as params_validated.interpolation_method instead of the values of a dictionary.

Conclusion

In the end, we can replace one 43 line method (for the full functionality) and cognitive complexity of 24 with one 40 line class containing six methods, each with cognitive complexity less than 4. The pydantic data models will not necessarily be shorter than the custom validation code they replace, and since there are a few quirks and concepts to pay attention to, they are not necessarily easier to read at the first try.

However, as we use the library for validation in our APIs, we are getting familiar with it, and we can understand more easily.

Some of the benefits of using pydantic for this are:

  • Type checking (and in fact also some type conversion), which we previously did ourselves, is now done automatically for us, saving us the work of repeating lines of error-prone code in many different functions.
  • Each validator has a name which, if we put a little thought into it, makes it very clear what we are trying to achieve. In our previous example, the purpose of each nested condition had to be deduced from the many if clauses and error messages. This should be a lot clearer now, especially if we use pydantic across different projects.
  • If speed is important, pydantic’s benchmarks show that they are fast compared to similar libraries.

Hopefully this will help you determine whether or not you should consider using pydantic models in your projects. In a later post, we will show how we use pydantic data models to share metadata between machine learning applications, and to share data requirements between the applications.

Developing and deploying machine learning models for online load forecasts

Load forecasts are important input to operating the power system, especially as renewable energy sources become a bigger part of our power mix and new electrical interconnectors link the Norwegian power system to neighbouring systems. That is why we have been developing machine learning models to make predictions for power consumption. This post describes the process for developing models as well as how we have deployed the models to provide online predictions.

Load forecasting, also referred to as consumption prognosis, combines techniques from machine learning with knowledge of the power system. It’s a task that fits like a glove for the Data Science team at Statnett and one we have been working on for the last months.

We started with a three week long Proof of Concept to see if we could develop good load forecasting models and evaluate the potential of improving the models. In this period, we only worked on offline predictions, and evaluated our results against past power consumption.

The results from the PoC were good, so we were able to continue the work. The next step was to find a way to go from offline predictions to making online predictions based on near real time data from our streaming platform. We also needed to do quite a bit of refactoring and testing of the code and to make it production ready.

After a few months of work, we have managed to build an API to provide online predictions from our scikit learn and Tensorflow models, and deploy our API as a container application on OpenShift.

Good forecasts are necessary to operate a green power system

Balancing power consumption and power production is one of the main responsibilities of the National Control Centre in Statnett. A good load forecast is essential to obtain this balance.

The balancing services in the Nordic countries consist of automatic Frequency Restoration Reserves (aFRR) and manual Frequency Restoration Reserves (mFRR). These are activated to contain and restore the frequency. Until the Frequency Restoration Reserves have been activated, the inertia (rotating mass) response and system damping reduce the frequency drop below unacceptable levels.

The growth in renewable energy sources alters the generation mix; in periods a large part of the generators will produce power without simultaneously providing inertia. Furthermore, import on HVDC-lines do not contribute with inertia. Therefore, situations with large imports to the Nordic system can result in insufficient amount of inertia in the system. A power system with low inertia will be more sensitive to disturbances, i.e. have smaller margins for keeping frequency stability.

That is why Statnett is developing a new balancing concept called Modernized ACE (Area Control Error) together with the Swedish TSO Svenska kraftnät. Online prediction of power consumption is an integral part of this concept and is the reason we are working on this problem.

Can we beat the existing forecast?

Statnett already has load forecasts. The forecasts used today are proprietary software, so they are truly black box algorithms. If a model performs poorly in a particular time period, or in a specific area, confidence in the model can be hard to restore. In addition, there is a need for predictions for longer horizons than the current software is providing.

We wanted to see if we could measure up to the current software, so we did a three week stunt, hacking away and trying to create the best possible models in the limited time frame. We focused on exploring different features and models as well as evaluating the results to compare with the current algorithm.

At the end of this stunt, we had developed two models: one simple model and one complex black box model. We had collected five years of historical data for training and testing, and used the final year for testing of the model performance.

Base features from open data sources

The current models provide predictions for the next 48 hours for each of the five bidding zones in Norway. Our strategy is to train one model for each of the zones. All the data sources we have used so far are openly available:

  • weather observations can be downloaded from the Norwegian Meteorological Institutes’s Frost API. We retrieve the parameters wind speed and air temperature from the observations endpoint, for specific weather stations that we have chosen for each bidding zone.
  • weather forecasts are available from the Norwegian Meteorological Institute’s thredds data server. There are several different forecasts, available as grid data, along with further explanation of the different forecasts and the post processing of parameters. We use the post processed unperturbed member (the control run) of the MetCoOp Ensemble Prediction System (MEPS), the meps_det_pp_2_5km-files. From the datasets we  extract wind speed and air temperature at 2 m height for the closest points to the weather stations we have chosen for each bidding zone.
  • historical hourly consumption per bidding area is available from Nord Pool

elspot-norge
The five bidding zones in Norway

Among the features of the models were:

  • weather observations and forecasts of temperature and wind speed from different stations in the bidding zones
  • weather parameters derived from observations and forecasts, such as windchill
  • historical consumption and derived features such as linear trend, cosine yearly trend, square of consumption and so on
  • calendar features, such as whether or not the day is a holiday, a weekday, the hour of the day and so on

Prediction horizon

As mentioned, we need to provide load forecasts for the next 48 hours, to replace the current software. We have experimented with two different strategies:

  • letting one model predict the entire load time series
  • train one model for each hour in the prediction horizon, that is one model for the next hour, one model for two hours from now and so on.

Our experience so far is that one model per time step performs better, but of course it takes more time to train, and is more complex to handle. This explodes if the frequency of the time series is increased or the horizon is increased: perhaps this will be too complex for a week ahead load forecast, resulting in 168 models for each bidding zone.

Choosing a simple model for correlated features

We did some initial experiments using simple linear regression, but quickly experienced the problem of bloated weights: The model had one feature with a weight of about one billion, and another with a weight of about minus one billion. This resulted in predictions in the right order of magnitude, but with potential to explode if one feature varied more than usual.

Ridge regression is a form of regularization of a linear regression model, which is designed to mitigate this problem. For correlated features, the unbiased estimates for least squares linear regression have a high variance. This can lead to bloated weights making the prediction unstable.

In ridge regression, we penalize large weights, thereby introducing a bias in the estimates, but hopefully reducing the variance sufficiently to produce reliable forecasts. In linear least squares we find the weights minimizing the error. In ridge regression, we penalize the error function for having large weights. That is we add a term in proportion to the sum square of the weights x1 and x2 to the error. This extra penalty can be illustrated with circles in the plane, the farther you are from the origin, the bigger the penalty.

ridge-reg-illustration
In ridge regression, the idea is to minimize the error, but penalize large weight values. Larger weights correspond to larger circles in the plane.

For our Ridge regression model, we used the Ridge class from the Python library scikit learn.

Choosing a neural network that doesn’t forget

Traditional neural networks are unable to remember information from earlier time steps, but recurrent neural networks (RNNs) are designed to be able to remember from earlier time steps. Unlike the simpler feed-forward networks where information can only propagate in one direction, RNNs introduce loops to pick up the information from earlier time steps. An analogy for this is being able to see a series of images as a video, as opposed to considering each image separately. The loops of the RNN is unrolled to form the figure below, showing the structure of the repeating modules:

RNN
An RNN contains a single network layer, represented by the blue box, where information from the previous module is incorporated.

In practice, RNNs are only able to learn dependencies from a short time period. Long short-term memory networks, LSTMs are a type of RNNs that use the same concept of loops as RNNs, but the value lies in how the repeating module is structured.

One important distinction from RNNs is that the LSTM has a cell state, that flows through the module only affected by linear operations. Therefore, the LSTM network doesn’t “forget” the long term dependencies.

LSTM_single
The repeating layer in an LSTM network, where the state propagates through the module, shown by the grey line. The blue boxes are network layers and blue circles are pointwise operations.

For our LSTM model we used the Keras LSTM layer.

For a thorough explanation of LSTM, check out this popular blog post by Chris Olah: Understanding LSTMs.

Results

To evaluate the models, we used mean absolute percentage error, MAPE, which is a common metric used in load forecasting. Our proof of concept models outperformed the current software when we used one year of test data, reducing the MAPE by 28% on average over all bidding zones in the forecasts.

Resultat
The average MAPE over all bidding zones was reduced from 5.1 % to 4.0 % when forecasting one day ahead (labeled D-1) and from 6.2 % to 4.2 % when forecasting two days ahead (labeled D-2).

We thought this was quite good for such a short PoC, especially considering that the test data was an entire year. However, we see that for the bidding zone NO1 (Østlandet), where the power consumption is largest, we need to improve our models.

How to evaluate fairly when using weather forecasts in features

We trained the models using only weather observations, but when we evaluated performance, we used weather forecasts for time steps after the prediction time. In the online prediction situation, observations are available for the past, but predictions must be used for the future. To compare our models with the current model, we must simulate the same conditions.

We have also considered training our model using forecasts, but we only have access to about a year of forecasts, therefore we have chosen to train on observations. Another possible side effect of training on forecasts, is the possibility of learning the strengths and weaknesses of a specific forecasting model from The Norwegian Meteorological Institute. This is not desirable, as we would have to re-learn the patterns when a new model is deployed. Also, the model version (from the Meteorological Institute) is not available as of now, so we do not know when the model needs re-training.

Our most recent load forecast

By now we have made some adjustments to the model, and our forecasts for NO1 from the LSTM model and Ridge regression model are shown below. Results are stored in a PostgreSQL database and we have built a Grafana dashboard visualizing results.

prognoser
The plot shows our newest load forecasts for the area NO1 (Østlandet) as of 20.12 17:00. For the hours in the past, the forecast from the hour before is shown. The shaded areas are the minimum and maximum values the previous forecasts have been, the bold green line is the actual consumption. The dashed line shows the forecast from 24 hours ago.

How to deploy real time ML models to OpenShift

The next step in our development process was to create a framework for delivering online predictions and deploy it. This also included refactoring the code from the PoC. We worked together with a development team to sketch out an architecture for fetching our data from different sources to delivering a prediction. The most important steps so far have been:

  • Developing and deploying the API early
  • Setting up a deployment pipeline that automatically deploys our application
  • Managing version control for the ML models

Develop and deploy the API early

“API first” development (or at least API early, as in our case) is often referenced as an addition to the twelve-factor app concepts, as it helps both client and server side define the direction of development. A process with contracts and requirements sounds like a waterfall process, but our experience from this project is that in reality, it supports an agile process as the requirements are adapted during development. Three of the benefits were:

  • API developers and API users can work in parallel. Starting with the API has let the client and server side continue work on both sides, as opposed to the client side waiting for the server side to implement the API. In reality, we altered the requirements as we developed the API and the client side implemented their use of it. This has been a great benefit, as we have uncovered problems, and we have been able to adjust the code early in the development phase.
  • Real time examples to end users. Starting early with the API allows us to demonstrate the models with real time examples to the operators who will be using the solution, before the end of model development. The operators will be able to start building experience and understanding of the forecasts, and give feedback early.
  • Brutally honest status reports. Since the results of the current models are available at any time, this provides a brutally honest status report of the quality of the models for anyone who wants to check. With the models running live on the test platform, we have some time to discover peculiarities of different models that we didn’t explore in the machine learning model development phase, or that occur in online prediction and not necessarily in the offline model development.

Separate concerns for the ML application and integration components

Our machine learning application provides an API that only performs predictions when a client sends a request with input data. Another option could be to have the application itself fetch the data and stream the results back to our streaming platform, Kafka.

We chose not to include fetching and publishing data inside our application in order to separate concerns of the different components in the architecture, and hopefully this will be robust to changes in the data sources.

The load forecasts will be extended to include Sweden as well. The specifics of how and where the models for Sweden will run are not decided, but our current architecture does not require the Swedish data input. Output is written back to Kafka topics on the same streaming platform.

current-architecture
The many components in our load forecast system architecture. The machine learning application only delivers predictions when a request is made.

The tasks we rely on are:

  • Fetching data from the different data sources we use. These data are published to our Kafka streaming platform.
  • Combine relevant data and make a request for a forecast when new data is available. A java application is scheduled each hour, when new data is available.
  • Make the predictions when requested. This is where the ML magic happens, and this is the application we are developing.
  • Publish the prediction response for real time applications. The java application that requests forecasts also publishes the results to Kafka. We imagine this is where the end-user applications will subscribe to data.
  • Store the prediction response for post-analysis and monitoring.

How to create an API for you ML models

Our machine learning application provides an API where users can supply the base features each model uses and receive the load forecast for the input data they provided. We created the API using the Python library Flask, a minimal web application framework.

Each algorithm runs as a separate instance of the application, on the same code base. Each of these algorithm instances has one endpoint for prediction. The endpoint takes the bidding zone as an input parameter and routes the request to the correct model.

Two tips for developing APIs

  • Modularize the app and use blueprint with Flask to prevent chaos. Flask makes it easy to quickly create a small application, but the downside is that it is also easy to create a messy application. After first creating an application that worked, we refactored the code to modularize. Blueprints are templates that allow us to create re-usable components.
  • Test your API using mocking techniques. As we refactored the code and tried writing good unit tests for the API, we used the mock library to mock out objects and functions so that we can separate which functionality the different tests test and run our tests faster.

Where should derived features be calculated?

We have chosen to calculated derived features (i.e. rolling averages of temperature, lagged temperature, calendar features and so on) within the model, based on the base features as input data. This could have been outside of the models, for example by the java application that integrates the ML and streaming, or by the Kafka producers. There are two main reasons why we have chosen to keep these calculations inside our ML API:

  • The feature data is mostly model specific, and lagged data causes data duplication. This is not very useful for anyone outside of the specific model, and therefore it doesn’t make sense to create separate Kafka topics for these data or calculate them outside of the application.
  • Since we developed the API at an early stage, we had (and still have) some feature engineering to do, and we did not want to be dependent on someone else to provide all the different features that we wanted to explore before we could deploy a new model.

Class hierarchy connects general operators to specific feature definitions

The calculations of derived features are modularized in a feature operator module to ensure that the same operations are done in the same way everywhere. The link that applies these feature operators to the base features is implemented using class methods in a class hierarchy.

Klassehierarki

We have created a class hierarchy as illustrated in the figure above. The top class, ForecastModels, contains class methods for applying the feature operators to the input data in order to obtain the feature data frame. Which base features and which feature operators we want to be applied to the features are specified in the child class.

The child classes at the very bottom, LstmModel and RidgeRegression, inherit from their respective Keras and scikit learn classes as well, providing the functionality for the prediction and training.

As Tensorflow models have a few specific requirements, such as normalising the features, this is implemented as a class method for the class TensorFlowModels. In addition, the LSTM models require persisting the state for the next prediction. This is handled by class methods in the LstmModel class.

Establish automatic deployment of the application

Our deployment pipeline consists of two steps, which run each time we push to our version control system:

  • run tests
  • deploy the application to the test environment if all tests pass

We use GitLab for version control, and have set up our pipeline using the integrated continuous integration/continuous deployment pipelines. GitLab also integrates nicely with the container platform we use, OpenShift, allowing for automatic deployment of our models through a POST request to the OpenShift API, which is part of our pipeline.

ci-cd-pipeline-with-test-deploy

Automatic deployment has been crucial in the collaboration with the developers who make the integrations between the data sources and our machine learning framework, as it lets us test different solutions easily and get results of improvements immediately. Of course, we also get immediate feedback when we make mistakes, and then we can hopefully fix it while we remember what the breaking change was.

How we deploy our application on OpenShift

Luckily, we had colleagues who were a few steps ahead of us and had just deployed a Flask application to OpenShift. We used the source to image-build strategy, which builds a Docker image from source code, given a git repository. The Python source to image image builder is even compatible with the environment management package we have used, Pipenv, and automatically starts the container running the script app.py. This allowed us to deploy our application without any adaptions to our code base other that setting an environment variable to enable Pipenv. The source-to-image strategy was perfect for the simple use case, but we are now going towards a Docker build strategy because we need more options in the build process.

For the LSTM model, we also needed to find a way to store the state, independently of deployments of the application. As a first step, we have chosen to store the states as flat files in a persistent storage, which is stored independently of new deployments of our application.

How do we get from test to production?

For deployment from the production to the test environment, we will set up a manual step after verifying that everything is working as expected in the test environment. The plan as of now is to publish the image running in the test environment in OpenShift to Artifactory. Artifactory is a repository manager we use for hosting internally developed Python packages and Docker images that our OpenShift deployments can subscribe to. The applications running in the OpenShift production environment can then be deployed from the updated image.

Flask famously warns us not to use it in production because it doesn’t scale. In our setting, one Flask application will receive one request per hour per bidding zone, so the problems should not be too big. However, if need be, we plan to use the Twisted web server for our Flask application. We have used Twisted previously when deploying a Dash web application, which is also based on Flask, and thought it worked well without too much effort.

Future improvements

Choosing the appropriate evaluation metric for models

In order to create a forecast that helps the balancing operators as much as possible, we need to optimize our models using a metric that indicates their need. For example, what is most important: Predicting the level of the top load in the day, or predicting the time the top load occurs on? Is it more important to predict the outliers correctly or should we focus on the overall level? This must be decided in close cooperation with the operators, illustrating how different models perform in the general and in the special case.

Understanding model behavior

Understanding model behavior is important both in developing models and to build trust in a model. We are looking into the SHAP framework, which combines Shapley values and LIME to explain model predictions. The framework provides plots that we think will be very useful for both balancing operators and data scientists.

Including uncertainty in predictions

There is of course uncertainty in the weather forecasts and the Norwegian Meteorological Institute provides ensemble forecasts. These 10 forecasts are obtained by using the same model, but perturbing the initial state, thereby illustrating uncertainty in the forecast. For certain applications of the load forecast, it could be helpful to get an indication of the uncertainty in the load forecast, for example by using these ensemble forecast to create different predictions, and illustrating the spread.

%d bloggers like this: