Source code for dagster_gcp.gcs.resources

from dagster import Field, Noneable, StringSource, resource
from dagster._utils.merger import merge_dicts
from google.cloud import storage

from .file_manager import GCSFileManager

GCS_CLIENT_CONFIG = {
    "project": Field(Noneable(StringSource), is_required=False, description="Project name")
}


[docs]@resource( GCS_CLIENT_CONFIG, description="This resource provides a GCS client", ) def gcs_resource(init_context): return _gcs_client_from_config(init_context.resource_config)
[docs]@resource( merge_dicts( GCS_CLIENT_CONFIG, { "gcs_bucket": Field(StringSource), "gcs_prefix": Field(StringSource, is_required=False, default_value="dagster"), }, ) ) def gcs_file_manager(context): """FileManager that provides abstract access to GCS. Implements the :py:class:`~dagster._core.storage.file_manager.FileManager` API. """ gcs_client = _gcs_client_from_config(context.resource_config) return GCSFileManager( client=gcs_client, gcs_bucket=context.resource_config["gcs_bucket"], gcs_base_key=context.resource_config["gcs_prefix"], )
def _gcs_client_from_config(config): """ Args: config: A configuration containing the fields in GCS_CLIENT_CONFIG. Returns: A GCS client. """ project = config.get("project", None) return storage.client.Client(project=project)