Source code for deephaven_enterprise.system_table_logger
import jpy
from deephaven.jcompat import AutoCloseable
from deephaven.table import Table
import deephaven_enterprise.database
from typing import Dict, Union
from deephaven import DHError
j_stl = jpy.get_type("io.deephaven.enterprise.database.SystemTableLogger")
j_zoneid = jpy.get_type("java.time.ZoneId")
j_objectcodec = jpy.get_type("io.deephaven.util.codec.ObjectCodec")
[docs]
class Codec:
"""
A Simple wrapper class around io.deephaven.util.codec.ObjectCodec
"""
def __init__(self, j_codec):
self.j_codec = j_codec
[docs]
def codec(codec: Union[str, j_objectcodec]) -> Codec:
"""
Create a Codec instance for use with log_table and log_table_incremental
:param codec: the codec as either a string representing the fully qualified java class, or an instance of a java io.deephaven.util.codec.ObjectCodec
:return: a new codec
"""
if isinstance(codec, jpy.JType) and j_objectcodec.jclass.isInstance(codec):
return Codec(codec)
jclass = jpy.get_type(codec)
if jclass is None:
raise DHError("Unable to instantiate codec " + codec)
return Codec(jclass())
[docs]
def byte_array_codec():
"""
Create a new byte array codec
:return: a new byte array codec
"""
return codec("io.deephaven.enterprise.codec.ByteArrayCodec")
[docs]
def char_array_codec():
"""
Create a new char array codec
:return: a new char array codec
"""
return codec("io.deephaven.enterprise.codec.CharArrayCodec")
[docs]
def short_array_codec():
"""
Create a new short array codec
:return: a new short array codec
"""
return codec("io.deephaven.enterprise.codec.ShortArrayCodec")
[docs]
def int_array_codec():
"""
Create a new int array codec
:return: a new int array codec
"""
return codec("io.deephaven.enterprise.codec.IntArrayCodec")
[docs]
def long_array_codec():
"""
Create a new long array codec
:return: a new long array codec
"""
return codec("io.deephaven.enterprise.codec.LongArrayCodec")
[docs]
def float_array_codec():
"""
Create a new float array codec
:return: a new float array codec
"""
return codec("io.deephaven.enterprise.codec.FloatArrayCodec")
[docs]
def double_array_codec():
"""
Create a new double array codec
:return: a new double array codec
"""
return codec("io.deephaven.enterprise.codec.DoubleArrayCodec")
[docs]
def string_array_codec():
"""
Create a new string array codec
:return: a new string array codec
"""
return codec("io.deephaven.enterprise.codec.StringArrayCodec")
[docs]
def log_table(namespace: str, table_name: str, table: Table, columnPartition: str, internalPartition: str = None,
applicationVersion : int=None, zone: str = None, useLas: bool = True, logDir: str = None, codecs: Dict[str, Codec] = None):
"""
Write tableToLog to System storage.
The table is logged to Intraday storage and can be retrieved with db.live_table.
Historical tables should be written using a merge process.
:param namespace: the namespace of the table
:param table_name: the name of the table
:param table: the table to log
:param columnPartition: the column partition to log to, if None then uses the current date
:param internalPartition: the internal partition, if None an internal partition is generated
:param applicationVersion: the application version, if None defaults to zero
:param zone: the time zone ID (as interpreted by java.time.ZoneId.of)
:param useLas: use the log aggregator service (defaults to true)
:param logDir: the directory for writing binary log files (useLas must be false)
:param codecs: an optional map of column name to Codec for encoding the values
"""
j_db = deephaven_enterprise.database.db.j_db
opts = j_stl.newOptionsBuilder()
if columnPartition is None:
opts.currentDateColumnPartition(True)
else:
opts.fixedColumnPartition(columnPartition)
if internalPartition is not None:
opts.internalPartition(internalPartition)
if applicationVersion is not None:
opts.applicationVersion(applicationVersion)
if zone is not None:
opts.zoneId(j_zoneid.of(zone))
opts.useLas(useLas)
if logDir is not None:
opts.logDirectory(logDir)
if codecs is not None:
for key, value in codecs.items():
opts.putColumnCodecs(key, value.j_codec)
j_stl.logTable(j_db, namespace, table_name, table.j_table, opts.build())
[docs]
def log_table_incremental(namespace: str, table_name: str, table: Table, columnPartition: str, internalPartition: str = None,
applicationVersion: int = None, zone: str = None, useLas: bool = True, logDir: str = None, codecs: Dict[str, Codec] = None):
"""
Write tableToLog to System storage.
The table is logged to Intraday storage and can be retrieved with db.live_table.
Historical tables should be written using a merge process.
No rows should be removed or modified in tableToLog. Modifications are an error. If the table is not a <i>blink</i>
table, then removals are an error.
:param namespace: the namespace of the table
:param table_name: the name of the table
:param table: the table to log
:param columnPartition: the column partition to log to, if None then uses the current date
:param internalPartition: the internal partition, if None an internal partition is generated
:param applicationVersion: the application version, if None defaults to zero
:param zone: the time zone ID (as interpreted by java.time.ZoneId.of)
:param useLas: use the log aggregator service (defaults to true)
:param logDir: the directory for writing binary log files (useLas must be false)
:param codecs: an optional map of column name to Codec for encoding the values
:returns: a context manager that can be used in a with statement, or alternatively you can call close() when complete.
Users should hold this return value to ensure liveness for writing.
"""
j_db = deephaven_enterprise.database.db.j_db
opts = j_stl.newOptionsBuilder()
if columnPartition is None:
opts.currentDateColumnPartition(True)
else:
opts.fixedColumnPartition(columnPartition)
if internalPartition is not None:
opts.internalPartition(internalPartition)
if applicationVersion is not None:
opts.applicationVersion(applicationVersion)
if zone is not None:
opts.zoneId(j_zoneid.of(zone))
opts.useLas(useLas)
if logDir is not None:
opts.logDirectory(logDir)
if codecs is not None:
for key, value in codecs.items():
opts.putColumnCodecs(key, value.j_codec)
closeable = j_stl.logTableIncremental(j_db, namespace, table_name, table.j_table, opts.build())
return AutoCloseable(closeable)