from __future__ import annotations
import json
from typing import Union, Optional, Iterable, Iterator
import threading
import time
import concurrent.futures
import grpc
import deephaven_enterprise.client.util
import deephaven_enterprise.proto.auth_pb2
import deephaven_enterprise.proto.controller_service_pb2_grpc
import deephaven_enterprise.proto.controller_common_pb2
import deephaven_enterprise.proto.controller_pb2
import deephaven_enterprise.proto.persistent_query_pb2
class RefreshThread(threading.Thread):
def __init__(self,
controller_client: "ControllerClient",
period_millis: int = 10000):
"""
This thread pings the controller every 10 seconds to ensure that our cookie is valid and that we stay authenticated.
:param controller_client:the controller client
:param period_millis:how often to refresh
"""
threading.Thread.__init__(self)
self.daemon = True
self.controller_client = controller_client
self.period_millis = period_millis
def run(self):
while self.controller_client.cookie is not None:
time.sleep(self.period_millis / 1000)
self.controller_client.ping()
class ResponseThread(threading.Thread):
def __init__(self,
controller_client: "ControllerClient",
iterator: Iterator[deephaven_enterprise.proto.controller_pb2.SubscribeResponse]):
"""
This thread processes response from the controller client's subscription method and calls the client's process method.
:param controller_client: the controller client
:param iterator: the result of the subscription
"""
threading.Thread.__init__(self)
self.daemon = True
self.controller_client = controller_client
self.iterator = iterator
def run(self):
for x in self.iterator:
if self.controller_client.cookie is None:
return
self.controller_client._process_one_event(x)
[docs]class ControllerClient:
"""
The ControllerClient connects to the Deephaven PersistentQueryController process.
You may subscribe to the state of Persistent Queries as well as create and modify them.
This class operates on the deephaven_enterprise.proto.persistent_query_pb2 structures.
"""
rpc_timeout_secs: int = None
cookie = None
query_map: dict[int, deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryInfoMessage] = {}
effective_user: str = None
server_config: deephaven_enterprise.proto.controller_pb2.ControllerConfigurationMessage = None
refresh_thread: RefreshThread = None
response_thread: ResponseThread = None
channel: grpc.Channel = None
opened_channel: bool
def __init__(self, host: str = None, port: int = None, rpc_timeout_seconds: int = 120, channel: grpc.Channel = None):
"""
Connect to the persistent query controller.
:param host: the host to connect to, requires port
:param port: the port to connect to, requires host
:param rpc_timeout_seconds: the timeout period in seconds for RPCs. Defaults to 120 if not provided.
:param channel: an already configured gRPC channel (exclusive with host and port)
"""
self.rpc_timeout_secs = rpc_timeout_seconds
(self.channel, self.me) = deephaven_enterprise.client.util.get_grpc_channel(
channel, host, port)
self.opened_channel = channel is None
self.clientID = deephaven_enterprise.client.util.make_client_id(
self.me)
self.stub = deephaven_enterprise.proto.controller_service_pb2_grpc.ControllerApiStub(
self.channel)
pingreq = deephaven_enterprise.proto.controller_common_pb2.PingRequest()
self.stub.ping(pingreq, timeout=self.rpc_timeout_secs,
wait_for_ready=True)
[docs] def authenticate(self, token: deephaven_enterprise.proto.auth_pb2.Token) -> None:
"""
Authenticate to the controller using a token obtained from the AuthClient get_token method.
:param token: the token to use for authentication, must have a service of "PersistentQueryController"
"""
auth_request: deephaven_enterprise.proto.controller_pb2.AuthenticationRequest = deephaven_enterprise.proto.controller_pb2.AuthenticationRequest()
auth_request.token.CopyFrom(token)
auth_request.clientId.CopyFrom(self.clientID)
auth_request.getConfiguration = True
auth_response: deephaven_enterprise.proto.controller_pb2.AuthenticationResponse = self.stub.authenticate(
auth_request, timeout=self.rpc_timeout_secs, wait_for_ready=True)
if not auth_response.authenticated:
raise Exception("Could not authenticate to controller.")
self.cookie = auth_response.cookie
self.refresh_thread = RefreshThread(self)
self.refresh_thread.start()
self.effective_user = token.user_context.effectiveUser
if auth_response.config:
self.server_config = auth_response.config
else:
config_request = deephaven_enterprise.proto.controller_pb2.GetConfigurationRequest()
config_request.cookie = self.cookie
config_response = self.stub.getConfiguration(config_request,
timeout=self.rpc_timeout_secs, wait_for_ready=True)
self.server_config = config_response.config
[docs] def close(self) -> None:
"""
Invalidate the clients cookie so that further operations do not take place with this client.
"""
if self.cookie is not None:
ivcr = deephaven_enterprise.proto.auth_pb2.InvalidateCookieRequest()
ivcr.cookie = self.cookie
self.cookie = None
self.stub.invalidateCookie(
ivcr, timeout=self.rpc_timeout_secs, wait_for_ready=True)
if self.opened_channel:
self.channel.close()
def _do_subscription(self):
if self.cookie is None:
raise Exception("Should authenticate first.")
subscription_request = deephaven_enterprise.proto.controller_pb2.SubscribeRequest()
subscription_request.cookie = self.cookie
subscription = self.stub.subscribe(
subscription_request, wait_for_ready=True, timeout=None)
first_value = next(subscription)
return (subscription, first_value)
[docs] def subscribe(self) -> None:
"""
Subscribe to persistent query state, and wait for the initial query state snapshot to be populated.
A successful call to authenticate should have happened before this call.
After the subscription is complete, you may call the map method to retrieve the complete map or the get method
to fetch a specific query by serial number.
"""
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
result_future: concurrent.futures.Future = executor.submit(
self._do_subscription)
(subscription, first) = result_future.result(
timeout=self.rpc_timeout_secs)
last: bool = self._process_one_event(first)
while not last:
last = self._process_one_event(next(subscription))
# Process response into our map
self.response_thread = ResponseThread(self, subscription)
self.response_thread.start()
executor.shutdown()
[docs] def map(self) -> dict[int, deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryInfoMessage]:
"""
Retrieve a copy of the current persistent query state.
A successful call to subscribe should have happened before this call.
:return: a map from serial number to persistent query info
"""
if self.response_thread is None:
raise Exception("Should subscribe first.")
return self.query_map.copy()
[docs] def get(self, serial: int,
timeout_seconds: float = 0) -> deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryInfoMessage:
"""
Get a query from the map. If the query does not exist, throws a KeyError.
A successful call to subscribe should have happened before this call.
The timeout_seconds parameter can be specified to wait for the query to exist before failing with KeyError.
This is useful when you have just created the query, know it's serial, but the controller has not yet published
the state to you.
:param serial:
:param timeout_seconds:
:return: the PersistentQueryInfoMessage associated with the serial number
"""
if self.response_thread is None:
raise Exception("Should subscribe first.")
timeout = time.time_ns() + int(timeout_seconds * 1_000_000_000)
tries = 0
last_error: Optional[KeyError] = None
while tries == 0 or time.time_ns() < timeout:
if tries > 0:
time.sleep(0.01)
tries += 1
try:
return self.query_map[serial]
except KeyError as ke:
last_error = ke
raise last_error
[docs] @staticmethod
def is_running(status: deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryStatusEnum) -> bool:
"""
Is the status from the query info running?
If not running and not terminal, then the query is in the initialization process.
"""
return status == deephaven_enterprise.proto.persistent_query_pb2.PQS_RUNNING
[docs] @staticmethod
def is_terminal(status: deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryStatusEnum) -> bool:
"""
Is the status from the query info terminal?
If not running and not terminal, then the query is in the initialization process.
"""
return status == deephaven_enterprise.proto.persistent_query_pb2.PQS_ERROR or \
status == deephaven_enterprise.proto.persistent_query_pb2.PQS_DISCONNECTED or \
status == deephaven_enterprise.proto.persistent_query_pb2.PQS_STOPPED or \
status == deephaven_enterprise.proto.persistent_query_pb2.PQS_FAILED or \
status == deephaven_enterprise.proto.persistent_query_pb2.PQS_COMPLETED
[docs] def add_query(self,
query_config: deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryConfigMessage) -> int:
"""
Add a persistent query.
A successful call to authenticate should have happened before this call.
:param query_config: the configuration of the query to add.
:return: the serial number of the created query
"""
if self.cookie is None:
raise Exception("Should authenticate first.")
aqreq = deephaven_enterprise.proto.controller_pb2.AddQueryRequest()
aqreq.cookie = self.cookie
aqreq.config.CopyFrom(query_config)
aqresp: deephaven_enterprise.proto.controller_pb2.AddQueryResponse = self.stub.addQuery(
aqreq, timeout=self.rpc_timeout_secs, wait_for_ready=True)
return aqresp.querySerial
[docs] def delete_query(self, serial: int) -> None:
"""
Delete a query.
A successful call to authenticate should have happened before this call.
:param serial: the serial number to delete
"""
if self.cookie is None:
raise Exception("Should authenticate first.")
remove_request: deephaven_enterprise.proto.controller_pb2.RemoveQueryRequest = \
deephaven_enterprise.proto.controller_pb2.RemoveQueryRequest()
remove_request.cookie = self.cookie
remove_request.serial = serial
self.stub.removeQuery(remove_request,
timeout=self.rpc_timeout_secs, wait_for_ready=True)
[docs] def restart_query(self, serials: Union[Iterable[int], int]) -> None:
"""
Restart one or more queries.
A successful call to authenticate should have happened before this call.
:param serials: a query serial number, or an iterable of serial numbers
"""
if self.cookie is None:
raise Exception("Should authenticate first.")
restartreq = deephaven_enterprise.proto.controller_pb2.RestartQueryRequest()
restartreq.cookie = self.cookie
if isinstance(serials, int):
restartreq.serials.append(serials)
else:
for serial in serials:
restartreq.serials.append(serial)
self.stub.restartQuery(restartreq,
timeout=self.rpc_timeout_secs, wait_for_ready=True)
[docs] def make_temporary_config(self, name: str, heap_size_gb: float,
server: str = None,
extra_jvm_args: list[str] = None,
extra_environment_vars: list[str] = None,
engine: str = "DeephavenCommunity",
auto_delete_timeout: Optional[int] = 600,
admin_groups: list[str] = None,
viewer_groups: list[str] = None) -> deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryConfigMessage:
"""
Create a configuration suitable for use as a temporary InteractiveConsole query. The worker uses the default
DeephavenCommunity engine. This kind of query enables a client to use the controller to create workers for
general use, in the same way Enterprise clients would have used the dispatcher. For options that are not
represented in the arguments, the returned PersistentQueryConfigMessage can be modified before adding it to
the controller.
:param name: the name of the persistent query. Defaults to None, which means a name based on the current time is used
:param heap_size_gb: the heap size of the worker
:param server: the server to connect to. Defaults to None, which means the first available server
:param extra_jvm_args: extra JVM arguments for starting the worker. Defaults to None.
:param extra_environment_vars: extra Environment variables for the worker. Defaults to None.
:param engine: which engine (worker kind) to use for the backend worker. Defaults to None, which means
"DeephavenCommunity"
:param auto_delete_timeout: after how many seconds should the query be automatically deleted after inactivity.
Defaults to ten minutes. If none, auto-delete is disabled. If zero, the query is deleted immediately after a
client connection is lost
:param admin_groups: list of groups that may administer the query. Defaults to None, which means only the
current user may administer the query.
:param viewer_groups: list of groups that may view the query. Defaults to None, which means only the current
user may view the query.
:return: a configuration suitable for passing to add_query.
"""
if extra_jvm_args is None:
extra_jvm_args = []
if extra_environment_vars is None:
extra_environment_vars = []
if self.effective_user is None:
raise Exception(
"Expect to be authenticated when creating temporary query configuration")
config: deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryConfigMessage = \
deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryConfigMessage()
# Java Long.MIN_VALUE allows the controller to assign a serial number
config.serial = -2 ** 63
config.name = name
config.version = 1
config.owner = self.effective_user
config.enabled = True
config.heapSizeGb = heap_size_gb
config.bufferPoolToHeapRatio = 0.25
config.detailedGCLoggingEnabled = True
if extra_jvm_args is not None:
config.extraJvmArguments.extend(extra_jvm_args)
if extra_environment_vars is not None:
config.extraEnvironmentVariables.extend(extra_environment_vars)
# config.classPathAdditions
if server is not None:
config.serverName = server
else:
config.serverName = self.server_config.dbServers[0].name
if admin_groups is not None:
config.adminGroups.extend(admin_groups)
if viewer_groups is not None:
config.viewerGroups.extend(viewer_groups)
config.restartUsers = deephaven_enterprise.proto.persistent_query_pb2.RU_ADMIN
config.scriptCode = ""
config.scriptLanguage = "Python"
config.configurationType = "InteractiveConsole"
scheduling = []
scheduling.append(
"SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerTemporary")
scheduling.append(
"TemporaryQueueName=InteractiveConsoleTemporaryQueue")
scheduling.append("TemporaryExpirationTimeMillis=172800000")
scheduling.append("StartTime=00:00:00")
scheduling.append("StopTime=23:59:59")
scheduling.append("TimeZone=America/New_York")
scheduling.append("SchedulingDisabled=false")
scheduling.append("Overnight=false")
scheduling.append("RepeatEnabled=false")
scheduling.append("SkipIfUnsuccessful=true")
scheduling.append("StopTimeDisabled=true")
scheduling.append("RestartErrorCount=0")
scheduling.append("RestartErrorDelay=0")
scheduling.append("RestartWhenRunning=false")
if auto_delete_timeout is not None:
scheduling.append("TemporaryAutoDelete=true")
tsf = {
"TerminationDelay": auto_delete_timeout * 1000,
}
config.typeSpecificFieldsJson = self.__encode_type_specific_fields(
tsf)
for param in scheduling:
config.scheduling.append(param)
# How long to wait for the query to initialize, without an initialization script the default of 60 seconds
# should be safe on most installations.
initialize_timeout_seconds: float = 60.0
config.timeoutNanos = int(initialize_timeout_seconds * 1_000_000_000)
config.jvmProfile = "Default"
# config.kubernetesControl = None
config.workerKind = engine
return config
@staticmethod
def __encode_type_specific_fields(tsf: dict) -> str:
"""
Encodes type specific fields from a Python dictionary into the JSON object that the controller expects.
:param tsf: a Python dictionary with type specific fields
:return: a JSON encoded string suitable for the controller
"""
if tsf is None or len(tsf) == 0:
return ""
encoded = {}
for (k, v) in tsf.items():
if v is None:
continue
if isinstance(v, int):
encoded[k] = {"type": "long", "value": str(v)}
elif isinstance(v, bool):
encoded[k] = {"type": "boolean",
"value": "true" if v else "false"}
elif isinstance(v, str):
encoded[k] = {"type": "string", "value": v}
elif isinstance(v, float):
encoded[k] = {"type": "double", "value": v}
return json.dumps(encoded)
[docs] def ping(self):
"""
Ping the controller and refresh our cookie.
:return: True if the ping was sent, False if we had no cookie
"""
ping_request = deephaven_enterprise.proto.controller_common_pb2.PingRequest()
cookie = self.cookie
if cookie is None:
return False
ping_request.cookie = cookie
self.stub.ping(
ping_request, timeout=self.rpc_timeout_secs, wait_for_ready=True)
return True
def _process_one_event(self, event: deephaven_enterprise.proto.controller_pb2.SubscribeResponse):
"""
Process a controller event.
:param event: the event to process
:return: True if this is the end of the initial subscription batch
"""
batch_end: bool = event.event == deephaven_enterprise.proto.controller_pb2.SubscriptionEvent.SE_BATCH_END
if event.event == deephaven_enterprise.proto.controller_pb2.SubscriptionEvent.SE_PUT or batch_end:
serial = event.queryInfo.config.serial
self.query_map[serial] = event.queryInfo
elif event.event == deephaven_enterprise.proto.controller_pb2.SubscriptionEvent.SE_REMOVE:
serial = event.querySerial
del self.query_map[serial]
elif event.event == deephaven_enterprise.proto.controller_pb2.SubscriptionEvent.SE_CONFIG_UPDATE:
self.server_config = event.config
else:
raise Exception("Unexpected event: " + str(event))
return batch_end