Source code for deephaven_enterprise.data_ingestion.dis

#
# Copyright (c) 2016-2025 Deephaven Data Labs and Patent Pending
#
"""This module provides a way to consume a Kafka topic and persistently write the data to a Deephaven Data Import
Server, making the table available to workers throughout your Deephaven Enterprise cluster. It also includes utilities
for accessing DIS by name and creating PartitionedTables from last-by views.
"""

from functools import cache
from typing import Optional

import jpy
from deephaven import DHError
from deephaven._wrapper import JObjectWrapper
from deephaven.table import PartitionedTable

from deephaven_enterprise.data_ingestion.kafka_options import KafkaTableWriterOptions

_JKafkaTableWriter = jpy.get_type(
    "io.deephaven.enterprise.kafkawriter.KafkaTableWriter"
)
_JKafkaTableWriterOptions = jpy.get_type(
    "io.deephaven.enterprise.kafkawriter.KafkaTableWriter$Options"
)
_JDataImportServer = jpy.get_type(
    "io.deephaven.shadow.enterprise.com.illumon.iris.db.tables.dataimport.logtailer.DataImportServer"
)
_JLastByPartitionedTableFactory = jpy.get_type(
    "io.deephaven.enterprise.lastbystate.LastByPartitionedTableFactory"
)


[docs] class DataImportServer(JObjectWrapper): """A class that represents a Data Import Server (DIS) in Deephaven. An instance of this class is needed to ingest data from external sources, such as Kafka, into Deephaven. This class provides convenience methods to create PartitionedTables from the last-by views. Note, users should not create this class directly. Instead, use the :meth:`.get_dis_by_name` function to get a Data Import Server (DIS) by name, and optionally, its private storage path. """ j_object_type = _JDataImportServer @property def j_object(self) -> jpy.JType: return self.j_dis def __init__(self, j_dis: jpy.JType): self.j_dis = j_dis
[docs] def consume(self, opts: KafkaTableWriterOptions): """Consumes a Kafka topic and persistently writes the data to this Deephaven Data Import Server (DIS), making the table available to workers throughout the Deephaven Enterprise cluster. Args: opts (KafkaTableWriterOptions): the configuration for ingesting data from Kafka Raises: DHError """ try: opts.j_object.dataImportServer(self.j_dis) _JKafkaTableWriter.consumeToDis(opts.j_object) except Exception as e: raise DHError(e, "Failed to consume to DIS") from e
[docs] def partitioned_table_from_last_by_view( self, namespace: str, table_name: str, partition_value: str, last_by_view: Optional[str] = None, ) -> PartitionedTable: """Creates a PartitionedTable from a last-by view created by an ingester on this Data Import Server (DIS). Args: namespace (str): The namespace of the table. table_name (str): The name of the table. partition_value (str): The column partition value, must not be None. last_by_view (str): The last-by view to use, defaults to None, which means to use the anonymous last-by view that is created on the specified table. Returns: PartitionedTable Raises: DHError """ try: j_factory = _JLastByPartitionedTableFactory.forDataImportServer(self.j_dis) return PartitionedTable( j_partitioned_table=j_factory.createPartitionedTable( namespace, table_name, partition_value, last_by_view ) ) except Exception as e: raise DHError( "Failed to create PartitionedTable from last-by views." ) from e
# This function is used to get a Deephaven Import Server (DIS) by name. # It is cached to ensure the same DIS is not created multiple times.
[docs] @cache def get_dis_by_name( dis_name: str, dis_storage_path: Optional[str] = None ) -> DataImportServer: """Returns a Deephaven Import Server (DIS) by name and optionally its storage path. Args: dis_name (str): The name of the Deephaven Import Server (DIS) to get dis_storage_path (str): The path to the Deephaven Import Server (DIS) storage, the DIS must have 'private' storage defined in the routing configuration. When specified the path must be on local disk. default is None. Returns: DataImportServer: The DataImportServer object. Raises: DHError """ try: if dis_storage_path is not None: return DataImportServer( j_dis=_JKafkaTableWriter.getDisByNameWithStorage( dis_name, dis_storage_path ) ) return DataImportServer(j_dis=_JKafkaTableWriter.getDisByName(dis_name)) except Exception as e: raise DHError(f"Failed to get DIS by name: {dis_name}") from e