Source code for deephaven_enterprise.iceberg

#
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#
"""This module is used to define Iceberg endpoints and to discover and deploy Deephaven schemas for Iceberg tables."""

from typing import Any, Optional

import jpy
from deephaven import DHError
from deephaven._wrapper import JObjectWrapper
from deephaven.jcompat import j_hashmap, unwrap
from deephaven.table import TableDefinition

_JIcebergTools = jpy.get_type("io.deephaven.enterprise.iceberg.IcebergTools")
_JDiscoveryResult: Any = jpy.get_type(
    "io.deephaven.enterprise.iceberg.discovery.DiscoveryResult"
)
_JDiscoveryConfig = jpy.get_type(
    "io.deephaven.enterprise.iceberg.discovery.DiscoveryConfig"
)
_JIcebergEndpoint: Any = jpy.get_type("io.deephaven.enterprise.iceberg.IcebergEndpoint")
_JSimpleUserContext = jpy.get_type("io.deephaven.enterprise.auth.SimpleUserContext")


[docs] class IcebergEndpoint(JObjectWrapper): """An Iceberg Endpoint defines both the location of the Iceberg Catalog and data warehouse. It also provides information required to create the catalog and retrieve data from the store as a Deephaven table. Optionally users may provide a set of properties, secrets, and hadoop settings to be passed down to the Iceberg APIs. Secrets passed to IcebergEndpoint instances are simply named references, never actual secret values. For example, you may provide S3 credentials as a secret defined as IcebergEndpoint("rest", "http://catalog/", "s3://warehouse", data_instructions, \ secrets = ["s3.access.key" : "s3.dev_access_key" ]) Deephaven will locate the secret within the set of SecretsProviders to discover the actual value. Note that it should not be instantiated directly, but rather through the :func:`make_endpoint` function. """ j_object_type = _JIcebergEndpoint @property def j_object(self) -> jpy.JType: return self._j_endpoint def __init__(self, j_endpoint: _JIcebergEndpoint): self._j_endpoint = j_endpoint
[docs] def deploy(self, overwrite_existing: bool = False) -> None: """Deploys this endpoint to Deephaven. This will fail unless the endpoint was created with a name. Args: overwrite_existing (bool): whether to overwrite an existing endpoint with the same name, defaults to False. Raises: DHError """ try: self._j_endpoint.deploy(overwrite_existing) except Exception as e: raise DHError(e, "failed to deploy endpoint") from e
[docs] def to_json(self) -> str: """Returns a JSON representation of the endpoint.""" return self._j_endpoint.toJson()
[docs] class DiscoveryResult(JObjectWrapper): """DiscoveryResult represents the result of an Iceberg discovery. It can be used to deploy the discovered schema to Deephaven. It is returned by the :func:`.discover` function. """ j_object_type = _JDiscoveryConfig @property def j_object(self) -> jpy.JType: return self.j_discovery_result def __init__(self, j_result: _JDiscoveryResult): self.j_discovery_result = j_result
[docs] def deploy_named(self) -> None: """Deploys the discovered Iceberg schema to Deephaven linking the endpoint by its name. Raises: DHError """ try: self.j_discovery_result.deployWithEndpointReference() except Exception as e: raise DHError(e, "failed to deploy the discovered Iceberg schema") from e
[docs] def deploy_embedded(self) -> None: """Deploys the discovered Iceberg schema to Deephaven, embedding the endpoint within the schema. Raises: DHError """ try: self.j_discovery_result.deployWithEmbeddedEndpoint() except Exception as e: raise DHError(e, "failed to deploy the discovered Iceberg schema") from e
[docs] def discover( table_id: str, endpoint: IcebergEndpoint, snapshot_id: Optional[str] = None, namespace: Optional[str] = None, table_name: Optional[str] = None, reference_definition: Optional[TableDefinition] = None, ) -> DiscoveryResult: """Discovers an Iceberg table. Args: table_id (str): the Iceberg table identifier endpoint (IcebergEndpoint): the endpoint to use for discovery snapshot_id (Optional[str]): the Iceberg snapshot ID to use for discovery, defaults to None namespace (Optional[str]): A user specified namespace. If not set, the namespace is derived from the table_id, defaults to None table_name (Optional[str]): A user specified table name. If not set, the table name is derived from the table_id, defaults to None reference_definition (Optional[TableDefinition]): A user specified reference TableDefinition. The discovery process guarantees that the result is compatible with this definition, defaults to None Returns: a DiscoveryResult that can be used to deploy Deephaven schemas Raises: DHError """ try: config = ( _JDiscoveryConfig.builder() .tableIdentifier(table_id) .endpoint(endpoint._j_endpoint) ) if snapshot_id is not None: config.snapshotId(snapshot_id) if namespace is not None: config.namespace(namespace) if table_name is not None: config.tableName(table_name) if reference_definition is not None: config.referenceDefinition(reference_definition) return DiscoveryResult(_JIcebergTools.discover(config.build())) except Exception as e: raise DHError(e, "failed to discover the Iceberg table") from e
[docs] def make_endpoint( catalog_type: str, catalog_uri: str, warehouse_uri: str, data_instructions: Any, endpoint_name: Optional[str] = None, properties: Optional[dict[str, str]] = None, secrets: Optional[dict[str, str]] = None, hadoop_opt: Optional[dict[str, str]] = None, ) -> IcebergEndpoint: """Creates a new IcebergEndpoint. If the endpoint_name is set, the resulting endpoint can be deployed to Deephaven for reuse with the :meth:`IcebergEndpoint.deploy` method. Any provided properties and secrets will be merged into a single collection of properties to be passed on to the Iceberg APIs Args: catalog_type (str): the type of catalog. Possible values include "rest", "glue", "hive", "nessie", "hadoop", and "jdbc" catalog_uri (str): the URI of the catalog warehouse_uri (str): the URI of the data data_instructions (Any): the data instructions for Deephaven to use when fetching table data. E.g. a :class:`deephaven.experimental.s3.S3Instructions: object endpoint_name (Optional[str]): a name for this endpoint. This must be set in order to deploy this endpoint, defaults to None properties (Optional[dict[str, str]]): a map of properties to be passed to the Iceberg API, defaults to None secrets (Optional[dict[str, str]]): a map of secrets to be resolved and passed to the Iceberg API, defaults to None hadoop_opt (Optional[dict[str, str]]): a map of hadoop specific options to be passed to the Iceberg API, defaults to None Returns: a IcebergEndpoint object Raises: DHError """ try: builder = ( _JIcebergTools.newEndpoint() .catalogType(catalog_type) .catalogUri(catalog_uri) .warehouseUri(warehouse_uri) .dataInstructions(unwrap(data_instructions)) ) if properties is not None: builder.putProperties(j_hashmap(properties)) if secrets is not None: builder.putSecrets(j_hashmap(secrets)) if hadoop_opt is not None: builder.putHadoopOptions(j_hashmap(hadoop_opt)) return IcebergEndpoint( builder.build() if endpoint_name is None else builder.build(endpoint_name) ) except Exception as e: raise DHError(e, "failed to create endpoint") from e
[docs] def get_named_endpoint(endpoint_name: str) -> IcebergEndpoint: """Gets an IcebergEndpoint from Deephaven by name. Args: endpoint_name (str): the name of the endpoint Returns: an IcebergEndpoint object Raises: DHError """ try: return IcebergEndpoint(_JIcebergTools.getEndpointByName(endpoint_name)) except Exception as e: raise DHError(e, f"failed to get endpoint by name {endpoint_name}") from e