#
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#
"""This module defines functions for logging tables to Intraday storage."""
from typing import Any, Optional, Union
import jpy
from deephaven import DHError
from deephaven.jcompat import AutoCloseable
from deephaven.table import Table
import deephaven_enterprise.database
_JSystemTableLogger = jpy.get_type("io.deephaven.enterprise.database.SystemTableLogger")
_JZoneId = jpy.get_type("java.time.ZoneId")
JObjectCodec: Any = jpy.get_type("io.deephaven.util.codec.ObjectCodec")
[docs]
class Codec:
"""A Simple wrapper class around io.deephaven.util.codec.ObjectCodec"""
def __init__(self, j_codec):
self.j_codec = j_codec
[docs]
def codec(codec: Union[str, JObjectCodec]) -> Codec:
"""Creates a Codec instance for use with log_table and log_table_incremental
Args:
codec (Union[str, JObjectCodec]): the codec as either a string representing the fully qualified java class,
or an instance of a java io.deephaven.util.codec.ObjectCodec
Returns:
Codec
Raises:
DHError
"""
if isinstance(codec, jpy.JType) and JObjectCodec.jclass.isInstance(codec):
return Codec(codec)
j_class = jpy.get_type(codec)
if j_class is None:
raise DHError(message=f"Unable to instantiate codec: {codec}")
return Codec(j_class())
[docs]
def byte_array_codec() -> Codec:
"""Creates a new byte array codec
Returns:
a new byte array codec
"""
return codec("io.deephaven.enterprise.codec.ByteArrayCodec")
[docs]
def char_array_codec() -> Codec:
"""Creates a new char array codec
Returns:
a new char array codec
"""
return codec("io.deephaven.enterprise.codec.CharArrayCodec")
[docs]
def short_array_codec() -> Codec:
"""Creates a new short array codec
Returns:
a new short array codec
"""
return codec("io.deephaven.enterprise.codec.ShortArrayCodec")
[docs]
def int_array_codec() -> Codec:
"""Creates a new int array codec
Returns:
a new int array codec
"""
return codec("io.deephaven.enterprise.codec.IntArrayCodec")
[docs]
def long_array_codec() -> Codec:
"""Creates a new long array codec
Returns:
a new long array codec
"""
return codec("io.deephaven.enterprise.codec.LongArrayCodec")
[docs]
def float_array_codec() -> Codec:
"""Creates a new float array codec
Returns:
a new float array codec
"""
return codec("io.deephaven.enterprise.codec.FloatArrayCodec")
[docs]
def double_array_codec() -> Codec:
"""Creates a new double array codec
Returns:
a new double array codec
"""
return codec("io.deephaven.enterprise.codec.DoubleArrayCodec")
[docs]
def string_array_codec() -> Codec:
"""Creates a new string array codec
Returns:
a new string array codec
"""
return codec("io.deephaven.enterprise.codec.StringArrayCodec")
[docs]
def log_table(
namespace: str,
table_name: str,
table: Table,
columnPartition: str,
internalPartition: Optional[str] = None,
applicationVersion: Optional[int] = None,
zone: Optional[str] = None,
useLas: bool = True,
logDir: Optional[str] = None,
codecs: Optional[dict[str, Codec]] = None,
) -> None:
"""Writes tableToLog to Intraday storage.
The table is logged to Intraday storage and can be retrieved with db.live_table. Historical tables should be written
using a merge process.
Args:
namespace (str): the namespace of the table
table_name (str): the name of the table
table (Table): the table to log
columnPartition (str): the column partition to log to, if None then uses the current date
internalPartition (str): the internal partition, defaults to None, meaning an internal partition is generated
applicationVersion (int): the application version, if None, defaults to zero
zone (str): the time zone ID (as interpreted by java.time.ZoneId.of)
useLas (bool): whether to use the log aggregator service, defaults to True
logDir (str): the directory for writing binary log files, if not None, useLas must be False. Defaults to None
codecs (dict[str, Codec]): an optional map of column name to Codec for encoding the values, defaults to None
Returns:
None
"""
if deephaven_enterprise.database.db is None:
raise DHError("deephaven_enterprise.database has not been initialized")
j_db = deephaven_enterprise.database.db.j_database
opts = _JSystemTableLogger.newOptionsBuilder()
if columnPartition is None:
opts.currentDateColumnPartition(True)
else:
opts.fixedColumnPartition(columnPartition)
if internalPartition is not None:
opts.internalPartition(internalPartition)
if applicationVersion is not None:
opts.applicationVersion(applicationVersion)
if zone is not None:
opts.zoneId(_JZoneId.of(zone))
opts.useLas(useLas)
if logDir is not None:
opts.logDirectory(logDir)
if codecs is not None:
for key, value in codecs.items():
opts.putColumnCodecs(key, value.j_codec)
_JSystemTableLogger.logTable(
j_db, namespace, table_name, table.j_table, opts.build()
)
[docs]
def log_table_incremental(
namespace: str,
table_name: str,
table: Table,
columnPartition: str,
internalPartition: Optional[str] = None,
applicationVersion: Optional[int] = None,
zone: Optional[str] = None,
useLas: bool = True,
logDir: Optional[str] = None,
codecs: Optional[dict[str, Codec]] = None,
) -> AutoCloseable:
"""TODO:
1. parameter naming is not Python convention would be a breaking change to fix, sooner better than later?"
2. Not sure about the System -> Intraday change
"""
"""Writes tableToLog to Intraday storage.
The table is logged to Intraday storage and can be retrieved with db.live_table. Historical tables should be written
using a merge process.
No rows should be removed or modified in tableToLog. Modifications are an error. If the table is not a blink
table, then removals are an error.
Args:
namespace (str): the namespace of the table
table_name (str): the name of the table
table (Table): the table to log
columnPartition (str): the column partition to log to, if None then uses the current date
internalPartition (str): the internal partition, defaults to None, meaning an internal partition is generated
applicationVersion (int): the application version, if None, defaults to zero
zone (str): the time zone ID (as interpreted by java.time.ZoneId.of), defaults to None
useLas (bool): whether to use the log aggregator service, defaults to True
logDir (str): the directory for writing binary log files, if not None, useLas must be False, defaults to None
codecs (dict[str, Codec]): an optional map of column name to Codec for encoding the values, defaults to None
Returns:
a context manager that can be used in a with statement, or alternatively you can call close() when complete.
Users should hold this return value to ensure liveness for writing.
"""
if deephaven_enterprise.database.db is None:
raise DHError("deephaven_enterprise.database has not been initialized")
j_db = deephaven_enterprise.database.db.j_database
opts = _JSystemTableLogger.newOptionsBuilder()
if columnPartition is None:
opts.currentDateColumnPartition(True)
else:
opts.fixedColumnPartition(columnPartition)
if internalPartition is not None:
opts.internalPartition(internalPartition)
if applicationVersion is not None:
opts.applicationVersion(applicationVersion)
if zone is not None:
opts.zoneId(_JZoneId.of(zone))
opts.useLas(useLas)
if logDir is not None:
opts.logDirectory(logDir)
if codecs is not None:
for key, value in codecs.items():
opts.putColumnCodecs(key, value.j_codec)
closeable = _JSystemTableLogger.logTableIncremental(
j_db, namespace, table_name, table.j_table, opts.build()
)
return AutoCloseable(closeable)