Source code for deephaven_enterprise.iceberg

#  Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending

"""
This module is used to define Iceberg endpoints and to discover and deploy Deephaven schemas for Iceberg tables.
"""

from typing import Optional, Dict, Any

import jpy
from deephaven.table import TableDefinition
from deephaven.jcompat import unwrap, j_hashmap

_JIcebergTools = jpy.get_type("io.deephaven.enterprise.iceberg.IcebergTools")
_JDiscoveryResult = jpy.get_type("io.deephaven.enterprise.iceberg.discovery.DiscoveryResult")
_JDiscoveryConfig = jpy.get_type("io.deephaven.enterprise.iceberg.discovery.DiscoveryConfig")
_JIcebergEndpoint = jpy.get_type("io.deephaven.enterprise.iceberg.IcebergEndpoint")
_JSimpleUserContext = jpy.get_type("io.deephaven.enterprise.auth.SimpleUserContext")

[docs] class IcebergEndpoint: """ An Iceberg Endpoint defines both the location of the Iceberg Catalog and data warehouse. It also provides information required to create the catalog and retrieve data from the store as a Deephaven table. Optionally users may provide a set of properties, secrets, and hadoop settings to be passed down to the Iceberg APIs. Secrets passed to IcebergEndpoint instances are simply named references, never actual secret values. For example, you may provide S3 credentials as a secret defined as IcebergEndpoint("rest", "http://catalog/", "s3://warehouse", data_instructions, \ secrets = ["s3.access.key" : "s3.dev_access_key" ]) Deephaven will locate the secret within the set of SecretsProviders to discover the actual value. """ def __init__(self, j_endpoint: _JIcebergEndpoint): """ Create a new IcebergEndpoint. """ self._j_endpoint = j_endpoint
[docs] def deploy(self, overwrite_existing: bool = False) -> None: """ Deploy this endpoint to Deephaven. This will fail unless the endpoint was created with a name. """ self._j_endpoint.deploy(overwrite_existing)
def to_json(self): return self._j_endpoint.toJson()
[docs] class DiscoveryResult: """ Contains the result of an Iceberg discovery (:func:`~deephaven_enterprise.discover`) """ def __init__(self, j_result: _JDiscoveryResult): self.j_object = j_result
[docs] def deploy_named(self) -> None: """ Deploy the discovered Iceberg schema to Deephaven linking the endpoint by its name. """ self.j_object.deployWithEndpointReference()
[docs] def deploy_embedded(self) -> None: """ Deploy the discovered Iceberg schema to Deephaven, embedding the endpoint within the schema. """ self.j_object.deployWithEmbeddedEndpoint()
[docs] def discover(table_id: str, endpoint: IcebergEndpoint, snapshot_id: Optional[str] = None, namespace: Optional[str] = None, table_name: Optional[str] = None, reference_definition: Optional[TableDefinition] = None) -> DiscoveryResult: """ Discover an Iceberg table. :param table_id: the Iceberg table identifier :param endpoint: the endpoint to use for discovery :param snapshot_id: (optional) the Iceberg snapshot ID to use for discovery :param namespace: (optional) A user specified namespace. If not set, the namespace is derived from the table_id :param table_name: (optional) A user specified table name. If not set, the table name is derived from the table_id :param reference_definition: (optional) A user specified reference TableDefinition. The discovery process guarantees that the result is compatible with this definition. :return: a DiscoveryResult that can be used to deploy Deephaven schemas. """ config = _JDiscoveryConfig.builder() \ .tableIdentifier(table_id) \ .endpoint(endpoint._j_endpoint) if snapshot_id is not None: config.snapshotId(snapshot_id) if namespace is not None : config.namespace(namespace) if table_name is not None: config.tableName(table_name) if reference_definition is not None: config.referenceDefinition(reference_definition) return DiscoveryResult(_JIcebergTools.discover(config.build()))
[docs] def make_endpoint(catalog_type: str, catalog_uri: str, warehouse_uri: str, data_instructions: Any, endpoint_name: Optional[str] = None, properties: Optional[Dict[str, str]] = None, secrets: Optional[Dict[str, str]] = None, hadoop_opt: Optional[Dict[str, str]] = None ) -> IcebergEndpoint: """ Create a new IcebergEndpoint. If the endpoint_name is set, the resulting endpoint can be deployed to Deephaven for reuse with the deploy() method. Any provided properties and secrets will be merged into a single collection of properties to be passed on to the Iceberg APIs :param catalog_type: the type of catalog. Possible values include "rest", "glue", "hive", "nessie", "hadoop", and "jdbc" :param catalog_uri: the URI of the catalog :param warehouse_uri: the URI of the data :param data_instructions: the Data Instructions for Deephaven to use when fetching table data. :param endpoint_name: (optional) a name for this endpoint. This must be set in order to deploy this endpoint :param properties: (optional) a map of properties to be passed to the Iceberg API :param secrets: a map of secrets to be resolved and passed to the Iceberg API :param hadoop_opt: a map of hadoop specific options to be passed to the Iceberg API """ builder = _JIcebergTools.newEndpoint() \ .catalogType(catalog_type) \ .catalogUri(catalog_uri) \ .warehouseUri(warehouse_uri) \ .dataInstructions(unwrap(data_instructions)) if properties is not None: builder.putProperties(j_hashmap(properties)) if secrets is not None: builder.putSecrets(j_hashmap(secrets)) if hadoop_opt is not None: builder.putHadoopOptions(j_hashmap(hadoop_opt)) return IcebergEndpoint(builder.build() if endpoint_name is None else builder.build(endpoint_name))
[docs] def get_named_endpoint(endpoint_name: str) -> IcebergEndpoint: """ Get an IcebergEndpoint from Deephaven by name. :param endpoint_name: the name of the endpoint :return: the IcebergEndpoint """ return IcebergEndpoint(_JIcebergTools.getEndpointByName(endpoint_name))