from __future__ import annotations
import base64
from json import loads
import io
import time
import urllib.request
import urllib.parse
from datetime import datetime, timedelta
from typing import Callable, Dict, Optional
import grpc
import pydeephaven
from pydeephaven.proto import ticket_pb2
from pydeephaven._table_ops import FetchTableOp
from pydeephaven.table import Table
from deephaven_enterprise.proto.persistent_query_pb2 import PersistentQueryConfigMessage, PersistentQueryInfoMessage
import deephaven_enterprise.client
from pydeephaven.dherror import DHError
from deephaven_enterprise.client.auth import AuthClient
from deephaven_enterprise.client.controller import ControllerClient
[docs]class SessionManager:
"""
The SessionManager authenticates to the Deephaven Enterprise server and allows you to create sessions for DnD
workers by either creating a new temporary Persistent Query or connecting to an existing Persistent Query.
"""
config: Dict[str, str]
json_connection_info: str
auth_client: AuthClient
controller_client: ControllerClient
active_sessions: List["DndSession"]
def __init__(self, url: str = None, json: str = None):
"""
Create a SessionManager for the specified JSON, which may either be a string or a URL to download.
The Deephaven server typically provides the JSON as "https://host:port/iris/connection.json". Exactly one
of url or json must be provided.
The JSON must have the following parameters: auth_host, auth_port, controller_host, controller_port. If the
truststore_url is set, then a trust store PEM file is downloaded from the given URL. If the
override_authorities, parameter is set then the authority used for the SSL connection is "authserver".
:param url: a URL to get the JSON connection information from
:param json: a JSON document containing connection information
"""
in_worker: bool = False
if url is not None:
if json is not None:
raise Exception("url and json are mutually exclusive")
urllib.parse.urlparse(url)
json = urllib.request.urlopen(url).read()
elif json is None:
try:
import deephaven_enterprise.client_worker_auth
# If we've gotten here, this means we are local to a worker so should generate our own connection.json and then authenticate with a delegate token
json = deephaven_enterprise.client_worker_auth.get_connection_json()
in_worker = True
except ImportError:
raise Exception("Must specify url or json")
self.config = loads(json)
self.active_sessions = []
auth_port = self.config.get("auth_port")
auth_host = self.config.get("auth_host")[0]
controller_port = self.config.get("controller_port")
controller_host = self.config.get("controller_host")
truststore_url: str | None = self.config.get("truststore_url")
if truststore_url is not None and truststore_url != "":
self.truststore_pem = urllib.request.urlopen(truststore_url).read()
else:
self.truststore_pem = None
if self.truststore_pem is None:
self.auth_client = AuthClient(auth_host, auth_port)
self.controller_client = ControllerClient(
controller_host, controller_port)
else:
credentials = grpc.ssl_channel_credentials(
root_certificates=self.truststore_pem)
auth_target = auth_host + ":" + str(auth_port)
if self.config.get("override_authorities"):
authserver_override_options = [
('grpc.ssl_target_name_override', 'authserver')]
else:
authserver_override_options = None
auth_channel: grpc.Channel = grpc.secure_channel(
auth_target, credentials, authserver_override_options)
self.auth_client = AuthClient(channel=auth_channel)
controller_target = controller_host + ":" + str(controller_port)
if controller_target == auth_target:
controller_channel: grpc.Channel = auth_channel
else:
controller_channel: grpc.Channel = grpc.secure_channel(controller_target, credentials,
authserver_override_options)
self.controller_client = ControllerClient(
channel=controller_channel)
if in_worker:
deephaven_enterprise.client_worker_auth.delegate_authentication(
self.auth_client)
self.__init_controller()
[docs] def password(self, user: str, password: str, effective_user: str = None):
"""
Authenticates to the server using a username and password.
:param user: the user to authenticate
:param password: the user's password
:param effective_user: the user to operate as, defaults to the user to authenticate
"""
self.auth_client.password(user, password, effective_user)
self.__init_controller()
[docs] def saml(self):
"""
Authenticate using SAML, which must be configured on the server.
"""
if not "saml_sso_uri" in self.config:
raise Exception("SAML URI is not defined in connection.json")
self.auth_client.saml(self.config["saml_sso_uri"])
self.__init_controller()
[docs] def private_key(self, file: Union[str | io.StringIO]):
"""
Authenticate to the server using a Deephaven format private key file.
https://deephaven.io/enterprise/docs/resources/how-to/connect-from-java/#instructions-for-setting-up-private-keys
:param file: a string file name containing the private key produced by generate-iris-keys, or alternatively an
io.StringIO instance (which may be closed after it is read)
"""
self.auth_client.private_key(file)
self.__init_controller()
[docs] def close(self):
"""
Terminate this session managers connection to the authentication server and controller.
"""
# We should close the queries in order to delete the sessions:
while len(self.active_sessions) > 0:
session = self.active_sessions.pop()
session.close()
self.auth_client.close()
self.controller_client.close()
[docs] def ping(self):
"""
Send a ping to the authentication server and controller.
:return: False if either ping was not sent, True if both pings were sent.
"""
if not self.auth_client.ping():
return False
return self.controller_client.ping()
[docs] def connect_to_new_worker(self, name, heap_size_gb: float,
server: str = None,
extra_jvm_args: List[str] = None,
extra_environment_vars: List[str] = None,
engine: str = "DeephavenCommunity",
auto_delete_timeout: Optional[int] = 600,
admin_groups: List[str] = None,
viewer_groups: List[str] = None,
timeout_seconds: float = 60,
configuration_transformer: Callable[[
PersistentQueryConfigMessage], PersistentQueryConfigMessage] = None,
session_arguments: Dict[str, any] = None) -> "DndSession":
"""
Create a new worker (as a temporary PersistentQuery) and establish a session to it.
:param name: the name of the persistent query. Defaults to None, which means a name based on the current time is used
:param heap_size_gb: the heap size of the worker
:param server: the server to connect to. Defaults to None, which means the first available server
:param extra_jvm_args: extra JVM arguments for starting the worker. Defaults to None.
:param extra_environment_vars: extra Environment variables for the worker. Defaults to None.
:param engine: which engine (worker kind) to use for the backend worker. Defaults to None, which means
"DeephavenCommunity"
:param auto_delete_timeout: after how many seconds should the query be automatically deleted after inactivity.
Defaults to ten minutes. If none, auto-delete is disabled. If zero, the query is deleted immediately after a
client connection is lost
:param admin_groups: list of groups that may administer the query. Defaults to None, which means only the
current user may administer the query.
:param viewer_groups: list of groups that may view the query. Defaults to None, which means only the current
user may view the query.
:param timeout_seconds: how long to wait for the query to start. Defaults to 60 seconds.
:param configuration_transformer: a function that can replace (or edit) the automatically generated persistent
query configuration, enabling you to set more advanced options than the other function parameters provide. Defaults to None.
:param session_arguments: a dictionary of additional arguments to pass to the pydeephaven.Session created and wrapped by a DndSession
:return: a session connected to a new Interactive Console PQ worker
"""
now: datetime = datetime.now()
# Create the Configuration
name = "Python Console " + str(now) if name is None else name
temp_config: PersistentQueryConfigMessage
temp_config = self.controller_client.make_temporary_config(name, heap_size_gb,
server=server,
extra_jvm_args=extra_jvm_args,
extra_environment_vars=extra_environment_vars,
engine=engine,
auto_delete_timeout=auto_delete_timeout,
admin_groups=admin_groups,
viewer_groups=viewer_groups)
if configuration_transformer is not None:
temp_config = configuration_transformer(temp_config)
serial: int = self.controller_client.add_query(temp_config)
self.controller_client.restart_query(serial)
pqinfo: PersistentQueryInfoMessage = self.__wait_for_ready(
now, serial, timeout_seconds)
if pqinfo is None:
self.controller_client.delete_query(serial)
raise Exception("Persistent Query did not start within timeout of " + str(
timeout_seconds) + ", serial=" + str(serial) + ", name=" + name)
if self.controller_client.is_terminal(pqinfo.state.status):
self.controller_client.delete_query(serial)
raise Exception(
"Query is in terminal state. Exception details: ", pqinfo.state.exceptionDetails)
if not self.controller_client.is_running(pqinfo.state.status):
self.controller_client.delete_query(serial)
raise Exception(
"Query is not running. Status: ", pqinfo.state.status)
print("Connecting to new query \"%s\", ProcessInfoId=\"%s\"" %
(pqinfo.config.name, pqinfo.state.connectionDetails.processInfoId))
return self.__make_session(pqinfo, serial, auto_delete=True, session_arguments=session_arguments)
def __wait_for_ready(self, now, serial, timeout_seconds) -> Optional[deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryInfoMessage]:
last_status = None
pqinfo: Optional[deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryInfoMessage] = None
while timedelta(seconds=timeout_seconds) > (datetime.now() - now):
pqinfo = self.controller_client.get(serial, 1)
if not pqinfo.HasField("state"):
if last_status is not None:
print("No state for ", pqinfo.config.name)
continue
status = pqinfo.state.status
if self.controller_client.is_running(status):
break
elif self.controller_client.is_terminal(status):
break
if status != last_status:
status_name = deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryStatusEnum.Name(
status)
print("Query Status: ", status_name)
last_status = status
time.sleep(0.1)
return pqinfo
[docs] def connect_to_persistent_query(self, name: str = None, serial: int = None, session_arguments: Dict[str, any] = None) -> "DndSession":
"""
Connect to an existing persistent query by name or serial number. The query must be running.
:param name: the name of the persistent query to connect to
:param serial: the serial number of the persistent query to connect to
:param session_arguments: a dictionary of additional arguments to pass to the pydeephaven.Session created and wrapped by a DndSession
:return: a session connected to the persistent query
"""
pqinfo: Optional[PersistentQueryInfoMessage]
if serial is not None:
pqinfo = self.controller_client.get(serial)
if pqinfo is None:
raise Exception(
"Could not find query with serial: " + str(serial))
if name is not None:
if pqinfo.config.name != name:
raise Exception(
"Name (" + name + ") and serial number (" + str(serial) + ") are inconsistent: " + str(
pqinfo.config))
elif name is not None:
pqinfo = None
for search_pq in self.controller_client.map().values():
if search_pq.config.name == name:
pqinfo = search_pq
serial = search_pq.config.serial
break
if pqinfo is None:
raise Exception("Could not find query with name: " + name)
else:
raise Exception(
"You must specify either name or serial to connect_to_persistent_query.")
if self.controller_client.is_terminal(pqinfo.state.status):
raise Exception(
"Query is in terminal state. Exception details: " + str(pqinfo.state.exceptionDetails))
if not self.controller_client.is_running(pqinfo.state.status):
status_name = deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryStatusEnum.Name(
pqinfo.state.status)
raise Exception("Query is not running, state is " + status_name)
return self.__make_session(pqinfo, serial, auto_delete=False, session_arguments=session_arguments)
def __make_session(self, pqinfo, serial, auto_delete: bool, session_arguments: Dict[str, any] = None):
url: str = pqinfo.state.connectionDetails.grpcUrl
if url is None or url == "":
if pqinfo.state.engineVersion == "":
raise Exception("Not a Community engine: " +
pqinfo.config.name + "(" + str(pqinfo.config.serial) + ")")
raise Exception("gRPC is not available for " +
pqinfo.config.name + "(" + str(pqinfo.config.serial) + ")")
parsed = urllib.parse.urlparse(url)
envoy_prefix: str = pqinfo.state.connectionDetails.envoyPrefix
extra_headers = {b'envoy-prefix': bytes(envoy_prefix, 'us-ascii')}
token: deephaven_enterprise.proto.auth_pb2.Token = self.auth_client.get_token(
"RemoteQueryProcessor")
token_base64 = base64.b64encode(
token.SerializeToString()).decode('us-ascii')
session = DndSession(session_manager=self, pqinfo=pqinfo, host=parsed.hostname, port=parsed.port, auth_token=token_base64,
extra_headers=extra_headers, delete_on_close=serial if auto_delete else None,
session_arguments=session_arguments
)
return session
def __init_controller(self):
self.controller_client.set_auth_client(self.auth_client)
self.controller_client.subscribe()
[docs]class DndSession(pydeephaven.session.Session):
"""
Wrapper around a basic Community session. For queries that are ephemeral, they are explicitly deleted on session
close.
"""
delete_on_close: Optional[int]
session_manager: SessionManager
__pqinfo: PersistentQueryInfoMessage
def __init__(self, session_manager: SessionManager,
pqinfo: PersistentQueryInfoMessage,
host: str = None, port: int = None,
auth_type: str = "io.deephaven.proto.auth.Token", auth_token: str = None,
extra_headers: Dict[bytes, bytes] = None, delete_on_close: int = None,
session_arguments: Dict[str, any] = None):
# We initialize enough to delete the query
self.delete_on_close = delete_on_close
self.session_manager = session_manager
self.session_manager.active_sessions.append(self)
# Then call the super, the base class __del__ method is still invoked by the Python runtime. The
# pydeephaven.Session __del__ method calls close() - which we override. If we call into the super close,
# in those cases, the uninitialized Session object will throw from the close method.
try:
super().__init__(host=host, port=port, auth_type=auth_type, auth_token=auth_token, use_tls=True,
extra_headers=extra_headers, tls_root_certs=session_manager.truststore_pem,
**(session_arguments if session_arguments is not None else dict()))
except Exception as e:
self.init_complete = False
raise e
# init was complete, we can close the object when it comes time
self.init_complete = True
self.__pqinfo = pqinfo
[docs] def pqinfo(self) -> PersistentQueryInfoMessage:
"""
Retrieve the persistent query information for this query.
:return:
"""
return self.__pqinfo
[docs] def close(self):
if self.init_complete:
super().close()
if self.delete_on_close is not None:
self.session_manager.controller_client.delete_query(
self.delete_on_close)
self.delete_on_close = None
try:
self.session_manager.active_sessions.remove(self)
except ValueError:
pass
[docs] def historical_table(self, namespace: str, table_name: str):
"""
Fetches a historical table from the database on the server.
:param namespace: the namespace of the table
:param table_name: the name of the table
:return: a Table object
:raise: DHError
"""
ticket = ticket_pb2.Ticket(
ticket=f'd/hist/{namespace}/{table_name}'.encode(encoding='ascii'))
return self.__fetch_table_(ticket)
[docs] def live_table(self, namespace: str, table_name: str):
"""
Fetches a live table from the database on the server.
:param namespace: the namespace of the table
:param table_name: the name of the table
:return: a Table object
:raise: DHError
"""
ticket = ticket_pb2.Ticket(
ticket=f'd/live/{namespace}/{table_name}'.encode(encoding='ascii'))
return self.__fetch_table_(ticket)
[docs] def catalog_table(self):
"""
Fetches the catalog table from the database on the server.
:return: a Table object
:raise: DHError
"""
ticket = ticket_pb2.Ticket(
ticket=f'd/catalog'.encode(encoding='ascii'))
return self.__fetch_table_(ticket)
def __fetch_table_(self, ticket):
with self._r_lock:
faketable = Table(session=self, ticket=ticket)
try:
table_op = FetchTableOp()
return self.table_service.grpc_table_op(faketable, table_op)
except Exception as e:
if isinstance(e.__cause__, grpc.RpcError):
if e.__cause__.code() == grpc.StatusCode.INVALID_ARGUMENT:
raise DHError(
f"no table by the name {ticket}") from None
raise e
finally:
# Explicitly close the table without releasing it (because it isn't ours)
faketable.ticket = None
faketable.schema = None