deephaven_enterprise.client.controller

class ControllerClient(host=None, port=None, rpc_timeout_seconds=120, channel=None)[source]

Bases: object

The ControllerClient connects to the Deephaven PersistentQueryController process.

You may subscribe to the state of Persistent Queries as well as create and modify them.

This class operates on the deephaven_enterprise.proto.persistent_query_pb2 structures.

Connect to the persistent query controller.

Parameters:
  • host (Optional[str]) – the host to connect to, requires port

  • port (Optional[int]) – the port to connect to, requires host

  • rpc_timeout_seconds (int) – the timeout period in seconds for RPCs. Defaults to 120 if not provided.

  • channel (Optional[Channel]) – an already configured gRPC channel (exclusive with host and port)

add_query(query_config)[source]

Add a persistent query. A successful call to authenticate should have happened before this call.

Parameters:

query_config (PersistentQueryConfigMessage) – the configuration of the query to add.

Return type:

int

Returns:

the serial number of the created query

authenticate(token, timeout=None)[source]

Authenticate to the controller using a token obtained from the AuthClient get_token method.

Parameters:

token (Token) – the token to use for authentication, must have a service of “PersistentQueryController”

Return type:

None

close()[source]

Invalidate the clients cookie so that further operations do not take place with this client.

Return type:

None

delete_query(serial)[source]

Delete a query. A successful call to authenticate should have happened before this call.

Parameters:

serial (int) – the serial number to delete

Return type:

None

static generate_disabled_scheduler()[source]

Generates a scheduler array for a PQ that has scheduling disabled.

get(serial, timeout_seconds=0)[source]

Get a query from the map. If the query does not exist, throws a KeyError. A successful call to subscribe should have happened before this call.

The timeout_seconds parameter can be specified to wait for the query to exist before failing with KeyError. This is useful when you have just created the query, know it’s serial, but the controller has not yet published the state to you.

Parameters:
Return type:

PersistentQueryInfoMessage

Returns:

the PersistentQueryInfoMessage associated with the serial number

get_serial_for_name(name, timeout_seconds=0)[source]

Retrieves the serial number for a given name.

Parameters:
Return type:

int

static is_completed(status)[source]

Is the status from the query info Completed?

If not running and not terminal, then the query is in the initialization process.

static is_running(status)[source]

Is the status from the query info running?

If not running and not terminal, then the query is in the initialization process.

static is_status_uninitialized(status)[source]

Is the status from the query info uninitialized?

This is the case before a query is ever started.

static is_terminal(status)[source]

Is the status from the query info terminal?

If not running and not terminal, then the query is in the initialization process.

make_temporary_config(name, heap_size_gb, server=None, extra_jvm_args=None, extra_environment_vars=None, engine='DeephavenCommunity', auto_delete_timeout=600, admin_groups=None, viewer_groups=None)[source]

Create a configuration suitable for use as a temporary InteractiveConsole query. The worker uses the default DeephavenCommunity engine. This kind of query enables a client to use the controller to create workers for general use, in the same way Enterprise clients would have used the dispatcher. For options that are not represented in the arguments, the returned PersistentQueryConfigMessage can be modified before adding it to the controller.

Parameters:
  • name (str) – the name of the persistent query. Defaults to None, which means a name based on the current time is used

  • heap_size_gb (float) – the heap size of the worker

  • server (Optional[str]) – the server to connect to. Defaults to None, which means the first available server

  • extra_jvm_args (Optional[List[str]]) – extra JVM arguments for starting the worker. Defaults to None.

  • extra_environment_vars (Optional[List[str]]) – extra Environment variables for the worker. Defaults to None.

  • engine (str) – which engine (worker kind) to use for the backend worker. Defaults to None, which means “DeephavenCommunity”

  • auto_delete_timeout (Optional[int]) – after how many seconds should the query be automatically deleted after inactivity. Defaults to ten minutes. If none, auto-delete is disabled. If zero, the query is deleted immediately after a client connection is lost

  • admin_groups (Optional[List[str]]) – list of groups that may administer the query. Defaults to None, which means only the current user may administer the query.

  • viewer_groups (Optional[List[str]]) – list of groups that may view the query. Defaults to None, which means only the current user may view the query.

Return type:

PersistentQueryConfigMessage

Returns:

a configuration suitable for passing to add_query.

map()[source]

Retrieve a copy of the current persistent query state. A successful call to subscribe should have happened before this call.

Return type:

Dict[int, PersistentQueryInfoMessage]

Returns:

a map from serial number to persistent query info

ping()[source]

Ping the controller and refresh our cookie. :return: True if the ping was sent, False if we had no cookie

restart_query(serials, timeout_seconds=None)[source]

Restart one or more queries. A successful call to authenticate should have happened before this call.

Parameters:

serials (Union[Iterable[int], int]) – a query serial number, or an iterable of serial numbers

Return type:

None

set_auth_client(auth_client)[source]

Authenticate using the given authentication client, which is stored as a local variable. If a controller operation fails with a gRPC unauthenticated exception; then we attempt to use the authentication client to create a new controller session. If the authentication client cannot produce tokens, then we cannot proceed.

start_and_wait(serial, timeout_seconds=120)[source]

Start the given query serial number; wait for the query to becoming running (or Completed). If the query fails, then raises an exception.

Parameters:
  • serial (int) – the serial to start

  • timeout_seconds (int) – how long, to wait for the query to become running.

Return type:

None

static status_name(status)[source]

Returns the name of the status enum from a PersistentQueryStateMessage.

stop_and_wait(serial, timeout_seconds=120)[source]

Stop the given query serial number; wait for the query to stop (be terminal). If the query does not stop in the given time, raise an exception.

Parameters:
  • serial (int) – the serial to start

  • timeout_seconds (int) – how long, to wait for the query to become running.

Return type:

None

stop_query(serials, timeout_seconds=None)[source]

Stop one or more queries. A successful call to authenticate should have happened before this call.

Parameters:

serials (Union[Iterable[int], int]) – a query serial number, or an iterable of serial numbers

Return type:

None

subscribe()[source]

Subscribe to persistent query state, and wait for the initial query state snapshot to be populated. A successful call to authenticate should have happened before this call.

After the subscription is complete, you may call the map method to retrieve the complete map or the get method to fetch a specific query by serial number.

Return type:

None

wait_for_change(timeout_seconds)[source]

Waits for a change in the query map to occur

wait_for_state(state_function, timeout_seconds=None)[source]

Wait for a particular state transition in the controller map to occur.

Parameters:

state_function – a function that returns a tuple. The first element is True if the state transition

occurred and False otherwise. The second element is the current state which is then returned to the user. :type timeout_seconds: :param timeout_seconds: how long, to wait for the state transition to occur :return: a tuple. The first element is True if the state_function returned True; or False if the timeout expired. The second element is the return value of the state_function or None if the state_function returned False.

class RefreshThread(controller_client, period_millis=10000)[source]

Bases: Thread

This thread pings the controller every 10 seconds to ensure that our cookie is valid and that we stay authenticated. :type controller_client: ControllerClient :param controller_client:the controller client :type period_millis: int :param period_millis:how often to refresh

property daemon

A boolean value indicating whether this thread is a daemon thread.

This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.

The entire Python program exits when only daemon threads are left.

property ident

Thread identifier of this thread or None if it has not been started.

This is a nonzero integer. See the get_ident() function. Thread identifiers may be recycled when a thread exits and another thread is created. The identifier is available even after the thread has exited.

is_alive()

Return whether the thread is alive.

This method returns True just before the run() method starts until just after the run() method terminates. See also the module function enumerate().

join(timeout=None)

Wait until the thread terminates.

This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.

When the timeout argument is not present or None, the operation will block until the thread terminates.

A thread can be join()ed many times.

join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.

property name

A string used for identification purposes only.

It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.

property native_id

Native integral thread ID of this thread, or None if it has not been started.

This is a non-negative integer. See the get_native_id() function. This represents the Thread ID as reported by the kernel.

run()[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

start()

Start the thread’s activity.

It must be called at most once per thread object. It arranges for the object’s run() method to be invoked in a separate thread of control.

This method will raise a RuntimeError if called more than once on the same thread object.

class ResponseThread(controller_client, iterator)[source]

Bases: Thread

This thread processes response from the controller client’s subscription method and calls the client’s process method.

Parameters:
property daemon

A boolean value indicating whether this thread is a daemon thread.

This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.

The entire Python program exits when only daemon threads are left.

property ident

Thread identifier of this thread or None if it has not been started.

This is a nonzero integer. See the get_ident() function. Thread identifiers may be recycled when a thread exits and another thread is created. The identifier is available even after the thread has exited.

is_alive()

Return whether the thread is alive.

This method returns True just before the run() method starts until just after the run() method terminates. See also the module function enumerate().

join(timeout=None)

Wait until the thread terminates.

This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.

When the timeout argument is not present or None, the operation will block until the thread terminates.

A thread can be join()ed many times.

join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.

property name

A string used for identification purposes only.

It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.

property native_id

Native integral thread ID of this thread, or None if it has not been started.

This is a non-negative integer. See the get_native_id() function. This represents the Thread ID as reported by the kernel.

run()[source]

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

start()

Start the thread’s activity.

It must be called at most once per thread object. It arranges for the object’s run() method to be invoked in a separate thread of control.

This method will raise a RuntimeError if called more than once on the same thread object.