Source code for deephaven_enterprise.system_table_logger

import jpy

from deephaven.jcompat import AutoCloseable
from deephaven.table import Table
import deephaven_enterprise.database
from typing import Dict, Union
from deephaven import DHError

j_stl = jpy.get_type("io.deephaven.enterprise.database.SystemTableLogger")
j_zoneid = jpy.get_type("java.time.ZoneId")
j_objectcodec = jpy.get_type("io.deephaven.util.codec.ObjectCodec")

[docs] class Codec: """ A Simple wrapper class around io.deephaven.util.codec.ObjectCodec """ def __init__(self, j_codec): self.j_codec = j_codec
[docs] def codec(codec: Union[str, j_objectcodec]) -> Codec: """ Create a Codec instance for use with log_table and log_table_incremental :param codec: the codec as either a string representing the fully qualified java class, or an instance of a java io.deephaven.util.codec.ObjectCodec :return: a new codec """ if isinstance(codec, jpy.JType) and j_objectcodec.jclass.isInstance(codec): return Codec(codec) jclass = jpy.get_type(codec) if jclass is None: raise DHError("Unable to instantiate codec " + codec) return Codec(jclass())
[docs] def byte_array_codec(): """ Create a new byte array codec :return: a new byte array codec """ return codec("io.deephaven.enterprise.codec.ByteArrayCodec")
[docs] def char_array_codec(): """ Create a new char array codec :return: a new char array codec """ return codec("io.deephaven.enterprise.codec.CharArrayCodec")
[docs] def short_array_codec(): """ Create a new short array codec :return: a new short array codec """ return codec("io.deephaven.enterprise.codec.ShortArrayCodec")
[docs] def int_array_codec(): """ Create a new int array codec :return: a new int array codec """ return codec("io.deephaven.enterprise.codec.IntArrayCodec")
[docs] def long_array_codec(): """ Create a new long array codec :return: a new long array codec """ return codec("io.deephaven.enterprise.codec.LongArrayCodec")
[docs] def float_array_codec(): """ Create a new float array codec :return: a new float array codec """ return codec("io.deephaven.enterprise.codec.FloatArrayCodec")
[docs] def double_array_codec(): """ Create a new double array codec :return: a new double array codec """ return codec("io.deephaven.enterprise.codec.DoubleArrayCodec")
[docs] def string_array_codec(): """ Create a new string array codec :return: a new string array codec """ return codec("io.deephaven.enterprise.codec.StringArrayCodec")
[docs] def log_table(namespace: str, table_name: str, table: Table, columnPartition: str, internalPartition: str = None, applicationVersion : int=None, zone: str = None, useLas: bool = True, logDir: str = None, codecs: Dict[str, Codec] = None): """ Write tableToLog to System storage. The table is logged to Intraday storage and can be retrieved with db.live_table. Historical tables should be written using a merge process. :param namespace: the namespace of the table :param table_name: the name of the table :param table: the table to log :param columnPartition: the column partition to log to, if None then uses the current date :param internalPartition: the internal partition, if None an internal partition is generated :param applicationVersion: the application version, if None defaults to zero :param zone: the time zone ID (as interpreted by java.time.ZoneId.of) :param useLas: use the log aggregator service (defaults to true) :param logDir: the directory for writing binary log files (useLas must be false) :param codecs: an optional map of column name to Codec for encoding the values """ j_db = deephaven_enterprise.database.db.j_db opts = j_stl.newOptionsBuilder() if columnPartition is None: opts.currentDateColumnPartition(True) else: opts.fixedColumnPartition(columnPartition) if internalPartition is not None: opts.internalPartition(internalPartition) if applicationVersion is not None: opts.applicationVersion(applicationVersion) if zone is not None: opts.zoneId(j_zoneid.of(zone)) opts.useLas(useLas) if logDir is not None: opts.logDirectory(logDir) if codecs is not None: for key, value in codecs.items(): opts.putColumnCodecs(key, value.j_codec) j_stl.logTable(j_db, namespace, table_name, table.j_table, opts.build())
[docs] def log_table_incremental(namespace: str, table_name: str, table: Table, columnPartition: str, internalPartition: str = None, applicationVersion: int = None, zone: str = None, useLas: bool = True, logDir: str = None, codecs: Dict[str, Codec] = None): """ Write tableToLog to System storage. The table is logged to Intraday storage and can be retrieved with db.live_table. Historical tables should be written using a merge process. No rows should be removed or modified in tableToLog. Modifications are an error. If the table is not a <i>blink</i> table, then removals are an error. :param namespace: the namespace of the table :param table_name: the name of the table :param table: the table to log :param columnPartition: the column partition to log to, if None then uses the current date :param internalPartition: the internal partition, if None an internal partition is generated :param applicationVersion: the application version, if None defaults to zero :param zone: the time zone ID (as interpreted by java.time.ZoneId.of) :param useLas: use the log aggregator service (defaults to true) :param logDir: the directory for writing binary log files (useLas must be false) :param codecs: an optional map of column name to Codec for encoding the values :returns: a context manager that can be used in a with statement, or alternatively you can call close() when complete. Users should hold this return value to ensure liveness for writing. """ j_db = deephaven_enterprise.database.db.j_db opts = j_stl.newOptionsBuilder() if columnPartition is None: opts.currentDateColumnPartition(True) else: opts.fixedColumnPartition(columnPartition) if internalPartition is not None: opts.internalPartition(internalPartition) if applicationVersion is not None: opts.applicationVersion(applicationVersion) if zone is not None: opts.zoneId(j_zoneid.of(zone)) opts.useLas(useLas) if logDir is not None: opts.logDirectory(logDir) if codecs is not None: for key, value in codecs.items(): opts.putColumnCodecs(key, value.j_codec) closeable = j_stl.logTableIncremental(j_db, namespace, table_name, table.j_table, opts.build()) return AutoCloseable(closeable)