Source code for deephaven_enterprise.client.session_manager

from __future__ import annotations

import base64
from json import loads
import io
import time
import urllib.request
import urllib.parse
from datetime import datetime, timedelta
from typing import Callable, Dict, Optional

import grpc
import pydeephaven
from pydeephaven.proto import ticket_pb2
from pydeephaven._table_ops import FetchTableOp
from pydeephaven.table import Table
from deephaven_enterprise.proto.persistent_query_pb2 import PersistentQueryConfigMessage, PersistentQueryInfoMessage
import deephaven_enterprise.client
from pydeephaven.dherror import DHError

from deephaven_enterprise.client.auth import AuthClient
from deephaven_enterprise.client.controller import ControllerClient


[docs]class SessionManager: """ The SessionManager authenticates to the Deephaven Enterprise server and allows you to create sessions for DnD workers by either creating a new temporary Persistent Query or connecting to an existing Persistent Query. """ config: Dict[str, str] json_connection_info: str auth_client: AuthClient controller_client: ControllerClient active_sessions: List["DndSession"] def __init__(self, url: str = None, json: str = None): """ Create a SessionManager for the specified JSON, which may either be a string or a URL to download. The Deephaven server typically provides the JSON as "https://host:port/iris/connection.json". Exactly one of url or json must be provided. The JSON must have the following parameters: auth_host, auth_port, controller_host, controller_port. If the truststore_url is set, then a trust store PEM file is downloaded from the given URL. If the override_authorities, parameter is set then the authority used for the SSL connection is "authserver". :param url: a URL to get the JSON connection information from :param json: a JSON document containing connection information """ in_worker: bool = False if url is not None: if json is not None: raise Exception("url and json are mutually exclusive") urllib.parse.urlparse(url) json = urllib.request.urlopen(url).read() elif json is None: try: import deephaven_enterprise.client_worker_auth # If we've gotten here, this means we are local to a worker so should generate our own connection.json and then authenticate with a delegate token json = deephaven_enterprise.client_worker_auth.get_connection_json() in_worker = True except ImportError: raise Exception("Must specify url or json") self.config = loads(json) self.active_sessions = [] auth_port = self.config.get("auth_port") auth_host = self.config.get("auth_host")[0] controller_port = self.config.get("controller_port") controller_host = self.config.get("controller_host") truststore_url: str | None = self.config.get("truststore_url") if truststore_url is not None and truststore_url != "": self.truststore_pem = urllib.request.urlopen(truststore_url).read() else: self.truststore_pem = None if self.truststore_pem is None: self.auth_client = AuthClient(auth_host, auth_port) self.controller_client = ControllerClient( controller_host, controller_port) else: credentials = grpc.ssl_channel_credentials( root_certificates=self.truststore_pem) auth_target = auth_host + ":" + str(auth_port) if self.config.get("override_authorities"): authserver_override_options = [ ('grpc.ssl_target_name_override', 'authserver')] else: authserver_override_options = None auth_channel: grpc.Channel = grpc.secure_channel( auth_target, credentials, authserver_override_options) self.auth_client = AuthClient(channel=auth_channel) controller_target = controller_host + ":" + str(controller_port) if controller_target == auth_target: controller_channel: grpc.Channel = auth_channel else: controller_channel: grpc.Channel = grpc.secure_channel(controller_target, credentials, authserver_override_options) self.controller_client = ControllerClient( channel=controller_channel) if in_worker: deephaven_enterprise.client_worker_auth.delegate_authentication( self.auth_client) self.__init_controller()
[docs] def password(self, user: str, password: str, effective_user: str = None): """ Authenticates to the server using a username and password. :param user: the user to authenticate :param password: the user's password :param effective_user: the user to operate as, defaults to the user to authenticate """ self.auth_client.password(user, password, effective_user) self.__init_controller()
[docs] def saml(self): """ Authenticate using SAML, which must be configured on the server. """ if not "saml_sso_uri" in self.config: raise Exception("SAML URI is not defined in connection.json") self.auth_client.saml(self.config["saml_sso_uri"]) self.__init_controller()
[docs] def private_key(self, file: Union[str | io.StringIO]): """ Authenticate to the server using a Deephaven format private key file. https://deephaven.io/enterprise/docs/resources/how-to/connect-from-java/#instructions-for-setting-up-private-keys :param file: a string file name containing the private key produced by generate-iris-keys, or alternatively an io.StringIO instance (which may be closed after it is read) """ self.auth_client.private_key(file) self.__init_controller()
[docs] def close(self): """ Terminate this session managers connection to the authentication server and controller. """ # We should close the queries in order to delete the sessions: while len(self.active_sessions) > 0: session = self.active_sessions.pop() session.close() self.auth_client.close() self.controller_client.close()
[docs] def ping(self): """ Send a ping to the authentication server and controller. :return: False if either ping was not sent, True if both pings were sent. """ if not self.auth_client.ping(): return False return self.controller_client.ping()
[docs] def connect_to_new_worker(self, name, heap_size_gb: float, server: str = None, extra_jvm_args: List[str] = None, extra_environment_vars: List[str] = None, engine: str = "DeephavenCommunity", auto_delete_timeout: Optional[int] = 600, admin_groups: List[str] = None, viewer_groups: List[str] = None, timeout_seconds: float = 60, configuration_transformer: Callable[[ PersistentQueryConfigMessage], PersistentQueryConfigMessage] = None, session_arguments: Dict[str, any] = None) -> "DndSession": """ Create a new worker (as a temporary PersistentQuery) and establish a session to it. :param name: the name of the persistent query. Defaults to None, which means a name based on the current time is used :param heap_size_gb: the heap size of the worker :param server: the server to connect to. Defaults to None, which means the first available server :param extra_jvm_args: extra JVM arguments for starting the worker. Defaults to None. :param extra_environment_vars: extra Environment variables for the worker. Defaults to None. :param engine: which engine (worker kind) to use for the backend worker. Defaults to None, which means "DeephavenCommunity" :param auto_delete_timeout: after how many seconds should the query be automatically deleted after inactivity. Defaults to ten minutes. If none, auto-delete is disabled. If zero, the query is deleted immediately after a client connection is lost :param admin_groups: list of groups that may administer the query. Defaults to None, which means only the current user may administer the query. :param viewer_groups: list of groups that may view the query. Defaults to None, which means only the current user may view the query. :param timeout_seconds: how long to wait for the query to start. Defaults to 60 seconds. :param configuration_transformer: a function that can replace (or edit) the automatically generated persistent query configuration, enabling you to set more advanced options than the other function parameters provide. Defaults to None. :param session_arguments: a dictionary of additional arguments to pass to the pydeephaven.Session created and wrapped by a DndSession :return: a session connected to a new Interactive Console PQ worker """ now: datetime = datetime.now() # Create the Configuration name = "Python Console " + str(now) if name is None else name temp_config: PersistentQueryConfigMessage temp_config = self.controller_client.make_temporary_config(name, heap_size_gb, server=server, extra_jvm_args=extra_jvm_args, extra_environment_vars=extra_environment_vars, engine=engine, auto_delete_timeout=auto_delete_timeout, admin_groups=admin_groups, viewer_groups=viewer_groups) if configuration_transformer is not None: temp_config = configuration_transformer(temp_config) serial: int = self.controller_client.add_query(temp_config) self.controller_client.restart_query(serial) pqinfo: PersistentQueryInfoMessage = self.__wait_for_ready( now, serial, timeout_seconds) if pqinfo is None: self.controller_client.delete_query(serial) raise Exception("Persistent Query did not start within timeout of " + str( timeout_seconds) + ", serial=" + str(serial) + ", name=" + name) if self.controller_client.is_terminal(pqinfo.state.status): self.controller_client.delete_query(serial) raise Exception( "Query is in terminal state. Exception details: ", pqinfo.state.exceptionDetails) if not self.controller_client.is_running(pqinfo.state.status): self.controller_client.delete_query(serial) raise Exception( "Query is not running. Status: ", pqinfo.state.status) print("Connecting to new query \"%s\", ProcessInfoId=\"%s\"" % (pqinfo.config.name, pqinfo.state.connectionDetails.processInfoId)) return self.__make_session(pqinfo, serial, auto_delete=True, session_arguments=session_arguments)
def __wait_for_ready(self, now, serial, timeout_seconds) -> Optional[deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryInfoMessage]: last_status = None pqinfo: Optional[deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryInfoMessage] = None while timedelta(seconds=timeout_seconds) > (datetime.now() - now): pqinfo = self.controller_client.get(serial, 1) if not pqinfo.HasField("state"): if last_status is not None: print("No state for ", pqinfo.config.name) continue status = pqinfo.state.status if self.controller_client.is_running(status): break elif self.controller_client.is_terminal(status): break if status != last_status: status_name = deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryStatusEnum.Name( status) print("Query Status: ", status_name) last_status = status time.sleep(0.1) return pqinfo
[docs] def connect_to_persistent_query(self, name: str = None, serial: int = None, session_arguments: Dict[str, any] = None) -> "DndSession": """ Connect to an existing persistent query by name or serial number. The query must be running. :param name: the name of the persistent query to connect to :param serial: the serial number of the persistent query to connect to :param session_arguments: a dictionary of additional arguments to pass to the pydeephaven.Session created and wrapped by a DndSession :return: a session connected to the persistent query """ pqinfo: Optional[PersistentQueryInfoMessage] if serial is not None: pqinfo = self.controller_client.get(serial) if pqinfo is None: raise Exception( "Could not find query with serial: " + str(serial)) if name is not None: if pqinfo.config.name != name: raise Exception( "Name (" + name + ") and serial number (" + str(serial) + ") are inconsistent: " + str( pqinfo.config)) elif name is not None: pqinfo = None for search_pq in self.controller_client.map().values(): if search_pq.config.name == name: pqinfo = search_pq serial = search_pq.config.serial break if pqinfo is None: raise Exception("Could not find query with name: " + name) else: raise Exception( "You must specify either name or serial to connect_to_persistent_query.") if self.controller_client.is_terminal(pqinfo.state.status): raise Exception( "Query is in terminal state. Exception details: " + str(pqinfo.state.exceptionDetails)) if not self.controller_client.is_running(pqinfo.state.status): status_name = deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryStatusEnum.Name( pqinfo.state.status) raise Exception("Query is not running, state is " + status_name) return self.__make_session(pqinfo, serial, auto_delete=False, session_arguments=session_arguments)
def __make_session(self, pqinfo, serial, auto_delete: bool, session_arguments: Dict[str, any] = None): url: str = pqinfo.state.connectionDetails.grpcUrl if url is None or url == "": if pqinfo.state.engineVersion == "": raise Exception("Not a Community engine: " + pqinfo.config.name + "(" + str(pqinfo.config.serial) + ")") raise Exception("gRPC is not available for " + pqinfo.config.name + "(" + str(pqinfo.config.serial) + ")") parsed = urllib.parse.urlparse(url) envoy_prefix: str = pqinfo.state.connectionDetails.envoyPrefix extra_headers = {b'envoy-prefix': bytes(envoy_prefix, 'us-ascii')} token: deephaven_enterprise.proto.auth_pb2.Token = self.auth_client.get_token( "RemoteQueryProcessor") token_base64 = base64.b64encode( token.SerializeToString()).decode('us-ascii') session = DndSession(session_manager=self, pqinfo=pqinfo, host=parsed.hostname, port=parsed.port, auth_token=token_base64, extra_headers=extra_headers, delete_on_close=serial if auto_delete else None, session_arguments=session_arguments ) return session def __init_controller(self): self.controller_client.set_auth_client(self.auth_client) self.controller_client.subscribe()
[docs]class DndSession(pydeephaven.session.Session): """ Wrapper around a basic Community session. For queries that are ephemeral, they are explicitly deleted on session close. """ delete_on_close: Optional[int] session_manager: SessionManager __pqinfo: PersistentQueryInfoMessage def __init__(self, session_manager: SessionManager, pqinfo: PersistentQueryInfoMessage, host: str = None, port: int = None, auth_type: str = "io.deephaven.proto.auth.Token", auth_token: str = None, extra_headers: Dict[bytes, bytes] = None, delete_on_close: int = None, session_arguments: Dict[str, any] = None): # We initialize enough to delete the query self.delete_on_close = delete_on_close self.session_manager = session_manager self.session_manager.active_sessions.append(self) # Then call the super, the base class __del__ method is still invoked by the Python runtime. The # pydeephaven.Session __del__ method calls close() - which we override. If we call into the super close, # in those cases, the uninitialized Session object will throw from the close method. try: super().__init__(host=host, port=port, auth_type=auth_type, auth_token=auth_token, use_tls=True, extra_headers=extra_headers, tls_root_certs=session_manager.truststore_pem, **(session_arguments if session_arguments is not None else dict())) except Exception as e: self.init_complete = False raise e # init was complete, we can close the object when it comes time self.init_complete = True self.__pqinfo = pqinfo
[docs] def pqinfo(self) -> PersistentQueryInfoMessage: """ Retrieve the persistent query information for this query. :return: """ return self.__pqinfo
[docs] def close(self): if self.init_complete: super().close() if self.delete_on_close is not None: self.session_manager.controller_client.delete_query( self.delete_on_close) self.delete_on_close = None try: self.session_manager.active_sessions.remove(self) except ValueError: pass
[docs] def historical_table(self, namespace: str, table_name: str): """ Fetches a historical table from the database on the server. :param namespace: the namespace of the table :param table_name: the name of the table :return: a Table object :raise: DHError """ ticket = ticket_pb2.Ticket( ticket=f'd/hist/{namespace}/{table_name}'.encode(encoding='ascii')) return self.__fetch_table_(ticket)
[docs] def live_table(self, namespace: str, table_name: str): """ Fetches a live table from the database on the server. :param namespace: the namespace of the table :param table_name: the name of the table :return: a Table object :raise: DHError """ ticket = ticket_pb2.Ticket( ticket=f'd/live/{namespace}/{table_name}'.encode(encoding='ascii')) return self.__fetch_table_(ticket)
[docs] def catalog_table(self): """ Fetches the catalog table from the database on the server. :return: a Table object :raise: DHError """ ticket = ticket_pb2.Ticket( ticket=f'd/catalog'.encode(encoding='ascii')) return self.__fetch_table_(ticket)
def __fetch_table_(self, ticket): with self._r_lock: faketable = Table(session=self, ticket=ticket) try: table_op = FetchTableOp() return self.table_service.grpc_table_op(faketable, table_op) except Exception as e: if isinstance(e.__cause__, grpc.RpcError): if e.__cause__.code() == grpc.StatusCode.INVALID_ARGUMENT: raise DHError( f"no table by the name {ticket}") from None raise e finally: # Explicitly close the table without releasing it (because it isn't ours) faketable.ticket = None faketable.schema = None