Source code for deephaven_enterprise.database

#
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#
"""This module defines the Database class, which provides an API for the discovery and management of tables in the
Deephaven Enterprise data store.
"""

from typing import Any, Callable, Optional

import jpy
from deephaven import DHError
from deephaven.jcompat import AutoCloseable, j_collection_to_list
from deephaven.table import PartitionedTable, Table
from deephaven.table_factory import InputTable

from deephaven_enterprise.input_tables import InputTableSpec

_JDatabase: Any = jpy.get_type("io.deephaven.enterprise.database.Database")
_JWritableDatabase = jpy.get_type("io.deephaven.enterprise.database.WritableDatabase")
_JTableOptions = jpy.get_type("io.deephaven.enterprise.database.TableOptions")
_JTableRetrievalLivenessDefault = jpy.get_type(
    "io.deephaven.shadow.enterprise.com.illumon.iris.db.tables.live.TableRetrievalLivenessDefault"
)


def _do_with_dh_error(func: Callable[[], Any], error_message: str) -> Any:
    try:
        result = func()
    except Exception as e:
        raise DHError(e, error_message) from e

    return result


[docs] class Database: """The Database class defines a API for the discovery and management of tables in the Deephaven Enterprise data store.""" j_database: _JDatabase def __init__(self, j_db: jpy.JType): self.j_database = jpy.cast(j_db, _JDatabase) if self.j_database is None: raise DHError( message="j_db type is not io.deephaven.enterprise.database.Database" ) self.j_writable_database = jpy.cast(j_db, _JWritableDatabase) if self.j_writable_database is None: raise DHError( message="j_db type is not io.deephaven.enterprise.database.WritableDatabase" )
[docs] def live_table( self, namespace: str, table_name: str, is_refreshing: Optional[bool] = None, is_blink: bool = False, internal_partition_column: Optional[str] = None, ) -> Table: """Retrieves a live Table for the specified namespace and table name. Args: namespace (str): the namespace table_name (str): the table name is_refreshing (bool): True if the returned table should be refreshing, defaults to None, meaning to use the value of JVM property RunAndDoneSetupQuery.liveTables which defaults to False is_blink (bool): True if the returned table should be a blink table, defaults to False internal_partition_column (str): Set the name of the internal partition column, defaults to None, meaning no internal partition column is included Returns: a Table object """ if is_refreshing is None: is_refreshing = _JTableRetrievalLivenessDefault.livenessDefault() if is_refreshing: options_builder = _JTableOptions.newLiveBuilder() else: options_builder = _JTableOptions.newStaticBuilder() options_builder.isBlink(is_blink) if internal_partition_column is not None: options_builder.internalPartitionColumn(internal_partition_column) return Table( j_table=self.j_database.liveTable( namespace, table_name, options_builder.build() ) )
[docs] def live_partitioned_table( self, namespace: str, table_name: str ) -> PartitionedTable: """Retrieves the specified live table as a partitioned table from the Database. The result's key columns will be derived from the table's partitioning columns as well as any internal partitions. Args: namespace (str): the namespace table_name (str): the table name Returns: a PartitionedTable object """ return PartitionedTable( j_partitioned_table=self.j_database.livePartitionedTable( namespace, table_name ) )
[docs] def historical_table( self, namespace: str, table_name: str, internal_partition_column: Optional[str] = None, ) -> Table: """Retrieves a historical Table for the specified namespace and table name. Args: namespace (str): the namespace table_name (str): the table name internal_partition_column (str): Set the name of the internal partition column, defaults to None, meaning no internal partition column is included Returns: a Table object """ options_builder = _JTableOptions.newStaticBuilder() if internal_partition_column is not None: options_builder.internalPartitionColumn(internal_partition_column) return Table( j_table=self.j_database.historicalTable( namespace, table_name, options_builder.build() ) )
[docs] def historical_partitioned_table( self, namespace: str, table_name: str ) -> PartitionedTable: """Retrieves the specified historical table as a partitioned table from the Database. The result's key columns will be derived from the table's partitioning columns as well as any internal partitions. Args: namespace (str): the namespace table_name (str): the table name Returns: a PartitionedTable object """ return PartitionedTable( j_partitioned_table=self.j_database.historicalPartitionedTable( namespace, table_name ) )
[docs] def namespaces(self) -> list[str]: """Returns the list of namespaces in this database. Returns: the list of namespaces """ return j_collection_to_list(self.j_database.getNamespaces())
[docs] def table_names(self, namespace: str) -> list[str]: """Returns the list tables within a namespace. Args: namespace (str): the namespace Returns: the list of table names within namespace """ return j_collection_to_list(self.j_database.getTableNames(namespace))
[docs] def catalog_table(self) -> Table: """Returns a table of the available tables. The result table contains 3 string type columns: Namespace, TableName, and NamespaceSet which is either System or User. Returns: a table """ return Table(j_table=self.j_database.getCatalogTable())
[docs] def has_table(self, namespace: str, table_name: str) -> bool: """ Identifies if the table is defined in the Database. and visible to the user. :param namespace: the namespace :param table_name: the table :return: True if the schema exists and is visible to the user, else False """ return self.j_database.hasTable(namespace, table_name)
[docs] def table_definition(self, namespace: str, table_name: str) -> Table: """Returns a Table containing the column definitions for the table with the specified namespace and table name. Args: namespace (str): the namespace table_name (str): the table name Returns: a Table object """ return Table( j_table=self.j_database.getTableDefinitionTable(namespace, table_name) )
[docs] def add_unpartitioned_table( self, namespace: str, table_name: str, table: Table ) -> None: """Adds a unpartitioned, directly manipulated user table in the database. Writes an unpartitioned user table to disk. If the namespace does not exist, then it is created. Also write out the definition of the table as the schema. The schema must not already exist. Note, if an error occurs during writing, the state of the table is undefined. Args: namespace (str): table namespace table_name (str): table name table (Table): table that the definition and data will be based on Raises: DHError """ _do_with_dh_error( lambda: self.j_writable_database.addUnpartitionedTable( namespace, table_name, table.j_table ), "Failed to add unpartitioned table '" + namespace + "." + table_name + "'", )
[docs] def delete_unpartitioned_table(self, namespace: str, table_name: str) -> bool: """Deletes a unpartitioned, directly manipulated user table and its associated schema. If the schema does not exist, then data is not deleted. If there is no data but the schema exists, then the schema is deleted. The namespace is not removed even if this is the last table in the namespace. Note, if an error occurs during deleting, the state of the table is undefined. Args: namespace (str): table namespace table_name (str): table name Returns: True if the data was deleted, False if there was no data or preexisting schema Raises: DHError """ return _do_with_dh_error( lambda: self.j_writable_database.deleteUnpartitionedTable( namespace, table_name ), "Failed to delete unpartitioned table '" + namespace + "." + table_name + "'", )
[docs] def add_partitioned_table_schema( self, namespace: str, table_name: str, partition_column_name: str, prototype: Table, ) -> bool: """Adds a schema for a partitioned, directly manipulated user table. The schema for the specified table is derived from the table definition of the prototype table and the partition column name. The prototype table’s definition must not include a partitioning column. If the namespace does not exist, then it is created. If the schema already exists, and it is identical (this is a stricter check than compatibility; all columns must be present in the same order with the same properties), then the method returns False. If the schema already exists and is not identical, then an error is thrown. Args: namespace (str): table namespace table_name (str): table name partition_column_name (str): name of the partitioning column prototype (Table): table with definition to derive schema from Returns: True if the partitioned table schema was added, False if it already existed Raises: DHError """ return _do_with_dh_error( lambda: self.j_writable_database.addPartitionedTableSchema( namespace, table_name, partition_column_name, prototype._definition.j_table_definition, ), "Failed to add partitioned table schema for '" + namespace + "." + table_name + "'", )
[docs] def update_partitioned_table_schema( self, namespace: str, table_name: str, prototype: Table ) -> bool: """Updates the schema of a preexisting partitioned, directly manipulated user table. If the schema does not exist an error is thrown. Not all schema modifications are permitted. The partitioning column may not be changed. Existing columns may not have their type changed. Columns may be added or deleted. Note that no data is modified by this operation. Removed columns remain on persistent storage, and added columns are treated as null on read. Note also that although each modification in isolation is verified for safety, a sequence of modifications to the schema may be unsafe. For example, deleting a column and adding it back with a new type results in unreadable data. Args: namespace (str): table namespace table_name (str): table name prototype (Table): table with definition to derive schema from Returns: True if the partitioned table definition was updated, False if there was already an identical definition Raises: DHError """ return _do_with_dh_error( lambda: self.j_writable_database.updatePartitionedTableSchema( namespace, table_name, prototype._definition.j_table_definition ), "Failed to update partitioned table schema for '" + namespace + "." + table_name + "'", )
[docs] def add_table_partition( self, namespace: str, table_name: str, partition_column_value: str, table: Table ) -> None: """Adds a single partition data of the provided partition column value to a partitioned, directly manipulated user table. The partition mustn't already exist. The data table must have a mutually compatible definition with the current schema. The data table must not have a column with the same name as the partitioning column. Note, if an error occurs during writing, the state of the table is undefined. Args: namespace (str): table namespace table_name (str): table name partition_column_value (str): value for the partitioning column, e.g. "2015-09-25" for "Date" table (Table): table to write data from Raises: DHError """ _do_with_dh_error( lambda: self.j_writable_database.addTablePartition( namespace, table_name, partition_column_value, table.j_table ), "Failed to add table partition " + partition_column_value + " to '" + namespace + "." + table_name + "'", )
[docs] def delete_table_partition( self, namespace: str, table_name: str, partition_column_value: str ) -> bool: """Deletes the partition of the provided partition column value from a partitioned, directly manipulated user table. Note, if an error is thrown while deleting, the state of the table is undefined. Args: namespace (str): table namespace table_name (str): table name partition_column_value (str): value for the partitioning column, e.g. "2015-09-25" for "Date" Returns: True if the partition was deleted, False if there was no partition or preexisting schema """ return _do_with_dh_error( lambda: self.j_writable_database.deleteTablePartition( namespace, table_name, partition_column_value ), "Failed to delete table partition " + partition_column_value + " from '" + namespace + "." + table_name + "'", )
[docs] def append_live_table( self, namespace: str, table_name: str, partition_column_value: str, table: Table ) -> None: """Appends all rows from a given table to a live (centrally managed) user table partition. The data table must have a mutually compatible definition with the current schema. The data table must not have a column with the same name as the partitioning column. This method is asynchronous. After returning, the data may not be immediately available. It is possible for the write to fail after this method has returned. When multiple workers append to a partition, ordering is imposed outside the worker by other system components. Note, if an error is thrown during appending, the state of the table is undefined. Args: namespace (str): table namespace table_name (str): table name partition_column_value (str): value for the partitioning column, e.g. "2015-09-25" for "Date" table (Table): table to append rows from Raises: DHError """ _do_with_dh_error( lambda: self.j_writable_database.appendLiveTable( namespace, table_name, partition_column_value, table.j_table ), "Failed to append to partition " + partition_column_value + " of live table '" + namespace + "." + table_name + "'", )
[docs] def append_live_table_incremental( self, namespace: str, table_name: str, partition_column_value: str, table: Table ) -> AutoCloseable: """Appends all rows from the given source table to a live (centrally managed) user table partition. When rows are added to the source table, they are additionally appended to the partition. The source table updates can only have additions and shifts. No modifications or removals are permitted. The data table must have a mutually compatible definition with the current schema. The data table must not have a column with the same name as the partitioning column. This method is asynchronous, after returning the data may not be immediately available. It is possible for the write to fail after this method has returned. When multiple workers append to a partition, ordering is imposed outside the worker by other system components. A reference must be maintained to the returned AutoClosable object to ensure expected functionality; calling close() on it will stop incremental appends, and clean up related resources. Note, if an error is thrown during appending, the state of the table is undefined. Args: namespace (str): table namespace table_name (str): table name partition_column_value (str): value for the partitioning column, e.g. "2015-09-25" for "Date" table (Table): table to append updates from Returns: the object used to ensure and stop expected functionality Raises: DHError """ return AutoCloseable( _do_with_dh_error( lambda: self.j_writable_database.appendLiveTableIncremental( namespace, table_name, partition_column_value, table.j_table ), "Failed to append incrementally to partition " + partition_column_value + " of live table '" + namespace + "." + table_name + "'", ) )
[docs] def delete_live_table_partition( self, namespace: str, table_name: str, partition_column_value: str ) -> bool: """Deletes a partition from a live (centrally managed) user table. Note, if an error is thrown during deleting, the state of the table is undefined. Args: namespace (str): table namespace table_name (str): table name partition_column_value (str): value for the partitioning column, e.g. "2015-09-25" for "Date" Returns: True if the partition was deleted, False if there was no partition or preexisting schema Raises: DHError """ return _do_with_dh_error( lambda: self.j_writable_database.deleteLiveTablePartition( namespace, table_name, partition_column_value ), "Failed to delete partition " + partition_column_value + " from live table '" + namespace + "." + table_name + "'", )
[docs] def delete_partitioned_table(self, namespace: str, table_name: str) -> bool: """Deletes all partitions, whether direct manipulated or live (centrally managed), and the schema, from a partitioned user table. All partitions from the table are deleted sequentially. If a partition cannot be deleted, then the operation fails but some data may have already been removed. After all partitions are deleted, then the schema is deleted. If the schema does not exist, then data is not deleted. If there is no data, but the schema exists, then the schema is deleted. The namespace is not removed, even if this is the last table in the namespace. Note, if an error is thrown during deleting, the state of the table is undefined. Args: namespace (str): table namespace table_name (str): table name Returns: True if data was deleted, False if there was no data or preexisting schema Raises: DHError """ return _do_with_dh_error( lambda: self.j_writable_database.deletePartitionedTable( namespace, table_name ), "Failed to delete partitioned table '" + namespace + "." + table_name + "'", )
[docs] def add_input_table_schema( self, namespace: str, table_name: str, input_table_spec: InputTableSpec ) -> bool: """Adds a new input table schema using the InputTableSpec. Args: namespace (str): the namespace of the input table table_name (str): the name of the input table input_table_spec (InputTableSpec): the input table specification Returns: True if the input table schema was added, False if the input table already exists with the same spec Raises: DHError """ return _do_with_dh_error( lambda: self.j_writable_database.addInputTableSchema( namespace, table_name, input_table_spec.j_input_table_spec ), f"Failed to add input table schema for '{namespace}.{table_name}'.", )
[docs] def add_input_table_schema_from_table( self, namespace: str, table_name: str, prototype: Table, key_col_names: list[str], ) -> bool: """Adds a new input table schema using the Table and specified key column names. Args: namespace (str): the namespace of the input table table_name (str): the name of the input table prototype (Table): the Table to derive the input table spec from key_col_names (list[str]): columns that should be keyed in the input table Returns: True if the input table schema was added, False if the input table already exists with the same spec Raises: DHError """ return _do_with_dh_error( lambda: self.j_writable_database.addInputTableSchema( namespace, table_name, prototype._definition.j_table_definition, key_col_names, ), f"Failed to add input table schema for '{namespace}.{table_name}'.", )
[docs] def update_input_table_schema( self, namespace: str, table_name: str, input_table_spec: InputTableSpec ) -> bool: """Updates an existing input table schema using the provided InputTableSpec. Retrieve and use a new InputTable via input_table after updating an input table's specification to ensure proper behavior. When the requested update is invalid, e.g. an input table already exists but with a different spec, an exception will be raised. Args: namespace (str): the namespace of the input table table_name (str): the name of the input table input_table_spec (InputTableSpec): the new specification for the input table Returns: True if the input table schema was updated, False if the input table already exists with the same spec Raises: DHError """ return _do_with_dh_error( lambda: self.j_writable_database.updateInputTableSchema( namespace, table_name, input_table_spec.j_input_table_spec ), f"Failed to update input table schema for '{namespace}.{table_name}'.", )
[docs] def delete_input_table(self, namespace: str, table_name: str) -> bool: """Deletes the input table given the namespace and table name. Args: namespace (str): the namespace of the input table table_name (str): the name of the input table Returns: True if the input table was deleted, False if there was no data or preexisting schema for the input table Raises: DHError """ return _do_with_dh_error( lambda: self.j_writable_database.deleteInputTable(namespace, table_name), f"Failed to delete input table for '{namespace}.{table_name}'.", )
[docs] def input_table(self, namespace: str, table_name: str) -> InputTable: """Retrieves the specified input table view. Args: namespace (str): the namespace in which the table exists table_name (str): the name of the table in the namespace Returns: a InputTable object Raises: DHError """ return _do_with_dh_error( lambda: InputTable( j_table=self.j_writable_database.inputTable(namespace, table_name) ), f"Failed to get input table for '{namespace}.{table_name}'.", )
[docs] def input_table_spec_for(self, namespace: str, table_name: str) -> InputTableSpec: """Retrieves the current InputTableSpec for the given namespace and table. Args: namespace (str): the namespace of the input table table_name (str): the name of the input table Returns: a InputTableSpec object Raises: DHError """ return _do_with_dh_error( lambda: InputTableSpec( list( InputTableSpec.get_column_specs_from_j_input_table_spec( self.j_writable_database.inputTableSpecFor( namespace, table_name ) ).values() ) ), f"Failed to get input table spec for '{namespace}.{table_name}'.", )
db: Optional[Database] = None