Source code for deephaven_enterprise.client.controller

from __future__ import annotations

import json
from typing import Union, Optional, Iterable, Iterator, List, Dict

import logging
import threading
import time
import concurrent.futures

import grpc

import deephaven_enterprise.client.util
import deephaven_enterprise.proto.auth_pb2
import deephaven_enterprise.proto.controller_service_pb2_grpc
import deephaven_enterprise.proto.controller_common_pb2
import deephaven_enterprise.proto.controller_pb2
import deephaven_enterprise.proto.persistent_query_pb2

from deephaven_enterprise.client.auth import AuthClient

class RefreshThread(threading.Thread):
    def __init__(self,
                 controller_client: "ControllerClient",
                 period_millis: int = 10000):
        """
        This thread pings the controller every 10 seconds to ensure that our cookie is valid and that we stay authenticated.
        :param controller_client:the controller client
        :param period_millis:how often to refresh
        """
        threading.Thread.__init__(self)
        self.stop = False
        self.daemon = True
        self.controller_client = controller_client
        self.period_millis = period_millis

    def run(self):
        while not self.stop and self.controller_client.cookie is not None:
            time.sleep(self.period_millis / 1000)
            try:
                self.controller_client.ping()
            except grpc.RpcError as e:
                if self.controller_client._should_retry_auth(e):
                    self.controller_client.log.info("Retryable Authentication Error in ping: %s" % str(e))
                    self.controller_client._reauthenticate(self.controller_client.rpc_timeout_secs)
                    continue
                if self.controller_client._should_retry(e):
                    self.controller_client.log.info("Retryable gRPC Error in ping: %s" % str(e))
                    continue
                raise e


class ResponseThread(threading.Thread):
    def __init__(self,
                 controller_client: "ControllerClient",
                 iterator: Iterator[deephaven_enterprise.proto.controller_pb2.SubscribeResponse]):
        """
        This thread processes response from the controller client's subscription method and calls the client's process method.

        :param controller_client: the controller client
        :param iterator: the result of the subscription
        """
        threading.Thread.__init__(self)
        self.daemon = True
        self.controller_client = controller_client
        self.iterator = iterator

    def run(self):
        try:
            for x in self.iterator:
                if self.controller_client.cookie is None:
                    return
                self.controller_client._process_one_event(x)
            self.controller_client.log.info("Controller subscription completed.")
        except grpc.RpcError as e:
            if self.controller_client.cookie is None:
                # If we are logged out, we should not bother
                return
            if not self.controller_client._should_retry(e) and not self.controller_client._should_retry_auth(e):
                raise e
            self.controller_client.log.info("Retryable Error in controller subscription handling: %s" % str(e))


        # After our subscription ends, we should re-establish a subscription, unless we are closed
        # (meaning there is no cookie anymore)
        if self.controller_client.cookie is None:
            return
        self.controller_client.subscribe_in_flight = True
        self.controller_client.response_thread = None
        self.controller_client.subscribe()


