from typing import Callable
import jpy
from deephaven.table import Table
import deephaven_enterprise.database
j_stl = jpy.get_type("io.deephaven.enterprise.database.SystemTableLogger")
j_zoneid = jpy.get_type("java.time.ZoneId")
[docs]def log_table(namespace: str, table_name: str, table: Table, columnPartition: str, internalPartition: str = None,
applicationVersion : int=None, zone: str = None, useLas: bool = True, logDir: str = None):
"""
Write tableToLog to System storage.
The table is logged to Intraday storage and can be retrieved with db.live_table.
Historical tables should be written using a merge process.
:param namespace: the namespace of the table
:param table_name: the name of the table
:param table: the table to log
:param columnPartition: the column partition to log to, if None then uses the current date
:param internalPartition: the internal partition, if None an internal partition is generated
:param applicationVersion: the application version, if None defaults to zero
:param zone: the time zone ID (as interpreted by java.time.ZoneId.of)
:param useLas: use the log aggregator service (defaults to true)
:param logDir: the directory for writing binary log files (useLas must be false)
"""
j_db = deephaven_enterprise.database.db.j_db
opts = j_stl.newOptionsBuilder()
if columnPartition is None:
opts.currentDateColumnPartition(True)
else:
opts.fixedColumnPartition(columnPartition)
if internalPartition is not None:
opts.internalPartition(internalPartition)
if applicationVersion is not None:
opts.applicationVersion(applicationVersion)
if zone is not None:
opts.zoneId(j_zoneid.of(zone))
opts.useLas(useLas)
if logDir is not None:
opts.logDirectory(logDir)
j_stl.logTable(j_db, namespace, table_name, table.j_table, opts.build())
[docs]def log_table_incremental(namespace: str, table_name: str, table: Table, columnPartition: str, internalPartition: str = None,
applicationVersion: int = None, zone: str = None, useLas: bool = True, logDir: str = None):
"""
Write tableToLog to System storage.
The table is logged to Intraday storage and can be retrieved with db.live_table.
Historical tables should be written using a merge process.
No rows should be removed or modified in tableToLog. Modifications are an error. If the table is not a <i>blink</i>
table, then removals are an error.
:param namespace: the namespace of the table
:param table_name: the name of the table
:param table: the table to log
:param columnPartition: the column partition to log to, if None then uses the current date
:param internalPartition: the internal partition, if None an internal partition is generated
:param applicationVersion: the application version, if None defaults to zero
:param zone: the time zone ID (as interpreted by java.time.ZoneId.of)
:param useLas: use the log aggregator service (defaults to true)
:param logDir: the directory for writing binary log files (useLas must be false)
:returns: a context manager that can be used in a with statement, or alternatively you can call close() when complete.
Users should hold this return value to ensure liveness for writing.
"""
j_db = deephaven_enterprise.database.db.j_db
opts = j_stl.newOptionsBuilder()
if columnPartition is None:
opts.currentDateColumnPartition(True)
else:
opts.fixedColumnPartition(columnPartition)
if internalPartition is not None:
opts.internalPartition(internalPartition)
if applicationVersion is not None:
opts.applicationVersion(applicationVersion)
if zone is not None:
opts.zoneId(j_zoneid.of(zone))
opts.useLas(useLas)
if logDir is not None:
opts.logDirectory(logDir)
closeable = j_stl.logTableIncremental(j_db, namespace, table_name, table.j_table, opts.build())
class Cleanup:
def __enter__(self):
return self
def close(self):
closeable.close()
def __exit__(self, exc_type, exc_value, traceback):
self.close()
return False
def __del__(self):
self.close()
return Cleanup()