Source code for deephaven.barrage

#
# Copyright (c) 2016-2025 Deephaven Data Labs and Patent Pending
#
""" This module defines the BarrageSession wrapper class and provides a factory function to create an instance of it
 for accessing resources on remote Deephaven servers."""

from __future__ import annotations

import jpy

from deephaven import DHError
from deephaven.table import Table

_JURI = jpy.get_type("java.net.URI")
_JTimeUnit = jpy.get_type("java.util.concurrent.TimeUnit")
_JClientConfig = jpy.get_type("io.deephaven.client.impl.ClientConfig")
_JSSLConfig = jpy.get_type("io.deephaven.ssl.config.SSLConfig")
_JSessionImplConfig = jpy.get_type("io.deephaven.client.impl.SessionImplConfig")
_JSessionConfig = jpy.get_type("io.deephaven.client.impl.SessionConfig")
_JTrustCustom = jpy.get_type("io.deephaven.ssl.config.TrustCustom")
_JDeephavenTarget = jpy.get_type("io.deephaven.uri.DeephavenTarget")
_JBarrageSession = jpy.get_type("io.deephaven.client.impl.BarrageSession")
_JTableSpec = jpy.get_type("io.deephaven.qst.table.TableSpec")
_JBarrageTableResolver = jpy.get_type("io.deephaven.server.uri.BarrageTableResolver")
_JChannelHelper = jpy.get_type("io.deephaven.client.impl.ChannelHelper")
_JDeephavenChannelImpl = jpy.get_type("io.deephaven.proto.DeephavenChannelImpl")
_JSessionImpl = jpy.get_type("io.deephaven.client.impl.SessionImpl")
_JExecutors = jpy.get_type("java.util.concurrent.Executors")
_JRootAllocator = jpy.get_type("org.apache.arrow.memory.RootAllocator")
_JDeephavenApiServer = jpy.get_type("io.deephaven.server.runner.DeephavenApiServer")


