Source code for deephaven_enterprise.remote_table

#
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#
""" This module provides a simple interface for creating subscriptions to remote tables in Deephaven via the Barrage
protocol.

Specifically, use the methods in this module to create reliable connections to remote Barrage tables. The tables
gracefully handle disconnection and reconnect when the upstream is available.

To establish a subscription begin with in_local_cluster() and specify the name of the Persistent Query and the table name
to subscribe to.  You can then call subscribe() or snapshot() on the result to fetch a live subscription or a static
snapshot.

If you do not provide the prototype table definition,  the query must be available and running to do the subscription
or snapshot.

For example:

import deephaven_enterprise.remote_table
ticking_remote_table = remote_table.in_local_cluster(query_name="TestQuery", table_name="TestTable") \
                          .subscribe(included_columns=["Col1", "Col3", "Col5"])
"""

from __future__ import annotations

from typing import Any, Optional

import jpy
from deephaven import DHError
from deephaven.table import Table

JTableDefinition: Any = jpy.get_type("io.deephaven.engine.table.TableDefinition")
_JRemoteTableBuilder = jpy.get_type("io.deephaven.enterprise.remote.RemoteTableBuilder")
_JSubscriptionOptions = jpy.get_type(
    "io.deephaven.enterprise.remote.SubscriptionOptions"
)


[docs] def in_local_cluster( query_name: str, table_name: str, table_definition: Optional[JTableDefinition] = None, ) -> RemoteTableBuilder: """Creates a new remote table builder for the current cluster. If no table definition is provided, the query must be available for connections. Args: query_name (str): the name of the Persistent Query to fetch the table from table_name (str): the table name table_definition (JTableDefinition): the expected table definition, defaults to None Returns: RemoteTableBuilder """ local_builder = ( RemoteTableBuilder(j_builder=_JRemoteTableBuilder.forLocalCluster()) .query_name(query_name) .table_name(table_name) ) if table_definition is not None: local_builder.table_definition(table_definition) return local_builder
[docs] def for_remote_cluster(cluster_url: str) -> UnauthenticatedRemoteTableBuilder: """Creates a new unauthenticated remote table builder for a remote cluster identified by the provided URL. The builder is unauthenticated and requires a call to :meth:`UnauthenticatedRemoteTableBuilder.password` or :meth:`UnauthenticatedRemoteTableBuilder.private_key` to authenticate and get an authenticated :class:`RemoteTableBuilder` in order to subscribe to a remote table or fetch a snapshot of a remote table. Args: cluster_url (str): the URL of the remote cluster Returns: UnauthenticatedRemoteTableBuilder """ return UnauthenticatedRemoteTableBuilder(cluster_url)
[docs] class RemoteTableBuilder: """RemoteTableBuilder is used to create a subscription to a remote table or fetch a snapshot fo a remote table.""" def __init__(self, j_builder): self.j_builder = j_builder
[docs] def table_name(self, table_name: str): """Sets the table name to fetch from the query. Args: table_name (str): the table name Returns: self """ self.j_builder.tableName(table_name) return self
[docs] def query_name(self, query_name: str): """Sets the name of the query to fetch the table from. Args: query_name (str): the query name Returns: self """ self.j_builder.queryName(query_name) return self
[docs] def table_definition(self, table_definition: JTableDefinition): """Sets the prototype table definition of the table to fetch. If this is not specified, the definition will be discovered by connecting to the query, which requires it to be available. Args: table_definition (JTableDefinition): the prototype table definition Returns: self """ self.j_builder.tableDefinition(table_definition) return self
[docs] def subscribe( self, clear_on_disconnect: Optional[bool] = None, retry_window_millis: Optional[int] = None, max_retries_within_window: Optional[int] = None, included_columns: Optional[list[str]] = None, filters: Optional[list[str]] = None, ) -> Table: """Subscribes to the remote table. Args: clear_on_disconnect (bool): if the result table should clear its rows on a disconnection. Defaults to True retry_window_millis (int): the window size in milliseconds to attempt to reconnect, defaults to None, which means 60 seconds max_retries_within_window (int): the maximum allowable number of unsuccessful retries in the retry window, defaults to None, which means 5 retries. included_columns (list[str]): the set of columns to include in the subscription, defaults to None, which includes all columns filters (list[str]): a list of raw string filters to apply before subscription, defaults to None Returns: Table: the subscribed table """ options = _JSubscriptionOptions.builder() if clear_on_disconnect is not None: options.clearOnDisconnect(clear_on_disconnect) if retry_window_millis is not None: options.retryWindowMillis(retry_window_millis) if max_retries_within_window is not None: options.maxRetriesWithinWindow(max_retries_within_window) if included_columns is not None: options.addIncludedColumns(included_columns) if filters is not None: options.addFilters(filters) return Table(self.j_builder.subscribe(options.build()))
[docs] def snapshot(self, included_columns: Optional[list[str]] = None) -> Table: """Creates a static snapshot of the remote table for the specified columns, or all if none are specified Args: included_columns (list[str]): the columns to include, defaults to None, which includes all columns Returns: Table: the snapshot of the table """ return Table(self.j_builder.snapshot(included_columns))
[docs] class UnauthenticatedRemoteTableBuilder: """UnauthenticatedRemoteTableBuilder is used to authenticate with a remote cluster before subscribing to a table.""" def __init__(self, cluster_url): self._j_builder = _JRemoteTableBuilder.forRemoteCluster(cluster_url)
[docs] def password(self, user_name: str, password: str) -> RemoteTableBuilder: """Authenticates with the cluster using a username and password Args: user_name (str): the username password (str): the password Returns: RemoteTableBuilder Raises: DHError """ try: return RemoteTableBuilder(self._j_builder.password(user_name, password)) except Exception as e: raise DHError( e, "Failed to authenticate with the provided username and password" ) from e
[docs] def private_key(self, private_key_file: str) -> RemoteTableBuilder: """Authenticates to the cluster using a private key file. Args: private_key_file (str): the private key file Returns: RemoteTableBuilder Raises: DHError """ try: return RemoteTableBuilder(self._j_builder.privateKey(private_key_file)) except Exception as e: raise DHError( e, "Failed to authenticate with the provided private key file" ) from e