= 'https://www.football-data.co.uk/mmz4281/{season}/{league_id}.csv'
base_url base_url
'https://www.football-data.co.uk/mmz4281/{season}/{league_id}.csv'
Georgios Douzas
June 22, 2023
Prefect is a workflow orchestration tool. It makes accessible the creation, scheduling, and monitoring of complex data pipelines. The workflows are defined as Python code, while Prefect provides error handling, retry mechanisms, and a user-friendly dashboard for monitoring.
As an example, let’s assume that we would like to create a data workflow that downloads, stores and updates historical and fixtures soccer data from Football-Data.co.uk. The URL of each of those main leagues has the following form:
'https://www.football-data.co.uk/mmz4281/{season}/{league_id}.csv'
where season
is the season of the league and league_id
is the league ID. Let’s select a few of those seasons and leagues:
SEASONS = ['1819', '1920', '2021', '2122', '2223', '2324']
LEAGUES_MAPPING = {
'E0': 'English',
'SC0': 'Scotish',
'D1': 'German',
'I1': 'Italian',
'SP1': 'Spanish',
'F1': 'French',
'N1': 'Dutch',
}
URLS_MAPPING = {
f'https://www.football-data.co.uk/mmz4281/{season}/{league_id}.csv': (
league,
'-'.join([season[0:2], season[2:]]),
)
for season in SEASONS
for league_id, league in LEAGUES_MAPPING.items()
}
FIXTURES_URL = 'https://www.football-data.co.uk/fixtures.csv'
Our workflow will include the following tasks:
The above tasks represent discrete units of work, and they will receive the task
decorator. We will also use an asynchronous httpx client to concurrently download the data since we have multiple files.
The function create_db
implements the first task:
import sqlite3
from prefect import task
from prefect.logging import get_run_logger
from pathlib import Path
from tempfile import mkdtemp
TEMP_DIR = Path(mkdtemp())
@task(name='Create database', description='Create the database to store the data')
def create_db():
logger = get_run_logger()
db_path = TEMP_DIR / 'soccer_data.db'
try:
con = sqlite3.connect(f'file:{db_path}?mode=rw', uri=True)
logger.info('Database exists.')
except sqlite3.OperationalError:
con = sqlite3.connect(db_path)
logger.info('Database created.')
finally:
con.close()
The function update_historical_data
implements the second task:
import httpx
import asyncio
import pandas as pd
from io import StringIO
async def request_csv_data(client: httpx.Client, url: str, **kwargs):
return await client.get(url=url)
async def download_csvs_data(urls_mapping: dict[str, tuple[str, str]]):
async with httpx.AsyncClient(limits=httpx.Limits(max_connections=30)) as client:
requests = [
request_csv_data(client, url, league=league, season=season)
for url, (league, season) in urls_mapping.items()
]
responses = await asyncio.gather(*requests)
csvs_data = [
StringIO(str(response.content, encoding='windows-1254'))
for response in responses
]
return csvs_data
@task(
name='Update historical data',
description='Fetch latest data to update historical data',
)
async def update_historical_data(urls_mapping):
logger = get_run_logger()
db_path = TEMP_DIR / 'soccer_data.db'
with sqlite3.connect(db_path) as con:
try:
data = pd.read_sql('SELECT * FROM historical', con)
logger.info(f'Table with historical data exists. Shape: {data.shape}')
except pd.errors.DatabaseError:
logger.info('Table with historical data does not exist.')
csvs_data = await download_csvs_data(urls_mapping)
data = pd.concat(
[
pd.read_csv(csv_data, encoding='windows-1254')
for csv_data in csvs_data
],
ignore_index=True,
)
data.to_sql('historical', con=con, index=False)
logger.info(f'Table with historical data was created. Shape: {data.shape}')
return None
urls_mapping = {
url: (league, season)
for url, (league, season) in urls_mapping.items()
if season == '23-24'
}
latest_csvs_data = await download_csvs_data(urls_mapping)
latest_data = pd.concat(
[
pd.read_csv(csv_data, encoding='windows-1254')
for csv_data in latest_csvs_data
],
ignore_index=True,
)
data = pd.concat([data, latest_data], ignore_index=True).drop_duplicates(
subset=['Div', 'Date', 'HomeTeam', 'AwayTeam', 'Time'], ignore_index=True
)
data.to_sql('historical', con=con, index=False, if_exists='replace')
logger.info(f'Table with historical data was updated. Shape: {data.shape}')
The function update_fixtures_data
implements the third task:
@task(name='Update fixtures data', description='Fetch latest fixtures data')
async def update_fixtures_data():
logger = get_run_logger()
db_path = TEMP_DIR / 'soccer_data.db'
data = pd.read_csv(FIXTURES_URL)
with sqlite3.connect(db_path) as con:
data.to_sql('fixtures', con=con, index=False, if_exists='replace')
logger.info(f'Fixtures data were updated. Shape: {data.shape}')
The full data workflow will receive the flow
decorator.
from prefect import flow
from prefect.task_runners import ConcurrentTaskRunner
@flow(
name='Download asynchronously the data and update the database',
validate_parameters=True,
task_runner=ConcurrentTaskRunner(),
log_prints=True,
)
async def update_db(urls_mapping: dict[str, tuple[str, str]]):
create_db()
await update_historical_data(urls_mapping)
await update_fixtures_data()
We can run the above flow:
11:51:57.184 | INFO | prefect.engine - Created flow run 'peridot-pronghorn' for flow 'Download asynchronously the data and update the database'
11:51:57.211 | INFO | Flow run 'peridot-pronghorn' - Created task run 'Create database-0' for task 'Create database'
11:51:57.212 | INFO | Flow run 'peridot-pronghorn' - Executing 'Create database-0' immediately...
11:51:57.230 | INFO | Task run 'Create database-0' - Database created.
11:51:57.237 | INFO | Task run 'Create database-0' - Finished in state Completed()
11:51:57.246 | INFO | Flow run 'peridot-pronghorn' - Created task run 'Update historical data-0' for task 'Update historical data'
11:51:57.246 | INFO | Flow run 'peridot-pronghorn' - Executing 'Update historical data-0' immediately...
11:51:57.262 | INFO | Task run 'Update historical data-0' - Table with historical data does not exist.
11:51:59.736 | INFO | Task run 'Update historical data-0' - Table with historical data was created. Shape: (13862, 124)
11:51:59.746 | INFO | Task run 'Update historical data-0' - Finished in state Completed()
11:51:59.755 | INFO | Flow run 'peridot-pronghorn' - Created task run 'Update fixtures data-0' for task 'Update fixtures data'
11:51:59.755 | INFO | Flow run 'peridot-pronghorn' - Executing 'Update fixtures data-0' immediately...
11:52:00.105 | INFO | Task run 'Update fixtures data-0' - Fixtures data were updated. Shape: (191, 101)
11:52:00.117 | INFO | Task run 'Update fixtures data-0' - Finished in state Completed()
11:52:00.130 | INFO | Flow run 'peridot-pronghorn' - Finished in state Completed('All states completed.')
[Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`')),
Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`')),
Completed(message=None, type=COMPLETED, result=UnpersistedResult(type='unpersisted', artifact_type='result', artifact_description='Unpersisted result of type `NoneType`'))]
Let’s read the data from the database:
The historical data:
Div | Date | HomeTeam | AwayTeam | FTHG | FTAG | FTR | HTHG | HTAG | HTR | ... | AvgC<2.5 | AHCh | B365CAHH | B365CAHA | PCAHH | PCAHA | MaxCAHH | MaxCAHA | AvgCAHH | AvgCAHA | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | E0 | 10/08/2018 | Man United | Leicester | 2 | 1 | H | 1 | 0 | H | ... | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
1 | E0 | 11/08/2018 | Bournemouth | Cardiff | 2 | 0 | H | 1 | 0 | H | ... | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
2 | E0 | 11/08/2018 | Fulham | Crystal Palace | 0 | 2 | A | 0 | 1 | A | ... | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
3 | E0 | 11/08/2018 | Huddersfield | Chelsea | 0 | 3 | A | 0 | 2 | A | ... | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
4 | E0 | 11/08/2018 | Newcastle | Tottenham | 1 | 2 | A | 1 | 2 | A | ... | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
13857 | N1 | 19/05/2024 | PSV Eindhoven | Waalwijk | 3 | 1 | H | 1 | 1 | D | ... | 5.69 | -2.50 | 1.73 | 2.08 | 1.77 | 2.09 | 1.98 | 2.17 | 1.85 | 1.99 |
13858 | N1 | 19/05/2024 | Sparta Rotterdam | Heerenveen | 2 | 1 | H | 0 | 0 | D | ... | 2.87 | -0.75 | 1.85 | 2.05 | 1.86 | 2.03 | 1.90 | 2.12 | 1.83 | 2.00 |
13859 | N1 | 19/05/2024 | Vitesse | Ajax | 2 | 2 | D | 1 | 1 | D | ... | 3.43 | 1.00 | 1.84 | 2.06 | 1.84 | 2.06 | 1.88 | 2.11 | 1.82 | 2.02 |
13860 | N1 | 19/05/2024 | Volendam | Go Ahead Eagles | 1 | 2 | A | 1 | 1 | D | ... | 3.52 | 1.25 | 1.78 | 2.03 | 1.83 | 2.07 | 1.85 | 2.12 | 1.81 | 2.02 |
13861 | N1 | 19/05/2024 | Zwolle | Twente | 1 | 2 | A | 0 | 0 | D | ... | 3.30 | 1.50 | 1.97 | 1.93 | 1.97 | 1.92 | 2.14 | 1.93 | 2.01 | 1.83 |
13862 rows × 124 columns
The fixtures data:
Div | Date | Time | HomeTeam | AwayTeam | B365H | B365D | B365A | BWH | BWD | ... | B365CAHH | B365CAHA | PCAHH | PCAHA | MaxCAHH | MaxCAHA | AvgCAHH | AvgCAHA | BFECAHH | BFECAHA | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | B1 | 24/08/2024 | 17:15 | Westerlo | Oud-Heverlee Leuven | 1.95 | 3.5 | 3.40 | 2.00 | 3.70 | ... | None | None | None | None | None | None | None | None | None | None |
1 | B1 | 24/08/2024 | 19:45 | Antwerp | Mechelen | 1.53 | 4.1 | 5.00 | 1.57 | 4.33 | ... | None | None | None | None | None | None | None | None | None | None |
2 | B1 | 25/08/2024 | 12:30 | Dender | Club Brugge | 4.75 | 4.2 | 1.55 | 5.00 | 4.20 | ... | None | None | None | None | None | None | None | None | None | None |
3 | B1 | 25/08/2024 | 15:00 | St Truiden | St. Gilloise | 5.50 | 4.1 | 1.48 | 6.00 | 4.40 | ... | None | None | None | None | None | None | None | None | None | None |
4 | B1 | 25/08/2024 | 17:30 | Standard | Beerschot VA | 1.73 | 3.6 | 4.10 | 1.77 | 3.80 | ... | None | None | None | None | None | None | None | None | None | None |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
186 | T1 | 24/08/2024 | 17:15 | Sivasspor | Eyupspor | 2.30 | 3.3 | 3.10 | 2.30 | 3.30 | ... | None | None | None | None | None | None | None | None | None | None |
187 | T1 | 24/08/2024 | 19:45 | Bodrumspor | Konyaspor | 2.90 | 3.1 | 2.50 | 2.85 | 3.25 | ... | None | None | None | None | None | None | None | None | None | None |
188 | T1 | 25/08/2024 | 17:15 | Kasimpasa | Ad. Demirspor | 1.57 | 4.2 | 5.25 | 1.58 | 4.33 | ... | None | None | None | None | None | None | None | None | None | None |
189 | T1 | 25/08/2024 | 19:45 | Antalyaspor | Hatayspor | 2.20 | 3.3 | 3.30 | 2.15 | 3.40 | ... | None | None | None | None | None | None | None | None | None | None |
190 | T1 | 25/08/2024 | 19:45 | Rizespor | Fenerbahce | 4.75 | 3.8 | 1.70 | 4.60 | 4.00 | ... | None | None | None | None | None | None | None | None | None | None |
191 rows × 101 columns
You can spin up a local Prefect server UI with the prefect server start
command in the shell and explore the characteristics of the above Prefect flow we ran. The data are stored in the Prefect database which by default is a local SQLite database.
Prefect also supports deployments i.e. packaging workflow code, settings, and infrastructure configuration so that the data workflow can be managed via the Prefect API and run remotely by a Prefect agent.
You can read more at the official Prefect documentation.