Our jobs wouldn't be very interesting if they were limited to single ops. Jobs connect ops into arbitrary DAGs of computation.
Why split up code into ops instead of splitting it up into regular Python functions? There are a few reasons:
Dagster jobs model a dataflow graph. In data pipelines, the reason that a later step comes after an earlier step is almost always that it uses data produced by the earlier step. Dagster models these dataflow dependencies with inputs and outputs.
We'll expand the job we worked with in the first section of the tutorial into two ops that:
This will allow us to re-run the code that finds the sugariest cereal without re-running the code that downloads the cereal data. If we spot a bug in our sugariness code, or if we decide we want to compute some other statistics about the cereal data, we won't need to re-download the data.
import csv
import requests
from dagster import get_dagster_logger, job, op
@op
def download_cereals():
response = requests.get("https://docs.dagster.io/assets/cereal.csv")
lines = response.text.split("\n")
return [row for row in csv.DictReader(lines)]
@op
def find_sugariest(cereals):
sorted_by_sugar = sorted(cereals, key=lambda cereal: cereal["sugars"])
get_dagster_logger().info(f'{sorted_by_sugar[-1]["name"]} is the sugariest cereal')
@job
def serial():
find_sugariest(download_cereals())
You'll see that we've modified our existing download_cereals
op to return an output, in this case the data frame representing the cereals dataset.
We've defined our new op, find_sugariest
, to take a user-defined input, cereals
.
We can use inputs and outputs to connect ops to each other. Here we tell Dagster that:
download_cereals
doesn't depend on the output of any other op.find_sugariest
depends on the output of download_cereals
.Let's visualize this job in Dagit:
dagit -f serial_job.py
Navigate to http://127.0.0.1:3000:
Ops don't need to be wired together serially. The output of one op can be consumed by any number of other ops, and the outputs of several different ops can be consumed by a single op.
import csv
import requests
from dagster import get_dagster_logger, job, op
@op
def download_cereals():
response = requests.get("https://docs.dagster.io/assets/cereal.csv")
lines = response.text.split("\n")
return [row for row in csv.DictReader(lines)]
@op
def find_highest_calorie_cereal(cereals):
sorted_cereals = list(sorted(cereals, key=lambda cereal: cereal["calories"]))
return sorted_cereals[-1]["name"]
@op
def find_highest_protein_cereal(cereals):
sorted_cereals = list(sorted(cereals, key=lambda cereal: cereal["protein"]))
return sorted_cereals[-1]["name"]
@op
def display_results(most_calories, most_protein):
logger = get_dagster_logger()
logger.info(f"Most caloric cereal: {most_calories}")
logger.info(f"Most protein-rich cereal: {most_protein}")
@job
def diamond():
cereals = download_cereals()
display_results(
most_calories=find_highest_calorie_cereal(cereals),
most_protein=find_highest_protein_cereal(cereals),
)
First, we introduce the intermediate variable cereals
into our job definition to represent the output of the download_cereals
op. Then we make both find_highest_calorie_cereal
and find_highest_protein_cereal
consume this output. Their outputs are in turn both consumed by display_results
.
Let's visualize this job in Dagit:
dagit -f complex_job.py
When you execute this example from Dagit, you'll see that download_cereals
executes first, followed by find_highest_calorie_cereal
and find_highest_protein_cereal
executing in parallel, since they don't depend on each other's outputs. Finally, display_results
executes last, only after find_highest_calorie_cereal
and find_highest_protein_cereal
have both executed (because display_results
depends on both of their outputs).