Partitioned Config

class dagster.PartitionedConfig(partitions_def, run_config_for_partition_fn, decorated_fn=None, tags_for_partition_fn=None)[source]

Defines a way of configuring a job where the job can be run on one of a discrete set of partitions, and each partition corresponds to run configuration for the job.

Setting PartitionedConfig as the config for a job allows you to launch backfills for that job and view the run history across partitions.

get_run_config_for_partition_key(partition_key)[source]

Generates the run config corresponding to a partition key.

Parameters

partition_key (str) – the key for a partition that should be used to generate a run config.

dagster.static_partitioned_config(partition_keys, tags_for_partition_fn=None)[source]

Creates a static partitioned config for a job.

The provided partition_keys returns a static list of strings identifying the set of partitions, given an optional datetime argument (representing the current time). The list of partitions is static, so while the run config returned by the decorated function may change over time, the list of valid partition keys does not.

This has performance advantages over dynamic_partitioned_config in terms of loading different partition views in Dagit.

The decorated function takes in a partition key and returns a valid run config for a particular target job.

Parameters

partition_keys (List[str]) – A list of valid partition keys, which serve as the range of values that can be provided to the decorated run config function.

Returns

PartitionedConfig

dagster.dynamic_partitioned_config(partition_fn, tags_for_partition_fn=None)[source]

Creates a dynamic partitioned config for a job.

The provided partition_fn returns a list of strings identifying the set of partitions, given an optional datetime argument (representing the current time). The list of partitions returned may change over time.

The decorated function takes in a partition key and returns a valid run config for a particular target job.

Parameters

partition_fn (Callable[[datetime.datetime], Sequence[str]]) – A function that generates a list of valid partition keys, which serve as the range of values that can be provided to the decorated run config function.

Returns

PartitionedConfig

dagster.hourly_partitioned_config(start_date, minute_offset=0, timezone=None, fmt=None, end_offset=0, tags_for_partition_fn=None)[source]

Defines run config over a set of hourly partitions.

The decorated function should accept a start datetime and end datetime, which represent the date partition the config should delineate.

The decorated function should return a run config dictionary.

The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at the start_date at midnight. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If minute_offset is provided, the start and end times of each partition will be minute_offset past the hour.

Parameters
  • start_date (Union[datetime.datetime, str]) – The first date in the set of partitions. Can provide in either a datetime or string format.

  • minute_offset (int) – Number of minutes past the hour to “split” the partition. Defaults to 0.

  • fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.

  • timezone (Optional[str]) – The timezone in which each date should exist. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.

  • end_offset (int) – Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on.

@hourly_partitioned_config(start_date=datetime(2022, 03, 12))
# creates partitions (2022-03-12-00:00, 2022-03-12-01:00), (2022-03-12-01:00, 2022-03-12-02:00), ...

@hourly_partitioned_config(start_date=datetime(2022, 03, 12), minute_offset=15)
# creates partitions (2022-03-12-00:15, 2022-03-12-01:15), (2022-03-12-01:15, 2022-03-12-02:15), ...
dagster.daily_partitioned_config(start_date, minute_offset=0, hour_offset=0, timezone=None, fmt=None, end_offset=0, tags_for_partition_fn=None)[source]

Defines run config over a set of daily partitions.

The decorated function should accept a start datetime and end datetime, which represent the bounds of the date partition the config should delineate.

The decorated function should return a run config dictionary.

The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at the start_date at midnight. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day.

Parameters
  • start_date (Union[datetime.datetime, str]) – The first date in the set of partitions. Can provide in either a datetime or string format.

  • minute_offset (int) – Number of minutes past the hour to “split” the partition. Defaults to 0.

  • hour_offset (int) – Number of hours past 00:00 to “split” the partition. Defaults to 0.

  • timezone (Optional[str]) – The timezone in which each date should exist. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.

  • fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.

  • end_offset (int) – Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on.

@daily_partitioned_config(start_date="2022-03-12")
# creates partitions (2022-03-12-00:00, 2022-03-13-00:00), (2022-03-13-00:00, 2022-03-14-00:00), ...

@daily_partitioned_config(start_date="2022-03-12", minute_offset=15, hour_offset=16)
# creates partitions (2022-03-12-16:15, 2022-03-13-16:15), (2022-03-13-16:15, 2022-03-14-16:15), ...
dagster.weekly_partitioned_config(start_date, minute_offset=0, hour_offset=0, day_offset=0, timezone=None, fmt=None, end_offset=0, tags_for_partition_fn=None)[source]

Defines run config over a set of weekly partitions.

The decorated function should accept a start datetime and end datetime, which represent the date partition the config should delineate.

The decorated function should return a run config dictionary.

The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at the start_date. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If day_offset is provided, the start and end date of each partition will be day of the week corresponding to day_offset (0 indexed with Sunday as the start of the week). If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day.

