Source code for deephaven_enterprise.input_tables
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
from deephaven import dtypes, DHError
from deephaven.dtypes import _j_name_type_map, DType
from typing import Any
import jpy
def _get_dtype_from_jclass_name(jclass_name: str) -> DType:
return _j_name_type_map[jclass_name]
def _get_enum_values_from_j_column_spec(j_column_spec: jpy.JType) -> list[Any]:
enum_values = []
j_it = j_column_spec.enumValues().iterator()
while j_it.hasNext():
enum_value = j_it.next()
if _get_dtype_from_jclass_name(j_column_spec.dataType().getName()) is dtypes.char:
enum_value = chr(enum_value)
enum_values.append(enum_value)
return enum_values
[docs]
class ColumnSpec:
"""
Representation of input table column specification.
"""
_JColumnSpec = jpy.get_type("io.deephaven.enterprise.inputtables.ColumnSpec")
# DType.j_type.jclass is insufficient b/c it's unassigned for Java primitive types.
_primitive_dtype_to_jclass_map = {
dtypes.char: jpy.get_type("java.lang.Character").TYPE,
dtypes.float32: jpy.get_type("java.lang.Float").TYPE,
dtypes.float64: jpy.get_type("java.lang.Double").TYPE,
dtypes.byte: jpy.get_type("java.lang.Byte").TYPE,
dtypes.short: jpy.get_type("java.lang.Short").TYPE,
dtypes.int32: jpy.get_type("java.lang.Integer").TYPE,
dtypes.long: jpy.get_type("java.lang.Long").TYPE
}
def __init__(self, name: str, data_type: DType, is_key: bool, enum_values: list[Any] = None) -> None:
j_column_spec_builder = self._JColumnSpec.builder() \
.name(name) \
.setDataType(self._primitive_dtype_to_jclass_map[data_type] if data_type in self._primitive_dtype_to_jclass_map else data_type.j_type.jclass)\
.isKey(is_key)
if enum_values:
# TODO: Remove char logic if jpy.convert supports single-character string conversions into chars.
if data_type is dtypes.char:
char_enum_values = []
for enum_value in enum_values:
if type(enum_value) is not str or len(enum_value) != 1:
raise DHError("Enum value inputs for character columns must be a single character string.")
char_enum_values.append(ord(enum_value))
enum_values = char_enum_values
enum_values = [jpy.convert(enum_value, data_type.j_name) for enum_value in enum_values]
j_column_spec_builder.addToEnumValues(enum_values)
self.j_column_spec = j_column_spec_builder.build()
[docs]
def name(self) -> str:
"""
Gets the name of the column.
:return: the name of the column
"""
return self.j_column_spec.name()
[docs]
def data_type(self) -> DType:
"""
Gets the DType of the column.
:return: the DType of the column
"""
return _get_dtype_from_jclass_name(self.j_column_spec.dataType().getName())
[docs]
def is_key(self) -> bool:
"""
Gets whether the column is a keyed column.
:return: whether the column is a keyed column
"""
return self.j_column_spec.isKey()
[docs]
def enum_values(self) -> list[Any]:
"""
Gets the enum values for the column.
:return: the enum values for the column
"""
return _get_enum_values_from_j_column_spec(self.j_column_spec)
[docs]
class InputTableSpec:
"""
Representation of input table specification.
"""
_JInputTableSpec = jpy.get_type("io.deephaven.enterprise.inputtables.InputTableSpec")
@staticmethod
def _add_column_specs_and_build(j_input_table_spec_builder: jpy.JType, column_specs: list[ColumnSpec]) -> jpy.JType:
"""
Adds column specs to an input table spec builder, builds it, and returns the built object.
:param column_specs: ColumnSpecs to add to the input table spec
:return: the input table spec
"""
for column_spec in column_specs:
j_input_table_spec_builder.addColumnSpec(column_spec.j_column_spec)
return j_input_table_spec_builder.build()
[docs]
@staticmethod
def get_column_specs_from_j_input_table_spec(j_input_table_spec: jpy.JType) -> dict[str, ColumnSpec]:
"""
Extracts Java column specs from Java input table specs and returns them as Python ColumnSpecs.
:param j_input_table_spec: Java input table spec to extract column specs from
:return: Python ColumnSpecs from the Java input table spec
"""
j_column_specs = []
j_it = j_input_table_spec.columnSpecs().values().iterator()
while j_it.hasNext():
j_column_specs.append(j_it.next())
column_specs = {}
for j_column_spec in j_column_specs:
enum_values = _get_enum_values_from_j_column_spec(j_column_spec)
if j_column_spec.enumValues().isEmpty():
column_specs[j_column_spec.name()] = ColumnSpec(j_column_spec.name(), _get_dtype_from_jclass_name(j_column_spec.dataType().getName()), j_column_spec.isKey())
else:
column_specs[j_column_spec.name()] = ColumnSpec(j_column_spec.name(), _get_dtype_from_jclass_name(j_column_spec.dataType().getName()), j_column_spec.isKey(), enum_values)
return column_specs
def __init__(self, column_specs: list[ColumnSpec]) -> None:
"""
Creates an InputTableSpec with the provided ColumnSpecs.
:param column_specs: the column specs to create the InputTableSpec
"""
self.j_input_table_spec = self._add_column_specs_and_build(self._JInputTableSpec.builder(), column_specs)
[docs]
def column_specs(self) -> dict[str, ColumnSpec]:
"""
Gets the ColumnSpecs for this input table.
:return: the ColumnSpecs for this input table
"""
return self.get_column_specs_from_j_input_table_spec(self.j_input_table_spec)
[docs]
def add_column_specs(self, column_specs: list[ColumnSpec]) -> "InputTableSpec":
"""
Returns an InputTableSpec that is equal to this one, except with the specified additional ColumnSpecs.
:param column_specs: specified ColumnSpecs to add to the InputTableSpec
:return: an InputTableSpec that is equal to this one, except with the specified additional ColumnSpecs
"""
return InputTableSpec(list(self.get_column_specs_from_j_input_table_spec(self._add_column_specs_and_build(self.j_input_table_spec.asBuilder(), column_specs)).values()))
[docs]
def remove_column_specs(self, column_names: list[str]) -> "InputTableSpec":
"""
Returns an InputTableSpec that is equal to this one, except without the specified columns.
:param column_names: specified columns to exclude from the returned InputTableSpec
:return: an InputTableSpec that is equal to this one, except without the specified columns
"""
return InputTableSpec(list(self.get_column_specs_from_j_input_table_spec(self._add_column_specs_and_build(self.j_input_table_spec.removeColumnSpecs(column_names), [])).values()))