deephaven_enterprise.client.controller

This module provides a client for the Deephaven PersistentQueryController process. The PersistentQueryController process is responsible for managing the lifecycle of Persistent Queries, which are long-running queries that can be started, stopped, and restarted. The ControllerClient allows you to subscribe to the state of Persistent Queries, create and modify them, and wait for them to reach a particular state.

class ControllerClient(host=None, port=None, rpc_timeout_seconds=120, channel=None, channel_options=None, client_name=None)[source]

Bases: object

The ControllerClient connects to the Deephaven PersistentQueryController process, and allows you to subscribe to the state of Persistent Queries as well as create and modify them.

This class operates on the deephaven_enterprise.proto.persistent_query_pb2 structures.

Creates a ControllerClient to connect to the specified PersistentQueryController.

Parameters:
  • host (str) – the host to connect to, if not None, requires port to be provided. Defaults to None.

  • port (int) – the port to connect to, if not None, requires host to be provided. Defaults to None.

  • rpc_timeout_seconds (int) – the timeout period in seconds for RPCs. Defaults to 120 if not provided.

  • channel (grpc.Channel) – an already configured gRPC channel (exclusive with host and port), it will not be closed by this client. Defaults to None.

  • channel_options (Optional[Any]) – a list of options for channel creation (not used if the channel is provided). Defaults to None.

  • client_name (str) – a string to help identify this client in logs. Defaults to None.

add_query(query_config)[source]

Adds a Persistent Query.

A successful call to authenticate() should have happened before this call.

Parameters:

query_config (persistent_query_pb2.PersistentQueryConfigMessage) – the configuration of the Persistent Query to add

Return type:

int

Returns:

the serial number of the created Persistent Query

authenticate(token, timeout=None)[source]

Authenticates to the Persistent Query Controller using a token obtained from the auth.AuthClient.get_token() method.

Parameters:
  • token (auth_pb2.Token) – the token to use for authentication, must have a service of PersistentQueryController

  • timeout (float) – the timeout period in seconds for the authentication call. Defaults to None, meaning to use the client’s timeout.

Raises:

RuntimeError – if the authentication fails

Return type:

None

close()[source]

Invalidates the client’s cookie so that further operations do not take place with this client.

Return type:

None

delete_query(serial)[source]

Deletes a Persistent Query.

A successful call to authenticate() should have happened before this call.

Parameters:

serial (int) – the serial number of the Persistent Query to delete

Raises:

RuntimeError

Return type:

None

fetch_script(pq)[source]

Fetches the script for a Persistent Query.

Parameters:

pq (Union[str, int]) – the name or serial number of the Persistent Query to fetch the script for

Return type:

str

Returns:

str

Raises:

RuntimeError

fetch_scripts()[source]

Fetches the scripts for all Persistent Queries that the client has access to.

Returns:

a map of serial numbers to scripts

Return type:

dict[int, str]

Raises:

RuntimeError

static generate_disabled_scheduler()[source]

Generates a scheduler array for a Persistent Query that has scheduling disabled.

Return type:

list[str]

Returns:

a scheduling definition array that can be passed to the controller when adding or updating a Persistent Query

get(serial, timeout_seconds=0)[source]

Gets a query from the map. If the query does not exist, throws a KeyError.

A successful call to subscribe() should have happened before this call.

The timeout_seconds parameter can be specified to wait for the query to exist before failing with KeyError. This is useful when you have just created the query, know its serial, but the controller has not yet published the state to you.

The timeout_seconds parameter can be specified to wait for the Persistent Query to exist before failing with KeyError. This is useful when you have just created the Persistent Query, know it’s serial, but the controller has not yet published the state to you.

Parameters:
  • serial (int) – the serial number of the Persistent Query to retrieve

  • timeout_seconds (float) – how long to wait for the Persistent Query to be found before raising an exception

Return type:

PersistentQueryInfoMessage

Returns:

the PersistentQueryInfoMessage associated with the serial number

Raises:

KeyError – if the Persistent Query is not found within the timeout period

get_serial_for_name(name, timeout_seconds=0)[source]

