Op Retries#

When an exception occurs during op execution, Dagster provides tools to retry that op within the same job run.

Relevant APIs#

NameDescription
RetryRequestedAn exception that can be thrown from the body of an op to request a retry
RetryPolicyA declarative policy to attach which will have retries requested on exception
BackoffModification to delay between retries based on attempt number
JitterRandom modification to delay beween retries

Overview#

In Dagster, code is executed within an op. Sometimes this code can fail for transient reasons, and the desired behavior is to retry and run the function again.

Dagster provides both declarative RetryPolicys as well as manual RetryRequested exceptions to enable this behavior.

Using Op Retries#

Here we start off with an op that is causing us to have to retry the whole job anytime it fails.

@op
def problematic():
    fails_sometimes()

RetryPolicy#

To get this solid to retry when an exception occurs, we can attach a RetryRequested.

@op(retry_policy=RetryPolicy())
def better():
    fails_sometimes()

This improves the situation, but we may need additional configuration to control how many times to retry and/or how long to wait between each retry.

@op(
    retry_policy=RetryPolicy(
        max_retries=3,
        delay=0.2,  # 200ms
        backoff=Backoff.EXPONENTIAL,
        jitter=Jitter.PLUS_MINUS,
    )
)
def even_better():
    fails_sometimes()

In addition to being able to set the policy directly on the op definition, it can also be set on specific invocations of an op, or a @job to apply to all ops contained within.

default_policy = RetryPolicy(max_retries=1)
flakey_op_policy = RetryPolicy(max_retries=10)


@job(op_retry_policy=default_policy)
def default_and_override_job():
    problematic.with_retry_policy(flakey_op_policy)()

RetryRequested#

In certain more nuanced situations, we may need to evaluate code to determine if we want to retry or not. For this we can use a manual RetryRequested exception.

@op
def manual():
    try:
        fails_sometimes()
    except Exception as e:
        if should_retry(e):
            raise RetryRequested(max_retries=1, seconds_to_wait=1) from e
        else:
            raise

Using raise from will ensure the original exceptions information is captured by Dagster.