# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending"""This module is used to define Iceberg endpoints and to discover and deploy Deephaven schemas for Iceberg tables."""fromtypingimportOptional,Dict,Anyimportjpyfromdeephaven.tableimportTableDefinitionfromdeephaven.jcompatimportunwrap,j_hashmap_JIcebergTools=jpy.get_type("io.deephaven.enterprise.iceberg.IcebergTools")_JDiscoveryResult=jpy.get_type("io.deephaven.enterprise.iceberg.discovery.DiscoveryResult")_JDiscoveryConfig=jpy.get_type("io.deephaven.enterprise.iceberg.discovery.DiscoveryConfig")_JIcebergEndpoint=jpy.get_type("io.deephaven.enterprise.iceberg.IcebergEndpoint")_JSimpleUserContext=jpy.get_type("io.deephaven.enterprise.auth.SimpleUserContext")
[docs]classIcebergEndpoint:""" An Iceberg Endpoint defines both the location of the Iceberg Catalog and data warehouse. It also provides information required to create the catalog and retrieve data from the store as a Deephaven table. Optionally users may provide a set of properties, secrets, and hadoop settings to be passed down to the Iceberg APIs. Secrets passed to IcebergEndpoint instances are simply named references, never actual secret values. For example, you may provide S3 credentials as a secret defined as IcebergEndpoint("rest", "http://catalog/", "s3://warehouse", data_instructions, \ secrets = ["s3.access.key" : "s3.dev_access_key" ]) Deephaven will locate the secret within the set of SecretsProviders to discover the actual value. """def__init__(self,j_endpoint:_JIcebergEndpoint):""" Create a new IcebergEndpoint. """self._j_endpoint=j_endpoint
[docs]defdeploy(self,overwrite_existing:bool=False)->None:""" Deploy this endpoint to Deephaven. This will fail unless the endpoint was created with a name. """self._j_endpoint.deploy(overwrite_existing)
defto_json(self):returnself._j_endpoint.toJson()
[docs]classDiscoveryResult:""" Contains the result of an Iceberg discovery (:func:`~deephaven_enterprise.discover`) """def__init__(self,j_result:_JDiscoveryResult):self.j_object=j_result
[docs]defdeploy_named(self)->None:""" Deploy the discovered Iceberg schema to Deephaven linking the endpoint by its name. """self.j_object.deployWithEndpointReference()
[docs]defdeploy_embedded(self)->None:""" Deploy the discovered Iceberg schema to Deephaven, embedding the endpoint within the schema. """self.j_object.deployWithEmbeddedEndpoint()
[docs]defdiscover(table_id:str,endpoint:IcebergEndpoint,snapshot_id:Optional[str]=None,namespace:Optional[str]=None,table_name:Optional[str]=None,reference_definition:Optional[TableDefinition]=None)->DiscoveryResult:""" Discover an Iceberg table. :param table_id: the Iceberg table identifier :param endpoint: the endpoint to use for discovery :param snapshot_id: (optional) the Iceberg snapshot ID to use for discovery :param namespace: (optional) A user specified namespace. If not set, the namespace is derived from the table_id :param table_name: (optional) A user specified table name. If not set, the table name is derived from the table_id :param reference_definition: (optional) A user specified reference TableDefinition. The discovery process guarantees that the result is compatible with this definition. :return: a DiscoveryResult that can be used to deploy Deephaven schemas. """config=_JDiscoveryConfig.builder() \
.tableIdentifier(table_id) \
.endpoint(endpoint._j_endpoint)ifsnapshot_idisnotNone:config.snapshotId(snapshot_id)ifnamespaceisnotNone:config.namespace(namespace)iftable_nameisnotNone:config.tableName(table_name)ifreference_definitionisnotNone:config.referenceDefinition(reference_definition)returnDiscoveryResult(_JIcebergTools.discover(config.build()))
[docs]defmake_endpoint(catalog_type:str,catalog_uri:str,warehouse_uri:str,data_instructions:Any,endpoint_name:Optional[str]=None,properties:Optional[Dict[str,str]]=None,secrets:Optional[Dict[str,str]]=None,hadoop_opt:Optional[Dict[str,str]]=None)->IcebergEndpoint:""" Create a new IcebergEndpoint. If the endpoint_name is set, the resulting endpoint can be deployed to Deephaven for reuse with the deploy() method. Any provided properties and secrets will be merged into a single collection of properties to be passed on to the Iceberg APIs :param catalog_type: the type of catalog. Possible values include "rest", "glue", "hive", "nessie", "hadoop", and "jdbc" :param catalog_uri: the URI of the catalog :param warehouse_uri: the URI of the data :param data_instructions: the Data Instructions for Deephaven to use when fetching table data. :param endpoint_name: (optional) a name for this endpoint. This must be set in order to deploy this endpoint :param properties: (optional) a map of properties to be passed to the Iceberg API :param secrets: a map of secrets to be resolved and passed to the Iceberg API :param hadoop_opt: a map of hadoop specific options to be passed to the Iceberg API """builder=_JIcebergTools.newEndpoint() \
.catalogType(catalog_type) \
.catalogUri(catalog_uri) \
.warehouseUri(warehouse_uri) \
.dataInstructions(unwrap(data_instructions))ifpropertiesisnotNone:builder.putProperties(j_hashmap(properties))ifsecretsisnotNone:builder.putSecrets(j_hashmap(secrets))ifhadoop_optisnotNone:builder.putHadoopOptions(j_hashmap(hadoop_opt))returnIcebergEndpoint(builder.build()ifendpoint_nameisNoneelsebuilder.build(endpoint_name))
[docs]defget_named_endpoint(endpoint_name:str)->IcebergEndpoint:""" Get an IcebergEndpoint from Deephaven by name. :param endpoint_name: the name of the endpoint :return: the IcebergEndpoint """returnIcebergEndpoint(_JIcebergTools.getEndpointByName(endpoint_name))