Prefect

Software Development
Open Source
Data Engineering
Build, deploy and observe data workflows

Introduction

Prefect is a workflow orchestration tool. It makes accessible the creation, scheduling, and monitoring of complex data pipelines. The workflows are defined as Python code, while Prefect provides error handling, retry mechanisms, and a user-friendly dashboard for monitoring.

Workflow for soccer data

As an example, let’s assume that we would like to create a data workflow that downloads, stores and updates historical and fixtures soccer data from Football-Data.co.uk. The URL of each of those main leagues has the following form:

base_url = 'https://www.football-data.co.uk/mmz4281/{season}/{league_id}.csv'
base_url
'https://www.football-data.co.uk/mmz4281/{season}/{league_id}.csv'

where season is the season of the league and league_id is the league ID. Let’s select a few of those seasons and leagues:

SEASONS = ['1819', '1920', '2021', '2122', '2223', '2324']
LEAGUES_MAPPING = {
    'E0': 'English',
    'SC0': 'Scotish',
    'D1': 'German',
    'I1': 'Italian',
    'SP1': 'Spanish',
    'F1': 'French',
    'N1': 'Dutch',
}
URLS_MAPPING = {
    f'https://www.football-data.co.uk/mmz4281/{season}/{league_id}.csv': (
        league,
        '-'.join([season[0:2], season[2:]]),
    )
    for season in SEASONS
    for league_id, league in LEAGUES_MAPPING.items()
}
FIXTURES_URL = 'https://www.football-data.co.uk/fixtures.csv'

Our workflow will include the following tasks:

  • Check if a local SQLite database exists. If not, then create it.
  • Check if the database is updated with the latest historical data. If the historical data do not exist, download all the data and store them to the database while if the historical data are not updated, download only the latest data and update the database.
  • Download the latest fixtures data and store them to the database.

Tasks

The above tasks represent discrete units of work, and they will receive the task decorator. We will also use an asynchronous httpx client to concurrently download the data since we have multiple files.

The function create_db implements the first task:

import sqlite3
from prefect import task
from prefect.logging import get_run_logger
from pathlib import Path
from tempfile import mkdtemp

TEMP_DIR = Path(mkdtemp())


@task(name='Create database', description='Create the database to store the data')
def create_db():
    logger = get_run_logger()
    db_path = TEMP_DIR / 'soccer_data.db'
    try:
        con = sqlite3.connect(f'file:{db_path}?mode=rw', uri=True)
        logger.info('Database exists.')
    except sqlite3.OperationalError:
        con = sqlite3.connect(db_path)
        logger.info('Database created.')
    finally:
        con.close()

The function update_historical_data implements the second task:

import httpx
import asyncio
import pandas as pd
from io import StringIO


async def request_csv_data(client: httpx.Client, url: str, **kwargs):
    return await client.get(url=url)


async def download_csvs_data(urls_mapping: dict[str, tuple[str, str]]):
    async with httpx.AsyncClient(limits=httpx.Limits(max_connections=30)) as client:
        requests = [
            request_csv_data(client, url, league=league, season=season)
            for url, (league, season) in urls_mapping.items()
        ]
        responses = await asyncio.gather(*requests)
    csvs_data = [
        StringIO(str(response.content, encoding='windows-1254'))
        for response in responses
    ]
    return csvs_data