Parameters
  • start_date (Union[datetime.datetime, str]) – The first date in the set of partitions will Sunday at midnight following start_date. Can provide in either a datetime or string format.

  • minute_offset (int) – Number of minutes past the hour to “split” the partition. Defaults to 0.

  • hour_offset (int) – Number of hours past 00:00 to “split” the partition. Defaults to 0.

  • day_offset (int) – Day of the week to “split” the partition. Defaults to 0 (Sunday).

  • timezone (Optional[str]) – The timezone in which each date should exist. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.

  • fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.

  • end_offset (int) – Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on.

@weekly_partitioned_config(start_date="2022-03-12")
# creates partitions (2022-03-13-00:00, 2022-03-20-00:00), (2022-03-20-00:00, 2022-03-27-00:00), ...

@weekly_partitioned_config(start_date="2022-03-12", minute_offset=15, hour_offset=3, day_offset=6)
# creates partitions (2022-03-12-03:15, 2022-03-19-03:15), (2022-03-19-03:15, 2022-03-26-03:15), ...
dagster.monthly_partitioned_config(start_date, minute_offset=0, hour_offset=0, day_offset=1, timezone=None, fmt=None, end_offset=0, tags_for_partition_fn=None)[source]

Defines run config over a set of monthly partitions.

The decorated function should accept a start datetime and end datetime, which represent the date partition the config should delineate.

The decorated function should return a run config dictionary.

The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at midnight on the soonest first of the month after start_date. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If day_offset is provided, the start and end date of each partition will be day_offset. If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day.

Parameters
  • start_date (Union[datetime.datetime, str]) – The first date in the set of partitions will be midnight the sonnest first of the month following start_date. Can provide in either a datetime or string format.

  • minute_offset (int) – Number of minutes past the hour to “split” the partition. Defaults to 0.

  • hour_offset (int) – Number of hours past 00:00 to “split” the partition. Defaults to 0.

  • day_offset (int) – Day of the month to “split” the partition. Defaults to 1.

  • timezone (Optional[str]) – The timezone in which each date should exist. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.

  • fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.

  • end_offset (int) – Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on.

@monthly_partitioned_config(start_date="2022-03-12")
# creates partitions (2022-04-01-00:00, 2022-05-01-00:00), (2022-05-01-00:00, 2022-06-01-00:00), ...

@monthly_partitioned_config(start_date="2022-03-12", minute_offset=15, hour_offset=3, day_offset=5)
# creates partitions (2022-04-05-03:15, 2022-05-05-03:15), (2022-05-05-03:15, 2022-06-05-03:15), ...

Partitions Definitions

class dagster.PartitionsDefinition(*args, **kwds)[source]
class dagster.HourlyPartitionsDefinition(start_date, minute_offset=0, timezone=None, fmt=None, end_offset=0)[source]
class dagster.DailyPartitionsDefinition(start_date, minute_offset=0, hour_offset=0, timezone=None, fmt=None, end_offset=0)[source]
class dagster.WeeklyPartitionsDefinition(start_date, minute_offset=0, hour_offset=0, day_offset=0, timezone=None, fmt=None, end_offset=0)[source]
class dagster.MonthlyPartitionsDefinition(start_date, minute_offset=0, hour_offset=0, day_offset=1, timezone=None, fmt=None, end_offset=0)[source]
class dagster.TimeWindowPartitionsDefinition(schedule_type, start, timezone, fmt, end_offset, minute_offset=0, hour_offset=0, day_offset=None)[source]
class dagster.StaticPartitionsDefinition(partition_keys)[source]

Partitioned Schedules

dagster.build_schedule_from_partitioned_job(job, description=None, name=None, minute_of_hour=None, hour_of_day=None, day_of_week=None, day_of_month=None, default_status=<DefaultScheduleStatus.STOPPED: 'STOPPED'>)[source]

Creates a schedule from a time window-partitioned job.

The schedule executes at the cadence specified by the partitioning of the given job.

Legacy Functions

The following functions are useful for working with partitions on legacy pipelines.

class dagster.Partition(value, name=None)[source]

A Partition represents a single slice of the entire set of a job’s possible work. It consists of a value, which is an object that represents that partition, and an optional name, which is used to label the partition in a human-readable way.

Parameters
  • value (Any) – The object for this partition

  • name (str) – Name for this partition

class dagster.PartitionSetDefinition(name, pipeline_name=None, partition_fn=None, solid_selection=None, mode=None, run_config_fn_for_partition=<function PartitionSetDefinition.<lambda>>, tags_fn_for_partition=<function PartitionSetDefinition.<lambda>>, partitions_def=None, job_name=None)[source]

Defines a partition set, representing the set of slices making up an axis of a pipeline

Parameters
  • name (str) – Name for this partition set

  • pipeline_name (str) – The name of the pipeline definition

  • partition_fn (Optional[Callable[void, List[Partition]]]) – User-provided function to define the set of valid partition objects.

  • solid_selection (Optional[List[str]]) – A list of solid subselection (including single solid names) to execute with this partition. e.g. ['*some_solid+', 'other_solid']

  • mode (Optional[str]) – The mode to apply when executing this partition. (default: ‘default’)

  • run_config_fn_for_partition (Callable[[Partition], Any]) – A function that takes a Partition and returns the run configuration that parameterizes the execution for this partition.

  • tags_fn_for_partition (Callable[[Partition], Optional[dict[str, str]]]) – A function that takes a Partition and returns a list of key value pairs that will be added to the generated run for this partition.

  • partitions_def (Optional[PartitionsDefinition]) – A set of parameters used to construct the set of valid partition objects.

