Prefect

Build, deploy and observe data workflows.
Software Engineering
Open Source
Data Engineering
Author

Georgios Douzas

Published

June 22, 2023

Introduction

Prefect is a workflow orchestration tool. It makes accessible the creation, scheduling, and monitoring of complex data pipelines. The workflows are defined as Python code, while Prefect provides error handling, retry mechanisms, and a user-friendly dashboard for monitoring.

Workflow for soccer data

As an example, let’s assume that we would like to create a data workflow that downloads, stores and updates historical and fixtures soccer data from Football-Data.co.uk. The URL of each of those main leagues has the following form:

base_url = 'https://www.football-data.co.uk/mmz4281/{season}/{league_id}.csv'
base_url
'https://www.football-data.co.uk/mmz4281/{season}/{league_id}.csv'

where season is the season of the league and league_id is the league ID. Let’s select a few of those seasons and leagues:

SEASONS = ['1819', '1920', '2021', '2122', '2223', '2324']
LEAGUES_MAPPING = {
    'E0': 'English',
    'SC0': 'Scotish',
    'D1': 'German',
    'I1': 'Italian',
    'SP1': 'Spanish',
    'F1': 'French',
    'N1': 'Dutch',
}
URLS_MAPPING = {
    f'https://www.football-data.co.uk/mmz4281/{season}/{league_id}.csv': (
        league,
        '-'.join([season[0:2], season[2:]]),
    )
    for season in SEASONS
    for league_id, league in LEAGUES_MAPPING.items()
}
FIXTURES_URL = 'https://www.football-data.co.uk/fixtures.csv'

Our workflow will include the following tasks:

  • Check if a local SQLite database exists. If not, then create it.
  • Check if the database is updated with the latest historical data. If the historical data do not exist, download all the data and store them to the database while if the historical data are not updated, download only the latest data and update the database.
  • Download the latest fixtures data and store them to the database.

Tasks

The above tasks represent discrete units of work, and they will receive the task decorator. We will also use an asynchronous httpx client to concurrently download the data since we have multiple files.

The function create_db implements the first task:

import sqlite3
from prefect import task
from prefect.logging import get_run_logger
from pathlib import Path
from tempfile import mkdtemp

TEMP_DIR = Path(mkdtemp())


@task(name='Create database', description='Create the database to store the data')
def create_db():
    logger = get_run_logger()
    db_path = TEMP_DIR / 'soccer_data.db'
    try:
        con = sqlite3.connect(f'file:{db_path}?mode=rw', uri=True)
        logger.info('Database exists.')
    except sqlite3.OperationalError:
        con = sqlite3.connect(db_path)
        logger.info('Database created.')
    finally:
        con.close()

The function update_historical_data implements the second task:

import httpx
import asyncio
import pandas as pd
from io import StringIO


async def request_csv_data(client: httpx.Client, url: str, **kwargs):
    return await client.get(url=url)


async def download_csvs_data(urls_mapping: dict[str, tuple[str, str]]):
    async with httpx.AsyncClient(limits=httpx.Limits(max_connections=30)) as client:
        requests = [
            request_csv_data(client, url, league=league, season=season)
            for url, (league, season) in urls_mapping.items()
        ]
        responses = await asyncio.gather(*requests)
    csvs_data = [
        StringIO(str(response.content, encoding='windows-1254'))
        for response in responses
    ]
    return csvs_data


