Source code for deephaven_enterprise.remote_table

#  Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending

"""
Use the methods in this module to create reliable connections to remote Barrage tables.  The tables gracefully
handle disconnection and reconnect when the upstream is available.

To establish a subscription begin with in_local_cluster() and specify the name of the query and the table name
to subscribe to.  You can then call subscribe() or snapshot() on the result to fetch a live subscription or a static
snapshot.

If you do not provide the prototype table definition,  the query must be available and running to do the subscription
or snapshot.

For example:

import deephaven_enterprise.remote_table
ticking_remote_table = remote_table.in_local_cluster(query_name="TestQuery", table_name="TestTable") \
                          .subscribe(included_columns=["Col1", "Col3", "Col5"])
"""

import jpy
from typing import List
from deephaven.table import Table

_JTableDefinition = jpy.get_type("io.deephaven.engine.table.TableDefinition")
_JRemoteTableBuilder = jpy.get_type(
    "io.deephaven.enterprise.remote.RemoteTableBuilder")
_JSubscriptionOptions = jpy.get_type(
    "io.deephaven.enterprise.remote.SubscriptionOptions")


[docs] def in_local_cluster(query_name: str, table_name: str, table_definition=None): """ Create a new remote table builder for the current cluster. If no table definition is provided the query must be available for connections. :param query_name: the name of the query to fetch the table from :param table_name: the table name :param table_definition: (optional) the expected table definition. :return: a RemoteTableBuilder for the specified query and table """ local_builder = RemoteTableBuilder(j_builder=_JRemoteTableBuilder.forLocalCluster()) \ .query_name(query_name) \ .table_name(table_name) if table_definition is not None: local_builder.table_definition(table_definition) return local_builder
[docs] def for_remote_cluster(cluster_url: str): """ Create a new remote table builder for a remote cluster. If no table definition is provided the query must be available for connections. :param cluster_url: the url to the desired cluster :return: a RemoteTableBuilder for the specified query and table """ return UnauthenticatedRemoteTableBuilder(cluster_url)
class RemoteTableBuilder: def __init__(self, j_builder): self.j_builder = j_builder def table_name(self, table_name: str): """ Set the Table name to fetch from the query. :param table_name: the table name :return: this object """ self.j_builder.tableName(table_name) return self def query_name(self, query_name: str): """ Set the name of the query to fetch the table from. :param query_name: the query name :return: this object """ self.j_builder.queryName(query_name) return self def table_definition(self, table_definition): """ Set the prototype table definition of the table to fetch. If this is not specified, the definition will be discovered by connecting to the query, which requires it to be available. :param table_definition: the prototype table definition :return: this object """ self.j_builder.tableDefinition(table_definition) return self def subscribe(self, clear_on_disconnect: bool = None, retry_window_millis: int = None, max_retries_within_window: int = None, included_columns: List[str] = None, filters: List[str] = None) -> Table: """ Subscribe to the table. :param clear_on_disconnect: (optional) if the table should clear its rows on a disconnection. Defaults to True :param retry_window_millis: (optional) the window size in milliseconds to attempt to reconnect :param max_retries_within_window: (optional) the maximum allowable number of unsuccessful retries in the retry window :param included_columns: (optional) the set of columns to include in the subscription :param filters: (optional) a list of raw string filters to apply before subscription :return: the subscribed table """ options = _JSubscriptionOptions.builder() if clear_on_disconnect is not None: options.clearOnDisconnect(clear_on_disconnect) if retry_window_millis is not None: options.retryWindowMillis(retry_window_millis) if max_retries_within_window is not None: options.maxRetriesWithinWindow(max_retries_within_window) if included_columns is not None: options.addIncludedColumns(included_columns) if filters is not None: options.addFilters(filters) return Table(self.j_builder.subscribe(options.build())) def snapshot(self, included_columns: List[str] = None) -> Table: """ Create a static snapshot of the table for the specified columns, or all if none are specified :param included_columns: (optional) the columns to include :return: a snapshot of the table """ return Table(self.j_builder.snapshot(included_columns)) def blink(self, is_blink: bool): """ Mark the fetched table as a blink table. The remote table must also be a blink table or the subscription will fail. :param is_blink: true if the table is a blink table :return: this object """ self.j_builder.isBlink(is_blink) return self class UnauthenticatedRemoteTableBuilder: def __init__(self, cluster_url): self._j_builder = _JRemoteTableBuilder.forRemoteCluster(cluster_url) def password(self, user_name: str, password: str): """ Authenticate with the server using a username and password :param user_name: the username :param password: the password :return: this object on success """ return RemoteTableBuilder(self._j_builder.password(user_name, password)) def private_key(self, private_key_file: str): """ Authenticate to the server using a private key file. :param private_key_file: the private key file :return: this object on success """ return RemoteTableBuilder(self._j_builder.privateKey(private_key_file))