Source code for deephaven_enterprise.input_tables

#
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#
"""This module supports the creation of input tables in Deephaven by providing classes for defining columns and input
table specifications.
"""

from __future__ import annotations

from typing import Any, Optional

import jpy
from deephaven import DHError, dtypes
from deephaven.dtypes import DType, _j_name_type_map

_JInputTableSpec = jpy.get_type("io.deephaven.enterprise.inputtables.InputTableSpec")
_JColumnSpec = jpy.get_type("io.deephaven.enterprise.inputtables.ColumnSpec")


def _get_dtype_from_jclass_name(jclass_name: str) -> DType:
    return _j_name_type_map[jclass_name]


def _get_enum_values_from_j_column_spec(j_column_spec: jpy.JType) -> list[Any]:
    enum_values = []
    j_it = j_column_spec.enumValues().iterator()
    while j_it.hasNext():
        enum_value = j_it.next()
        if (
            _get_dtype_from_jclass_name(j_column_spec.dataType().getName())
            is dtypes.char
        ):
            enum_value = chr(enum_value)
        enum_values.append(enum_value)
    return enum_values


[docs] class ColumnSpec: """A ColumnSpec defines a column of an input table.""" # DType.j_type.jclass is insufficient b/c it's unassigned for Java primitive types. _primitive_dtype_to_jclass_map = { dtypes.char: jpy.get_type("java.lang.Character").TYPE, dtypes.float32: jpy.get_type("java.lang.Float").TYPE, dtypes.float64: jpy.get_type("java.lang.Double").TYPE, dtypes.byte: jpy.get_type("java.lang.Byte").TYPE, dtypes.short: jpy.get_type("java.lang.Short").TYPE, dtypes.int32: jpy.get_type("java.lang.Integer").TYPE, dtypes.long: jpy.get_type("java.lang.Long").TYPE, } def __init__( self, name: str, data_type: DType, is_key: bool, enum_values: Optional[list[Any]] = None, ) -> None: """Creates a ColumnSpec with the provided name, data type, and key status. Args: name (str): the name of the column data_type (DType): the data type of the column is_key (bool): whether the column is a keyed column enum_values (list[Any]): the enum values for the column, defaults to None Raises: DHError """ try: j_column_spec_builder = ( _JColumnSpec.builder() .name(name) .dataType( self._primitive_dtype_to_jclass_map[data_type] if data_type in self._primitive_dtype_to_jclass_map else data_type.j_type.jclass # type: ignore[union-attr] ) .isKey(is_key) ) except Exception as e: raise DHError( e, message="Unsupported data type for column: " + data_type.j_name ) from e if enum_values: # TODO: Remove char logic if jpy.convert supports single-character string conversions into chars. if data_type is dtypes.char: char_enum_values = [] for enum_value in enum_values: if type(enum_value) is not str or len(enum_value) != 1: raise DHError( message="Enum value inputs for character columns must be a single character string." ) char_enum_values.append(ord(enum_value)) enum_values = char_enum_values enum_values = [ jpy.convert(enum_value, data_type.j_name) for enum_value in enum_values ] j_column_spec_builder.addEnumValues(enum_values) self.j_column_spec = j_column_spec_builder.build()
[docs] def name(self) -> str: """Returns the name of the column. Returns: str """ return self.j_column_spec.name()
[docs] def data_type(self) -> DType: """Returns the DType of the column. Returns: DType: the DType of the column """ return _get_dtype_from_jclass_name(self.j_column_spec.dataType().getName())
[docs] def is_key(self) -> bool: """Whether the column is a keyed column. Returns: bool """ return self.j_column_spec.isKey()
[docs] def enum_values(self) -> list[Any]: """Gets the enum values for the column. Returns: list[Any] """ return _get_enum_values_from_j_column_spec(self.j_column_spec)
[docs] class InputTableSpec: """A specification for input tables that can be passed into :meth:`.database.Database.add_input_table_schema` to add an input table schema to the database. See https://deephaven.io/enterprise/docs/coreplus/user-tables/#input-tables-1 for more information. """ @staticmethod def _add_column_specs_and_build( j_input_table_spec_builder: jpy.JType, column_specs: list[ColumnSpec] ) -> jpy.JType: """Adds column specs to an input table spec builder, builds it, and returns the built object. Args: j_input_table_spec_builder (jpy.JType): the input table spec builder to add column specs to column_specs (list[ColumnSpec]): the column specs to add to the input table spec Returns: jpy.JType: the raw Java input table spec """ for column_spec in column_specs: j_input_table_spec_builder.addColumnSpec(column_spec.j_column_spec) return j_input_table_spec_builder.build()
[docs] @staticmethod def get_column_specs_from_j_input_table_spec( j_input_table_spec: jpy.JType, ) -> dict[str, ColumnSpec]: """Extracts Java column specs from Java input table specs and returns them as Python ColumnSpecs. Args: j_input_table_spec (jpy.JType): Java input table spec to extract column specs from Returns: dict[str, ColumnSpec]: Python ColumnSpecs from the Java input table spec """ j_column_specs = [] j_it = j_input_table_spec.columnSpecs().values().iterator() while j_it.hasNext(): j_column_specs.append(j_it.next()) column_specs = {} for j_column_spec in j_column_specs: enum_values = _get_enum_values_from_j_column_spec(j_column_spec) if j_column_spec.enumValues().isEmpty(): column_specs[j_column_spec.name()] = ColumnSpec( j_column_spec.name(), _get_dtype_from_jclass_name(j_column_spec.dataType().getName()), j_column_spec.isKey(), ) else: column_specs[j_column_spec.name()] = ColumnSpec( j_column_spec.name(), _get_dtype_from_jclass_name(j_column_spec.dataType().getName()), j_column_spec.isKey(), enum_values, ) return column_specs
def __init__(self, column_specs: list[ColumnSpec]) -> None: """Creates an InputTableSpec with the provided ColumnSpecs. Args: column_specs (list[ColumnSpec]): the column specs to create the InputTableSpec """ self.j_input_table_spec = self._add_column_specs_and_build( _JInputTableSpec.builder(), column_specs )
[docs] def column_specs(self) -> dict[str, ColumnSpec]: """Returns the ColumnSpecs for this input table. Returns: dict[str, ColumnSpec]: the ColumnSpecs for this input table """ return self.get_column_specs_from_j_input_table_spec(self.j_input_table_spec)
[docs] def add_column_specs(self, column_specs: list[ColumnSpec]) -> InputTableSpec: """Returns a new InputTableSpec that is equal to this one, except with the specified additional ColumnSpecs. Args: column_specs (list[ColumnSpec]): specified ColumnSpecs to add to the InputTableSpec Returns: a new InputTableSpec """ return InputTableSpec( list( self.get_column_specs_from_j_input_table_spec( self._add_column_specs_and_build( self.j_input_table_spec.asBuilder(), column_specs ) ).values() ) )
[docs] def remove_column_specs(self, column_names: list[str]) -> InputTableSpec: """Returns an InputTableSpec that is equal to this one, except without the specified columns. Args: column_names (list[str]): specified columns to exclude from the returned InputTableSpec Returns: a new InputTableSpec """ return InputTableSpec( list( self.get_column_specs_from_j_input_table_spec( self._add_column_specs_and_build( self.j_input_table_spec.removeColumnSpecs(column_names), [] ) ).values() ) )