create_schedule_definition(schedule_name, cron_schedule, partition_selector, should_execute=None, environment_vars=None, execution_timezone=None, description=None, decorated_fn=None, job=None, default_status=<DefaultScheduleStatus.STOPPED: 'STOPPED'>)[source]

Create a ScheduleDefinition from a PartitionSetDefinition.

Parameters
  • schedule_name (str) – The name of the schedule.

  • cron_schedule (str) – A valid cron string for the schedule

  • partition_selector (Callable[ScheduleEvaluationContext, PartitionSetDefinition], Union[Partition, List[Partition]]) – Function that determines the partition to use at a given execution time. Can return either a single Partition or a list of Partitions. For time-based partition sets, will likely be either identity_partition_selector or a selector returned by create_offset_partition_selector.

  • should_execute (Optional[function]) – Function that runs at schedule execution time that determines whether a schedule should execute. Defaults to a function that always returns True.

  • environment_vars (Optional[dict]) – The environment variables to set for the schedule.

  • execution_timezone (Optional[str]) – Timezone in which the schedule should run. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.

  • description (Optional[str]) – A human-readable description of the schedule.

  • default_status (DefaultScheduleStatus) – Whether the schedule starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.

Returns

The generated PartitionScheduleDefinition for the partition

selector

Return type

PartitionScheduleDefinition

get_partitions(current_time=None)[source]

Return the set of known partitions.

Parameters

current_time (Optional[datetime]) – The evaluation time for the partition function, which is passed through to the partition_fn (if it accepts a parameter). Defaults to the current time in UTC.

dagster.date_partition_range(start, end=None, delta_range='days', fmt=None, inclusive=False, timezone=None)[source]

Utility function that returns a partition generating function to be used in creating a PartitionSet definition.

Parameters
  • start (datetime) – Datetime capturing the start of the time range.

  • end (Optional(datetime)) – Datetime capturing the end of the partition. By default, the current time is used. The range is not inclusive of the end value.

  • delta_range (Optional(str)) – string representing the time duration of each partition. Must be a valid argument to pendulum.period.range (“days”, “hours”, “months”, etc.).

  • fmt (Optional(str)) – Format string to represent each partition by its start time

  • inclusive (Optional(bool)) – By default, the partition set only contains date interval partitions for which the end time of the interval is less than current time. In other words, the partition set contains date interval partitions that are completely in the past. If inclusive is set to True, then the partition set will include all date interval partitions for which the start time of the interval is less than the current time.

  • timezone (Optional(str)) – Timezone in which the partition values should be expressed.

Returns

Callable[[], List[Partition]]

dagster.identity_partition_selector(context, partition_set_def)[source]

Utility function for supplying a partition selector when creating a schedule from a partition set made of datetime objects that assumes the schedule always executes at the partition time.

It’s important that the cron string passed into create_schedule_definition match the partition set times. For example, a schedule created from a partition set with partitions for each day at midnight would create its partition selector as follows:

partition_set = PartitionSetDefinition(
    name='hello_world_partition_set',
    pipeline_name='hello_world_pipeline',
    partition_fn= date_partition_range(
        start=datetime.datetime(2021, 1, 1),
        delta_range="days",
        timezone="US/Central",
    )
    run_config_fn_for_partition=my_run_config_fn,
)

schedule_definition = partition_set.create_schedule_definition(
    "hello_world_daily_schedule",
    "0 0 * * *",
    partition_selector=identity_partition_selector,
    execution_timezone="US/Central",
)
dagster.create_offset_partition_selector(execution_time_to_partition_fn)[source]

Utility function for supplying a partition selector when creating a schedule from a partition set made of datetime objects that assumes a fixed time offset between the partition time and the time at which the schedule executes.

It’s important to keep the cron string that’s supplied to PartitionSetDefinition.create_schedule_definition in sync with the offset that’s supplied to this function. For example, a schedule created from a partition set with partitions for each day at midnight that fills in the partition for day N at day N+1 at 10:00AM would create the partition selector as follows:

partition_set = PartitionSetDefinition(
    name='hello_world_partition_set',
    pipeline_name='hello_world_pipeline',
    partition_fn= date_partition_range(
        start=datetime.datetime(2021, 1, 1),
        delta_range="days",
        timezone="US/Central",
    )
    run_config_fn_for_partition=my_run_config_fn,
)

schedule_definition = partition_set.create_schedule_definition(
    "daily_10am_schedule",
    "0 10 * * *",
    partition_selector=create_offset_partition_selector(lambda d: d.subtract(hours=10, days=1))
    execution_timezone="US/Central",
)
Parameters

execution_time_to_partition_fn (Callable[[datetime.datetime], datetime.datetime]) – A function that maps the execution time of the schedule to the partition time.