@task(
    name='Update historical data',
    description='Fetch latest data to update historical data',
)
async def update_historical_data(urls_mapping):
    logger = get_run_logger()
    db_path = TEMP_DIR / 'soccer_data.db'
    with sqlite3.connect(db_path) as con:
        try:
            data = pd.read_sql('SELECT * FROM historical', con)
            logger.info(f'Table with historical data exists. Shape: {data.shape}')
        except pd.errors.DatabaseError:
            logger.info('Table with historical data does not exist.')
            csvs_data = await download_csvs_data(urls_mapping)
            data = pd.concat(
                [
                    pd.read_csv(csv_data, encoding='windows-1254')
                    for csv_data in csvs_data
                ],
                ignore_index=True,
            )
            data.to_sql('historical', con=con, index=False)
            logger.info(f'Table with historical data was created. Shape: {data.shape}')
            return None
    urls_mapping = {
        url: (league, season)
        for url, (league, season) in urls_mapping.items()
        if season == '23-24'
    }
    latest_csvs_data = await download_csvs_data(urls_mapping)
    latest_data = pd.concat(
        [
            pd.read_csv(csv_data, encoding='windows-1254')
            for csv_data in latest_csvs_data
        ],
        ignore_index=True,
    )
    data = pd.concat([data, latest_data], ignore_index=True).drop_duplicates(
        subset=['Div', 'Date', 'HomeTeam', 'AwayTeam', 'Time'], ignore_index=True
    )
    data.to_sql('historical', con=con, index=False, if_exists='replace')
    logger.info(f'Table with historical data was updated. Shape: {data.shape}')

The function update_fixtures_data implements the third task:

@task(name='Update fixtures data', description='Fetch latest fixtures data')
async def update_fixtures_data():
    logger = get_run_logger()
    db_path = TEMP_DIR / 'soccer_data.db'
    data = pd.read_csv(FIXTURES_URL)
    with sqlite3.connect(db_path) as con:
        data.to_sql('fixtures', con=con, index=False, if_exists='replace')
        logger.info(f'Fixtures data were updated. Shape: {data.shape}')

Flow

The full data workflow will receive the flow decorator.

from prefect import flow
from prefect.task_runners import ConcurrentTaskRunner


@flow(
    name='Download asynchronously the data and update the database',
    validate_parameters=True,
    task_runner=ConcurrentTaskRunner(),
    log_prints=True,
)
async def update_db(urls_mapping: dict[str, tuple[str, str]]):
    create_db()
    await update_historical_data(urls_mapping)
    await update_fixtures_data()

Results

We can run the above flow:

await update_db(URLS_MAPPING)
15:45:48.090 | INFO    | prefect.engine - View at https://app.prefect.cloud/account/b258155a-2005-491c-96d1-e0ffa5f1d8f1/workspace/89cadadb-5990-4d0d-9536-3c50d73b387a/runs/flow-run/64bcfbcc-1b44-4dcf-a5e9-26f1a73ef985
15:45:48.503 | INFO    | Flow run 'invisible-horse' - Beginning flow run 'invisible-horse' for flow 'Download asynchronously the data and update the database'
15:45:48.506 | INFO    | Flow run 'invisible-horse' - View at https://app.prefect.cloud/account/b258155a-2005-491c-96d1-e0ffa5f1d8f1/workspace/89cadadb-5990-4d0d-9536-3c50d73b387a/runs/flow-run/64bcfbcc-1b44-4dcf-a5e9-26f1a73ef985
15:45:48.541 | INFO    | Task run 'Create database-6d6' - Database created.
15:45:48.543 | INFO    | Task run 'Create database-6d6' - Finished in state Completed()
15:45:48.547 | INFO    | Task run 'Update historical data-e94' - Table with historical data does not exist.
15:45:50.541 | INFO    | Task run 'Update historical data-e94' - Table with historical data was created. Shape: (13862, 124)
15:45:50.542 | INFO    | Task run 'Update historical data-e94' - Finished in state Completed()
15:45:50.869 | INFO    | Task run 'Update fixtures data-727' - Fixtures data were updated. Shape: (178, 102)
15:45:50.872 | INFO    | Task run 'Update fixtures data-727' - Finished in state Completed()
15:45:51.098 | INFO    | Flow run 'invisible-horse' - Finished in state Completed()

Let’s read the data from the database:

from shutil import rmtree

db_path = TEMP_DIR / 'soccer_data.db'
with sqlite3.connect(db_path) as con:
    historical_data = pd.read_sql('SELECT * FROM historical', con)
    fixtures_data = pd.read_sql('SELECT * FROM fixtures', con)