[docs]class BarrageSession(): """ A Deephaven Barrage session to a remote server.""" def __init__(self, j_barrage_session: jpy.JType, j_managed_channel: jpy.JType = None): """ Initializes a Barrage session. When BarrageSession is created via the barrage_session() factory function, j_managed_channel is always provided, and when BarrageSession.close() is called, it will shut down the channel as well as close the j_barrage_session. When BarrageSession is initialized directly and j_managed_channel is None, when BarrageSession.close() is called, it will only close the j_barrage_session, it is the responsibility of the user to shut down the associated channel if needed. """ self.j_barrage_session = j_barrage_session self.j_session = j_barrage_session.session() self.j_managed_channel = j_managed_channel def __enter__(self) -> BarrageSession: return self def __exit__(self, exc_type, exc_val, exc_tb) -> None: self.close()
[docs] def close(self) -> None: """ Closes the barrage session. If the BarrageSession is initialized with a managed channel, the channel will be shut down as well. """ try: self.j_barrage_session.close() if self.j_managed_channel: self.j_managed_channel.shutdownNow() self.j_managed_channel.awaitTermination(10, _JTimeUnit.SECONDS) except Exception as e: raise DHError(e, "failed to close the barrage session.") from e
[docs] def subscribe(self, ticket: bytes) -> Table: """ Subscribes to a published remote table with the given ticket. Note, if the remote table is closed or its owner session is closed, the ticket becomes invalid. If the same ticket is subscribed to multiple times, multiple subscriptions will be created. Args: ticket (bytes): the bytes of the ticket Returns: a Table Raises: DHError """ try: j_barrage_subscription = self.j_barrage_session.subscribe(_JTableSpec.ticket(ticket), _JBarrageTableResolver.SUB_OPTIONS) return Table(j_barrage_subscription.entireTable().get()) except Exception as e: raise DHError(e, "failed to subscribe to the remote table with the provided ticket.") from e
[docs] def snapshot(self, ticket: bytes) -> Table: """ Returns a snapshot of a published remote table with the given ticket. Note, if the remote table is closed or its owner session is closed, the ticket becomes invalid. If the same ticket is snapshot multiple times, multiple snapshots will be created. Args: ticket (bytes): the bytes of the ticket Returns: a Table Raises: DHError """ try: j_barrage_snapshot = self.j_barrage_session.snapshot(_JTableSpec.ticket(ticket), _JBarrageTableResolver.SNAP_OPTIONS) return Table(j_barrage_snapshot.entireTable().get()) except Exception as e: raise DHError(e, "failed to take a snapshot of the remote table with the provided ticket.") from e
[docs]def barrage_session(host: str, port: int = 10000, auth_type: str = "Anonymous", auth_token: str = "", use_tls: bool = False, tls_root_certs: bytes = None, extra_headers: Dict[str, str] = None ) -> BarrageSession: """Returns a Deephaven gRPC session to a remote server if a cached session is available; otherwise, creates a new session. Note: client authentication is not supported yet. Args: host (str): the host name or IP address of the Deephaven server. port (int): the port number that the remote Deephaven server is listening on, default is 10000. auth_type (str): the authentication type string, can be "Anonymous', 'Basic", or any custom-built authenticator in the server, such as "io.deephaven.authentication.psk.PskAuthenticationHandler", default is 'Anonymous'. auth_token (str): the authentication token string. When auth_type is 'Basic', it must be "user:password"; when auth_type is "Anonymous', it will be ignored; when auth_type is a custom-built authenticator, it must conform to the specific requirement of the authenticator use_tls (bool): if True, use a TLS connection. Defaults to False tls_root_certs (bytes): PEM encoded root certificates to use for TLS connection, or None to use system defaults. If not None implies use a TLS connection and the use_tls argument should have been passed as True. Defaults to None extra_headers (Dict[str, str]): extra headers to set when configuring the gRPC channel. Defaults to None. Returns: a Deephaven Barrage session Raises: DHError """ try: if tls_root_certs and not use_tls: raise DHError(message="tls_root_certs is provided but use_tls is False") target_uri = f"{host}:{port}" if use_tls: target_uri = f"dh://{target_uri}" else: target_uri = f"dh+plain://{target_uri}" j_client_config = _build_client_config(target_uri, tls_root_certs, extra_headers) if not auth_token: auth = auth_type else: auth = f"{auth_type} {auth_token}" try: return _get_barrage_session_via_api_server(j_client_config, auth) except: # fall back to the direct way when we don't have a fully initialized server, used for testing only # TODO: remove when we are done with restructuring the integrations tests wiring # https://github.com/deephaven/deephaven-core/issues/5401 return _get_barrage_session_direct(j_client_config, auth) except Exception as e: raise DHError(e, "failed to get a barrage session to the target remote Deephaven server.") from e
def _get_barrage_session_via_api_server(client_config: jpy.JType, auth: str) -> BarrageSession: j_barrage_session_factory_creator = _JDeephavenApiServer.getInstance().sessionFactoryCreator() j_barrage_session_factory = j_barrage_session_factory_creator.barrageFactory(client_config) j_managed_channel = j_barrage_session_factory.managedChannel() if auth: j_session_config = (_JSessionConfig.builder() .authenticationTypeAndValue(auth) .build()) j_barrage_session = j_barrage_session_factory.newBarrageSession(j_session_config) else: j_barrage_session = j_barrage_session_factory.newBarrageSession() return BarrageSession(j_barrage_session, j_managed_channel) def _get_barrage_session_direct(client_config: jpy.JType, auth: str) -> BarrageSession: """Note, this is used for testing only. This way of constructing a Barrage session is less efficient because it does not share any of the state or configuration that the server provides; namely, when you are doing it with the server context it provides a singleton executor, allocator, outbound SSL configuration, and the ability for the server to hook in additional channel building options. TODO: remove when we are done with restructuring the integrations tests wiring https://github.com/deephaven/deephaven-core/issues/5401. """ j_channel = _JChannelHelper.channel(client_config) j_dh_channel = _JDeephavenChannelImpl(j_channel) j_session_config = (_JSessionImplConfig.builder() .executor(_JExecutors.newScheduledThreadPool(4)) .authenticationTypeAndValue(auth) .channel(j_dh_channel) .build()) try: j_session = _JSessionImpl.create(j_session_config) except Exception as e: # if the connection to the host can't be established, we should clean up the resources j_session_config.executor().shutdownNow() j_channel.shutdownNow() j_channel.awaitTermination(10, _JTimeUnit.SECONDS) raise j_barrage_session = _JBarrageSession.create(j_session, _JRootAllocator(), j_channel) return BarrageSession(j_barrage_session, j_channel) def _build_client_config(target_uri: str, tls_root_certs: bytes, extra_headers: Dict[str, str] = None) -> jpy.JType: j_client_config_builder = _JClientConfig.builder() j_client_config_builder.target(_JDeephavenTarget.of(_JURI(target_uri))) if extra_headers: for header, value in extra_headers.items(): j_client_config_builder.putExtraHeaders(header, value) if tls_root_certs: j_ssl_config = _JSSLConfig.builder().trust( _JTrustCustom.ofX509(tls_root_certs, 0, len(tls_root_certs))).build() j_client_config_builder.ssl(j_ssl_config) j_client_config = j_client_config_builder.build() return j_client_config