Source code for deephaven_enterprise.table_tools
#
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#
"""This module defines a convenience function for reading and writing tables in Deephaven format."""
from typing import Any, Optional, Sequence, Union
import jpy
from deephaven.dherror import DHError
from deephaven.jcompat import to_sequence
from deephaven.table import Table, TableDefinition, TableDefinitionLike
from deephaven_enterprise.schema import Schema, SchemaLike
_JEnterpriseTableTools = jpy.get_type(
"io.deephaven.enterprise.table.EnterpriseTableTools"
)
JTableDefinition: Any = jpy.get_type("io.deephaven.engine.table.TableDefinition")
_JReadBinlogOptions: Any = jpy.get_type(
"io.deephaven.enterprise.table.ReadBinlogOptions"
)
_JReadBinlogDirectoryOptions: Any = jpy.get_type(
"io.deephaven.enterprise.table.ReadBinlogDirectoryOptions"
)
_JPath: Any = jpy.get_type("java.nio.file.Path")
[docs]
def read_table(path: str) -> Table:
"""Reads a Deephaven format table at the specified path.
Args:
path (str): the path to the table
Returns:
a Table object
"""
return Table(j_table=_JEnterpriseTableTools.readTable(path))
[docs]
def write_table(
table: Table, path: str, table_definition: Optional[JTableDefinition] = None
) -> None:
"""Writes a table to the specified path in Deephaven format.
This method allows you to provide a separate table definition so that users may specify different column attributes
(grouping, for example) than that of the source table to write. If provided, the definition must be compatible with
the source table. If not provided, the source table's definition is used.
Args:
table (Table): the table to write
path (str): the path to write to
table_definition (JTableDefinition): the table definition to use, defaults to None
"""
_JEnterpriseTableTools.writeTable(
table.j_table,
table.j_table.getDefinition() if table_definition is None else table_definition,
path,
)
[docs]
def read_bin(
paths: Union[str, Sequence[str]],
schema: Optional[SchemaLike] = None,
table_def: Optional[TableDefinitionLike] = None,
buffer_size: Optional[int] = None,
batch_size: Optional[int] = None,
) -> Table:
"""Loads binary log file(s) to an in-memory Deephaven format table. All binary log files must have an identical
schema.
Args:
paths (Union[str, Sequence[str]]): A full path (or list of full paths) to the binlog(s) which should be read.
schema (Optional[SchemaLike): Identifies the schema to use for listener generation.
Required if the Namespace and TableName cannot be determined by the first file in `paths`.
table_def (Optional[TableDefinitionLike]): An optional table definition, used to limit the desired columns.
buffer_size (Optional[int]): The buffer size used for reading from binlog file(s). Defaults to None, which will
use the system default.
batch_size (Optional[int]): The number of rows to batch while reading from binlog file(s). Defaults to None,
which will use the system default.
Returns:
a Table object
Raises:
DHError
"""
if paths is None:
raise DHError("Failed to read binlog(s): Must specify at least one path")
else:
paths = to_sequence(paths)
jpaths = [_JPath.of(pth) for pth in paths]
rbo = _JReadBinlogOptions.builder()
rbo.addPaths(jpaths)
if schema is not None:
rbo.schema(Schema(schema).j_schema)
elif len(jpaths) > 0:
rbo.schema(_JReadBinlogOptions.determineSchema(jpaths[0]))
else:
raise DHError("Failed to read binlog(s): Cannot determine schema")
if table_def is not None:
rbo.tableDefinition(TableDefinition(table_def).j_table_definition)
if buffer_size is not None:
rbo.bufferSize(buffer_size)
if batch_size is not None:
rbo.batchSize(batch_size)
try:
return Table(j_table=_JEnterpriseTableTools.readBin(rbo.build()))
except Exception as e:
raise DHError(e, message="Failed to read binlog(s)") from e
[docs]
def read_bin_dir(
path: str,
glob: str,
schema: SchemaLike,
table_def: Optional[TableDefinitionLike] = None,
buffer_size: Optional[int] = None,
batch_size: Optional[int] = None,
) -> Table:
"""Loads binary log file(s) from a directory to an in-memory Deephaven format table. All binary log files must have
an identical schema.
Args:
path (str): A full path to the directory which should be read.
glob (str): The globbing pattern used to discover the binary log files in the specified `path`.
schema (SchemaLike): Identifies the schema to use for listener generation.
table_def (Optional[TableDefinitionLike]): An optional table definition, used to limit the desired columns.
buffer_size (Optional[int]): The buffer size used for reading from binlog file(s). Defaults to None, which will
use the system default.
batch_size (Optional[int]): The number of rows to batch while reading from binlog file(s). Defaults to None,
which will use the system default.
returns:
Table
Raises:
DHError
"""
if path is None or glob is None:
raise DHError("Failed to read binlog(s): Must specify path and glob")
jpath = _JPath.of(path)
rbdo = _JReadBinlogDirectoryOptions.builder()
rbdo.dir(jpath)
rbdo.glob(glob)
if schema is not None:
rbdo.schema(Schema(schema).j_schema)
else:
raise DHError("Failed to read binlog(s): Cannot determine schema")
if table_def is not None:
rbdo.tableDefinition(TableDefinition(table_def).j_table_definition)
if buffer_size is not None:
rbdo.bufferSize(buffer_size)
if batch_size is not None:
rbdo.batchSize(batch_size)
try:
return Table(j_table=_JEnterpriseTableTools.readBin(rbdo.build()))
except Exception as e:
raise DHError(e, message="Failed to read binlog(s)") from e