@task(
    name='Update historical data',
    description='Fetch latest data to update historical data',
)
async def update_historical_data(urls_mapping):
    logger = get_run_logger()
    db_path = TEMP_DIR / 'soccer_data.db'
    with sqlite3.connect(db_path) as con:
        try:
            data = pd.read_sql('SELECT * FROM historical', con)
            logger.info(f'Table with historical data exists. Shape: {data.shape}')
        except pd.errors.DatabaseError:
            logger.info('Table with historical data does not exist.')
            csvs_data = await download_csvs_data(urls_mapping)
            data = pd.concat(
                [
                    pd.read_csv(csv_data, encoding='windows-1254')
                    for csv_data in csvs_data
                ],
                ignore_index=True,
            )
            data.to_sql('historical', con=con, index=False)
            logger.info(f'Table with historical data was created. Shape: {data.shape}')
            return None
    urls_mapping = {
        url: (league, season)
        for url, (league, season) in urls_mapping.items()
        if season == '23-24'
    }
    latest_csvs_data = await download_csvs_data(urls_mapping)
    latest_data = pd.concat(
        [
            pd.read_csv(csv_data, encoding='windows-1254')
            for csv_data in latest_csvs_data
        ],
        ignore_index=True,
    )
    data = pd.concat([data, latest_data], ignore_index=True).drop_duplicates(
        subset=['Div', 'Date', 'HomeTeam', 'AwayTeam', 'Time'], ignore_index=True
    )
    data.to_sql('historical', con=con, index=False, if_exists='replace')
    logger.info(f'Table with historical data was updated. Shape: {data.shape}')

The function update_fixtures_data implements the third task:

@task(name='Update fixtures data', description='Fetch latest fixtures data')
async def update_fixtures_data():
    logger = get_run_logger()
    db_path = TEMP_DIR / 'soccer_data.db'
    data = pd.read_csv(FIXTURES_URL)
    with sqlite3.connect(db_path) as con:
        data.to_sql('fixtures', con=con, index=False, if_exists='replace')
        logger.info(f'Fixtures data were updated. Shape: {data.shape}')

Flow

The full data workflow will receive the flow decorator.

from prefect import flow
from prefect.task_runners import ConcurrentTaskRunner


@flow(
    name='Download asynchronously the data and update the database',
    validate_parameters=True,
    task_runner=ConcurrentTaskRunner(),
    log_prints=True,
)
async def update_db(urls_mapping: dict[str, tuple[str, str]]):
    create_db()
    await update_historical_data(urls_mapping)
    await update_fixtures_data()

Results

We can run the above flow:

