Source code for deephaven_enterprise.input_tables

#  Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending

from deephaven import dtypes, DHError
from deephaven.dtypes import _j_name_type_map, DType
from typing import Any
import jpy


def _get_dtype_from_jclass_name(jclass_name: str) -> DType:
    return _j_name_type_map[jclass_name]


def _get_enum_values_from_j_column_spec(j_column_spec: jpy.JType) -> list[Any]:
    enum_values = []
    j_it = j_column_spec.enumValues().iterator()
    while j_it.hasNext():
        enum_value = j_it.next()
        if _get_dtype_from_jclass_name(j_column_spec.dataType().getName()) is dtypes.char:
            enum_value = chr(enum_value)
        enum_values.append(enum_value)
    return enum_values


[docs] class ColumnSpec: """ Representation of input table column specification. """ _JColumnSpec = jpy.get_type("io.deephaven.enterprise.inputtables.ColumnSpec") # DType.j_type.jclass is insufficient b/c it's unassigned for Java primitive types. _primitive_dtype_to_jclass_map = { dtypes.char: jpy.get_type("java.lang.Character").TYPE, dtypes.float32: jpy.get_type("java.lang.Float").TYPE, dtypes.float64: jpy.get_type("java.lang.Double").TYPE, dtypes.byte: jpy.get_type("java.lang.Byte").TYPE, dtypes.short: jpy.get_type("java.lang.Short").TYPE, dtypes.int32: jpy.get_type("java.lang.Integer").TYPE, dtypes.long: jpy.get_type("java.lang.Long").TYPE } def __init__(self, name: str, data_type: DType, is_key: bool, enum_values: list[Any] = None) -> None: j_column_spec_builder = self._JColumnSpec.builder() \ .name(name) \ .setDataType(self._primitive_dtype_to_jclass_map[data_type] if data_type in self._primitive_dtype_to_jclass_map else data_type.j_type.jclass)\ .isKey(is_key) if enum_values: # TODO: Remove char logic if jpy.convert supports single-character string conversions into chars. if data_type is dtypes.char: char_enum_values = [] for enum_value in enum_values: if type(enum_value) is not str or len(enum_value) != 1: raise DHError("Enum value inputs for character columns must be a single character string.") char_enum_values.append(ord(enum_value)) enum_values = char_enum_values enum_values = [jpy.convert(enum_value, data_type.j_name) for enum_value in enum_values] j_column_spec_builder.addToEnumValues(enum_values) self.j_column_spec = j_column_spec_builder.build()
[docs] def name(self) -> str: """ Gets the name of the column. :return: the name of the column """ return self.j_column_spec.name()
[docs] def data_type(self) -> DType: """ Gets the DType of the column. :return: the DType of the column """ return _get_dtype_from_jclass_name(self.j_column_spec.dataType().getName())
[docs] def is_key(self) -> bool: """ Gets whether the column is a keyed column. :return: whether the column is a keyed column """ return self.j_column_spec.isKey()
[docs] def enum_values(self) -> list[Any]: """ Gets the enum values for the column. :return: the enum values for the column """ return _get_enum_values_from_j_column_spec(self.j_column_spec)
[docs] class InputTableSpec: """ Representation of input table specification. """ _JInputTableSpec = jpy.get_type("io.deephaven.enterprise.inputtables.InputTableSpec") @staticmethod def _add_column_specs_and_build(j_input_table_spec_builder: jpy.JType, column_specs: list[ColumnSpec]) -> jpy.JType: """ Adds column specs to an input table spec builder, builds it, and returns the built object. :param column_specs: ColumnSpecs to add to the input table spec :return: the input table spec """ for column_spec in column_specs: j_input_table_spec_builder.addColumnSpec(column_spec.j_column_spec) return j_input_table_spec_builder.build()
[docs] @staticmethod def get_column_specs_from_j_input_table_spec(j_input_table_spec: jpy.JType) -> dict[str, ColumnSpec]: """ Extracts Java column specs from Java input table specs and returns them as Python ColumnSpecs. :param j_input_table_spec: Java input table spec to extract column specs from :return: Python ColumnSpecs from the Java input table spec """ j_column_specs = [] j_it = j_input_table_spec.columnSpecs().values().iterator() while j_it.hasNext(): j_column_specs.append(j_it.next()) column_specs = {} for j_column_spec in j_column_specs: enum_values = _get_enum_values_from_j_column_spec(j_column_spec) if j_column_spec.enumValues().isEmpty(): column_specs[j_column_spec.name()] = ColumnSpec(j_column_spec.name(), _get_dtype_from_jclass_name(j_column_spec.dataType().getName()), j_column_spec.isKey()) else: column_specs[j_column_spec.name()] = ColumnSpec(j_column_spec.name(), _get_dtype_from_jclass_name(j_column_spec.dataType().getName()), j_column_spec.isKey(), enum_values) return column_specs
def __init__(self, column_specs: list[ColumnSpec]) -> None: """ Creates an InputTableSpec with the provided ColumnSpecs. :param column_specs: the column specs to create the InputTableSpec """ self.j_input_table_spec = self._add_column_specs_and_build(self._JInputTableSpec.builder(), column_specs)
[docs] def column_specs(self) -> dict[str, ColumnSpec]: """ Gets the ColumnSpecs for this input table. :return: the ColumnSpecs for this input table """ return self.get_column_specs_from_j_input_table_spec(self.j_input_table_spec)
[docs] def add_column_specs(self, column_specs: list[ColumnSpec]) -> "InputTableSpec": """ Returns an InputTableSpec that is equal to this one, except with the specified additional ColumnSpecs. :param column_specs: specified ColumnSpecs to add to the InputTableSpec :return: an InputTableSpec that is equal to this one, except with the specified additional ColumnSpecs """ return InputTableSpec(list(self.get_column_specs_from_j_input_table_spec(self._add_column_specs_and_build(self.j_input_table_spec.asBuilder(), column_specs)).values()))
[docs] def remove_column_specs(self, column_names: list[str]) -> "InputTableSpec": """ Returns an InputTableSpec that is equal to this one, except without the specified columns. :param column_names: specified columns to exclude from the returned InputTableSpec :return: an InputTableSpec that is equal to this one, except without the specified columns """ return InputTableSpec(list(self.get_column_specs_from_j_input_table_spec(self._add_column_specs_and_build(self.j_input_table_spec.removeColumnSpecs(column_names), [])).values()))