#
# Copyright (c) 2016-2025 Deephaven Data Labs and Patent Pending
#
from __future__ import annotations
from abc import ABC, abstractmethod
from enum import Enum
from typing import TYPE_CHECKING, Any, Optional, Union
import pyarrow as pa
from deephaven_core.proto import table_pb2, table_pb2_grpc, ticket_pb2
from pydeephaven._arrow import map_arrow_type
from pydeephaven.agg import Aggregation
from pydeephaven.filters import Filter
from pydeephaven.updateby import UpdateByOperation
if TYPE_CHECKING:
from pydeephaven.table import MultiJoinInput, Table
[docs]class SortDirection(Enum):
"""An enum defining the sort ordering."""
DESCENDING = table_pb2.SortDescriptor.SortDirection.DESCENDING
"""Descending sort direction"""
ASCENDING = table_pb2.SortDescriptor.SortDirection.ASCENDING
"""Ascending sort direction"""
[docs]class NaturalJoinType(Enum):
"""An Enum defining ways to handle duplicate right hand table values during natural join operations"""
ERROR_ON_DUPLICATE = table_pb2.NaturalJoinTablesRequest.JoinType.ERROR_ON_DUPLICATE
"""Throw an error if a duplicate right hand table row is found. This is the default behavior if not specified"""
FIRST_MATCH = table_pb2.NaturalJoinTablesRequest.JoinType.FIRST_MATCH
"""Match the first right hand table row and ignore later duplicates"""
LAST_MATCH = table_pb2.NaturalJoinTablesRequest.JoinType.LAST_MATCH
"""Match the last right hand table row and ignore earlier duplicates"""
EXACTLY_ONE_MATCH = table_pb2.NaturalJoinTablesRequest.JoinType.EXACTLY_ONE_MATCH
"""Match exactly one right hand table row; throw an error if there are zero or more than one matches"""
class TableOp(ABC):
@classmethod
@abstractmethod
def get_stub_func(
cls, table_service_stub: table_pb2_grpc.TableServiceStub
) -> Any: ...
@abstractmethod
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any: ...
@abstractmethod
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any: ...
class NoneOp(TableOp):
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
raise AssertionError("should never be called.")
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
raise AssertionError("should never be called.")
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
raise AssertionError("should never be called.")
def __init__(self, table: Table):
self.table = table
class TimeTableOp(TableOp):
start_time: Union[int, str]
def __init__(
self,
start_time: Optional[Union[int, str]],
period: Union[int, str],
blink_table: bool = False,
):
if start_time is None:
# Force this to zero to trigger `now()` behavior.
self.start_time = 0
else:
self.start_time = start_time
self.period = period
self.blink_table = blink_table
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.TimeTable
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
return table_pb2.TimeTableRequest(
result_id=result_id,
start_time_nanos=(
self.start_time if not isinstance(self.start_time, str) else None # type: ignore[arg-type]
),
start_time_string=(
self.start_time if isinstance(self.start_time, str) else None # type: ignore[arg-type]
),
period_nanos=self.period if not isinstance(self.period, str) else None, # type: ignore[arg-type]
period_string=self.period if isinstance(self.period, str) else None, # type: ignore[arg-type]
blink_table=self.blink_table,
)
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
time_table=self.make_grpc_request(result_id=result_id, source_id=source_id)
)
class EmptyTableOp(TableOp):
def __init__(self, size: int):
self.size = size
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.EmptyTable
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
return table_pb2.EmptyTableRequest(result_id=result_id, size=self.size)
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
empty_table=self.make_grpc_request(result_id=result_id, source_id=source_id)
)
class DropColumnsOp(TableOp):
def __init__(self, column_names: list[str]):
self.column_names = column_names
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.DropColumns
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
return table_pb2.DropColumnsRequest(
result_id=result_id, source_id=source_id, column_names=self.column_names
)
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
drop_columns=self.make_grpc_request(
result_id=result_id, source_id=source_id
)
)
class USVOp(TableOp):
column_specs: list[str]
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
return table_pb2.SelectOrUpdateRequest(
result_id=result_id, source_id=source_id, column_specs=self.column_specs
)
class UpdateOp(USVOp):
def __init__(self, column_specs: list[str]):
self.column_specs = column_specs
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.Update
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
update=self.make_grpc_request(result_id=result_id, source_id=source_id)
)
class LazyUpdateOp(USVOp):
def __init__(self, column_specs: list[str]):
self.column_specs = column_specs
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.LazyUpdate
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
lazy_update=self.make_grpc_request(result_id=result_id, source_id=source_id)
)
class ViewOp(USVOp):
def __init__(self, column_specs: list[str]):
self.column_specs = column_specs
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.View
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
view=self.make_grpc_request(result_id=result_id, source_id=source_id)
)
class UpdateViewOp(USVOp):
def __init__(self, column_specs: list[str]):
self.column_specs = column_specs
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.UpdateView
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
update_view=self.make_grpc_request(result_id=result_id, source_id=source_id)
)
class SelectOp(USVOp):
def __init__(self, column_specs: list[str]):
self.column_specs = column_specs
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.Select
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
select=self.make_grpc_request(result_id=result_id, source_id=source_id)
)
class SelectDistinctOp(TableOp):
def __init__(self, column_names: list[str]):
self.column_names = column_names
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.SelectDistinct
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
return table_pb2.SelectDistinctRequest(
result_id=result_id, source_id=source_id, column_names=self.column_names
)
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
select_distinct=self.make_grpc_request(
result_id=result_id, source_id=source_id
)
)
class UnstructuredFilterOp(TableOp):
def __init__(self, filters: list[str]):
self.filters = filters
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.UnstructuredFilter
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
return table_pb2.UnstructuredFilterTableRequest(
result_id=result_id, source_id=source_id, filters=self.filters
)
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
unstructured_filter=self.make_grpc_request(
result_id=result_id, source_id=source_id
)
)
class FilterOp(TableOp):
def __init__(self, filters: list[Filter]):
self.filters = filters
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.Filter
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
return table_pb2.FilterTableRequest(
result_id=result_id,
source_id=source_id,
filters=[f.make_grpc_message() for f in self.filters],
)
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
filter=self.make_grpc_request(result_id=result_id, source_id=source_id)
)
class SortOp(TableOp):
def __init__(self, column_names: list[str], directions: list[SortDirection]):
self.column_names = column_names
self.directions = directions
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.Sort
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
from itertools import zip_longest
sort_specs = zip_longest(self.column_names, self.directions)
sort_descriptors = []
for sp in sort_specs:
if not sp[0]:
break
direction = sp[1] if sp[1] else SortDirection.ASCENDING
sort_descriptor = table_pb2.SortDescriptor(
column_name=sp[0], direction=direction.value
)
sort_descriptors.append(sort_descriptor)
return table_pb2.SortTableRequest(
result_id=result_id, source_id=source_id, sorts=sort_descriptors
)
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
sort=self.make_grpc_request(result_id=result_id, source_id=source_id)
)
class HeadOrTailOp(TableOp):
num_rows: int
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
return table_pb2.HeadOrTailRequest(
result_id=result_id, source_id=source_id, num_rows=self.num_rows
)
class HeadOp(HeadOrTailOp):
def __init__(self, num_rows: int):
self.num_rows = num_rows
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.Head
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
head=self.make_grpc_request(result_id=result_id, source_id=source_id)
)
class TailOp(HeadOrTailOp):
def __init__(self, num_rows: int):
self.num_rows = num_rows
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.Tail
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
tail=self.make_grpc_request(result_id=result_id, source_id=source_id)
)
class HeadOrTailByOp(TableOp):
num_rows: int
column_names: list[str]
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
return table_pb2.HeadOrTailByRequest(
result_id=result_id,
source_id=source_id,
num_rows=self.num_rows,
group_by_column_specs=self.column_names,
)
class HeadByOp(HeadOrTailByOp):
def __init__(self, num_rows: int, column_names: list[str]):
self.num_rows = num_rows
self.column_names = column_names
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.HeadBy
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
head_by=self.make_grpc_request(result_id=result_id, source_id=source_id)
)
class TailByOp(HeadOrTailByOp):
def __init__(self, num_rows: int, column_names: list[str]):
self.num_rows = num_rows
self.column_names = column_names
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.TailBy
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
tail_by=self.make_grpc_request(result_id=result_id, source_id=source_id)
)
class UngroupOp(TableOp):
def __init__(self, column_names: list[str], null_fill: bool = True):
self.column_names = column_names
self.null_fill = null_fill
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.Ungroup
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
return table_pb2.UngroupRequest(
result_id=result_id,
source_id=source_id,
null_fill=self.null_fill,
columns_to_ungroup=self.column_names,
)
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
ungroup=self.make_grpc_request(result_id=result_id, source_id=source_id)
)
class MergeTablesOp(TableOp):
def __init__(self, tables: list[Table], key_column: str = ""):
self.tables = tables
self.key_column = key_column
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.MergeTables
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
table_references = []
for tbl in self.tables:
table_references.append(table_pb2.TableReference(ticket=tbl.pb_ticket))
return table_pb2.MergeTablesRequest(
result_id=result_id, source_ids=table_references, key_column=self.key_column
)
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
merge=self.make_grpc_request(result_id=result_id, source_id=source_id)
)
class NaturalJoinOp(TableOp):
def __init__(
self,
table: Table,
keys: list[str],
columns_to_add: list[str],
type: NaturalJoinType,
):
self.table = table
self.keys = keys
self.columns_to_add = columns_to_add
self.type = type
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.NaturalJoinTables
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
left_id = source_id
right_id = table_pb2.TableReference(ticket=self.table.pb_ticket)
return table_pb2.NaturalJoinTablesRequest(
result_id=result_id,
left_id=left_id,
right_id=right_id,
columns_to_match=self.keys,
columns_to_add=self.columns_to_add,
join_type=self.type.value,
)
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
natural_join=self.make_grpc_request(
result_id=result_id, source_id=source_id
)
)
class ExactJoinOp(TableOp):
def __init__(self, table: Table, keys: list[str], columns_to_add: list[str]):
self.table = table
self.keys = keys
self.columns_to_add = columns_to_add
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.ExactJoinTables
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
left_id = source_id
right_id = table_pb2.TableReference(ticket=self.table.pb_ticket)
return table_pb2.ExactJoinTablesRequest(
result_id=result_id,
left_id=left_id,
right_id=right_id,
columns_to_match=self.keys,
columns_to_add=self.columns_to_add,
)
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
exact_join=self.make_grpc_request(result_id=result_id, source_id=source_id)
)
class CrossJoinOp(TableOp):
def __init__(
self,
table: Table,
keys: list[str] = [],
columns_to_add: list[str] = [],
reserve_bits: int = 10,
):
self.table = table
self.keys = keys
self.columns_to_add = columns_to_add
self.reserve_bits = reserve_bits
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.CrossJoinTables
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
left_id = source_id
right_id = table_pb2.TableReference(ticket=self.table.pb_ticket)
return table_pb2.CrossJoinTablesRequest(
result_id=result_id,
left_id=left_id,
right_id=right_id,
columns_to_match=self.keys,
columns_to_add=self.columns_to_add,
reserve_bits=self.reserve_bits,
)
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
cross_join=self.make_grpc_request(result_id=result_id, source_id=source_id)
)
class AjOp(TableOp):
def __init__(
self, table: Table, keys: list[str] = [], columns_to_add: list[str] = []
):
self.table = table
self.keys = keys
self.columns_to_add = columns_to_add
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.AjTables
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
left_id = source_id
right_id = table_pb2.TableReference(ticket=self.table.pb_ticket)
return table_pb2.AjRajTablesRequest(
result_id=result_id,
left_id=left_id,
right_id=right_id,
exact_match_columns=self.keys[:-1],
as_of_column=self.keys[-1],
columns_to_add=self.columns_to_add,
)
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
aj=self.make_grpc_request(result_id=result_id, source_id=source_id)
)
class RajOp(TableOp):
def __init__(
self, table: Table, keys: list[str] = [], columns_to_add: list[str] = []
):
self.table = table
self.keys = keys
self.columns_to_add = columns_to_add
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.RajTables
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
left_id = source_id
right_id = table_pb2.TableReference(ticket=self.table.pb_ticket)
return table_pb2.AjRajTablesRequest(
result_id=result_id,
left_id=left_id,
right_id=right_id,
exact_match_columns=self.keys[:-1],
as_of_column=self.keys[-1],
columns_to_add=self.columns_to_add,
)
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
raj=self.make_grpc_request(result_id=result_id, source_id=source_id)
)
class FlattenOp(TableOp):
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.Flatten
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
return table_pb2.FlattenRequest(result_id=result_id, source_id=source_id)
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
flatten=self.make_grpc_request(result_id=result_id, source_id=source_id)
)
class FetchTableOp(TableOp):
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.FetchTable
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
return table_pb2.FetchTableRequest(result_id=result_id, source_id=source_id)
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
fetch_table=self.make_grpc_request(result_id=result_id, source_id=source_id)
)
class UpdateByOp(TableOp):
def __init__(
self, operations: list[UpdateByOperation], by: Optional[list[str]] = None
):
self.operations = operations
self.by = by
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.UpdateBy
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
operations = [op.make_grpc_message() for op in self.operations]
return table_pb2.UpdateByRequest(
result_id=result_id,
source_id=source_id,
operations=operations,
group_by_columns=self.by,
)
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
update_by=self.make_grpc_request(result_id=result_id, source_id=source_id)
)
class SnapshotTableOp(TableOp):
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.Snapshot
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
return table_pb2.SnapshotTableRequest(result_id=result_id, source_id=source_id)
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
snapshot=self.make_grpc_request(result_id=result_id, source_id=source_id)
)
class SnapshotWhenTableOp(TableOp):
def __init__(
self,
trigger_table: Table,
stamp_cols: Optional[list[str]] = None,
initial: bool = False,
incremental: bool = False,
history: bool = False,
):
self.trigger_table = trigger_table
self.stamp_cols = stamp_cols
self.initial = initial
self.incremental = incremental
self.history = history
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.SnapshotWhen
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
base_id = source_id
trigger_id = table_pb2.TableReference(ticket=self.trigger_table.pb_ticket)
return table_pb2.SnapshotWhenTableRequest(
result_id=result_id,
base_id=base_id,
trigger_id=trigger_id,
initial=self.initial,
incremental=self.incremental,
history=self.history,
stamp_columns=self.stamp_cols,
)
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
snapshot_when=self.make_grpc_request(
result_id=result_id, source_id=source_id
)
)
class AggregateOp(TableOp):
def __init__(self, aggs: list[Aggregation], by: list[str]):
self.aggs = aggs
self.by = by
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.Aggregate
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
aggregations = [agg.make_grpc_message() for agg in self.aggs]
return table_pb2.AggregateRequest(
result_id=result_id,
source_id=source_id,
aggregations=aggregations,
group_by_columns=self.by,
)
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
aggregate=self.make_grpc_request(result_id=result_id, source_id=source_id)
)
class AggregateAllOp(TableOp):
def __init__(self, agg: Aggregation, by: list[str]):
self.agg_spec = agg.agg_spec
self.by = by
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.AggregateAll
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
return table_pb2.AggregateAllRequest(
result_id=result_id,
source_id=source_id,
spec=self.agg_spec,
group_by_columns=self.by,
)
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
aggregate_all=self.make_grpc_request(
result_id=result_id, source_id=source_id
)
)
class CreateInputTableOp(TableOp):
def __init__(
self,
schema: Optional[pa.Schema] = None,
init_table: Optional[Table] = None,
key_cols: Optional[list[str]] = None,
blink: bool = False,
):
if blink and key_cols:
raise ValueError("key columns are not supported for blink input tables.")
self.schema = schema
self.init_table = init_table
self.key_cols = key_cols
self.blink = blink
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.CreateInputTable
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
if self.blink:
blink_ = table_pb2.CreateInputTableRequest.InputTableKind.Blink()
input_table_kind = table_pb2.CreateInputTableRequest.InputTableKind(
blink=blink_
)
elif self.key_cols:
key_backed = (
table_pb2.CreateInputTableRequest.InputTableKind.InMemoryKeyBacked(
key_columns=self.key_cols
)
)
input_table_kind = table_pb2.CreateInputTableRequest.InputTableKind(
in_memory_key_backed=key_backed
)
else:
append_only = (
table_pb2.CreateInputTableRequest.InputTableKind.InMemoryAppendOnly()
)
input_table_kind = table_pb2.CreateInputTableRequest.InputTableKind(
in_memory_append_only=append_only
)
if self.schema:
dh_fields = []
for f in self.schema:
dh_fields.append(
pa.field(name=f.name, type=f.type, metadata=map_arrow_type(f.type))
)
dh_schema = pa.schema(dh_fields)
schema = dh_schema.serialize().to_pybytes()
return table_pb2.CreateInputTableRequest(
result_id=result_id, schema=schema, kind=input_table_kind
)
elif self.init_table:
source_table_id = table_pb2.TableReference(ticket=self.init_table.pb_ticket)
return table_pb2.CreateInputTableRequest(
result_id=result_id,
source_table_id=source_table_id,
kind=input_table_kind,
)
else:
raise ValueError("either schema or init_table must be provided")
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
create_input_table=self.make_grpc_request(
result_id=result_id, source_id=source_id
)
)
class WhereInTableOp(TableOp):
def __init__(self, filter_table: Table, cols: list[str], inverted: bool):
self.filter_table = filter_table
self.cols = cols
self.inverted = inverted
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.WhereIn
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
right_id = table_pb2.TableReference(ticket=self.filter_table.pb_ticket)
return table_pb2.WhereInRequest(
result_id=result_id,
left_id=source_id,
right_id=right_id,
inverted=self.inverted,
columns_to_match=self.cols,
)
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
where_in=self.make_grpc_request(result_id=result_id, source_id=source_id)
)
class SliceOp(TableOp):
def __init__(self, first_position_inclusive: int, last_position_exclusive: int):
self.first_position_inclusive = first_position_inclusive
self.last_position_exclusive = last_position_exclusive
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.Slice
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
return table_pb2.SliceRequest(
result_id=result_id,
source_id=source_id,
first_position_inclusive=self.first_position_inclusive,
last_position_exclusive=self.last_position_exclusive,
)
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
slice=self.make_grpc_request(result_id=result_id, source_id=source_id)
)
class MetaTableOp(TableOp):
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.MetaTable
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
return table_pb2.MetaTableRequest(result_id=result_id, source_id=source_id)
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
meta_table=self.make_grpc_request(result_id=result_id, source_id=source_id)
)
class MultijoinTablesOp(TableOp):
def __init__(self, multi_join_inputs: list[MultiJoinInput]):
self.multi_join_inputs = multi_join_inputs
@classmethod
def get_stub_func(cls, table_service_stub: table_pb2_grpc.TableServiceStub) -> Any:
return table_service_stub.MultiJoinTables
def make_grpc_request(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: Optional[table_pb2.TableReference],
) -> Any:
pb_inputs = []
for mji in self.multi_join_inputs:
source_id = table_pb2.TableReference(ticket=mji.table.ticket.pb_ticket)
columns_to_match = mji.on
columns_to_add = mji.joins
pb_inputs.append(
table_pb2.MultiJoinInput(
source_id=source_id,
columns_to_match=columns_to_match,
columns_to_add=columns_to_add,
)
)
return table_pb2.MultiJoinTablesRequest(
result_id=result_id, multi_join_inputs=pb_inputs
)
def make_grpc_request_for_batch(
self,
result_id: Optional[ticket_pb2.Ticket],
source_id: table_pb2.TableReference,
) -> Any:
return table_pb2.BatchTableRequest.Operation(
multi_join=self.make_grpc_request(result_id=result_id, source_id=source_id)
)