[docs]class ControllerClient: """ The ControllerClient connects to the Deephaven PersistentQueryController process. You may subscribe to the state of Persistent Queries as well as create and modify them. This class operates on the deephaven_enterprise.proto.persistent_query_pb2 structures. """ rpc_timeout_secs: int query_map: Dict[int, deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryInfoMessage] effective_user: str server_config: deephaven_enterprise.proto.controller_pb2.ControllerConfigurationMessage refresh_thread: RefreshThread response_thread: ResponseThread channel: grpc.Channel opened_channel: bool auth_client: AuthClient log : logging.Logger def __init__(self, host: str = None, port: int = None, rpc_timeout_seconds: int = 120, channel: grpc.Channel = None): """ Connect to the persistent query controller. :param host: the host to connect to, requires port :param port: the port to connect to, requires host :param rpc_timeout_seconds: the timeout period in seconds for RPCs. Defaults to 120 if not provided. :param channel: an already configured gRPC channel (exclusive with host and port) """ self.cookie = None self.rpc_timeout_secs = rpc_timeout_seconds self.query_map = {} self.effective_user = None self.server_config = None self.refresh_thread = None self.response_thread = None self.subscribe_in_flight = False self.auth_client = None self.log = logging.getLogger("deephaven_enterprise.controller") (self.channel, self.me) = deephaven_enterprise.client.util.get_grpc_channel( channel, host, port) self.opened_channel = channel is None self.clientID = deephaven_enterprise.client.util.make_client_id( self.me) self.stub = deephaven_enterprise.proto.controller_service_pb2_grpc.ControllerApiStub( self.channel) pingreq = deephaven_enterprise.proto.controller_common_pb2.PingRequest() self.stub.ping(pingreq, timeout=self.rpc_timeout_secs, wait_for_ready=True)
[docs] def set_auth_client(self, auth_client: AuthClient): """ Authenticate using the given authentication client, which is stored as a local variable. If a controller operation fails with a gRPC unauthenticated exception; then we attempt to use the authentication client to create a new controller session. If the authentication client cannot produce tokens, then we cannot proceed. """ self.auth_client = auth_client self._reauthenticate(self.rpc_timeout_secs)
def _should_retry_auth(self, e: grpc.RpcError): """ Is this gRPC Error something that should be retried after attempting to reauthenticate to the controller? """ code = e.code() return self.auth_client is not None and code.name == "UNAUTHENTICATED" def _should_retry(self, e: grpc.RpcError): """ Is this a transient error that we should retry? """ code = e.code() if code.name == "UNAVAILABLE": return True if code.name == "INTERNAL": return "RST_STREAM" in e.details() return False def _reauthenticate(self, timeout: float): """ Request a new token from our authentication client, and then uses it to authenticate to the controller """ if self.auth_client is None: raise Exception("Cannot reauthenticate without an auth_client.") deadline = time.time() + timeout token = self.auth_client.get_token("PersistentQueryController", timeout) timeout = deadline - time.time() self.authenticate(token, timeout)
[docs] def authenticate(self, token: deephaven_enterprise.proto.auth_pb2.Token, timeout: float = None) -> None: """ Authenticate to the controller using a token obtained from the AuthClient get_token method. :param token: the token to use for authentication, must have a service of "PersistentQueryController" """ auth_request: deephaven_enterprise.proto.controller_pb2.AuthenticationRequest = deephaven_enterprise.proto.controller_pb2.AuthenticationRequest() auth_request.token.CopyFrom(token) auth_request.clientId.CopyFrom(self.clientID) auth_request.getConfiguration = True use_timeout: float = timeout if timeout is not None else self.rpc_timeout_secs auth_response: deephaven_enterprise.proto.controller_pb2.AuthenticationResponse = self.stub.authenticate( auth_request, timeout=use_timeout, wait_for_ready=True) if not auth_response.authenticated: raise Exception("Could not authenticate to controller.") self.cookie = auth_response.cookie if self.refresh_thread is not None: self.refresh_thread.stop = True self.refresh_thread = RefreshThread(self) self.refresh_thread.start() self.effective_user = token.user_context.effectiveUser if auth_response.config: self.server_config = auth_response.config else: config_request = deephaven_enterprise.proto.controller_pb2.GetConfigurationRequest() config_request.cookie = self.cookie config_response = self.stub.getConfiguration(config_request, timeout=self.rpc_timeout_secs, wait_for_ready=True) self.server_config = config_response.config
[docs] def close(self) -> None: """ Invalidate the clients cookie so that further operations do not take place with this client. """ if self.cookie is not None: ivcr = deephaven_enterprise.proto.auth_pb2.InvalidateCookieRequest() ivcr.cookie = self.cookie self.cookie = None self.stub.invalidateCookie( ivcr, timeout=self.rpc_timeout_secs, wait_for_ready=True) if self.opened_channel: self.channel.close()
def _do_subscription(self): if self.cookie is None: raise Exception("Should authenticate first.") subscription_request = deephaven_enterprise.proto.controller_pb2.SubscribeRequest() # We have no timeout, because the subscription must last "forever", we use a future executor to handle the # actual timeout of the gRPC call def _grpc_call(): subscription = self.stub.subscribe(subscription_request, wait_for_ready=True, timeout=None) first_value = next(subscription) return (subscription, first_value) return self._req_with_authentication_retry_no_timeout(subscription_request, _grpc_call)
[docs] def subscribe(self) -> None: """ Subscribe to persistent query state, and wait for the initial query state snapshot to be populated. A successful call to authenticate should have happened before this call. After the subscription is complete, you may call the map method to retrieve the complete map or the get method to fetch a specific query by serial number. """ self.subscribe_in_flight = True executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) result_future: concurrent.futures.Future = executor.submit( self._do_subscription) (subscription, first) = result_future.result(timeout=self.rpc_timeout_secs) last: bool = self._process_one_event(first) while not last: last = self._process_one_event(next(subscription)) # Process response into our map self.response_thread = ResponseThread(self, subscription) self.response_thread.start() self.subscribe_in_flight = False executor.shutdown()
[docs] def map(self) -> Dict[int, deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryInfoMessage]: """ Retrieve a copy of the current persistent query state. A successful call to subscribe should have happened before this call. :return: a map from serial number to persistent query info """ deadline_ns = time.time_ns() + int(self.rpc_timeout_secs * 1_000_000_000) self.maybe_wait_for_subscription(deadline_ns) return self.query_map.copy()
def maybe_wait_for_subscription(self, deadline_ns): if self.response_thread is None: if self.subscribe_in_flight: while time.time_ns() < deadline_ns: time.sleep(0.1) if self.response_thread is not None: return raise Exception("Should subscribe first.")
[docs] def get(self, serial: int, timeout_seconds: float = 0) -> deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryInfoMessage: """ Get a query from the map. If the query does not exist, throws a KeyError. A successful call to subscribe should have happened before this call. The timeout_seconds parameter can be specified to wait for the query to exist before failing with KeyError. This is useful when you have just created the query, know it's serial, but the controller has not yet published the state to you. :param serial: :param timeout_seconds: :return: the PersistentQueryInfoMessage associated with the serial number """ deadline_ns = time.time_ns() + int(timeout_seconds * 1_000_000_000) self.maybe_wait_for_subscription(deadline_ns) tries = 0 last_error: Optional[KeyError] = None while tries == 0 or time.time_ns() < deadline_ns: if tries > 0: time.sleep(0.01) tries += 1 try: return self.query_map[serial] except KeyError as ke: last_error = ke raise last_error
[docs] @staticmethod def is_running(status: deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryStatusEnum) -> bool: """ Is the status from the query info running? If not running and not terminal, then the query is in the initialization process. """ return status == deephaven_enterprise.proto.persistent_query_pb2.PQS_RUNNING
[docs] @staticmethod def is_terminal(status: deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryStatusEnum) -> bool: """ Is the status from the query info terminal? If not running and not terminal, then the query is in the initialization process. """ return status == deephaven_enterprise.proto.persistent_query_pb2.PQS_ERROR or \ status == deephaven_enterprise.proto.persistent_query_pb2.PQS_DISCONNECTED or \ status == deephaven_enterprise.proto.persistent_query_pb2.PQS_STOPPED or \ status == deephaven_enterprise.proto.persistent_query_pb2.PQS_FAILED or \ status == deephaven_enterprise.proto.persistent_query_pb2.PQS_COMPLETED
def _req_with_authentication_retry(self, request, timeout, method): """ Perform the requested method with retries. :param request: the request that we'll be sending in method, the cookie for authentication is set on each try :param timeout: how long in seconds we have to complete the call :param method: the method to execute, must take a single parameter which is the number of seconds for the timeout """ deadline = time.time() + timeout while True: request.cookie = self.cookie try: return method(timeout) except grpc.RpcError as e: timeout = deadline - time.time() if timeout > 0: if self._should_retry_auth(e): self.log.info("Retryable gRPC Authentication Error: %s" % str(e)) self._reauthenticate(timeout) continue if self._should_retry(e): self.log.info("Retryable gRPC Error: %s" % str(e)) continue raise e def _req_with_authentication_retry_no_timeout(self, request, method): """ Perform the requested method with retries, no timeout is provided to the method; the authentication call uses our standard timeout. This is intended for use when we have an external thing managing timeouts (like for streaming RPCs). :param request: the request that we'll be sending in method, the cookie for authentication is set on each try :param method: the method to execute """ while True: request.cookie = self.cookie try: return method() except grpc.RpcError as e: if self._should_retry_auth(e): self.log.info("Retryable gRPC Authentication Error: %s" % str(e)) self._reauthenticate(self.rpc_timeout_secs) continue if self._should_retry(e): self.log.info("Retryable gRPC Error: %s" % str(e)) continue raise e
[docs] def add_query(self, query_config: deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryConfigMessage) -> int: """ Add a persistent query. A successful call to authenticate should have happened before this call. :param query_config: the configuration of the query to add. :return: the serial number of the created query """ if self.cookie is None: raise Exception("Should authenticate first.") aqreq = deephaven_enterprise.proto.controller_pb2.AddQueryRequest() aqreq.config.CopyFrom(query_config) aqresp: deephaven_enterprise.proto.controller_pb2.AddQueryResponse = self._req_with_authentication_retry(aqreq, timeout=self.rpc_timeout_secs, method=lambda timeout: self.stub.addQuery( aqreq, timeout=self.rpc_timeout_secs, wait_for_ready=True)) return aqresp.querySerial
[docs] def delete_query(self, serial: int) -> None: """ Delete a query. A successful call to authenticate should have happened before this call. :param serial: the serial number to delete """ if self.cookie is None: raise Exception("Should authenticate first.") remove_request: deephaven_enterprise.proto.controller_pb2.RemoveQueryRequest = \ deephaven_enterprise.proto.controller_pb2.RemoveQueryRequest() remove_request.serial = serial self._req_with_authentication_retry(remove_request, self.rpc_timeout_secs, method=lambda timeout: self.stub.removeQuery(remove_request, timeout, wait_for_ready=True))
[docs] def restart_query(self, serials: Union[Iterable[int], int]) -> None: """ Restart one or more queries. A successful call to authenticate should have happened before this call. :param serials: a query serial number, or an iterable of serial numbers """ if self.cookie is None: raise Exception("Should authenticate first.") restartreq = deephaven_enterprise.proto.controller_pb2.RestartQueryRequest() if isinstance(serials, int): restartreq.serials.append(serials) else: for serial in serials: restartreq.serials.append(serial) self._req_with_authentication_retry(restartreq, self.rpc_timeout_secs, lambda timeout: self.stub.restartQuery(restartreq, timeout, wait_for_ready=True))
[docs] def make_temporary_config(self, name: str, heap_size_gb: float, server: str = None, extra_jvm_args: List[str] = None, extra_environment_vars: List[str] = None, engine: str = "DeephavenCommunity", auto_delete_timeout: Optional[int] = 600, admin_groups: List[str] = None, viewer_groups: List[str] = None) -> deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryConfigMessage: """ Create a configuration suitable for use as a temporary InteractiveConsole query. The worker uses the default DeephavenCommunity engine. This kind of query enables a client to use the controller to create workers for general use, in the same way Enterprise clients would have used the dispatcher. For options that are not represented in the arguments, the returned PersistentQueryConfigMessage can be modified before adding it to the controller. :param name: the name of the persistent query. Defaults to None, which means a name based on the current time is used :param heap_size_gb: the heap size of the worker :param server: the server to connect to. Defaults to None, which means the first available server :param extra_jvm_args: extra JVM arguments for starting the worker. Defaults to None. :param extra_environment_vars: extra Environment variables for the worker. Defaults to None. :param engine: which engine (worker kind) to use for the backend worker. Defaults to None, which means "DeephavenCommunity" :param auto_delete_timeout: after how many seconds should the query be automatically deleted after inactivity. Defaults to ten minutes. If none, auto-delete is disabled. If zero, the query is deleted immediately after a client connection is lost :param admin_groups: list of groups that may administer the query. Defaults to None, which means only the current user may administer the query. :param viewer_groups: list of groups that may view the query. Defaults to None, which means only the current user may view the query. :return: a configuration suitable for passing to add_query. """ if extra_jvm_args is None: extra_jvm_args = [] if extra_environment_vars is None: extra_environment_vars = [] if self.effective_user is None: raise Exception( "Expect to be authenticated when creating temporary query configuration") config: deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryConfigMessage = \ deephaven_enterprise.proto.persistent_query_pb2.PersistentQueryConfigMessage() # Java Long.MIN_VALUE allows the controller to assign a serial number config.serial = -2 ** 63 config.name = name config.version = 1 config.owner = self.effective_user config.enabled = True config.heapSizeGb = heap_size_gb config.bufferPoolToHeapRatio = 0.25 config.detailedGCLoggingEnabled = True if extra_jvm_args is not None: config.extraJvmArguments.extend(extra_jvm_args) if extra_environment_vars is not None: config.extraEnvironmentVariables.extend(extra_environment_vars) # config.classPathAdditions if server is not None: config.serverName = server else: config.serverName = self.server_config.dbServers[0].name if admin_groups is not None: config.adminGroups.extend(admin_groups) if viewer_groups is not None: config.viewerGroups.extend(viewer_groups) config.restartUsers = deephaven_enterprise.proto.persistent_query_pb2.RU_ADMIN config.scriptCode = "" config.scriptLanguage = "Python" config.configurationType = "InteractiveConsole" scheduling = [] scheduling.append( "SchedulerType=com.illumon.iris.controller.IrisQuerySchedulerTemporary") scheduling.append( "TemporaryQueueName=InteractiveConsoleTemporaryQueue") scheduling.append("TemporaryExpirationTimeMillis=172800000") scheduling.append("StartTime=00:00:00") scheduling.append("StopTime=23:59:59") scheduling.append("TimeZone=America/New_York") scheduling.append("SchedulingDisabled=false") scheduling.append("Overnight=false") scheduling.append("RepeatEnabled=false") scheduling.append("SkipIfUnsuccessful=true") scheduling.append("StopTimeDisabled=true") scheduling.append("RestartErrorCount=0") scheduling.append("RestartErrorDelay=0") scheduling.append("RestartWhenRunning=false") if auto_delete_timeout is not None: scheduling.append("TemporaryAutoDelete=true") tsf = { "TerminationDelay": auto_delete_timeout * 1000, } config.typeSpecificFieldsJson = self.__encode_type_specific_fields( tsf) for param in scheduling: config.scheduling.append(param) # How long to wait for the query to initialize, without an initialization script the default of 60 seconds # should be safe on most installations. initialize_timeout_seconds: float = 60.0 config.timeoutNanos = int(initialize_timeout_seconds * 1_000_000_000) config.jvmProfile = "Default" # config.kubernetesControl = None config.workerKind = engine return config
@staticmethod def __encode_type_specific_fields(tsf: Dict) -> str: """ Encodes type specific fields from a Python dictionary into the JSON object that the controller expects. :param tsf: a Python dictionary with type specific fields :return: a JSON encoded string suitable for the controller """ if tsf is None or len(tsf) == 0: return "" encoded = {} for (k, v) in tsf.items(): if v is None: continue if isinstance(v, int): encoded[k] = {"type": "long", "value": str(v)} elif isinstance(v, bool): encoded[k] = {"type": "boolean", "value": "true" if v else "false"} elif isinstance(v, str): encoded[k] = {"type": "string", "value": v} elif isinstance(v, float): encoded[k] = {"type": "double", "value": v} return json.dumps(encoded)
[docs] def ping(self): """ Ping the controller and refresh our cookie. :return: True if the ping was sent, False if we had no cookie """ ping_request = deephaven_enterprise.proto.controller_common_pb2.PingRequest() cookie = self.cookie if cookie is None: return False ping_request.cookie = cookie self.stub.ping( ping_request, timeout=self.rpc_timeout_secs, wait_for_ready=True) return True
def _process_one_event(self, event: deephaven_enterprise.proto.controller_pb2.SubscribeResponse): """ Process a controller event. :param event: the event to process :return: True if this is the end of the initial subscription batch """ batch_end: bool = event.event == deephaven_enterprise.proto.controller_pb2.SubscriptionEvent.SE_BATCH_END if event.event == deephaven_enterprise.proto.controller_pb2.SubscriptionEvent.SE_PUT or batch_end: serial = event.queryInfo.config.serial self.query_map[serial] = event.queryInfo elif event.event == deephaven_enterprise.proto.controller_pb2.SubscriptionEvent.SE_REMOVE: serial = event.querySerial if serial in self.query_map: del self.query_map[serial] else: raise Exception("Unknown query: " + str(event)) elif event.event == deephaven_enterprise.proto.controller_pb2.SubscriptionEvent.SE_CONFIG_UPDATE: self.server_config = event.config else: raise Exception("Unexpected event: " + str(event)) return batch_end