Source code for deephaven_enterprise.database

#  Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending

import jpy
from deephaven import DHError
from deephaven.table import Table, PartitionedTable
from deephaven.jcompat import wrap_j_object
from typing import Any, List


def do_with_dh_error(func, error_message):
    try:
        result = func()
    except Exception as e:
        raise DHError(e, error_message) from e

    return result


[docs]class Database: _j_db_type = jpy.get_type("io.deephaven.enterprise.database.Database") _j_wdb_type = jpy.get_type( "io.deephaven.enterprise.database.WritableDatabase") _j_options_type = jpy.get_type( "io.deephaven.enterprise.database.TableOptions") _j_liveness_default_type = jpy.get_type("io.deephaven.shadow.enterprise.com.illumon.iris.db.tables.live.TableRetrievalLivenessDefault") def __init__(self, j_db: jpy.JType): self.j_db = jpy.cast(j_db, self._j_db_type) if self.j_db is None: raise DHError( "j_db type is not io.deephaven.enterprise.database.Database") self.j_wdb = jpy.cast(j_db, self._j_wdb_type) if self.j_wdb is None: raise DHError( "j_db type is not io.deephaven.enterprise.database.WritableDatabase") def _j_collection_to_list(self, jcollection) -> List[Any]: """Converts a java Collection to a python list.""" result = [] if not jcollection: return result it = jcollection.iterator() while it.hasNext(): result.append(wrap_j_object(it.next())) return result
[docs] def live_table(self, namespace: str, table_name: str, is_refreshing: bool = None, is_blink: bool = False, internal_partition_column: str = None): """ Fetch a live Table for the specified namespace and table name. :param namespace: the namespace :param table_name: the table :param is_refreshing: True if the returned table should be refreshing :param is_blink: True if the returned table should be a blink table (defaults to False) :param internal_partition_column: Set the name of the internal partition column (defaults to None, not included) :return: the live table at the specified namespace and table name """ if is_refreshing is None: is_refreshing = self._j_liveness_default_type.livenessDefault() if is_refreshing: optionsBuilder = self._j_options_type.newLiveBuilder() else: optionsBuilder = self._j_options_type.newStaticBuilder() optionsBuilder.isBlink(is_blink) if internal_partition_column is not None: optionsBuilder.internalPartitionColumn(internal_partition_column) return Table(j_table=self.j_db.liveTable(namespace, table_name, optionsBuilder.build()))
[docs] def live_partitioned_table(self, namespace: str, table_name: str): """ Retrieve the specified historical table as a partitioned table from the Database. :param namespace: the namespace :param table_name: the table :return: a new historical partitioned table at the specified namespace and table name """ return PartitionedTable(j_partitioned_table=self.j_db.livePartitionedTable(namespace, table_name))
[docs] def historical_table(self, namespace: str, table_name: str, internal_partition_column: str = None): """ Fetch a historical Table for the specified namespace and table name. :param namespace: the namespace :param table_name: the table :return: the historical table at the specified namespace and table name """ optionsBuilder = self._j_options_type.newStaticBuilder() if internal_partition_column is not None: optionsBuilder.internalPartitionColumn(internal_partition_column) return Table(j_table=self.j_db.historicalTable(namespace, table_name, optionsBuilder.build()))
[docs] def historical_partitioned_table(self, namespace: str, table_name: str): """ Retrieve the specified historical table as a partitioned table from the Database. :param namespace: the namespace :param table_name: the table :return: a new historical partitioned table at the specified namespace and table name """ return PartitionedTable(j_partitioned_table=self.j_db.historicalPartitionedTable(namespace, table_name))
[docs] def namespaces(self): """ Return the list of namespaces. :return: the list of namespaces """ return self._j_collection_to_list(self.j_db.getNamespaces())
[docs] def table_names(self, namespace: str): """ Return the list tables within a namespace. :param namespace: the namespace :return: the list of tables within namespace """ return self._j_collection_to_list(self.j_db.getTableNames(namespace))
[docs] def catalog_table(self): """ Return a table of the available tables. The result table contains a column for Namespace, TableName, and NamespaceSet. :return: a table of table names """ return Table(j_table=self.j_db.getCatalogTable())
[docs] def table_definition(self, namespace: str, table_name: str): """ Fetch a Table containing the column definitions for the specified namespace and table name. :param namespace: the namespace :param table_name: the table :return: the definition table for the specified namespace and table name """ return Table(j_table=self.j_db.getTableDefinitionTable(namespace, table_name))
[docs] def add_unpartitioned_table(self, namespace: str, table_name: str, table: Table): """ Adds an unpartitioned user table. Writes an unpartitioned user table to disk. If the namespace does not exist, then it is created. The schema must not already exist. If an error occurs during writing, the state of the table is undefined. :param namespace: table namespace :param table_name: table name :param table: table that the definition and data will be based on """ return do_with_dh_error(lambda: self.j_wdb.addUnpartitionedTable(namespace, table_name, table.j_table), "Failed to add unpartitioned table '" + namespace + "." + table_name + "'")
[docs] def delete_unpartitioned_table(self, namespace: str, table_name: str): """ Deletes an unpartitioned user table and the schema. If the schema does not exist, then data is not deleted. If there is no data but the schema exists, then the schema is deleted. The namespace is not removed even if this is the last table in the namespace. If an error occurs during deleting, the state of the table is undefined. :param namespace: table namespace :param table_name: table name :return: True if the data was deleted, False if there was no data or preexisting schema """ return do_with_dh_error(lambda: self.j_wdb.deleteUnpartitionedTable(namespace, table_name), "Failed to delete unpartitioned table '" + namespace + "." + table_name + "'")
[docs] def add_partitioned_table_schema(self, namespace: str, table_name: str, partition_column_name: str, prototype: Table): """ Adds a schema for a partitioned user table. The schema is derived from the prototype table definition and the partition column name. The prototype definition must not include a partitioning column. If the namespace does not exist, then it is created. If the schema already exists and it is identical (this is a stricter check than compatibility; all columns must be present in the same order with the same properties), then the method returns False. If the schema already exists and is not identical, then an error is thrown. :param namespace: table namespace :param table_name: table name :param partition_column_name: name of the partitioning column :param prototype: table with definition to derive schema from :return: True if the partitioned table definition was added, False if it already existed """ return do_with_dh_error(lambda: self.j_wdb.addPartitionedTableSchema(namespace, table_name, partition_column_name, prototype._definition), "Failed to add partitioned table schema for '" + namespace + "." + table_name + "'")
[docs] def update_partitioned_table_schema(self, namespace: str, table_name: str, prototype: Table): """ Updates a preexisting User table schema. If the schema does not exist an error is thrown. Not all schema modifications are permitted. The partitioning column may not be changed. Existing columns may not have their type changed. Columns may be added or deleted. Note that no data is modified by this operation. Removed columns remain on persistent storage, and added columns are treated as null on read. Although each modification in isolation is verified for safety, a sequence of modifications to the schema may be unsafe. For example, deleting a column and adding it back with a new type results in unreadable data. :param namespace: table namespace :param table_name: table name :param prototype: table with definition to derive schema from :return: True if the partitioned table definition was updated, False if there was already an identical definition """ return do_with_dh_error(lambda: self.j_wdb.updatePartitionedTableSchema(namespace, table_name, prototype._definition), "Failed to update partitioned table schema for '" + namespace + "." + table_name + "'")
[docs] def add_table_partition(self, namespace: str, table_name: str, partition_column_value, table: Table): """ Adds a single column partition of data to a partitioned user table. The data table must have a mutually compatible definition with the current schema. The data table must not have a column with the same name as the partitioning column. If an error occurs during writing, the state of the table is undefined. :param namespace: table namespace :param table_name: table name :param partition_column_value: value for the partitioning column, e.g. "2015-09-25" for "Date" :param table: table to write data from """ return do_with_dh_error(lambda: self.j_wdb.addTablePartition(namespace, table_name, partition_column_value, table.j_table), "Failed to add table partition " + partition_column_value + " to '" + namespace + "." + table_name + "'")
[docs] def delete_table_partition(self, namespace: str, table_name: str, partition_column_value: str): """ Deletes a single column partition of data from a partitioned user table. If an error is thrown while deleting, the state of the table is undefined. :param namespace: table namespace :param table_name: table name :param partition_column_value: value for the partitioning column, e.g. "2015-09-25" for "Date" :return: True if the partition was deleted, False if there was no partition or preexisting schema """ return do_with_dh_error(lambda: self.j_wdb.deleteTablePartition(namespace, table_name, partition_column_value), "Failed to delete table partition " + partition_column_value + " from '" + namespace + "." + table_name + "'")
[docs] def append_live_table(self, namespace: str, table_name: str, partition_column_value: str, table: Table): """ Appends all rows from a given table to a live user table partition. The data table must have a mutually compatible definition with the current schema. The data table must not have a column with the same name as the partitioning column. This method is asynchronous. After returning, the data may not be immediately available. It is possible for the write to fail after this method has returned. When multiple workers append to a partition, ordering is imposed outside the worker by other system components. If an error is thrown during appending, the state of the table is undefined. :param namespace: table namespace :param table_name: table name :param partition_column_value: value for the partitioning column, e.g. "2015-09-25" for "Date" :param table: table to append rows from """ return do_with_dh_error(lambda: self.j_wdb.appendLiveTable(namespace, table_name, partition_column_value, table.j_table), "Failed to append to partition " + partition_column_value + " of live table '" + namespace + "." + table_name + "'")
[docs] def append_live_table_incremental(self, namespace: str, table_name: str, partition_column_value: str, table: Table): """ Appends all rows from a given table to a live user table partition. When rows are added to the table, they are additionally appended to the table. The input table updates can only have additions and shifts. No modifications or removals are permitted. The data table must have a mutually compatible definition with the current schema. The data table must not have a column with the same name as the partitioning column. This method is asynchronous, after returning the data may not be immediately available. It is possible for the write to fail after this method has returned. When multiple workers append to a partition, ordering is imposed outside the worker by other system components. A reference must be maintained to the returned object to ensure expected functionality; calling close() on it will stop incremental appends, and clean up related resources. If an error is thrown during appending, the state of the table is undefined. :param namespace: table namespace :param table_name: table name :param partition_column_value: value for the partitioning column, e.g. "2015-09-25" for "Date" :param table: table to append updates from :return: the object used to ensure and stop expected functionality """ return do_with_dh_error(lambda: self.j_wdb.appendLiveTableIncremental(namespace, table_name, partition_column_value, table.j_table), "Failed to append incrementally to partition " + partition_column_value + " of live table '" + namespace + "." + table_name + "'")
[docs] def delete_live_table_partition(self, namespace: str, table_name: str, partition_column_value: str): """ Delete a partition from a live user table. If an error is thrown during deleting, the state of the table is undefined. :param namespace: table namespace :param table_name: table name :param partition_column_value: value for the partitioning column, e.g. "2015-09-25" for "Date" :return: True if the partition was deleted, False if there was no partition or preexisting schema """ return do_with_dh_error(lambda: self.j_wdb.deleteLiveTablePartition(namespace, table_name, partition_column_value), "Failed to delete partition " + partition_column_value + " from live table '" + namespace + "." + table_name + "'")
[docs] def delete_partitioned_table(self, namespace: str, table_name: str): """ Delete all partitions, whether direct or live, and the schema, from a partitioned user table. All partitions from the table are deleted sequentially. If a partition cannot be deleted, then the operation fails but some data may have already been removed. After all partitions are deleted, then the schema is deleted. If the schema does not exist, then data is not deleted. If there is no data, but the schema exists, then the schema is deleted. The namespace is not removed, even if this is the last table in the namespace. If an error is thrown during deleting, the state of the table is undefined. :param namespace: table namespace :param table_name: table name :return: True if data was deleted, False if there was no data or preexisting schema """ return do_with_dh_error(lambda: self.j_wdb.deletePartitionedTable(namespace, table_name), "Failed to delete partitioned table '" + namespace + "." + table_name + "'")
db: Database = None