rmtree(TEMP_DIR)

The historical data:

historical_data
Div Date HomeTeam AwayTeam FTHG FTAG FTR HTHG HTAG HTR ... AvgC<2.5 AHCh B365CAHH B365CAHA PCAHH PCAHA MaxCAHH MaxCAHA AvgCAHH AvgCAHA
0 E0 10/08/2018 Man United Leicester 2 1 H 1 0 H ... NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
1 E0 11/08/2018 Bournemouth Cardiff 2 0 H 1 0 H ... NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
2 E0 11/08/2018 Fulham Crystal Palace 0 2 A 0 1 A ... NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
3 E0 11/08/2018 Huddersfield Chelsea 0 3 A 0 2 A ... NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
4 E0 11/08/2018 Newcastle Tottenham 1 2 A 1 2 A ... NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
13857 N1 19/05/2024 PSV Eindhoven Waalwijk 3 1 H 1 1 D ... 5.69 -2.50 1.73 2.08 1.77 2.09 1.98 2.17 1.85 1.99
13858 N1 19/05/2024 Sparta Rotterdam Heerenveen 2 1 H 0 0 D ... 2.87 -0.75 1.85 2.05 1.86 2.03 1.90 2.12 1.83 2.00
13859 N1 19/05/2024 Vitesse Ajax 2 2 D 1 1 D ... 3.43 1.00 1.84 2.06 1.84 2.06 1.88 2.11 1.82 2.02
13860 N1 19/05/2024 Volendam Go Ahead Eagles 1 2 A 1 1 D ... 3.52 1.25 1.78 2.03 1.83 2.07 1.85 2.12 1.81 2.02
13861 N1 19/05/2024 Zwolle Twente 1 2 A 0 0 D ... 3.30 1.50 1.97 1.93 1.97 1.92 2.14 1.93 2.01 1.83

13862 rows × 124 columns

The fixtures data:

fixtures_data
Div Date Time HomeTeam AwayTeam Referee B365H B365D B365A BWH ... B365CAHH B365CAHA PCAHH PCAHA MaxCAHH MaxCAHA AvgCAHH AvgCAHA BFECAHH BFECAHA
0 B1 28/02/2025 19:45 Charleroi Genk None 2.55 3.4 2.63 None ... None None None None None None None None None None
1 B1 01/03/2025 15:00 Cercle Brugge Antwerp None 2.38 3.4 2.75 None ... None None None None None None None None None None
2 B1 01/03/2025 15:00 Gent Club Brugge None 4.10 4.0 1.73 None ... None None None None None None None None None None
3 B1 01/03/2025 17:15 St Truiden Kortrijk None 1.75 3.6 4.33 None ... None None None None None None None None None None
4 B1 01/03/2025 19:45 St. Gilloise Dender None 1.33 5.0 8.00 None ... None None None None None None None None None None
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
173 T1 02/03/2025 17:30 Fenerbahce Antalyaspor None 1.27 6.5 8.50 None ... None None None None None None None None None None
174 T1 03/03/2025 13:00 Gaziantep Eyupspor None 2.30 3.4 3.00 None ... None None None None None None None None None None
175 T1 03/03/2025 13:00 Konyaspor Trabzonspor None 2.63 3.3 2.63 None ... None None None None None None None None None None
176 T1 03/03/2025 17:30 Buyuksehyr Sivasspor None 1.67 4.0 4.75 None ... None None None None None None None None None None
177 T1 03/03/2025 17:30 Goztep Samsunspor None 2.00 3.4 3.75 None ... None None None None None None None None None None

178 rows × 102 columns

Final thoughts

You can spin up a local Prefect server UI with the prefect server start command in the shell and explore the characteristics of the above Prefect flow we ran. The data are stored in the Prefect database which by default is a local SQLite database.

Prefect also supports deployments i.e. packaging workflow code, settings, and infrastructure configuration so that the data workflow can be managed via the Prefect API and run remotely by a Prefect agent.

You can read more at the official Prefect documentation.