"""
The pivot_table module allows you to create a :class:`PivotTable` using :meth:`make_pivot`.
"""
import jpy
from deephaven import DHError
from deephaven.table import Table
from deephaven.table import Aggregation, SortDirection, _sort_column
from deephaven.filters import Filter, and_
from deephaven._wrapper import JObjectWrapper
from typing import Union, Sequence
from deephaven.jcompat import to_sequence, j_array_list
_JPivotTable = jpy.get_type("io.deephaven.pivot.PivotTable")
_JPivotServicePlugin = jpy.get_type("io.deephaven.pivot.PivotServicePlugin")
_JColumnName = jpy.get_type("io.deephaven.api.ColumnName")
[docs]
class PivotTable(JObjectWrapper):
"""
A PivotTable aggregates data in two dimensions. The row by-columns form rows of the table,
with each column producing a hierarchical set of rows. Similarly, the column by-columns
form columns of the table, forming a hierarchical set of columns.
"""
j_object_type = _JPivotTable
@property
def j_object(self) -> jpy.JType:
return self._j_pivot
def __init__(self, j_pivot: _JPivotTable):
self._j_pivot = j_pivot
[docs]
def filter(
self, filters: Union[str, Filter, Sequence[Union[str, Filter]]]
) -> "PivotTable":
"""
Filter this PivotTable and produce a new PivotTable.
Args:
filters (Union[str, Filter, Sequence[str], Sequence[Filter]]): the filter to be applied to the PivotTable)
Returns:
a new PivotTable
"""
j_filter = and_(filters).j_filter
try:
j_result = self._j_pivot.apply(
j_array_list(), j_array_list(), j_array_list([j_filter])
)
except Exception as e:
raise DHError(e, "Failed to filter pivot") from e
return PivotTable(j_result)
[docs]
def sort_rows(
self,
order_by: Union[str, Sequence[str]],
order: Union[SortDirection, Sequence[SortDirection]] = None,
) -> "PivotTable":
"""
Sort the rows of this table and produce a new PivotTable.
Args:
order_by (Union[str, Sequence[str]]): the column(s) to be sorted on
order (Union[SortDirection, Sequence[SortDirection], optional): the corresponding sort directions for
each sort column, default is None, meaning ascending order for all the sort columns.
Returns:
a new PivotTable
"""
try:
j_result = self._j_pivot.apply(
self._make_sort_columns(order, order_by), j_array_list(), j_array_list()
)
except Exception as e:
raise DHError(e, "Failed to sort pivot rows") from e
return PivotTable(j_result)
[docs]
def sort_cols(
self,
order_by: Union[str, Sequence[str]],
order: Union[SortDirection, Sequence[SortDirection]] = None,
) -> "PivotTable":
"""
Sort the columns of this table and produce a new PivotTable.
Args:
order_by (Union[str, Sequence[str]]): the column(s) to be sorted on
order (Union[SortDirection, Sequence[SortDirection], optional): the corresponding sort directions for
each sort column, default is None, meaning ascending order for all the sort columns.
Returns:
a new PivotTable
"""
try:
j_result = self._j_pivot.apply(
j_array_list(), self._make_sort_columns(order, order_by), j_array_list()
)
except Exception as e:
raise DHError(e, "Failed to sort pivot columns") from e
return PivotTable(j_result)
def _make_sort_columns(self, order, order_by):
order_by = to_sequence(order_by)
if not order:
order = (SortDirection.ASCENDING,) * len(order_by)
else:
order = to_sequence(order)
if any(
[
o not in (SortDirection.ASCENDING, SortDirection.DESCENDING)
for o in order
]
):
raise DHError(
message="The sort direction must be either 'ASCENDING' or 'DESCENDING'."
)
if len(order_by) != len(order):
raise DHError(
message="The number of sort columns must be the same as the number of sort directions."
)
sort_columns = [_sort_column(col, dir_) for col, dir_ in zip(order_by, order)]
j_sc_list = j_array_list(sort_columns)
return j_sc_list
[docs]
def make_pivot(
source: Table,
aggregations: list[Aggregation],
row_by_columns: list[str],
column_by_columns: list[str],
) -> PivotTable:
"""
Create a new PivotTable.
Args:
source (Table): The table to aggregate.
aggregations (list[Aggregation]): The list of aggregations to perform.
row_by_columns (list[str]): The columns that serve as key columns for the row aggregations. Each unique value
(and unique value prefix) creates another row in the output grid.
column_by_columns (list[str]): The columns that serve as key columns for the column aggregations. Each unique value
(and unique value prefix) creates another (logical) column in the output grid.
Returns:
PivotTable: A new PivotTable instance.
"""
j_row_by_cols = j_array_list([_JColumnName.of(col) for col in row_by_columns])
j_col_by_cols = j_array_list([_JColumnName.of(col) for col in column_by_columns])
j_aggs = j_array_list([aa.j_aggregation for aa in aggregations])
try:
j_result = _JPivotTable.make(
source.j_table, j_aggs, j_row_by_cols, j_col_by_cols
)
except Exception as e:
raise DHError(e, "Failed to create new PivotTable") from e
return PivotTable(j_result)
[docs]
class PivotServicePlugin(JObjectWrapper):
"""
A Python Wrapper for the PivotServicePlugin.
It is not intended for the worker to interact with this object directly, but rather for it to be made available as
a session variable for plugin clients to create new pivots.
"""
j_object_type = _JPivotServicePlugin
@property
def j_object(self) -> jpy.JType:
return self._j_psp
def __init__(self):
self._j_psp = _JPivotServicePlugin()