#
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#
"""This module is used to define Iceberg endpoints and to discover and deploy Deephaven schemas for Iceberg tables."""
from typing import Any, Optional
import jpy
from deephaven import DHError
from deephaven._wrapper import JObjectWrapper
from deephaven.jcompat import j_hashmap, unwrap
from deephaven.table import TableDefinition
_JIcebergTools = jpy.get_type("io.deephaven.enterprise.iceberg.IcebergTools")
_JDiscoveryResult: Any = jpy.get_type(
"io.deephaven.enterprise.iceberg.discovery.DiscoveryResult"
)
_JDiscoveryConfig = jpy.get_type(
"io.deephaven.enterprise.iceberg.discovery.DiscoveryConfig"
)
_JIcebergEndpoint: Any = jpy.get_type("io.deephaven.enterprise.iceberg.IcebergEndpoint")
_JSimpleUserContext = jpy.get_type("io.deephaven.enterprise.auth.SimpleUserContext")
[docs]
class IcebergEndpoint(JObjectWrapper):
"""An Iceberg Endpoint defines both the location of the Iceberg Catalog and data warehouse. It also provides
information required to create the catalog and retrieve data from the store as a Deephaven table. Optionally
users may provide a set of properties, secrets, and hadoop settings to be passed down to the Iceberg APIs.
Secrets passed to IcebergEndpoint instances are simply named references, never actual secret values. For example,
you may provide S3 credentials as a secret defined as
IcebergEndpoint("rest", "http://catalog/", "s3://warehouse", data_instructions, \
secrets = ["s3.access.key" : "s3.dev_access_key" ])
Deephaven will locate the secret within the set of SecretsProviders to discover the actual value.
Note that it should not be instantiated directly, but rather through the :func:`make_endpoint` function.
"""
j_object_type = _JIcebergEndpoint
@property
def j_object(self) -> jpy.JType:
return self._j_endpoint
def __init__(self, j_endpoint: _JIcebergEndpoint):
self._j_endpoint = j_endpoint
[docs]
def deploy(self, overwrite_existing: bool = False) -> None:
"""Deploys this endpoint to Deephaven. This will fail unless the endpoint was created with a name.
Args:
overwrite_existing (bool): whether to overwrite an existing endpoint with the same name, defaults to False.
Raises:
DHError
"""
try:
self._j_endpoint.deploy(overwrite_existing)
except Exception as e:
raise DHError(e, "failed to deploy endpoint") from e
[docs]
def to_json(self) -> str:
"""Returns a JSON representation of the endpoint."""
return self._j_endpoint.toJson()
[docs]
class DiscoveryResult(JObjectWrapper):
"""DiscoveryResult represents the result of an Iceberg discovery. It can be used to deploy the discovered schema
to Deephaven. It is returned by the :func:`.discover` function.
"""
j_object_type = _JDiscoveryConfig
@property
def j_object(self) -> jpy.JType:
return self.j_discovery_result
def __init__(self, j_result: _JDiscoveryResult):
self.j_discovery_result = j_result
[docs]
def deploy_named(self) -> None:
"""Deploys the discovered Iceberg schema to Deephaven linking the endpoint by its name.
Raises:
DHError
"""
try:
self.j_discovery_result.deployWithEndpointReference()
except Exception as e:
raise DHError(e, "failed to deploy the discovered Iceberg schema") from e
[docs]
def deploy_embedded(self) -> None:
"""Deploys the discovered Iceberg schema to Deephaven, embedding the endpoint within the schema.
Raises:
DHError
"""
try:
self.j_discovery_result.deployWithEmbeddedEndpoint()
except Exception as e:
raise DHError(e, "failed to deploy the discovered Iceberg schema") from e
[docs]
def discover(
table_id: str,
endpoint: IcebergEndpoint,
snapshot_id: Optional[str] = None,
namespace: Optional[str] = None,
table_name: Optional[str] = None,
reference_definition: Optional[TableDefinition] = None,
) -> DiscoveryResult:
"""Discovers an Iceberg table.
Args:
table_id (str): the Iceberg table identifier
endpoint (IcebergEndpoint): the endpoint to use for discovery
snapshot_id (Optional[str]): the Iceberg snapshot ID to use for discovery, defaults to None
namespace (Optional[str]): A user specified namespace. If not set, the namespace is derived from the table_id,
defaults to None
table_name (Optional[str]): A user specified table name. If not set, the table name is derived from the table_id,
defaults to None
reference_definition (Optional[TableDefinition]): A user specified reference TableDefinition. The discovery process
guarantees that the result is compatible with this definition, defaults to None
Returns:
a DiscoveryResult that can be used to deploy Deephaven schemas
Raises:
DHError
"""
try:
config = (
_JDiscoveryConfig.builder()
.tableIdentifier(table_id)
.endpoint(endpoint._j_endpoint)
)
if snapshot_id is not None:
config.snapshotId(snapshot_id)
if namespace is not None:
config.namespace(namespace)
if table_name is not None:
config.tableName(table_name)
if reference_definition is not None:
config.referenceDefinition(reference_definition)
return DiscoveryResult(_JIcebergTools.discover(config.build()))
except Exception as e:
raise DHError(e, "failed to discover the Iceberg table") from e
[docs]
def make_endpoint(
catalog_type: str,
catalog_uri: str,
warehouse_uri: str,
data_instructions: Any,
endpoint_name: Optional[str] = None,
properties: Optional[dict[str, str]] = None,
secrets: Optional[dict[str, str]] = None,
hadoop_opt: Optional[dict[str, str]] = None,
) -> IcebergEndpoint:
"""Creates a new IcebergEndpoint. If the endpoint_name is set, the resulting endpoint can be deployed to Deephaven
for reuse with the :meth:`IcebergEndpoint.deploy` method.
Any provided properties and secrets will be merged into a single collection of properties to be passed on to
the Iceberg APIs
Args:
catalog_type (str): the type of catalog. Possible values include "rest", "glue", "hive", "nessie", "hadoop",
and "jdbc"
catalog_uri (str): the URI of the catalog
warehouse_uri (str): the URI of the data
data_instructions (Any): the data instructions for Deephaven to use when fetching table data.
E.g. a :class:`deephaven.experimental.s3.S3Instructions: object
endpoint_name (Optional[str]): a name for this endpoint. This must be set in order to deploy this endpoint,
defaults to None
properties (Optional[dict[str, str]]): a map of properties to be passed to the Iceberg API, defaults to None
secrets (Optional[dict[str, str]]): a map of secrets to be resolved and passed to the Iceberg API,
defaults to None
hadoop_opt (Optional[dict[str, str]]): a map of hadoop specific options to be passed to the Iceberg API,
defaults to None
Returns:
a IcebergEndpoint object
Raises:
DHError
"""
try:
builder = (
_JIcebergTools.newEndpoint()
.catalogType(catalog_type)
.catalogUri(catalog_uri)
.warehouseUri(warehouse_uri)
.dataInstructions(unwrap(data_instructions))
)
if properties is not None:
builder.putProperties(j_hashmap(properties))
if secrets is not None:
builder.putSecrets(j_hashmap(secrets))
if hadoop_opt is not None:
builder.putHadoopOptions(j_hashmap(hadoop_opt))
return IcebergEndpoint(
builder.build() if endpoint_name is None else builder.build(endpoint_name)
)
except Exception as e:
raise DHError(e, "failed to create endpoint") from e
[docs]
def get_named_endpoint(endpoint_name: str) -> IcebergEndpoint:
"""Gets an IcebergEndpoint from Deephaven by name.
Args:
endpoint_name (str): the name of the endpoint
Returns:
an IcebergEndpoint object
Raises:
DHError
"""
try:
return IcebergEndpoint(_JIcebergTools.getEndpointByName(endpoint_name))
except Exception as e:
raise DHError(e, f"failed to get endpoint by name {endpoint_name}") from e