Retrieves the serial number for a given Persistent Query name.

Parameters:
  • name (str) – the name of the Persistent Query to retrieve

  • timeout_seconds (float) – how long to wait for the Persistent Query to be found before raising an exception

Return type:

int

Returns:

the serial number

Raises:

RuntimeError

static is_completed(status)[source]
Is the status from the Persistent Query info Completed? If not running and not terminal, then the Persistent

Query is in the initialization process.

Parameters:

status (persistent_query_pb2.PersistentQueryStatusEnum) – the status to check

Return type:

bool

static is_running(status)[source]

Is the status from the Persistent Query info running? If not running and not terminal, then the Persistent Query is in the initialization process.

Parameters:

status (persistent_query_pb2.PersistentQueryStatusEnum) – the status to check

Return type:

bool

static is_status_uninitialized(status)[source]

Is the status from the Persistent Query info uninitialized? This is the case before a Persistent Query is ever started.

Parameters:

status (persistent_query_pb2.PersistentQueryStatusEnum) – the status to check

Return type:

bool

static is_terminal(status)[source]

Is the status from the Persistent Query info terminal? If not running and not terminal, then the Persistent Query is in the initialization process.

Parameters:

status (persistent_query_pb2.PersistentQueryStatusEnum) – the status to check

Return type:

bool

make_temporary_config(name, heap_size_gb, server=None, extra_jvm_args=None, extra_environment_vars=None, engine='DeephavenCommunity', auto_delete_timeout=600, admin_groups=None, viewer_groups=None)[source]

Creates a configuration suitable for use as a temporary InteractiveConsole Persistent Query. The worker uses the default Deephaven Community engine. This kind of Persistent Query enables a client to use the controller to create workers for general use, in the same way Enterprise clients would have used the dispatcher. For options that are not represented in the arguments, the returned PersistentQueryConfigMessage can be modified before adding it to the controller.

Parameters:
  • name (str) – the name of the Persistent Query. Defaults to None, which means a name based on the current time is used

  • heap_size_gb (float) – the heap size of the worker in gigabytes

  • server (str) – the server to connect to. Defaults to None, which means the first available server

  • extra_jvm_args (list[str]) – extra JVM arguments for starting the worker. Defaults to None.

  • extra_environment_vars (list[str]) – extra Environment variables for the worker. Defaults to None.

  • engine (str) – which engine (worker kind) to use for the backend worker. Defaults to None, which means “DeephavenCommunity”

  • auto_delete_timeout (Optional[int]) – after how many seconds should the Persistent Query be automatically deleted after inactivity. Defaults to 10 minutes. If none, auto-delete is disabled. If zero, the Persistent Query is deleted immediately after a client connection is lost

  • admin_groups (list[str]) – list of groups that may administer the Persistent Query. Defaults to None, which means only the current user may administer the Persistent Query.

  • viewer_groups (list[str]) – list of groups that may view the Persistent Query. Defaults to None, which means only the current user may view the Persistent Query.

Return type:

PersistentQueryConfigMessage

Returns:

a configuration suitable for passing to add_query

Raises:

RuntimeError

map()[source]

Retrieves a copy of the current Persistent Query state.

A successful call to subscribe() should have happened before this call.

Return type:

dict[int, PersistentQueryInfoMessage]

Returns:

a dict of Persistent Query serial number to Persistent Query info

map_and_version()[source]

Retrieves a copy of the current persistent query state alongside a version number for the overall map state.

A successful call to subscribe() should have happened before this call.

Note the version number here has nothing to do with persistent query versions, but reflects instead a monotonically increasing version number for the current known state of all subscriptions. The version number will increase as update messages arriving from the controller make the map change.

Return type:

tuple[dict[int, PersistentQueryInfoMessage], int]

Returns:

A tuple of two elements, a map from serial number to persistent query info, and a version number

maybe_wait_for_subscription(deadline_ns)[source]

Maybe wait for the subscription to finish, but only until the deadline.

Parameters:

deadline_ns (int) – the deadline in nanoseconds

Raises:

RuntimeError

Return type:

None

modify_query(query_config, do_restart)[source]

Modifies a persistent query.

A successful call to authenticate() should have happened before this call.

Parameters:
Raises:

RuntimeError

Return type:

None

ping()[source]

Pings the controller and refreshes the cookie.

Return type:

bool

Returns:

True if the ping was sent, False if we had no cookie

restart_query(serials, timeout_seconds=None)[source]

Restarts one or more Persistent Queries.

A successful call to authenticate() should have happened before this call.

Parameters:
  • serials (Union[Iterable[int], int]) – the serial number(s) of one or more Persistent Queries to restart

  • timeout_seconds (Optional[int]) – how long to wait for the Persistent Query to restart before raising an exception. Defaults to None, meaning to use the client’s timeout.

Raises:

RuntimeError

Return type:

None

set_auth_client(auth_client)[source]

Sets the authentication client for the controller client. It is used to re-authenticate when necessary. If a controller operation fails with a gRPC unauthenticated exception; then we attempt to use the authentication client to create a new controller session. If the authentication client cannot produce tokens, then we cannot proceed.

start_and_wait(serial, timeout_seconds=120)[source]

Starts the Persistent Query of the given serial number; wait for the Persistent Query to becoming running (or Completed). If the Persistent Query fails, then raises an exception.

Parameters:
  • serial (int) – the serial number of the Persistent Query to start

  • timeout_seconds (int) – how long to wait for the Persistent Query to become running or completed, defaults to 120 seconds

Raises:

RuntimeError

Return type:

None

static status_name(status)[source]

Returns the name of the status enum from a PersistentQueryStateMessage.

Parameters:

status (persistent_query_pb2.PersistentQueryStatusEnum) – the status to get the name of

Return type:

str

stop_and_wait(serial, timeout_seconds=120)[source]

Stops the Persistent Query of the given serial number; wait for the Persistent Query to stop (be terminal). If the Persistent Query does not stop in the given time, raise an exception.

Parameters:
  • serial (int) – the serial number of the Persistent Query to stop

  • timeout_seconds (int) – how long to wait for the Persistent Query to stop before raising an exception, defaults to 120 seconds

Raises:

RuntimeError

Return type:

None

stop_query(serials, timeout_seconds=None)[source]

Stops one or more Persistent Queries.

A successful call to authenticate() should have happened before this call.

Parameters:
  • serials (Union[Iterable[int], int]) – the serial number(s) of one or more Persistent Queries to stop

  • timeout_seconds (int) – how long to wait for the Persistent Query to stop before raising an exception, defaults to None, meaning to use the client’s timeout

Raises:

RuntimeError

Return type:

None

subscribe()[source]

Subscribes to the Persistent Query state, and wait for the initial query state snapshot to be populated.

A successful call to authenticate() should have happened before this call.

After the subscription is complete, you may call the map() to retrieve the complete map or the get method to fetch a specific query by serial number.

Return type:

None

wait_for_change(timeout_seconds)[source]

Waits for a change in the query map to occur. See also wait_for_change_from_version.

Parameters:

timeout_seconds (float) – how long to wait in seconds

Return type:

None

wait_for_change_from_version(map_version, timeout_seconds)[source]

Waits for a new version in the query map to occur

Parameters:
  • map_version (int) – a version number reflecting the current known map version

  • timeout_seconds (float) – how long to wait in seconds

Return type:

bool

Returns:

True if a newer version exists, False otherwise

wait_for_state(state_function, timeout_seconds=None)[source]

Waits for a particular state transition in the controller map to occur.

Parameters:
  • state_function (Callable[[], (bool, Any)]) – a function that returns a tuple. The first element is True if the state transition occurred and False otherwise. The second element is the current state which is then returned to the caller.

  • timeout_seconds (float) – how long to wait for the state transition to occur, defaults to None, meaning to use the client’s timeout

Return type:

tuple[bool, Any]

Returns:

a tuple. The first element is True if the state_function returned True; or False if the timeout expired. The second element is the return value of the state_function or None if the state_function returned False.

class SubState(*values)[source]

Bases: Enum