Jobs are the main unit of execution and monitoring in Dagster. The core of a job is a graph of ops connected via data dependencies.
Ops are linked together by defining the dependencies between their inputs and outputs. An important difference between Dagster and other workflow systems is that, in Dagster, op dependencies are expressed as data dependencies, not just execution dependencies.
This difference enables Dagster to support richer modeling of dependencies. Instead of merely ensuring that the order of execution is correct, dependencies in Dagster provide a variety of compile and run-time checks.
Using jobs, you can:
Name | Description |
---|---|
@job | The decorator used to create a job. |
JobDefinition | A job definition. Jobs are the main unit of execution and monitoring in Dagster. Typically constructed using the @job decorator. |
Jobs can be created in several ways:
Asset jobs can materialize a fixed set of assets each time they run. Additionally, multiple jobs in the same repository can target overlapping sets of assets:
from dagster import asset, define_asset_job, repository
@asset
def asset1():
return [1, 2, 3]
@asset
def asset2(asset1):
return asset1 + [4]
all_assets_job = define_asset_job(name="all_assets_job")
asset1_job = define_asset_job(name="asset1_job", selection="asset1")
@repository
def repo():
return [asset1, asset2, all_assets_job, asset1_job]
Unlike jobs created using the job
decorator where you explicitly define the dependencies when you create the job, the topology of an asset-based job is based on the assets and their dependencies.
The simplest way to create an op-based job is to use the job
decorator.
Within the decorated function body, you can use function calls to indicate the dependency structure between the ops/graphs. This allows you to explicitly define dependencies between ops when you define the job.
In this example, the add_one
op depends on the return_five
op's output. Because this data dependency exists, the add_one
op executes after return_five
runs successfully and emits the required output.
from dagster import job, op
@op
def return_five():
return 5
@op
def add_one(arg):
return arg + 1
@job
def do_stuff():
add_one(return_five())
When defining a job, you can provide resources, configuration, hooks, tags, and an executor (follow the links for explanation of how to use each of these).
Like regular jobs, jobs that target assets can be placed on schedules and sensors.
Creating jobs from a graph can be useful when you want to define inter-op dependencies before binding them to resources, configuration, executors, and other environment-specific features. This approach to job creation allows you to customize graphs for each environment by plugging in configuration and services specific to that environment.
You can model this by building multiple jobs that use the same underlying graph of ops. The graph represents the logical core of data transformation, and the configuration and resources on each job customize the behavior of that job for its environment.
To do this, you first define a graph with the @graph
decorator.
from dagster import graph, op
@op(required_resource_keys={"server"})
def interact_with_server(context):
context.resources.server.ping_server()
@graph
def do_stuff():
interact_with_server()
Then you build jobs from it using the GraphDefinition.to_job
method:
from dagster import ResourceDefinition
prod_server = ResourceDefinition.mock_resource()
local_server = ResourceDefinition.mock_resource()
prod_job = do_stuff.to_job(resource_defs={"server": prod_server}, name="do_stuff_prod")
local_job = do_stuff.to_job(
resource_defs={"server": local_server}, name="do_stuff_local"
)
to_job
accepts the same arguments as the @job
decorator: you can provide resources, configuration, hooks, tags, and an executor.
Ops, software-defined assets, and resources often accept configuration that determines how they behave. By default, you supply configuration for these ops and resources at the time you launch the job.
When constructing a job, you can customize how that configuration will be satisfied, by passing a value to the config
parameter of the GraphDefinition.to_job
method or the @job
decorator. The options are discussed below:
You can supply a config dictionary. The supplied dictionary will be used to configure the job whenever the job is launched. It will show up in the Dagit Launchpad and can be overridden.
from dagster import job, op
@op(config_schema={"config_param": str})
def do_something(context):
context.log.info("config_param: " + context.op_config["config_param"])
default_config = {"ops": {"do_something": {"config": {"config_param": "stuff"}}}}
@job(config=default_config)
def do_it_all_with_default_config():
do_something()
if __name__ == "__main__":
# Will log "config_param: stuff"
do_it_all_with_default_config.execute_in_process()
For op-based jobs, you can supply a PartitionedConfig
to create a partitioned job. This defines a discrete set of partitions along with a function for generating config for a partition. Job runs can be configured by selecting a partition.
Refer to the Partitions documentation for more info and examples.
You can supply a ConfigMapping
. This allows you to expose a narrower config interface to your job. Instead of needing to configure every op and resource individually when launching the job, you can supply a smaller number of values to the outer config, and the ConfigMapping
can translate it into config for all the job's ops and resources.
from dagster import config_mapping, job, op
@op(config_schema={"config_param": str})
def do_something(context):
context.log.info("config_param: " + context.op_config["config_param"])
@config_mapping(config_schema={"simplified_param": str})
def simplified_config(val):
return {
"ops": {"do_something": {"config": {"config_param": val["simplified_param"]}}}
}
@job(config=simplified_config)
def do_it_all_with_simplified_config():
do_something()
if __name__ == "__main__":
# Will log "config_param: stuff"
do_it_all_with_simplified_config.execute_in_process(
run_config={"simplified_param": "stuff"}
)
You make jobs available to Dagit, GraphQLs, and the command line by including them inside repositories. If you include schedules or sensors in a repository, the repository will automatically include jobs that those schedules or sensors target.
from dagster import job, repository
@job
def do_it_all():
...
@repository
def my_repo():
return [do_it_all]
Dagster has built-in support for testing your data applications, including separating business logic from environments and setting explicit expectations on uncontrollable inputs. Refer to the Testing guide for more info and examples.
You can run a job in a variety of ways:
Refer to the Job execution guide for more info and examples.
For more examples of jobs, check out the following in our Hacker News example:
Our New York Times example covers: