Source code for deephaven_enterprise.controller_import
#
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#
"""This module provides a way for workers on a cluster to import scripts from the Persistent Query Controller. The controller provides
a centralized script repository that can also be configured to automatically pull from git. This mechanism allows users
to modularize the query scripts and distribute them to workers in a cluster without having to manually copy them.
"""
from __future__ import annotations
import sys
from importlib.abc import Loader
from importlib.machinery import ModuleSpec
from typing import Any, Optional
from deephaven.jcompat import j_collection_to_list
from deephaven_enterprise import dh_globals
[docs]
def exec_script(path: str, globals: Optional[dict[str, Any]] = None) -> None:
"""Executes a controller script
Args:
path (str): relative path to the script on the controller
globals (dict[str, Any]): the globals to use for execution of the script, if you want to have the variables
from your script session available use "globals()"
"""
src = dh_globals.j_controller_client_factory.getUnsubscribed().getScriptBody(
path, True, dh_globals.script_loader_state
)
if src is None:
raise FileNotFoundError("Controller script not found: " + path)
exec(compile(src, path, "exec"), globals)
class _ImportedControllerScript(Loader):
"""A loader for controller scripts that are imported."""
def __init__(self, path, source=None):
self.path = path
self.source = source
def create_module(self, spec):
return None
def exec_module(self, module):
if self.source is not None:
exec(compile(self.source, "<string>", "exec"), module.__dict__)
class _ControllerImporter:
root: str
def __init__(self, root: str):
if root is None or root == "":
raise RuntimeError("root for controller imports must not be empty!")
if "." in root:
raise RuntimeError("root for controller imports must not contain a period!")
self.root = root
self.j_controller_client = (
dh_globals.j_controller_client_factory.getUnsubscribed()
)
self.loader_state = dh_globals.script_loader_state
def find_spec(self, name, path, target=None) -> Optional[ModuleSpec]:
if name != self.root and not name.startswith(self.root + "."):
# this is not a controller import; return None so that other finder/loaders can try
return None
relevant_path = name[len(self.root) + 1 :]
paths = j_collection_to_list(
self.j_controller_client.getScriptPaths(self.loader_state, True)
)
to_directory = relevant_path.replace(".", "/")
if to_directory + ".py" in paths:
# this is the right thing
src_name = to_directory + ".py"
elif to_directory + "/__init__.py" in paths:
src_name = to_directory + "/__init__.py"
elif relevant_path == "":
src_name = None
else:
# we should check that some path here would be a child of our path
prefix_exists: bool = False
for x in paths:
if x.startswith(to_directory + "/"):
prefix_exists = True
break
if not prefix_exists:
return None
src_name = None
if src_name is None:
src = None
else:
src = self.j_controller_client.getScriptBody(
src_name, True, self.loader_state
)
module_spec = ModuleSpec(
name, _ImportedControllerScript(path, src), is_package=True
)
module_spec.submodule_search_locations = name.split(".")
return module_spec