Source code for deephaven_enterprise.remote_table
#
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#
""" This module provides a simple interface for creating subscriptions to remote tables in Deephaven via the Barrage
protocol.
Specifically, use the methods in this module to create reliable connections to remote Barrage tables. The tables
gracefully handle disconnection and reconnect when the upstream is available.
To establish a subscription begin with in_local_cluster() and specify the name of the Persistent Query and the table name
to subscribe to. You can then call subscribe() or snapshot() on the result to fetch a live subscription or a static
snapshot.
If you do not provide the prototype table definition, the query must be available and running to do the subscription
or snapshot.
For example:
import deephaven_enterprise.remote_table
ticking_remote_table = remote_table.in_local_cluster(query_name="TestQuery", table_name="TestTable") \
.subscribe(included_columns=["Col1", "Col3", "Col5"])
"""
from __future__ import annotations
from typing import Any, Optional
import jpy
from deephaven import DHError
from deephaven.table import Table
JTableDefinition: Any = jpy.get_type("io.deephaven.engine.table.TableDefinition")
_JRemoteTableBuilder = jpy.get_type("io.deephaven.enterprise.remote.RemoteTableBuilder")
_JSubscriptionOptions = jpy.get_type(
"io.deephaven.enterprise.remote.SubscriptionOptions"
)
[docs]
def in_local_cluster(
query_name: str,
table_name: str,
table_definition: Optional[JTableDefinition] = None,
) -> RemoteTableBuilder:
"""Creates a new remote table builder for the current cluster. If no table definition is provided, the query must
be available for connections.
Args:
query_name (str): the name of the Persistent Query to fetch the table from
table_name (str): the table name
table_definition (JTableDefinition): the expected table definition, defaults to None
Returns:
RemoteTableBuilder
"""
local_builder = (
RemoteTableBuilder(j_builder=_JRemoteTableBuilder.forLocalCluster())
.query_name(query_name)
.table_name(table_name)
)
if table_definition is not None:
local_builder.table_definition(table_definition)
return local_builder
[docs]
def for_remote_cluster(cluster_url: str) -> UnauthenticatedRemoteTableBuilder:
"""Creates a new unauthenticated remote table builder for a remote cluster identified by the provided URL.
The builder is unauthenticated and requires a call to :meth:`UnauthenticatedRemoteTableBuilder.password` or
:meth:`UnauthenticatedRemoteTableBuilder.private_key` to authenticate and get an authenticated
:class:`RemoteTableBuilder` in order to subscribe to a remote table or fetch a snapshot of a remote table.
Args:
cluster_url (str): the URL of the remote cluster
Returns:
UnauthenticatedRemoteTableBuilder
"""
return UnauthenticatedRemoteTableBuilder(cluster_url)
[docs]
class RemoteTableBuilder:
"""RemoteTableBuilder is used to create a subscription to a remote table or fetch a snapshot fo a remote table."""
def __init__(self, j_builder):
self.j_builder = j_builder
[docs]
def table_name(self, table_name: str):
"""Sets the table name to fetch from the query.
Args:
table_name (str): the table name
Returns:
self
"""
self.j_builder.tableName(table_name)
return self
[docs]
def query_name(self, query_name: str):
"""Sets the name of the query to fetch the table from.
Args:
query_name (str): the query name
Returns:
self
"""
self.j_builder.queryName(query_name)
return self
[docs]
def table_definition(self, table_definition: JTableDefinition):
"""Sets the prototype table definition of the table to fetch. If this is not specified, the definition will be
discovered by connecting to the query, which requires it to be available.
Args:
table_definition (JTableDefinition): the prototype table definition
Returns:
self
"""
self.j_builder.tableDefinition(table_definition)
return self
[docs]
def subscribe(
self,
clear_on_disconnect: Optional[bool] = None,
retry_window_millis: Optional[int] = None,
max_retries_within_window: Optional[int] = None,
included_columns: Optional[list[str]] = None,
filters: Optional[list[str]] = None,
) -> Table:
"""Subscribes to the remote table.
Args:
clear_on_disconnect (bool): if the result table should clear its rows on a disconnection. Defaults to True
retry_window_millis (int): the window size in milliseconds to attempt to reconnect, defaults to None, which
means 60 seconds
max_retries_within_window (int): the maximum allowable number of unsuccessful retries in the retry window,
defaults to None, which means 5 retries.
included_columns (list[str]): the set of columns to include in the subscription, defaults to None, which
includes all columns
filters (list[str]): a list of raw string filters to apply before subscription, defaults to None
Returns:
Table: the subscribed table
"""
options = _JSubscriptionOptions.builder()
if clear_on_disconnect is not None:
options.clearOnDisconnect(clear_on_disconnect)
if retry_window_millis is not None:
options.retryWindowMillis(retry_window_millis)
if max_retries_within_window is not None:
options.maxRetriesWithinWindow(max_retries_within_window)
if included_columns is not None:
options.addIncludedColumns(included_columns)
if filters is not None:
options.addFilters(filters)
return Table(self.j_builder.subscribe(options.build()))
[docs]
def snapshot(self, included_columns: Optional[list[str]] = None) -> Table:
"""Creates a static snapshot of the remote table for the specified columns, or all if none are specified
Args:
included_columns (list[str]): the columns to include, defaults to None, which includes all columns
Returns:
Table: the snapshot of the table
"""
return Table(self.j_builder.snapshot(included_columns))
[docs]
def blink(self, is_blink: bool):
"""Sets whether the remote table must be a blink table. If the table is not a blink table, the subscription
will fail.
Args:
is_blink (bool): true if the table is a blink table
Returns:
self
"""
self.j_builder.isBlink(is_blink)
return self
[docs]
class UnauthenticatedRemoteTableBuilder:
"""UnauthenticatedRemoteTableBuilder is used to authenticate with a remote cluster before subscribing to a table."""
def __init__(self, cluster_url):
self._j_builder = _JRemoteTableBuilder.forRemoteCluster(cluster_url)
[docs]
def password(self, user_name: str, password: str) -> RemoteTableBuilder:
"""Authenticates with the cluster using a username and password
Args:
user_name (str): the username
password (str): the password
Returns:
RemoteTableBuilder
Raises:
DHError
"""
try:
return RemoteTableBuilder(self._j_builder.password(user_name, password))
except Exception as e:
raise DHError(
e, "Failed to authenticate with the provided username and password"
) from e
[docs]
def private_key(self, private_key_file: str) -> RemoteTableBuilder:
"""Authenticates to the cluster using a private key file.
Args:
private_key_file (str): the private key file
Returns:
RemoteTableBuilder
Raises:
DHError
"""
try:
return RemoteTableBuilder(self._j_builder.privateKey(private_key_file))
except Exception as e:
raise DHError(
e, "Failed to authenticate with the provided private key file"
) from e