Source code for deephaven_enterprise.client.controller

from __future__ import annotations

import json
from typing import Union, Optional, Iterable, Iterator

import threading
import time
import concurrent.futures

import grpc

import deephaven_enterprise.client.util
import deephaven_enterprise.proto.auth_pb2
import deephaven_enterprise.proto.controller_service_pb2_grpc
import deephaven_enterprise.proto.controller_common_pb2
import deephaven_enterprise.proto.controller_pb2
import deephaven_enterprise.proto.persistent_query_pb2


class RefreshThread(threading.Thread):
    def __init__(self,
                 controller_client: "ControllerClient",
                 period_millis: int = 10000):
        """
        This thread pings the controller every 10 seconds to ensure that our cookie is valid and that we stay authenticated.
        :param controller_client:the controller client
        :param period_millis:how often to refresh
        """
        threading.Thread.__init__(self)
        self.daemon = True
        self.controller_client = controller_client
        self.period_millis = period_millis

    def run(self):
        while self.controller_client.cookie is not None:
            time.sleep(self.period_millis / 1000)
            self.controller_client.ping()


class ResponseThread(threading.Thread):
    def __init__(self,
                 controller_client: "ControllerClient",
                 iterator: Iterator[deephaven_enterprise.proto.controller_pb2.SubscribeResponse]):
        """
        This thread processes response from the controller client's subscription method and calls the client's process method.

        :param controller_client: the controller client
        :param iterator: the result of the subscription
        """
        threading.Thread.__init__(self)
        self.daemon = True
        self.controller_client = controller_client
        self.iterator = iterator

    def run(self):
        for x in self.iterator:
            if self.controller_client.cookie is None:
                return
            self.controller_client._process_one_event(x)


