Source code for deephaven_enterprise.pivot.pivot_table

"""
The pivot_table module allows you to create a :class:`PivotTable` using :meth:`make_pivot`.
"""

import jpy
from deephaven import DHError
from deephaven.table import Table
from deephaven.table import Aggregation, SortDirection, _sort_column
from deephaven.filters import Filter, and_
from deephaven._wrapper import JObjectWrapper

from typing import Union, Sequence

from deephaven.jcompat import to_sequence, j_array_list

_JPivotTable = jpy.get_type("io.deephaven.pivot.PivotTable")
_JPivotServicePlugin = jpy.get_type("io.deephaven.pivot.PivotServicePlugin")
_JColumnName = jpy.get_type("io.deephaven.api.ColumnName")


[docs] class PivotTable(JObjectWrapper): """ A PivotTable aggregates data in two dimensions. The row by-columns form rows of the table, with each column producing a hierarchical set of rows. Similarly, the column by-columns form columns of the table, forming a hierarchical set of columns. """ j_object_type = _JPivotTable @property def j_object(self) -> jpy.JType: return self._j_pivot def __init__(self, j_pivot: _JPivotTable): self._j_pivot = j_pivot
[docs] def filter( self, filters: Union[str, Filter, Sequence[Union[str, Filter]]] ) -> "PivotTable": """ Filter this PivotTable and produce a new PivotTable. Args: filters (Union[str, Filter, Sequence[str], Sequence[Filter]]): the filter to be applied to the PivotTable) Returns: a new PivotTable """ j_filter = and_(filters).j_filter try: j_result = self._j_pivot.apply( j_array_list(), j_array_list(), j_array_list([j_filter]) ) except Exception as e: raise DHError(e, "Failed to filter pivot") from e return PivotTable(j_result)
[docs] def sort_rows( self, order_by: Union[str, Sequence[str]], order: Union[SortDirection, Sequence[SortDirection]] = None, ) -> "PivotTable": """ Sort the rows of this table and produce a new PivotTable. Args: order_by (Union[str, Sequence[str]]): the column(s) to be sorted on order (Union[SortDirection, Sequence[SortDirection], optional): the corresponding sort directions for each sort column, default is None, meaning ascending order for all the sort columns. Returns: a new PivotTable """ try: j_result = self._j_pivot.apply( self._make_sort_columns(order, order_by), j_array_list(), j_array_list() ) except Exception as e: raise DHError(e, "Failed to sort pivot rows") from e return PivotTable(j_result)
[docs] def sort_cols( self, order_by: Union[str, Sequence[str]], order: Union[SortDirection, Sequence[SortDirection]] = None, ) -> "PivotTable": """ Sort the columns of this table and produce a new PivotTable. Args: order_by (Union[str, Sequence[str]]): the column(s) to be sorted on order (Union[SortDirection, Sequence[SortDirection], optional): the corresponding sort directions for each sort column, default is None, meaning ascending order for all the sort columns. Returns: a new PivotTable """ try: j_result = self._j_pivot.apply( j_array_list(), self._make_sort_columns(order, order_by), j_array_list() ) except Exception as e: raise DHError(e, "Failed to sort pivot columns") from e return PivotTable(j_result)
def _make_sort_columns(self, order, order_by): order_by = to_sequence(order_by) if not order: order = (SortDirection.ASCENDING,) * len(order_by) else: order = to_sequence(order) if any( [ o not in (SortDirection.ASCENDING, SortDirection.DESCENDING) for o in order ] ): raise DHError( message="The sort direction must be either 'ASCENDING' or 'DESCENDING'." ) if len(order_by) != len(order): raise DHError( message="The number of sort columns must be the same as the number of sort directions." ) sort_columns = [_sort_column(col, dir_) for col, dir_ in zip(order_by, order)] j_sc_list = j_array_list(sort_columns) return j_sc_list
[docs] def make_pivot( source: Table, aggregations: list[Aggregation], row_by_columns: list[str], column_by_columns: list[str], ) -> PivotTable: """ Create a new PivotTable. Args: source (Table): The table to aggregate. aggregations (list[Aggregation]): The list of aggregations to perform. row_by_columns (list[str]): The columns that serve as key columns for the row aggregations. Each unique value (and unique value prefix) creates another row in the output grid. column_by_columns (list[str]): The columns that serve as key columns for the column aggregations. Each unique value (and unique value prefix) creates another (logical) column in the output grid. Returns: PivotTable: A new PivotTable instance. """ j_row_by_cols = j_array_list([_JColumnName.of(col) for col in row_by_columns]) j_col_by_cols = j_array_list([_JColumnName.of(col) for col in column_by_columns]) j_aggs = j_array_list([aa.j_aggregation for aa in aggregations]) try: j_result = _JPivotTable.make( source.j_table, j_aggs, j_row_by_cols, j_col_by_cols ) except Exception as e: raise DHError(e, "Failed to create new PivotTable") from e return PivotTable(j_result)
[docs] class PivotServicePlugin(JObjectWrapper): """ A Python Wrapper for the PivotServicePlugin. It is not intended for the worker to interact with this object directly, but rather for it to be made available as a session variable for plugin clients to create new pivots. """ j_object_type = _JPivotServicePlugin @property def j_object(self) -> jpy.JType: return self._j_psp def __init__(self): self._j_psp = _JPivotServicePlugin()