Class MergeData
- Direct Known Subclasses:
MergeFromTable
,MergeIntradayData
,RemergeData
public abstract class MergeData extends Object
Re-writes an input table to multi-day storage, (merged, partitioned, grouped, and sorted as appropriate).
===================================================================================================================== Notes on properties: ===================================================================================================================== For now, I'm using: (TODO: Replace these with optional arguments and switch to Commons-CLI) merge.futureLookingDays (dateAdjustmentNDaysPrevious, default 0) iris.concurrentWriteThreads (threadPoolSize, default 1) =====================================================================================================================
TODO: Experiment with sequential reading and splayed writing, instead of the current (reversed) operation
-
Constructor Summary
Constructors Constructor Description MergeData()
-
Method Summary
Modifier and Type Method Description protected File[]
filterWritablePartitions(File[] originalWritablePartitions)
Filter the input array of candidate writable partitions.protected abstract Table
getInputData(com.fishlib.io.logger.Logger log, String logPrefix, Database db, TableDefinition tableDefinition, String namespace, String tableName, String partitioningColumnValue)
Get the input data that should be merged.static ExecutorService
makeExecutor(String name, int poolSize)
Make a suitableExecutorService
for use in merge processing.void
merge(com.fishlib.configuration.Configuration configuration, com.fishlib.io.logger.Logger log, com.fishlib.util.process.FatalErrorReporter fatalErrorReporter, String... args)
Read, partition, group, sort input data and re-write it into multi-day locations.void
merge(com.fishlib.io.logger.Logger log, com.fishlib.util.process.FatalErrorReporter fatalErrorReporter, String namespace, String tableName, String partitioningColumnValue, int threadPoolSize, boolean lowHeapUsageMode, boolean force, boolean allowEmptyInput, String sortColumnFormula, Database db, StatusCallback statusCallback, Database.StorageFormat storageFormat, String parquetCodecName, SyncMode syncMode)
Read, partition, group, sort input data and re-write it into multi-day locations or parquet.void
merge(com.fishlib.io.logger.Logger log, com.fishlib.util.process.FatalErrorReporter fatalErrorReporter, String namespace, String tableName, String partitioningColumnValue, int threadPoolSize, boolean lowHeapUsageMode, boolean force, boolean allowEmptyInput, String sortColumnFormula, Database db, StatusCallback statusCallback, Database.StorageFormat storageFormat, String parquetCodecName, SyncMode syncMode, boolean lateCleanup)
Read, partition, group, sort input data and re-write it into multi-day locations or parquet.void
merge(com.fishlib.io.logger.Logger log, com.fishlib.util.process.FatalErrorReporter fatalErrorReporter, String namespace, String tableName, String partitioningColumnValue, int threadPoolSize, boolean lowHeapUsageMode, boolean force, boolean allowEmptyInput, String sortColumnFormula, StatusCallback statusCallback, Database.StorageFormat storageFormat, String parquetCodecName, SyncMode syncMode)
Read, partition, group, sort input data and re-write it into multi-day locations.void
merge(com.fishlib.io.logger.Logger log, com.fishlib.util.process.FatalErrorReporter fatalErrorReporter, String namespace, String tableName, String partitioningColumnValue, int threadPoolSize, int maximumConcurrentColumns, boolean lowHeapUsageMode, boolean force, boolean allowEmptyInput, String sortColumnFormula, Database db, StatusCallback statusCallback, Database.StorageFormat storageFormat, String parquetCodecName, SyncMode syncMode, boolean lateCleanup, Table sourceTable)
Read, partition, group, sort input data and re-write it into multi-day locations or parquet.void
merge(com.fishlib.io.logger.Logger log, com.fishlib.util.process.FatalErrorReporter fatalErrorReporter, String namespace, String tableName, String partitioningColumnValue, int threadPoolSize, int maximumConcurrentColumns, boolean lowHeapUsageMode, boolean force, boolean allowEmptyInput, String sortColumnFormula, StatusCallback statusCallback, Database.StorageFormat storageFormat, String parquetCodecName, SyncMode syncMode)
Read, partition, group, sort input data and re-write it into multi-day locations.static Stream<String>
supportedParquetCodecsStream(com.fishlib.configuration.Configuration configuration)
Determine the supported parquet codecs.
-
Constructor Details
-
MergeData
public MergeData()
-
-
Method Details
-
merge
public void merge(@NotNull com.fishlib.configuration.Configuration configuration, @NotNull com.fishlib.io.logger.Logger log, @NotNull com.fishlib.util.process.FatalErrorReporter fatalErrorReporter, @NotNull String... args)Read, partition, group, sort input data and re-write it into multi-day locations.- Parameters:
configuration
- The configurationlog
- The logfatalErrorReporter
- An error reporterargs
- Command-line args to parse
-
merge
public void merge(@NotNull com.fishlib.io.logger.Logger log, @NotNull com.fishlib.util.process.FatalErrorReporter fatalErrorReporter, @NotNull String namespace, @NotNull String tableName, @NotNull String partitioningColumnValue, int threadPoolSize, boolean lowHeapUsageMode, boolean force, boolean allowEmptyInput, @Nullable String sortColumnFormula, @NotNull StatusCallback statusCallback, @Nullable Database.StorageFormat storageFormat, @Nullable String parquetCodecName, @Nullable SyncMode syncMode)Read, partition, group, sort input data and re-write it into multi-day locations.- Parameters:
log
- The logfatalErrorReporter
- An error reporternamespace
- The namespace to merge totableName
- The table name to merge topartitioningColumnValue
- The partitioning column valuethreadPoolSize
- The number of parallel threads to uselowHeapUsageMode
- Whether to prioritize heap conservation over throughputforce
- Whether to force merge when destinations already have dataallowEmptyInput
- Whether to allow merge to proceed if the input data is emptysortColumnFormula
- Formula to apply for sorting, post-groupingstatusCallback
- A callback for statusstorageFormat
- If non-null this specifies the storage format, otherwise use the schema's defaultparquetCodecName
- If using Parquet, an optional Parquet codec namesyncMode
-SyncMode
for column files, currently only used forDatabase.StorageFormat.DeephavenV1
-
merge
public void merge(@NotNull com.fishlib.io.logger.Logger log, @NotNull com.fishlib.util.process.FatalErrorReporter fatalErrorReporter, @NotNull String namespace, @NotNull String tableName, @NotNull String partitioningColumnValue, int threadPoolSize, int maximumConcurrentColumns, boolean lowHeapUsageMode, boolean force, boolean allowEmptyInput, @Nullable String sortColumnFormula, @NotNull StatusCallback statusCallback, @Nullable Database.StorageFormat storageFormat, @Nullable String parquetCodecName, @Nullable SyncMode syncMode)Read, partition, group, sort input data and re-write it into multi-day locations.- Parameters:
log
- The logfatalErrorReporter
- An error reporternamespace
- The namespace to merge totableName
- The table name to merge topartitioningColumnValue
- The partitioning column valuethreadPoolSize
- The number of parallel threads to uselowHeapUsageMode
- Whether to prioritize heap conservation over throughputforce
- Whether to force merge when destinations already have dataallowEmptyInput
- Whether to allow merge to proceed if the input data is emptysortColumnFormula
- Formula to apply for sorting, post-groupingstatusCallback
- A callback for statusstorageFormat
- If non-null this specifies the storage format, otherwise use the schema's defaultparquetCodecName
- If using Parquet, an optional Parquet codec namesyncMode
-SyncMode
for column files, currently only used forDatabase.StorageFormat.DeephavenV1
-
merge
public void merge(@NotNull com.fishlib.io.logger.Logger log, @NotNull com.fishlib.util.process.FatalErrorReporter fatalErrorReporter, @NotNull String namespace, @NotNull String tableName, @NotNull String partitioningColumnValue, int threadPoolSize, boolean lowHeapUsageMode, boolean force, boolean allowEmptyInput, @Nullable String sortColumnFormula, @NotNull Database db, @NotNull StatusCallback statusCallback, @Nullable Database.StorageFormat storageFormat, @Nullable String parquetCodecName, @Nullable SyncMode syncMode)Read, partition, group, sort input data and re-write it into multi-day locations or parquet.- Parameters:
log
- The logfatalErrorReporter
- An error reporternamespace
- The namespace to merge totableName
- The table name to merge topartitioningColumnValue
- The partitioning column valuethreadPoolSize
- The number of parallel threads to uselowHeapUsageMode
- Whether to prioritize heap conservation over throughputforce
- Whether to force merge when destinations already have dataallowEmptyInput
- Whether to allow merge to proceed if the input data is emptysortColumnFormula
- Formula to apply for sorting, post-groupingdb
- A database to use when reading source datastatusCallback
- A callback for statusstorageFormat
- If non-null, force a location, otherwise use the schema's defaultparquetCodecName
- If using Parquet, an optional Parquet codec namesyncMode
-SyncMode
for column files, currently only used forDatabase.StorageFormat.DeephavenV1
-
merge
public void merge(@NotNull com.fishlib.io.logger.Logger log, @NotNull com.fishlib.util.process.FatalErrorReporter fatalErrorReporter, @NotNull String namespace, @NotNull String tableName, @NotNull String partitioningColumnValue, int threadPoolSize, boolean lowHeapUsageMode, boolean force, boolean allowEmptyInput, @Nullable String sortColumnFormula, @NotNull Database db, @NotNull StatusCallback statusCallback, @Nullable Database.StorageFormat storageFormat, @Nullable String parquetCodecName, @Nullable SyncMode syncMode, boolean lateCleanup)Read, partition, group, sort input data and re-write it into multi-day locations or parquet.- Parameters:
log
- The logfatalErrorReporter
- An error reporternamespace
- The namespace to merge totableName
- The table name to merge topartitioningColumnValue
- The partitioning column valuethreadPoolSize
- The number of parallel threads to uselowHeapUsageMode
- Whether to prioritize heap conservation over throughputforce
- Whether to force merge when destinations already have dataallowEmptyInput
- Whether to allow merge to proceed if the input data is emptysortColumnFormula
- Formula to apply for sorting, post-groupingdb
- A database to use when reading source datastatusCallback
- A callback for statusstorageFormat
- If non-null, force a location, otherwise use the schema's defaultparquetCodecName
- If using Parquet, an optional Parquet codec namesyncMode
-SyncMode
for column files, currently only used forDatabase.StorageFormat.DeephavenV1
lateCleanup
- Defers cleanup until ready to replace with newly merged data.
-
merge
public void merge(@NotNull com.fishlib.io.logger.Logger log, @NotNull com.fishlib.util.process.FatalErrorReporter fatalErrorReporter, @NotNull String namespace, @NotNull String tableName, @NotNull String partitioningColumnValue, int threadPoolSize, int maximumConcurrentColumns, boolean lowHeapUsageMode, boolean force, boolean allowEmptyInput, @Nullable String sortColumnFormula, @NotNull Database db, @NotNull StatusCallback statusCallback, @Nullable Database.StorageFormat storageFormat, @Nullable String parquetCodecName, @Nullable SyncMode syncMode, boolean lateCleanup, @Nullable Table sourceTable)Read, partition, group, sort input data and re-write it into multi-day locations or parquet.- Parameters:
log
- The logfatalErrorReporter
- An error reporternamespace
- The namespace to merge totableName
- The table name to merge topartitioningColumnValue
- The partitioning column valuethreadPoolSize
- The number of parallel threads to usemaximumConcurrentColumns
- The maximum number of columns to merge concurrently in maximum throughput modelowHeapUsageMode
- Whether to prioritize heap conservation over throughputforce
- Whether to force merge when destinations already have dataallowEmptyInput
- Whether to allow merge to proceed if the input data is emptysortColumnFormula
- Formula to apply for sorting, post-groupingdb
- A database to use when reading source datastatusCallback
- A callback for statusstorageFormat
- If non-null, force a location, otherwise use the schema's defaultparquetCodecName
- If using Parquet, an optional Parquet codec namesyncMode
-SyncMode
for column files, currently only used forDatabase.StorageFormat.DeephavenV1
-
filterWritablePartitions
Filter the input array of candidate writable partitions. This can be used by an overriding class to merge only a subset of the possible partitions. Partitions removed from the originalWritablePartitions array will not be touched.- Parameters:
originalWritablePartitions
- the list of writable partitions to filter- Returns:
- all partitions for which the merge should continue.
-
getInputData
@Nullable protected abstract Table getInputData(@NotNull com.fishlib.io.logger.Logger log, @NotNull String logPrefix, @NotNull Database db, @NotNull TableDefinition tableDefinition, @NotNull String namespace, @NotNull String tableName, @NotNull String partitioningColumnValue)Get the input data that should be merged.- Parameters:
log
- Logger for any useful outputlogPrefix
- A prefix for any log outputdb
- The Database instance to use when sourcing input datatableDefinition
- The definition of the destination tablenamespace
- The namespace of the destination tabletableName
- The name of the destination tablepartitioningColumnValue
- The value of the destination partitioning column- Returns:
- Input data in Table form, or null if no such data exists
-
supportedParquetCodecsStream
public static Stream<String> supportedParquetCodecsStream(com.fishlib.configuration.Configuration configuration)Determine the supported parquet codecs. Some codecs require additional jar files which aren't part of the base Deephaven installation, so by default the available codecs are limited through the propertySUPPORTED_PARQUET_CODECS_PROPERTY
.- Parameters:
configuration
- the Configuration instance- Returns:
- a Stream of available codecs
-
makeExecutor
Make a suitableExecutorService
for use in merge processing.- Parameters:
name
- The name of the executor to be used in thread namingpoolSize
- The number of threads that will be allowed in the executor (core and maximum)- Returns:
- The new
ExecutorService
-