Source code for deephaven_enterprise.input_tables
#
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#
"""This module supports the creation of input tables in Deephaven by providing classes for defining columns and input
table specifications.
"""
from __future__ import annotations
from typing import Any, Optional
import jpy
from deephaven import DHError, dtypes
from deephaven.dtypes import DType, _j_name_type_map
_JInputTableSpec = jpy.get_type("io.deephaven.enterprise.inputtables.InputTableSpec")
_JColumnSpec = jpy.get_type("io.deephaven.enterprise.inputtables.ColumnSpec")
def _get_dtype_from_jclass_name(jclass_name: str) -> DType:
return _j_name_type_map[jclass_name]
def _get_enum_values_from_j_column_spec(j_column_spec: jpy.JType) -> list[Any]:
enum_values = []
j_it = j_column_spec.enumValues().iterator()
while j_it.hasNext():
enum_value = j_it.next()
if (
_get_dtype_from_jclass_name(j_column_spec.dataType().getName())
is dtypes.char
):
enum_value = chr(enum_value)
enum_values.append(enum_value)
return enum_values
[docs]
class ColumnSpec:
"""A ColumnSpec defines a column of an input table."""
# DType.j_type.jclass is insufficient b/c it's unassigned for Java primitive types.
_primitive_dtype_to_jclass_map = {
dtypes.char: jpy.get_type("java.lang.Character").TYPE,
dtypes.float32: jpy.get_type("java.lang.Float").TYPE,
dtypes.float64: jpy.get_type("java.lang.Double").TYPE,
dtypes.byte: jpy.get_type("java.lang.Byte").TYPE,
dtypes.short: jpy.get_type("java.lang.Short").TYPE,
dtypes.int32: jpy.get_type("java.lang.Integer").TYPE,
dtypes.long: jpy.get_type("java.lang.Long").TYPE,
}
def __init__(
self,
name: str,
data_type: DType,
is_key: bool,
enum_values: Optional[list[Any]] = None,
) -> None:
"""Creates a ColumnSpec with the provided name, data type, and key status.
Args:
name (str): the name of the column
data_type (DType): the data type of the column
is_key (bool): whether the column is a keyed column
enum_values (list[Any]): the enum values for the column, defaults to None
Raises:
DHError
"""
try:
j_column_spec_builder = (
_JColumnSpec.builder()
.name(name)
.dataType(
self._primitive_dtype_to_jclass_map[data_type]
if data_type in self._primitive_dtype_to_jclass_map
else data_type.j_type.jclass # type: ignore[union-attr]
)
.isKey(is_key)
)
except Exception as e:
raise DHError(
e, message="Unsupported data type for column: " + data_type.j_name
) from e
if enum_values:
# TODO: Remove char logic if jpy.convert supports single-character string conversions into chars.
if data_type is dtypes.char:
char_enum_values = []
for enum_value in enum_values:
if type(enum_value) is not str or len(enum_value) != 1:
raise DHError(
message="Enum value inputs for character columns must be a single character string."
)
char_enum_values.append(ord(enum_value))
enum_values = char_enum_values
enum_values = [
jpy.convert(enum_value, data_type.j_name) for enum_value in enum_values
]
j_column_spec_builder.addEnumValues(enum_values)
self.j_column_spec = j_column_spec_builder.build()
[docs]
def name(self) -> str:
"""Returns the name of the column.
Returns:
str
"""
return self.j_column_spec.name()
[docs]
def data_type(self) -> DType:
"""Returns the DType of the column.
Returns:
DType: the DType of the column
"""
return _get_dtype_from_jclass_name(self.j_column_spec.dataType().getName())
[docs]
def is_key(self) -> bool:
"""Whether the column is a keyed column.
Returns:
bool
"""
return self.j_column_spec.isKey()
[docs]
def enum_values(self) -> list[Any]:
"""Gets the enum values for the column.
Returns:
list[Any]
"""
return _get_enum_values_from_j_column_spec(self.j_column_spec)
[docs]
class InputTableSpec:
"""A specification for input tables that can be passed into :meth:`.database.Database.add_input_table_schema` to
add an input table schema to the database.
See https://deephaven.io/enterprise/docs/coreplus/user-tables/#input-tables-1 for more information.
"""
@staticmethod
def _add_column_specs_and_build(
j_input_table_spec_builder: jpy.JType, column_specs: list[ColumnSpec]
) -> jpy.JType:
"""Adds column specs to an input table spec builder, builds it, and returns the built object.
Args:
j_input_table_spec_builder (jpy.JType): the input table spec builder to add column specs to
column_specs (list[ColumnSpec]): the column specs to add to the input table spec
Returns:
jpy.JType: the raw Java input table spec
"""
for column_spec in column_specs:
j_input_table_spec_builder.addColumnSpec(column_spec.j_column_spec)
return j_input_table_spec_builder.build()
[docs]
@staticmethod
def get_column_specs_from_j_input_table_spec(
j_input_table_spec: jpy.JType,
) -> dict[str, ColumnSpec]:
"""Extracts Java column specs from Java input table specs and returns them as Python ColumnSpecs.
Args:
j_input_table_spec (jpy.JType): Java input table spec to extract column specs from
Returns:
dict[str, ColumnSpec]: Python ColumnSpecs from the Java input table spec
"""
j_column_specs = []
j_it = j_input_table_spec.columnSpecs().values().iterator()
while j_it.hasNext():
j_column_specs.append(j_it.next())
column_specs = {}
for j_column_spec in j_column_specs:
enum_values = _get_enum_values_from_j_column_spec(j_column_spec)
if j_column_spec.enumValues().isEmpty():
column_specs[j_column_spec.name()] = ColumnSpec(
j_column_spec.name(),
_get_dtype_from_jclass_name(j_column_spec.dataType().getName()),
j_column_spec.isKey(),
)
else:
column_specs[j_column_spec.name()] = ColumnSpec(
j_column_spec.name(),
_get_dtype_from_jclass_name(j_column_spec.dataType().getName()),
j_column_spec.isKey(),
enum_values,
)
return column_specs
def __init__(self, column_specs: list[ColumnSpec]) -> None:
"""Creates an InputTableSpec with the provided ColumnSpecs.
Args:
column_specs (list[ColumnSpec]): the column specs to create the InputTableSpec
"""
self.j_input_table_spec = self._add_column_specs_and_build(
_JInputTableSpec.builder(), column_specs
)
[docs]
def column_specs(self) -> dict[str, ColumnSpec]:
"""Returns the ColumnSpecs for this input table.
Returns:
dict[str, ColumnSpec]: the ColumnSpecs for this input table
"""
return self.get_column_specs_from_j_input_table_spec(self.j_input_table_spec)
[docs]
def add_column_specs(self, column_specs: list[ColumnSpec]) -> InputTableSpec:
"""Returns a new InputTableSpec that is equal to this one, except with the specified additional ColumnSpecs.
Args:
column_specs (list[ColumnSpec]): specified ColumnSpecs to add to the InputTableSpec
Returns:
a new InputTableSpec
"""
return InputTableSpec(
list(
self.get_column_specs_from_j_input_table_spec(
self._add_column_specs_and_build(
self.j_input_table_spec.asBuilder(), column_specs
)
).values()
)
)
[docs]
def remove_column_specs(self, column_names: list[str]) -> InputTableSpec:
"""Returns an InputTableSpec that is equal to this one, except without the specified columns.
Args:
column_names (list[str]): specified columns to exclude from the returned InputTableSpec
Returns:
a new InputTableSpec
"""
return InputTableSpec(
list(
self.get_column_specs_from_j_input_table_spec(
self._add_column_specs_and_build(
self.j_input_table_spec.removeColumnSpecs(column_names), []
)
).values()
)
)