[docs]class ControllerClient: """ The ControllerClient connects to the Deephaven PersistentQueryController process. You may subscribe to the state of Persistent Queries as well as create and modify them. This class operates on the deephaven_enterprise.proto.persistent_query_pb2 structures. """ rpc_timeout_secs: int = None cookie = None query_map: dict[int, deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryInfoMessage] = {} effective_user: str = None server_config: deephaven_enterprise.proto.controller_pb2.ControllerConfigurationMessage = None refresh_thread: RefreshThread = None response_thread: ResponseThread = None channel: grpc.Channel = None opened_channel: bool def __init__(self, host: str = None, port: int = None, rpc_timeout_seconds: int = 120, channel: grpc.Channel = None): """ Connect to the persistent query controller. :param host: the host to connect to, requires port :param port: the port to connect to, requires host :param rpc_timeout_seconds: the timeout period in seconds for RPCs. Defaults to 120 if not provided. :param channel: an already configured gRPC channel (exclusive with host and port) """ self.rpc_timeout_secs = rpc_timeout_seconds (self.channel, self.me) = deephaven_enterprise.client.util.get_grpc_channel( channel, host, port) self.opened_channel = channel is None self.clientID = deephaven_enterprise.client.util.make_client_id( self.me) self.stub = deephaven_enterprise.proto.controller_service_pb2_grpc.ControllerApiStub( self.channel) pingreq = deephaven_enterprise.proto.controller_common_pb2.PingRequest() self.stub.ping(pingreq, timeout=self.rpc_timeout_secs, wait_for_ready=True)
[docs] def authenticate(self, token: deephaven_enterprise.proto.auth_pb2.Token) -> None: """ Authenticate to the controller using a token obtained from the AuthClient get_token method. :param token: the token to use for authentication, must have a service of "PersistentQueryController" """ auth_request: deephaven_enterprise.proto.controller_pb2.AuthenticationRequest = deephaven_enterprise.proto.controller_pb2.AuthenticationRequest() auth_request.token.CopyFrom(token) auth_request.clientId.CopyFrom(self.clientID) auth_request.getConfiguration = True auth_response: deephaven_enterprise.proto.controller_pb2.AuthenticationResponse = self.stub.authenticate( auth_request, timeout=self.rpc_timeout_secs, wait_for_ready=True) if not auth_response.authenticated: raise Exception("Could not authenticate to controller.") self.cookie = auth_response.cookie self.refresh_thread = RefreshThread(self) self.refresh_thread.start() self.effective_user = token.user_context.effectiveUser if auth_response.config: self.server_config = auth_response.config else: config_request = deephaven_enterprise.proto.controller_pb2.GetConfigurationRequest() config_request.cookie = self.cookie config_response = self.stub.getConfiguration(config_request, timeout=self.rpc_timeout_secs, wait_for_ready=True) self.server_config = config_response.config
[docs] def close(self) -> None: """ Invalidate the clients cookie so that further operations do not take place with this client. """ if self.cookie is not None: ivcr = deephaven_enterprise.proto.auth_pb2.InvalidateCookieRequest() ivcr.cookie = self.cookie self.cookie = None self.stub.invalidateCookie( ivcr, timeout=self.rpc_timeout_secs, wait_for_ready=True) if self.opened_channel: self.channel.close()
def _do_subscription(self): if self.cookie is None: raise Exception("Should authenticate first.") subscription_request = deephaven_enterprise.proto.controller_pb2.SubscribeRequest() subscription_request.cookie = self.cookie subscription = self.stub.subscribe( subscription_request, wait_for_ready=True, timeout=None) first_value = next(subscription) return (subscription, first_value)
[docs] def subscribe(self) -> None: """ Subscribe to persistent query state, and wait for the initial query state snapshot to be populated. A successful call to authenticate should have happened before this call. After the subscription is complete, you may call the map method to retrieve the complete map or the get method to fetch a specific query by serial number. """ executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) result_future: concurrent.futures.Future = executor.submit( self._do_subscription) (subscription, first) = result_future.result( timeout=self.rpc_timeout_secs) last: bool = self._process_one_event(first) while not last: last = self._process_one_event(next(subscription)) # Process response into our map self.response_thread = ResponseThread(self, subscription) self.response_thread.start() executor.shutdown()
[docs] def map(self) -> dict[int, deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryInfoMessage]: """ Retrieve a copy of the current persistent query state. A successful call to subscribe should have happened before this call. :return: a map from serial number to persistent query info """ if self.response_thread is None: raise Exception("Should subscribe first.") return self.query_map.copy()
[docs] def get(self, serial: int, timeout_seconds: float = 0) -> deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryInfoMessage: """ Get a query from the map. If the query does not exist, throws a KeyError. A successful call to subscribe should have happened before this call. The timeout_seconds parameter can be specified to wait for the query to exist before failing with KeyError. This is useful when you have just created the query, know it's serial, but the controller has not yet published the state to you. :param serial: :param timeout_seconds: :return: the PersistentQueryInfoMessage associated with the serial number """ if self.response_thread is None: raise Exception("Should subscribe first.") timeout = time.time_ns() + int(timeout_seconds * 1_000_000_000) tries = 0 last_error: Optional[KeyError] = None while tries == 0 or time.time_ns() < timeout: if tries > 0: time.sleep(0.01) tries += 1 try: return self.query_map[serial] except KeyError as ke: last_error = ke raise last_error
[docs] @staticmethod def is_running(status: deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryStatusEnum) -> bool: """ Is the status from the query info running? If not running and not terminal, then the query is in the initialization process. """ return status == deephaven_enterprise.proto.persistent_query_pb2.PQS_RUNNING
[docs] @staticmethod def is_terminal(status: deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryStatusEnum) -> bool: """ Is the status from the query info terminal? If not running and not terminal, then the query is in the initialization process. """ return status == deephaven_enterprise.proto.persistent_query_pb2.PQS_ERROR or \ status == deephaven_enterprise.proto.persistent_query_pb2.PQS_DISCONNECTED or \ status == deephaven_enterprise.proto.persistent_query_pb2.PQS_STOPPED or \ status == deephaven_enterprise.proto.persistent_query_pb2.PQS_FAILED or \ status == deephaven_enterprise.proto.persistent_query_pb2.PQS_COMPLETED
[docs] def add_query(self, query_config: deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryConfigMessage) -> int: """ Add a persistent query. A successful call to authenticate should have happened before this call. :param query_config: the configuration of the query to add. :return: the serial number of the created query """ if self.cookie is None: raise Exception("Should authenticate first.") aqreq = deephaven_enterprise.proto.controller_pb2.AddQueryRequest() aqreq.cookie = self.cookie aqreq.config.CopyFrom(query_config) aqresp: deephaven_enterprise.proto.controller_pb2.AddQueryResponse = self.stub.addQuery( aqreq, timeout=self.rpc_timeout_secs, wait_for_ready=True) return aqresp.querySerial
[docs] def delete_query(self, serial: int) -> None: """ Delete a query. A successful call to authenticate should have happened before this call. :param serial: the serial number to delete """ if self.cookie is None: raise Exception("Should authenticate first.") remove_request: deephaven_enterprise.proto.controller_pb2.RemoveQueryRequest = \ deephaven_enterprise.proto.controller_pb2.RemoveQueryRequest() remove_request.cookie = self.cookie remove_request.serial = serial self.stub.removeQuery(remove_request, timeout=self.rpc_timeout_secs, wait_for_ready=True)
[docs] def restart_query(self, serials: Union[Iterable[int], int]) -> None: """ Restart one or more queries. A successful call to authenticate should have happened before this call. :param serials: a query serial number, or an iterable of serial numbers """ if self.cookie is None: raise Exception("Should authenticate first.") restartreq = deephaven_enterprise.proto.controller_pb2.RestartQueryRequest() restartreq.cookie = self.cookie if isinstance(serials, int): restartreq.serials.append(serials) else: for serial in serials: restartreq.serials.append(serial) self.stub.restartQuery(restartreq, timeout=self.rpc_timeout_secs, wait_for_ready=True)
[docs] def make_temporary_config(self, name: str, heap_size_gb: float, server: str = None, extra_jvm_args: list[str] = None, extra_environment_vars: list[str] = None, engine: str = "DeephavenCommunity", auto_delete_timeout: Optional[int] = 600, admin_groups: list[str] = None, viewer_groups: list[str] = None) -> deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryConfigMessage: """ Create a configuration suitable for use as a temporary InteractiveConsole query. The worker uses the default DeephavenCommunity engine. This kind of query enables a client to use the controller to create workers for general use, in the same way Enterprise clients would have used the dispatcher. For options that are not represented in the arguments, the returned PersistentQueryConfigMessage can be modified before adding it to the controller. :param name: the name of the persistent query. Defaults to None, which means a name based on the current time is used :param heap_size_gb: the heap size of the worker :param server: the server to connect to. Defaults to None, which means the first available server :param extra_jvm_args: extra JVM arguments for starting the worker. Defaults to None. :param extra_environment_vars: extra Environment variables for the worker. Defaults to None. :param engine: which engine (worker kind) to use for the backend worker. Defaults to None, which means "DeephavenCommunity" :param auto_delete_timeout: after how many seconds should the query be automatically deleted after inactivity. Defaults to ten minutes. If none, auto-delete is disabled. If zero, the query is deleted immediately after a client connection is lost :param admin_groups: list of groups that may administer the query. Defaults to None, which means only the current user may administer the query. :param viewer_groups: list of groups that may view the query. Defaults to None, which means only the current user may view the query. :return: a configuration suitable for passing to add_query. """ if extra_jvm_args is None: extra_jvm_args = [] if extra_environment_vars is None: extra_environment_vars = [] if self.effective_user is None: raise Exception( "Expect to be authenticated when creating temporary query configuration") config: deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryConfigMessage = \ deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryConfigMessage() # Java Long.MIN_VALUE allows the controller to assign a serial number config.serial = -2 ** 63 config.name = name config.version = 1 config.owner = self.effective_user config.enabled = True config.heapSizeGb = heap_size_gb config.bufferPoolToHeapRatio = 0.25 config.detailedGCLoggingEnabled = True if extra_jvm_args is not None: config.extraJvmArguments.extend(extra_jvm_args) if extra_environment_vars is not None: config.extraEnvironmentVariables.extend(extra_environment_vars) # config.classPathAdditions if server is not None: config.serverName = server else: config.serverName = self.server_config.dbServers[0].name if admin_groups is not None: config.adminGroups.extend(admin_groups) if viewer_groups is not None: config.viewerGroups.extend(viewer_groups) config.restartUsers = deephaven_enterprise.proto.persistent_query_pb2.RU_ADMIN config.scriptCode = "" config.scriptLanguage = "Python" config.configurationType = "InteractiveConsole" scheduling = [] scheduling.append( "SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerTemporary") scheduling.append( "TemporaryQueueName=InteractiveConsoleTemporaryQueue") scheduling.append("TemporaryExpirationTimeMillis=172800000") scheduling.append("StartTime=00:00:00") scheduling.append("StopTime=23:59:59") scheduling.append("TimeZone=America/New_York") scheduling.append("SchedulingDisabled=false") scheduling.append("Overnight=false") scheduling.append("RepeatEnabled=false") scheduling.append("SkipIfUnsuccessful=true") scheduling.append("StopTimeDisabled=true") scheduling.append("RestartErrorCount=0") scheduling.append("RestartErrorDelay=0") scheduling.append("RestartWhenRunning=false") if auto_delete_timeout is not None: scheduling.append("TemporaryAutoDelete=true") tsf = { "TerminationDelay": auto_delete_timeout * 1000, } config.typeSpecificFieldsJson = self.__encode_type_specific_fields( tsf) for param in scheduling: config.scheduling.append(param) # How long to wait for the query to initialize, without an initialization script the default of 60 seconds # should be safe on most installations. initialize_timeout_seconds: float = 60.0 config.timeoutNanos = int(initialize_timeout_seconds * 1_000_000_000) config.jvmProfile = "Default" # config.kubernetesControl = None config.workerKind = engine return config
@staticmethod def __encode_type_specific_fields(tsf: dict) -> str: """ Encodes type specific fields from a Python dictionary into the JSON object that the controller expects. :param tsf: a Python dictionary with type specific fields :return: a JSON encoded string suitable for the controller """ if tsf is None or len(tsf) == 0: return "" encoded = {} for (k, v) in tsf.items(): if v is None: continue if isinstance(v, int): encoded[k] = {"type": "long", "value": str(v)} elif isinstance(v, bool): encoded[k] = {"type": "boolean", "value": "true" if v else "false"} elif isinstance(v, str): encoded[k] = {"type": "string", "value": v} elif isinstance(v, float): encoded[k] = {"type": "double", "value": v} return json.dumps(encoded)
[docs] def ping(self): """ Ping the controller and refresh our cookie. :return: True if the ping was sent, False if we had no cookie """ ping_request = deephaven_enterprise.proto.controller_common_pb2.PingRequest() cookie = self.cookie if cookie is None: return False ping_request.cookie = cookie self.stub.ping( ping_request, timeout=self.rpc_timeout_secs, wait_for_ready=True) return True
def _process_one_event(self, event: deephaven_enterprise.proto.controller_pb2.SubscribeResponse): """ Process a controller event. :param event: the event to process :return: True if this is the end of the initial subscription batch """ batch_end: bool = event.event == deephaven_enterprise.proto.controller_pb2.SubscriptionEvent.SE_BATCH_END if event.event == deephaven_enterprise.proto.controller_pb2.SubscriptionEvent.SE_PUT or batch_end: serial = event.queryInfo.config.serial self.query_map[serial] = event.queryInfo elif event.event == deephaven_enterprise.proto.controller_pb2.SubscriptionEvent.SE_REMOVE: serial = event.querySerial del self.query_map[serial] elif event.event == deephaven_enterprise.proto.controller_pb2.SubscriptionEvent.SE_CONFIG_UPDATE: self.server_config = event.config else: raise Exception("Unexpected event: " + str(event)) return batch_end