Source code for deephaven_enterprise.system_table_logger

#
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#
"""This module defines functions for logging tables to Intraday storage."""

from typing import Any, Optional, Union

import jpy
from deephaven import DHError
from deephaven.jcompat import AutoCloseable
from deephaven.table import Table

import deephaven_enterprise.database

_JSystemTableLogger = jpy.get_type("io.deephaven.enterprise.database.SystemTableLogger")
_JZoneId = jpy.get_type("java.time.ZoneId")
JObjectCodec: Any = jpy.get_type("io.deephaven.util.codec.ObjectCodec")


[docs] class Codec: """A Simple wrapper class around io.deephaven.util.codec.ObjectCodec""" def __init__(self, j_codec): self.j_codec = j_codec
[docs] def codec(codec: Union[str, JObjectCodec]) -> Codec: """Creates a Codec instance for use with log_table and log_table_incremental Args: codec (Union[str, JObjectCodec]): the codec as either a string representing the fully qualified java class, or an instance of a java io.deephaven.util.codec.ObjectCodec Returns: Codec Raises: DHError """ if isinstance(codec, jpy.JType) and JObjectCodec.jclass.isInstance(codec): return Codec(codec) j_class = jpy.get_type(codec) if j_class is None: raise DHError(message=f"Unable to instantiate codec: {codec}") return Codec(j_class())
[docs] def byte_array_codec() -> Codec: """Creates a new byte array codec Returns: a new byte array codec """ return codec("io.deephaven.enterprise.codec.ByteArrayCodec")
[docs] def char_array_codec() -> Codec: """Creates a new char array codec Returns: a new char array codec """ return codec("io.deephaven.enterprise.codec.CharArrayCodec")
[docs] def short_array_codec() -> Codec: """Creates a new short array codec Returns: a new short array codec """ return codec("io.deephaven.enterprise.codec.ShortArrayCodec")
[docs] def int_array_codec() -> Codec: """Creates a new int array codec Returns: a new int array codec """ return codec("io.deephaven.enterprise.codec.IntArrayCodec")
[docs] def long_array_codec() -> Codec: """Creates a new long array codec Returns: a new long array codec """ return codec("io.deephaven.enterprise.codec.LongArrayCodec")
[docs] def float_array_codec() -> Codec: """Creates a new float array codec Returns: a new float array codec """ return codec("io.deephaven.enterprise.codec.FloatArrayCodec")
[docs] def double_array_codec() -> Codec: """Creates a new double array codec Returns: a new double array codec """ return codec("io.deephaven.enterprise.codec.DoubleArrayCodec")
[docs] def string_array_codec() -> Codec: """Creates a new string array codec Returns: a new string array codec """ return codec("io.deephaven.enterprise.codec.StringArrayCodec")
[docs] def log_table( namespace: str, table_name: str, table: Table, columnPartition: str, internalPartition: Optional[str] = None, applicationVersion: Optional[int] = None, zone: Optional[str] = None, useLas: bool = True, logDir: Optional[str] = None, codecs: Optional[dict[str, Codec]] = None, ) -> None: """Writes tableToLog to Intraday storage. The table is logged to Intraday storage and can be retrieved with db.live_table. Historical tables should be written using a merge process. Args: namespace (str): the namespace of the table table_name (str): the name of the table table (Table): the table to log columnPartition (str): the column partition to log to, if None then uses the current date internalPartition (str): the internal partition, defaults to None, meaning an internal partition is generated applicationVersion (int): the application version, if None, defaults to zero zone (str): the time zone ID (as interpreted by java.time.ZoneId.of) useLas (bool): whether to use the log aggregator service, defaults to True logDir (str): the directory for writing binary log files, if not None, useLas must be False. Defaults to None codecs (dict[str, Codec]): an optional map of column name to Codec for encoding the values, defaults to None Returns: None """ if deephaven_enterprise.database.db is None: raise DHError("deephaven_enterprise.database has not been initialized") j_db = deephaven_enterprise.database.db.j_database opts = _JSystemTableLogger.newOptionsBuilder() if columnPartition is None: opts.currentDateColumnPartition(True) else: opts.fixedColumnPartition(columnPartition) if internalPartition is not None: opts.internalPartition(internalPartition) if applicationVersion is not None: opts.applicationVersion(applicationVersion) if zone is not None: opts.zoneId(_JZoneId.of(zone)) opts.useLas(useLas) if logDir is not None: opts.logDirectory(logDir) if codecs is not None: for key, value in codecs.items(): opts.putColumnCodecs(key, value.j_codec) _JSystemTableLogger.logTable( j_db, namespace, table_name, table.j_table, opts.build() )
[docs] def log_table_incremental( namespace: str, table_name: str, table: Table, columnPartition: str, internalPartition: Optional[str] = None, applicationVersion: Optional[int] = None, zone: Optional[str] = None, useLas: bool = True, logDir: Optional[str] = None, codecs: Optional[dict[str, Codec]] = None, ) -> AutoCloseable: """TODO: 1. parameter naming is not Python convention would be a breaking change to fix, sooner better than later?" 2. Not sure about the System -> Intraday change """ """Writes tableToLog to Intraday storage. The table is logged to Intraday storage and can be retrieved with db.live_table. Historical tables should be written using a merge process. No rows should be removed or modified in tableToLog. Modifications are an error. If the table is not a blink table, then removals are an error. Args: namespace (str): the namespace of the table table_name (str): the name of the table table (Table): the table to log columnPartition (str): the column partition to log to, if None then uses the current date internalPartition (str): the internal partition, defaults to None, meaning an internal partition is generated applicationVersion (int): the application version, if None, defaults to zero zone (str): the time zone ID (as interpreted by java.time.ZoneId.of), defaults to None useLas (bool): whether to use the log aggregator service, defaults to True logDir (str): the directory for writing binary log files, if not None, useLas must be False, defaults to None codecs (dict[str, Codec]): an optional map of column name to Codec for encoding the values, defaults to None Returns: a context manager that can be used in a with statement, or alternatively you can call close() when complete. Users should hold this return value to ensure liveness for writing. """ if deephaven_enterprise.database.db is None: raise DHError("deephaven_enterprise.database has not been initialized") j_db = deephaven_enterprise.database.db.j_database opts = _JSystemTableLogger.newOptionsBuilder() if columnPartition is None: opts.currentDateColumnPartition(True) else: opts.fixedColumnPartition(columnPartition) if internalPartition is not None: opts.internalPartition(internalPartition) if applicationVersion is not None: opts.applicationVersion(applicationVersion) if zone is not None: opts.zoneId(_JZoneId.of(zone)) opts.useLas(useLas) if logDir is not None: opts.logDirectory(logDir) if codecs is not None: for key, value in codecs.items(): opts.putColumnCodecs(key, value.j_codec) closeable = _JSystemTableLogger.logTableIncremental( j_db, namespace, table_name, table.j_table, opts.build() ) return AutoCloseable(closeable)