await update_db(URLS_MAPPING)
18:19:50.135 | INFO    | prefect.engine - Created flow run 'transparent-prawn' for flow 'Download asynchronously the data and update the database'
18:19:50.169 | INFO    | Flow run 'transparent-prawn' - Created task run 'Create database-0' for task 'Create database'
18:19:50.170 | INFO    | Flow run 'transparent-prawn' - Executing 'Create database-0' immediately...
18:19:50.191 | INFO    | Task run 'Create database-0' - Database created.
18:19:50.202 | INFO    | Task run 'Create database-0' - Finished in state Completed()
18:19:50.213 | INFO    | Flow run 'transparent-prawn' - Created task run 'Update historical data-0' for task 'Update historical data'
18:19:50.214 | INFO    | Flow run 'transparent-prawn' - Executing 'Update historical data-0' immediately...
18:19:50.232 | INFO    | Task run 'Update historical data-0' - Table with historical data does not exist.
18:19:52.155 | INFO    | Task run 'Update historical data-0' - Table with historical data was created. Shape: (13862, 124)
18:19:52.169 | INFO    | Task run 'Update historical data-0' - Finished in state Completed()
18:19:52.181 | INFO    | Flow run 'transparent-prawn' - Created task run 'Update fixtures data-0' for task 'Update fixtures data'
18:19:52.181 | INFO    | Flow run 'transparent-prawn' - Executing 'Update fixtures data-0' immediately...
18:19:52.522 | INFO    | Task run 'Update fixtures data-0' - Fixtures data were updated. Shape: (148, 102)
18:19:52.538 | INFO    | Task run 'Update fixtures data-0' - Finished in state Completed()
18:19:52.554 | INFO    | Flow run 'transparent-prawn' - Finished in state Completed('All states completed.')
[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`')),
 Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`'))]

Let’s read the data from the database:

from shutil import rmtree

db_path = TEMP_DIR / 'soccer_data.db'
with sqlite3.connect(db_path) as con:
    historical_data = pd.read_sql('SELECT * FROM historical', con)
    fixtures_data = pd.read_sql('SELECT * FROM fixtures', con)
rmtree(TEMP_DIR)

The historical data:

historical_data
Div Date HomeTeam AwayTeam FTHG FTAG FTR HTHG HTAG HTR ... AvgC<2.5 AHCh B365CAHH B365CAHA PCAHH PCAHA MaxCAHH MaxCAHA AvgCAHH AvgCAHA
0 E0 10/08/2018 Man United Leicester 2 1 H 1 0 H ... NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
1 E0 11/08/2018 Bournemouth Cardiff 2 0 H 1 0 H ... NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
2 E0 11/08/2018 Fulham Crystal Palace 0 2 A 0 1 A ... NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
3 E0 11/08/2018 Huddersfield Chelsea 0 3 A 0 2 A ... NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
4 E0 11/08/2018 Newcastle Tottenham 1 2 A 1 2 A ... NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
13857 N1 19/05/2024 PSV Eindhoven Waalwijk 3 1 H 1 1 D ... 5.69 -2.50 1.73 2.08 1.77 2.09 1.98 2.17 1.85 1.99
13858 N1 19/05/2024 Sparta Rotterdam Heerenveen 2 1 H 0 0 D ... 2.87 -0.75 1.85 2.05 1.86 2.03 1.90 2.12 1.83 2.00
13859 N1 19/05/2024 Vitesse Ajax 2 2 D 1 1 D ... 3.43 1.00 1.84 2.06 1.84 2.06 1.88 2.11 1.82 2.02
13860 N1 19/05/2024 Volendam Go Ahead Eagles 1 2 A 1 1 D ... 3.52 1.25 1.78 2.03 1.83 2.07 1.85 2.12 1.81 2.02
13861 N1 19/05/2024 Zwolle Twente 1 2 A 0 0 D ... 3.30 1.50 1.97 1.93 1.97 1.92 2.14 1.93 2.01 1.83

13862 rows × 124 columns

The fixtures data:

fixtures_data
Div Date Time HomeTeam AwayTeam Referee B365H B365D B365A BWH ... B365CAHH B365CAHA PCAHH PCAHA MaxCAHH MaxCAHA AvgCAHH AvgCAHA BFECAHH BFECAHA
0 B1 29/11/2024 19:45 Kortrijk Mechelen None 3.40 3.60 2.05 3.40 ... None None None None None None None None None None
1 B1 30/11/2024 17:15 Club Brugge Dender None 1.25 7.00 8.00 1.25 ... None None None None None None None None None None
2 B1 30/11/2024 19:45 Charleroi Standard None 1.95 3.40 3.90 1.98 ... None None None None None None None None None None
3 B1 01/12/2024 12:30 St Truiden Genk None 4.50 4.00 1.73 4.33 ... None None None None None None None None None None
4 B1 01/12/2024 15:00 Beerschot VA Cercle Brugge None 3.90 3.50 1.90 3.60 ... None None None None None None None None None None
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
143 T1 30/11/2024 16:00 Buyuksehyr Goztep None 2.38 3.25 3.00 2.35 ... None None None None None None None None None None
144 T1 01/12/2024 13:00 Rizespor Kayserispor None 1.83 3.60 4.20 1.82 ... None None None None None None None None None None
145 T1 01/12/2024 16:00 Galatasaray Eyupspor None 1.25 7.00 8.50 1.26 ... None None None None None None None None None None
146 T1 02/12/2024 17:00 Fenerbahce Gaziantep None 1.25 6.50 9.00 1.24 ... None None None None None None None None None None
147 T1 02/12/2024 17:00 Hatayspor Besiktas None 4.10 3.60 1.83 3.80 ... None None None None None None None None None None

148 rows × 102 columns

Final thoughts

You can spin up a local Prefect server UI with the prefect server start command in the shell and explore the characteristics of the above Prefect flow we ran. The data are stored in the Prefect database which by default is a local SQLite database.

Prefect also supports deployments i.e. packaging workflow code, settings, and infrastructure configuration so that the data workflow can be managed via the Prefect API and run remotely by a Prefect agent.

You can read more at the official Prefect documentation.