Class GroupByReaggregateOperator
java.lang.Object
io.deephaven.engine.table.impl.by.GroupByReaggregateOperator
- All Implemented Interfaces:
GroupByOperator,IterativeChunkedAggregationOperator
An
IterativeChunkedAggregationOperator used to re-aggregate the results of an
AggGroup as part of a rollup.
The operator is fundamentally different than the GroupByChunkedOperator. Rather than examining row keys, it
listens to the rollup's base (or intermediate) level and reads the exposed RowSet column. The relevant RowSets are
added to a random builder for each state while processing an update (or initialization). At the end of the update
cycle, it builds the rowsets and updates an internal ObjectArraySource of RowSets.
The resulting column sources are once again AggregateColumnSource, which reuse the wrapped aggregated column
source from the source table (thus each level of the rollup uses the original table's sources as the input to the
AggregateColumnSources -- not the immediately prior level).
-
Nested Class Summary
Nested classes/interfaces inherited from interface io.deephaven.engine.table.impl.by.IterativeChunkedAggregationOperator
IterativeChunkedAggregationOperator.BucketedContext, IterativeChunkedAggregationOperator.SingletonContext -
Field Summary
Fields inherited from interface io.deephaven.engine.table.impl.by.IterativeChunkedAggregationOperator
ZERO_LENGTH_ITERATIVE_CHUNKED_AGGREGATION_OPERATOR_ARRAY -
Constructor Summary
ConstructorsConstructorDescriptionGroupByReaggregateOperator(@NotNull QueryTable inputTable, boolean registeredWithHelper, @Nullable String exposeRowSetsAs, @Nullable List<String> hiddenResults, @NotNull MatchPair... aggregatedColumnPairs) -
Method Summary
Modifier and TypeMethodDescriptionvoidaddChunk(IterativeChunkedAggregationOperator.BucketedContext bucketedContext, Chunk<? extends Values> values, LongChunk<? extends RowKeys> inputRowKeys, @NotNull IntChunk<RowKeys> destinations, @NotNull IntChunk<ChunkPositions> startPositions, @NotNull IntChunk<ChunkLengths> length, @NotNull WritableBooleanChunk<Values> stateModified) Aggregate a chunk of data into the result columns.booleanaddChunk(IterativeChunkedAggregationOperator.SingletonContext singletonContext, int chunkSize, Chunk<? extends Values> values, @NotNull LongChunk<? extends RowKeys> inputRowKeys, long destination) Aggregate a chunk of data into the result columns.voidensureCapacity(long tableSize) Ensure that this operator can handle destinations up to tableSize - 1.Map<String, ? extends ColumnSource<?>> Get a map from input column names to the corresponding outputColumnSource.Map<String, ? extends ColumnSource<?>> Return a map of result columns produced by this operator.booleanhasModifications(boolean columnsModified) Determine whether to propagate changes when input columns have been modified.initializeRefreshing(@NotNull QueryTable resultTable, @NotNull LivenessReferent aggregationUpdateListener) Initialize refreshing result support for this operator.voidmodifyChunk(IterativeChunkedAggregationOperator.BucketedContext bucketedContext, Chunk<? extends Values> previousValues, Chunk<? extends Values> newValues, LongChunk<? extends RowKeys> postShiftRowKeys, @NotNull IntChunk<RowKeys> destinations, @NotNull IntChunk<ChunkPositions> startPositions, @NotNull IntChunk<ChunkLengths> length, @NotNull WritableBooleanChunk<Values> stateModified) Modify a chunk of data previously aggregated into the result columns using a parallel chunk of new values.booleanmodifyChunk(IterativeChunkedAggregationOperator.SingletonContext singletonContext, int chunkSize, Chunk<? extends Values> previousValues, Chunk<? extends Values> newValues, LongChunk<? extends RowKeys> postShiftRowKeys, long destination) Modify a chunk of data previously aggregated into the result columns using a parallel chunk of new values.voidpropagateInitialState(@NotNull QueryTable resultTable, int startingDestinationsCount) Perform any internal state keeping needed for destinations that were added during initialization.voidpropagateUpdates(@NotNull TableUpdate downstream, @NotNull RowSet newDestinations) Perform any internal state keeping needed for destinations that were added (went from 0 keys to > 0), removed (went from > 0 keys to 0), or modified (keys added or removed, or keys modified) by this iteration.voidremoveChunk(IterativeChunkedAggregationOperator.BucketedContext bucketedContext, Chunk<? extends Values> values, LongChunk<? extends RowKeys> inputRowKeys, @NotNull IntChunk<RowKeys> destinations, @NotNull IntChunk<ChunkPositions> startPositions, @NotNull IntChunk<ChunkLengths> length, @NotNull WritableBooleanChunk<Values> stateModified) Remove a chunk of data previously aggregated into the result columns.booleanremoveChunk(IterativeChunkedAggregationOperator.SingletonContext singletonContext, int chunkSize, Chunk<? extends Values> values, @NotNull LongChunk<? extends RowKeys> inputRowKeys, long destination) Remove a chunk of data previously aggregated into the result columns.booleanresetForStep(@NotNull TableUpdate upstream, int startingDestinationsCount) Reset any per-step internal state.@NotNull IterativeChunkedAggregationOperatorresultExtractor(List<Pair> resultPairs) voidshiftChunk(IterativeChunkedAggregationOperator.BucketedContext bucketedContext, Chunk<? extends Values> previousValues, Chunk<? extends Values> newValues, @NotNull LongChunk<? extends RowKeys> preShiftRowKeys, @NotNull LongChunk<? extends RowKeys> postShiftRowKeys, @NotNull IntChunk<RowKeys> destinations, @NotNull IntChunk<ChunkPositions> startPositions, @NotNull IntChunk<ChunkLengths> length, @NotNull WritableBooleanChunk<Values> stateModified) Called with shifted row keys whenIterativeChunkedAggregationOperator.requiresRowKeys()returns true, including shifted same-slot modifies.booleanshiftChunk(IterativeChunkedAggregationOperator.SingletonContext singletonContext, Chunk<? extends Values> previousValues, Chunk<? extends Values> newValues, @NotNull LongChunk<? extends RowKeys> preShiftRowKeys, @NotNull LongChunk<? extends RowKeys> postShiftRowKeys, long destination) Shift a chunk of data previously aggregated into the result columns, including shifted same-slot modifies.voidCalled after initialization; when the operator's result columns must have previous tracking enabled.Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface io.deephaven.engine.table.impl.by.IterativeChunkedAggregationOperator
addRowSet, makeBucketedContext, makeSingletonContext, modifyRowKeys, modifyRowKeys, propagateFailure, requiresRowKeys, requiresRunFinds, unchunkedRowSet
-
Constructor Details
-
GroupByReaggregateOperator
public GroupByReaggregateOperator(@NotNull @NotNull QueryTable inputTable, boolean registeredWithHelper, @Nullable @Nullable String exposeRowSetsAs, @Nullable @Nullable List<String> hiddenResults, @NotNull @NotNull MatchPair... aggregatedColumnPairs)
-
-
Method Details
-
addChunk
public void addChunk(IterativeChunkedAggregationOperator.BucketedContext bucketedContext, Chunk<? extends Values> values, LongChunk<? extends RowKeys> inputRowKeys, @NotNull @NotNull IntChunk<RowKeys> destinations, @NotNull @NotNull IntChunk<ChunkPositions> startPositions, @NotNull @NotNull IntChunk<ChunkLengths> length, @NotNull @NotNull WritableBooleanChunk<Values> stateModified) Description copied from interface:IterativeChunkedAggregationOperatorAggregate a chunk of data into the result columns.- Specified by:
addChunkin interfaceIterativeChunkedAggregationOperator- Parameters:
bucketedContext- the operator-specific contextvalues- a chunk of values to aggregateinputRowKeys- the input row keys, in post-shift spacedestinations- the destinations in resultColumn to aggregate into, parallel with startPositions and lengthstartPositions- the starting positions in the chunk for each destinationlength- the number of values in the chunk for each destinationstateModified- a boolean output array, parallel to destinations, which is set to true if the corresponding destination has been modified
-
removeChunk
public void removeChunk(IterativeChunkedAggregationOperator.BucketedContext bucketedContext, Chunk<? extends Values> values, LongChunk<? extends RowKeys> inputRowKeys, @NotNull @NotNull IntChunk<RowKeys> destinations, @NotNull @NotNull IntChunk<ChunkPositions> startPositions, @NotNull @NotNull IntChunk<ChunkLengths> length, @NotNull @NotNull WritableBooleanChunk<Values> stateModified) Description copied from interface:IterativeChunkedAggregationOperatorRemove a chunk of data previously aggregated into the result columns.- Specified by:
removeChunkin interfaceIterativeChunkedAggregationOperator- Parameters:
bucketedContext- the operator-specific contextvalues- a chunk of values that have been previously aggregated.inputRowKeys- the input row keys, in pre-shift spacedestinations- the destinations in resultColumn to remove the values from, parallel with startPositions and lengthstartPositions- the starting positions in the chunk for each destinationlength- the number of values in the chunk for each destinationstateModified- a boolean output array, parallel to destinations, which is set to true if the corresponding destination has been modified
-
modifyChunk
public void modifyChunk(IterativeChunkedAggregationOperator.BucketedContext bucketedContext, Chunk<? extends Values> previousValues, Chunk<? extends Values> newValues, LongChunk<? extends RowKeys> postShiftRowKeys, @NotNull @NotNull IntChunk<RowKeys> destinations, @NotNull @NotNull IntChunk<ChunkPositions> startPositions, @NotNull @NotNull IntChunk<ChunkLengths> length, @NotNull @NotNull WritableBooleanChunk<Values> stateModified) Description copied from interface:IterativeChunkedAggregationOperatorModify a chunk of data previously aggregated into the result columns using a parallel chunk of new values. Never includes modifies that have been shifted ifIterativeChunkedAggregationOperator.requiresRowKeys()returns true - those are handled inIterativeChunkedAggregationOperator.shiftChunk(BucketedContext, Chunk, Chunk, LongChunk, LongChunk, IntChunk, IntChunk, IntChunk, WritableBooleanChunk).- Specified by:
modifyChunkin interfaceIterativeChunkedAggregationOperator- Parameters:
bucketedContext- the operator-specific contextpreviousValues- a chunk of values that have been previously aggregatednewValues- a chunk of values to aggregatepostShiftRowKeys- the input row keys, in post-shift spacedestinations- the destinations in resultColumn to remove the values from, parallel with startPositions and lengthstartPositions- the starting positions in the chunk for each destinationlength- the number of values in the chunk for each destinationstateModified- a boolean output array, parallel to destinations, which is set to true if the corresponding destination has been modified
-
shiftChunk
public void shiftChunk(IterativeChunkedAggregationOperator.BucketedContext bucketedContext, Chunk<? extends Values> previousValues, Chunk<? extends Values> newValues, @NotNull @NotNull LongChunk<? extends RowKeys> preShiftRowKeys, @NotNull @NotNull LongChunk<? extends RowKeys> postShiftRowKeys, @NotNull @NotNull IntChunk<RowKeys> destinations, @NotNull @NotNull IntChunk<ChunkPositions> startPositions, @NotNull @NotNull IntChunk<ChunkLengths> length, @NotNull @NotNull WritableBooleanChunk<Values> stateModified) Description copied from interface:IterativeChunkedAggregationOperatorCalled with shifted row keys whenIterativeChunkedAggregationOperator.requiresRowKeys()returns true, including shifted same-slot modifies.- Specified by:
shiftChunkin interfaceIterativeChunkedAggregationOperator- Parameters:
bucketedContext- the operator-specific contextpreviousValues- a chunk of values that have been previously aggregated.newValues- a chunk of values to aggregatepreShiftRowKeys- the input row keys, in pre-shift spacepostShiftRowKeys- the input row keys, in post-shift spacedestinations- the destinations in resultColumn to aggregate into, parallel with startPositions and lengthstartPositions- the starting positions in the chunk for each destinationlength- the number of values in the chunk for each destinationstateModified- a boolean output array, parallel to destinations, which is set to true if the corresponding destination has been modified
-
addChunk
public boolean addChunk(IterativeChunkedAggregationOperator.SingletonContext singletonContext, int chunkSize, Chunk<? extends Values> values, @NotNull @NotNull LongChunk<? extends RowKeys> inputRowKeys, long destination) Description copied from interface:IterativeChunkedAggregationOperatorAggregate a chunk of data into the result columns.- Specified by:
addChunkin interfaceIterativeChunkedAggregationOperator- Parameters:
singletonContext- the operator-specific contextchunkSize- the size of the additionvalues- the values to aggregateinputRowKeys- the input row keys, in post-shift spacedestination- the destination in the result columns- Returns:
- true if the state was modified, false otherwise
-
removeChunk
public boolean removeChunk(IterativeChunkedAggregationOperator.SingletonContext singletonContext, int chunkSize, Chunk<? extends Values> values, @NotNull @NotNull LongChunk<? extends RowKeys> inputRowKeys, long destination) Description copied from interface:IterativeChunkedAggregationOperatorRemove a chunk of data previously aggregated into the result columns.- Specified by:
removeChunkin interfaceIterativeChunkedAggregationOperator- Parameters:
singletonContext- the operator-specific contextchunkSize- the size of the removalvalues- the values to remove from the aggregationinputRowKeys- the input row keys, in pre-shift spacedestination- the destination in the result columns- Returns:
- true if the state was modified, false otherwise
-
modifyChunk
public boolean modifyChunk(IterativeChunkedAggregationOperator.SingletonContext singletonContext, int chunkSize, Chunk<? extends Values> previousValues, Chunk<? extends Values> newValues, LongChunk<? extends RowKeys> postShiftRowKeys, long destination) Description copied from interface:IterativeChunkedAggregationOperatorModify a chunk of data previously aggregated into the result columns using a parallel chunk of new values. Never includes modifies that have been shifted ifIterativeChunkedAggregationOperator.requiresRowKeys()returns true - those are handled inIterativeChunkedAggregationOperator.shiftChunk(SingletonContext, Chunk, Chunk, LongChunk, LongChunk, long).- Specified by:
modifyChunkin interfaceIterativeChunkedAggregationOperator- Parameters:
singletonContext- the operator-specific contextchunkSize- the size of the modificationpreviousValues- a chunk of values that have been previously aggregated.newValues- a chunk of values to aggregatepostShiftRowKeys- the input row keys, in post-shift space- Returns:
- true if the state was modified, false otherwise
-
shiftChunk
public boolean shiftChunk(IterativeChunkedAggregationOperator.SingletonContext singletonContext, Chunk<? extends Values> previousValues, Chunk<? extends Values> newValues, @NotNull @NotNull LongChunk<? extends RowKeys> preShiftRowKeys, @NotNull @NotNull LongChunk<? extends RowKeys> postShiftRowKeys, long destination) Description copied from interface:IterativeChunkedAggregationOperatorShift a chunk of data previously aggregated into the result columns, including shifted same-slot modifies.- Specified by:
shiftChunkin interfaceIterativeChunkedAggregationOperator- Parameters:
singletonContext- the operator-specific contextpreviousValues- a chunk of values that have been previously aggregated.newValues- a chunk of values to aggregatepreShiftRowKeys- the input row keys, in pre-shift spacepostShiftRowKeys- the input row keys, in post-shift spacedestination- the destination in the result columns- Returns:
- true if the result should be considered modified
-
ensureCapacity
public void ensureCapacity(long tableSize) Description copied from interface:IterativeChunkedAggregationOperatorEnsure that this operator can handle destinations up to tableSize - 1.- Specified by:
ensureCapacityin interfaceIterativeChunkedAggregationOperator- Parameters:
tableSize- the new size of the table
-
getResultColumns
Description copied from interface:IterativeChunkedAggregationOperatorReturn a map of result columns produced by this operator.- Specified by:
getResultColumnsin interfaceIterativeChunkedAggregationOperator- Returns:
- a map of name to columns for the result table
-
startTrackingPrevValues
public void startTrackingPrevValues()Description copied from interface:IterativeChunkedAggregationOperatorCalled after initialization; when the operator's result columns must have previous tracking enabled.- Specified by:
startTrackingPrevValuesin interfaceIterativeChunkedAggregationOperator
-
initializeRefreshing
public UnaryOperator<ModifiedColumnSet> initializeRefreshing(@NotNull @NotNull QueryTable resultTable, @NotNull @NotNull LivenessReferent aggregationUpdateListener) Description copied from interface:IterativeChunkedAggregationOperatorInitialize refreshing result support for this operator. As a side effect, make a factory method for converting upstream modified column sets to result modified column sets, to be invoked whenever this operator reports a modification in order to determine the operator's contribution to the final result modified column set.- Specified by:
initializeRefreshingin interfaceIterativeChunkedAggregationOperator- Parameters:
resultTable- The resultQueryTableafter initializationaggregationUpdateListener- The aggregation update listener, which may be needed for referential integrity- Returns:
- A factory that produces a result modified column set from the upstream modified column set
-
getInputResultColumns
Description copied from interface:GroupByOperatorGet a map from input column names to the corresponding outputColumnSource.- Specified by:
getInputResultColumnsin interfaceGroupByOperator
-
hasModifications
public boolean hasModifications(boolean columnsModified) Description copied from interface:GroupByOperatorDetermine whether to propagate changes when input columns have been modified.- Specified by:
hasModificationsin interfaceGroupByOperator- Parameters:
columnsModified- have any of the input columns been modified (as per the MCS)?- Returns:
- true if we have modified our output (e.g., because of additions or modifications).
-
resetForStep
Description copied from interface:IterativeChunkedAggregationOperatorReset any per-step internal state. Note that the arguments to this method should not be mutated in any way.- Specified by:
resetForStepin interfaceIterativeChunkedAggregationOperator- Parameters:
upstream- The upstream ShiftAwareListener.UpdatestartingDestinationsCount- The number of used destinations at the beginning of this step- Returns:
- true if this operator must generate modifications on this cycle; typically an operator returns false and depends on the actual add/remove/modify/shift calls to determine modifications
-
propagateInitialState
public void propagateInitialState(@NotNull @NotNull QueryTable resultTable, int startingDestinationsCount) Description copied from interface:IterativeChunkedAggregationOperatorPerform any internal state keeping needed for destinations that were added during initialization.- Specified by:
propagateInitialStatein interfaceIterativeChunkedAggregationOperator- Parameters:
resultTable- The resultQueryTableafter initializationstartingDestinationsCount- The number of used destinations at the beginning of this step
-
propagateUpdates
public void propagateUpdates(@NotNull @NotNull TableUpdate downstream, @NotNull @NotNull RowSet newDestinations) Description copied from interface:IterativeChunkedAggregationOperatorPerform any internal state keeping needed for destinations that were added (went from 0 keys to > 0), removed (went from > 0 keys to 0), or modified (keys added or removed, or keys modified) by this iteration. Note that the arguments to this method should not be mutated in any way.- Specified by:
propagateUpdatesin interfaceIterativeChunkedAggregationOperator- Parameters:
downstream- The downstream TableUpdate (which does not have itsModifiedColumnSetfinalized yet)newDestinations- New destinations added on this update
-
getExposedRowSetsAs
-
getAggregatedColumnPairs
-
getHiddenResults
-
resultExtractor
@NotNull public @NotNull IterativeChunkedAggregationOperator resultExtractor(List<Pair> resultPairs)
-