Source code for dagster_gcp.dataproc.ops

from dagster import Bool, Field, Int, op
from dagster._seven import json

from .configs import define_dataproc_submit_job_config
from .resources import TWENTY_MINUTES

DATAPROC_CONFIG_SCHEMA = {
    "job_timeout_in_seconds": Field(
        Int,
        description="""Optional. Maximum time in seconds to wait for the job being
                    completed. Default is set to 1200 seconds (20 minutes).
                    """,
        is_required=False,
        default_value=TWENTY_MINUTES,
    ),
    "job_config": define_dataproc_submit_job_config(),
    "job_scoped_cluster": Field(
        Bool,
        description="whether to create a cluster or use an existing cluster",
        is_required=False,
        default_value=True,
    ),
}


def _dataproc_compute(context):
    job_config = context.solid_config["job_config"]
    job_timeout = context.solid_config["job_timeout_in_seconds"]

    context.log.info(
        "submitting job with config: %s and timeout of: %d seconds"
        % (str(json.dumps(job_config)), job_timeout)
    )

    if context.solid_config["job_scoped_cluster"]:
        # Cluster context manager, creates and then deletes cluster
        with context.resources.dataproc.cluster_context_manager() as cluster:
            # Submit the job specified by this solid to the cluster defined by the associated resource
            result = cluster.submit_job(job_config)

            job_id = result["reference"]["jobId"]
            context.log.info("Submitted job ID {}".format(job_id))
            cluster.wait_for_job(job_id, wait_timeout=job_timeout)

    else:
        # Submit to an existing cluster
        # Submit the job specified by this solid to the cluster defined by the associated resource
        result = context.resources.dataproc.submit_job(job_config)

        job_id = result["reference"]["jobId"]
        context.log.info("Submitted job ID {}".format(job_id))
        context.resources.dataproc.wait_for_job(job_id, wait_timeout=job_timeout)


@op(required_resource_keys={"dataproc"}, config_schema=DATAPROC_CONFIG_SCHEMA)
def dataproc_solid(context):
    return _dataproc_compute(context)


[docs]@op(required_resource_keys={"dataproc"}, config_schema=DATAPROC_CONFIG_SCHEMA) def dataproc_op(context): return _dataproc_compute(context)