#
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#
"""This module defines the Database class, which provides an API for the discovery and management of tables in the
Deephaven Enterprise data store.
"""
from typing import Any, Callable, Optional
import jpy
from deephaven import DHError
from deephaven.jcompat import AutoCloseable, j_collection_to_list
from deephaven.table import PartitionedTable, Table
from deephaven.table_factory import InputTable
from deephaven_enterprise.input_tables import InputTableSpec
_JDatabase: Any = jpy.get_type("io.deephaven.enterprise.database.Database")
_JWritableDatabase = jpy.get_type("io.deephaven.enterprise.database.WritableDatabase")
_JTableOptions = jpy.get_type("io.deephaven.enterprise.database.TableOptions")
_JTableRetrievalLivenessDefault = jpy.get_type(
"io.deephaven.shadow.enterprise.com.illumon.iris.db.tables.live.TableRetrievalLivenessDefault"
)
def _do_with_dh_error(func: Callable[[], Any], error_message: str) -> Any:
try:
result = func()
except Exception as e:
raise DHError(e, error_message) from e
return result
[docs]
class Database:
"""The Database class defines a API for the discovery and management of tables in the Deephaven Enterprise data
store."""
j_database: _JDatabase
def __init__(self, j_db: jpy.JType):
self.j_database = jpy.cast(j_db, _JDatabase)
if self.j_database is None:
raise DHError(
message="j_db type is not io.deephaven.enterprise.database.Database"
)
self.j_writable_database = jpy.cast(j_db, _JWritableDatabase)
if self.j_writable_database is None:
raise DHError(
message="j_db type is not io.deephaven.enterprise.database.WritableDatabase"
)
[docs]
def live_table(
self,
namespace: str,
table_name: str,
is_refreshing: Optional[bool] = None,
is_blink: bool = False,
internal_partition_column: Optional[str] = None,
) -> Table:
"""Retrieves a live Table for the specified namespace and table name.
Args:
namespace (str): the namespace
table_name (str): the table name
is_refreshing (bool): True if the returned table should be refreshing, defaults to None, meaning to use the
value of JVM property RunAndDoneSetupQuery.liveTables which defaults to False
is_blink (bool): True if the returned table should be a blink table, defaults to False
internal_partition_column (str): Set the name of the internal partition column, defaults to None, meaning
no internal partition column is included
Returns:
a Table object
"""
if is_refreshing is None:
is_refreshing = _JTableRetrievalLivenessDefault.livenessDefault()
if is_refreshing:
options_builder = _JTableOptions.newLiveBuilder()
else:
options_builder = _JTableOptions.newStaticBuilder()
options_builder.isBlink(is_blink)
if internal_partition_column is not None:
options_builder.internalPartitionColumn(internal_partition_column)
return Table(
j_table=self.j_database.liveTable(
namespace, table_name, options_builder.build()
)
)
[docs]
def live_partitioned_table(
self, namespace: str, table_name: str
) -> PartitionedTable:
"""Retrieves the specified live table as a partitioned table from the Database. The result's key columns will
be derived from the table's partitioning columns as well as any internal partitions.
Args:
namespace (str): the namespace
table_name (str): the table name
Returns:
a PartitionedTable object
"""
return PartitionedTable(
j_partitioned_table=self.j_database.livePartitionedTable(
namespace, table_name
)
)
[docs]
def historical_table(
self,
namespace: str,
table_name: str,
internal_partition_column: Optional[str] = None,
) -> Table:
"""Retrieves a historical Table for the specified namespace and table name.
Args:
namespace (str): the namespace
table_name (str): the table name
internal_partition_column (str): Set the name of the internal partition column, defaults to None, meaning
no internal partition column is included
Returns:
a Table object
"""
options_builder = _JTableOptions.newStaticBuilder()
if internal_partition_column is not None:
options_builder.internalPartitionColumn(internal_partition_column)
return Table(
j_table=self.j_database.historicalTable(
namespace, table_name, options_builder.build()
)
)
[docs]
def historical_partitioned_table(
self, namespace: str, table_name: str
) -> PartitionedTable:
"""Retrieves the specified historical table as a partitioned table from the Database. The result's key columns will
be derived from the table's partitioning columns as well as any internal partitions.
Args:
namespace (str): the namespace
table_name (str): the table name
Returns:
a PartitionedTable object
"""
return PartitionedTable(
j_partitioned_table=self.j_database.historicalPartitionedTable(
namespace, table_name
)
)
[docs]
def namespaces(self) -> list[str]:
"""Returns the list of namespaces in this database.
Returns:
the list of namespaces
"""
return j_collection_to_list(self.j_database.getNamespaces())
[docs]
def table_names(self, namespace: str) -> list[str]:
"""Returns the list tables within a namespace.
Args:
namespace (str): the namespace
Returns:
the list of table names within namespace
"""
return j_collection_to_list(self.j_database.getTableNames(namespace))
[docs]
def catalog_table(self) -> Table:
"""Returns a table of the available tables. The result table contains 3 string type columns: Namespace,
TableName, and NamespaceSet which is either System or User.
Returns:
a table
"""
return Table(j_table=self.j_database.getCatalogTable())
[docs]
def has_table(self, namespace: str, table_name: str) -> bool:
"""
Identifies if the table is defined in the Database. and visible to the user.
:param namespace: the namespace
:param table_name: the table
:return: True if the schema exists and is visible to the user, else False
"""
return self.j_database.hasTable(namespace, table_name)
[docs]
def table_definition(self, namespace: str, table_name: str) -> Table:
"""Returns a Table containing the column definitions for the table with the specified namespace and table name.
Args:
namespace (str): the namespace
table_name (str): the table name
Returns:
a Table object
"""
return Table(
j_table=self.j_database.getTableDefinitionTable(namespace, table_name)
)
[docs]
def add_unpartitioned_table(
self, namespace: str, table_name: str, table: Table
) -> None:
"""Adds a unpartitioned, directly manipulated user table in the database.
Writes an unpartitioned user table to disk. If the namespace does not exist, then it is created. Also write out
the definition of the table as the schema. The schema must not already exist.
Note, if an error occurs during writing, the state of the table is undefined.
Args:
namespace (str): table namespace
table_name (str): table name
table (Table): table that the definition and data will be based on
Raises:
DHError
"""
_do_with_dh_error(
lambda: self.j_writable_database.addUnpartitionedTable(
namespace, table_name, table.j_table
),
"Failed to add unpartitioned table '" + namespace + "." + table_name + "'",
)
[docs]
def delete_unpartitioned_table(self, namespace: str, table_name: str) -> bool:
"""Deletes a unpartitioned, directly manipulated user table and its associated schema.
If the schema does not exist, then data is not deleted. If there is no data but the schema exists,
then the schema is deleted.
The namespace is not removed even if this is the last table in the namespace.
Note, if an error occurs during deleting, the state of the table is undefined.
Args:
namespace (str): table namespace
table_name (str): table name
Returns:
True if the data was deleted, False if there was no data or preexisting schema
Raises:
DHError
"""
return _do_with_dh_error(
lambda: self.j_writable_database.deleteUnpartitionedTable(
namespace, table_name
),
"Failed to delete unpartitioned table '"
+ namespace
+ "."
+ table_name
+ "'",
)
[docs]
def add_partitioned_table_schema(
self,
namespace: str,
table_name: str,
partition_column_name: str,
prototype: Table,
) -> bool:
"""Adds a schema for a partitioned, directly manipulated user table.
The schema for the specified table is derived from the table definition of the prototype table and
the partition column name. The prototype table’s definition must not include a partitioning column.
If the namespace does not exist, then it is created.
If the schema already exists, and it is identical (this is a stricter check than compatibility; all columns
must be present in the same order with the same properties), then the method returns False. If the schema
already exists and is not identical, then an error is thrown.
Args:
namespace (str): table namespace
table_name (str): table name
partition_column_name (str): name of the partitioning column
prototype (Table): table with definition to derive schema from
Returns:
True if the partitioned table schema was added, False if it already existed
Raises:
DHError
"""
return _do_with_dh_error(
lambda: self.j_writable_database.addPartitionedTableSchema(
namespace,
table_name,
partition_column_name,
prototype._definition.j_table_definition,
),
"Failed to add partitioned table schema for '"
+ namespace
+ "."
+ table_name
+ "'",
)
[docs]
def update_partitioned_table_schema(
self, namespace: str, table_name: str, prototype: Table
) -> bool:
"""Updates the schema of a preexisting partitioned, directly manipulated user table.
If the schema does not exist an error is thrown. Not all schema modifications are permitted. The partitioning
column may not be changed. Existing columns may not have their type changed. Columns may be added or deleted.
Note that no data is modified by this operation. Removed columns remain on persistent storage, and added columns
are treated as null on read.
Note also that although each modification in isolation is verified for safety, a sequence of modifications to
the schema may be unsafe. For example, deleting a column and adding it back with a new type results in
unreadable data.
Args:
namespace (str): table namespace
table_name (str): table name
prototype (Table): table with definition to derive schema from
Returns:
True if the partitioned table definition was updated, False if there was already an identical definition
Raises:
DHError
"""
return _do_with_dh_error(
lambda: self.j_writable_database.updatePartitionedTableSchema(
namespace, table_name, prototype._definition.j_table_definition
),
"Failed to update partitioned table schema for '"
+ namespace
+ "."
+ table_name
+ "'",
)
[docs]
def add_table_partition(
self, namespace: str, table_name: str, partition_column_value: str, table: Table
) -> None:
"""Adds a single partition data of the provided partition column value to a partitioned, directly manipulated
user table.
The partition mustn't already exist. The data table must have a mutually compatible definition with the current
schema. The data table must not have a column with the same name as the partitioning column.
Note, if an error occurs during writing, the state of the table is undefined.
Args:
namespace (str): table namespace
table_name (str): table name
partition_column_value (str): value for the partitioning column, e.g. "2015-09-25" for "Date"
table (Table): table to write data from
Raises:
DHError
"""
_do_with_dh_error(
lambda: self.j_writable_database.addTablePartition(
namespace, table_name, partition_column_value, table.j_table
),
"Failed to add table partition "
+ partition_column_value
+ " to '"
+ namespace
+ "."
+ table_name
+ "'",
)
[docs]
def delete_table_partition(
self, namespace: str, table_name: str, partition_column_value: str
) -> bool:
"""Deletes the partition of the provided partition column value from a partitioned, directly manipulated user
table.
Note, if an error is thrown while deleting, the state of the table is undefined.
Args:
namespace (str): table namespace
table_name (str): table name
partition_column_value (str): value for the partitioning column, e.g. "2015-09-25" for "Date"
Returns:
True if the partition was deleted, False if there was no partition or preexisting schema
"""
return _do_with_dh_error(
lambda: self.j_writable_database.deleteTablePartition(
namespace, table_name, partition_column_value
),
"Failed to delete table partition "
+ partition_column_value
+ " from '"
+ namespace
+ "."
+ table_name
+ "'",
)
[docs]
def append_live_table(
self, namespace: str, table_name: str, partition_column_value: str, table: Table
) -> None:
"""Appends all rows from a given table to a live (centrally managed) user table partition.
The data table must have a mutually compatible definition with the current schema. The data table must not
have a column with the same name as the partitioning column.
This method is asynchronous. After returning, the data may not be immediately available. It is possible for
the write to fail after this method has returned. When multiple workers append to a partition, ordering is
imposed outside the worker by other system components.
Note, if an error is thrown during appending, the state of the table is undefined.
Args:
namespace (str): table namespace
table_name (str): table name
partition_column_value (str): value for the partitioning column, e.g. "2015-09-25" for "Date"
table (Table): table to append rows from
Raises:
DHError
"""
_do_with_dh_error(
lambda: self.j_writable_database.appendLiveTable(
namespace, table_name, partition_column_value, table.j_table
),
"Failed to append to partition "
+ partition_column_value
+ " of live table '"
+ namespace
+ "."
+ table_name
+ "'",
)
[docs]
def append_live_table_incremental(
self, namespace: str, table_name: str, partition_column_value: str, table: Table
) -> AutoCloseable:
"""Appends all rows from the given source table to a live (centrally managed) user table partition. When rows
are added to the source table, they are additionally appended to the partition.
The source table updates can only have additions and shifts. No modifications or removals are permitted.
The data table must have a mutually compatible definition with the current schema. The data table must not
have a column with the same name as the partitioning column.
This method is asynchronous, after returning the data may not be immediately available. It is possible for
the write to fail after this method has returned. When multiple workers append to a partition, ordering is
imposed outside the worker by other system components.
A reference must be maintained to the returned AutoClosable object to ensure expected functionality; calling
close() on it will stop incremental appends, and clean up related resources.
Note, if an error is thrown during appending, the state of the table is undefined.
Args:
namespace (str): table namespace
table_name (str): table name
partition_column_value (str): value for the partitioning column, e.g. "2015-09-25" for "Date"
table (Table): table to append updates from
Returns:
the object used to ensure and stop expected functionality
Raises:
DHError
"""
return AutoCloseable(
_do_with_dh_error(
lambda: self.j_writable_database.appendLiveTableIncremental(
namespace, table_name, partition_column_value, table.j_table
),
"Failed to append incrementally to partition "
+ partition_column_value
+ " of live table '"
+ namespace
+ "."
+ table_name
+ "'",
)
)
[docs]
def delete_live_table_partition(
self, namespace: str, table_name: str, partition_column_value: str
) -> bool:
"""Deletes a partition from a live (centrally managed) user table.
Note, if an error is thrown during deleting, the state of the table is undefined.
Args:
namespace (str): table namespace
table_name (str): table name
partition_column_value (str): value for the partitioning column, e.g. "2015-09-25" for "Date"
Returns:
True if the partition was deleted, False if there was no partition or preexisting schema
Raises:
DHError
"""
return _do_with_dh_error(
lambda: self.j_writable_database.deleteLiveTablePartition(
namespace, table_name, partition_column_value
),
"Failed to delete partition "
+ partition_column_value
+ " from live table '"
+ namespace
+ "."
+ table_name
+ "'",
)
[docs]
def delete_partitioned_table(self, namespace: str, table_name: str) -> bool:
"""Deletes all partitions, whether direct manipulated or live (centrally managed), and the schema, from a
partitioned user table.
All partitions from the table are deleted sequentially. If a partition cannot be deleted, then the operation
fails but some data may have already been removed. After all partitions are deleted, then the schema is
deleted. If the schema does not exist, then data is not deleted. If there is no data, but the schema exists,
then the schema is deleted.
The namespace is not removed, even if this is the last table in the namespace.
Note, if an error is thrown during deleting, the state of the table is undefined.
Args:
namespace (str): table namespace
table_name (str): table name
Returns:
True if data was deleted, False if there was no data or preexisting schema
Raises:
DHError
"""
return _do_with_dh_error(
lambda: self.j_writable_database.deletePartitionedTable(
namespace, table_name
),
"Failed to delete partitioned table '" + namespace + "." + table_name + "'",
)
db: Optional[Database] = None