Class MergeData

java.lang.Object
com.illumon.iris.importers.MergeData
Direct Known Subclasses:
MergeFromTable, MergeIntradayData, RemergeData

public abstract class MergeData
extends Object

Re-writes an input table to multi-day storage, (merged, partitioned, grouped, and sorted as appropriate).

===================================================================================================================== Notes on properties: ===================================================================================================================== For now, I'm using: (TODO: Replace these with optional arguments and switch to Commons-CLI) merge.futureLookingDays (dateAdjustmentNDaysPrevious, default 0) iris.concurrentWriteThreads (threadPoolSize, default 1) =====================================================================================================================

TODO: Experiment with sequential reading and splayed writing, instead of the current (reversed) operation

  • Constructor Summary

    Constructors 
    Constructor Description
    MergeData()  
  • Method Summary

    Modifier and Type Method Description
    protected File[] filterWritablePartitions​(File[] originalWritablePartitions)
    Filter the input array of candidate writable partitions.
    protected abstract Table getInputData​(com.fishlib.io.logger.Logger log, String logPrefix, Database db, TableDefinition tableDefinition, String namespace, String tableName, String partitioningColumnValue)
    Get the input data that should be merged.
    static ExecutorService makeExecutor​(String name, int poolSize)
    Make a suitable ExecutorService for use in merge processing.
    void merge​(com.fishlib.configuration.Configuration configuration, com.fishlib.io.logger.Logger log, com.fishlib.util.process.FatalErrorReporter fatalErrorReporter, String... args)
    Read, partition, group, sort input data and re-write it into multi-day locations.
    void merge​(com.fishlib.io.logger.Logger log, com.fishlib.util.process.FatalErrorReporter fatalErrorReporter, String namespace, String tableName, String partitioningColumnValue, int threadPoolSize, boolean lowHeapUsageMode, boolean force, boolean allowEmptyInput, String sortColumnFormula, Database db, StatusCallback statusCallback, Database.StorageFormat storageFormat, String parquetCodecName, SyncMode syncMode)
    Read, partition, group, sort input data and re-write it into multi-day locations or parquet.
    void merge​(com.fishlib.io.logger.Logger log, com.fishlib.util.process.FatalErrorReporter fatalErrorReporter, String namespace, String tableName, String partitioningColumnValue, int threadPoolSize, boolean lowHeapUsageMode, boolean force, boolean allowEmptyInput, String sortColumnFormula, Database db, StatusCallback statusCallback, Database.StorageFormat storageFormat, String parquetCodecName, SyncMode syncMode, boolean lateCleanup)
    Read, partition, group, sort input data and re-write it into multi-day locations or parquet.
    void merge​(com.fishlib.io.logger.Logger log, com.fishlib.util.process.FatalErrorReporter fatalErrorReporter, String namespace, String tableName, String partitioningColumnValue, int threadPoolSize, boolean lowHeapUsageMode, boolean force, boolean allowEmptyInput, String sortColumnFormula, StatusCallback statusCallback, Database.StorageFormat storageFormat, String parquetCodecName, SyncMode syncMode)
    Read, partition, group, sort input data and re-write it into multi-day locations.
    void merge​(com.fishlib.io.logger.Logger log, com.fishlib.util.process.FatalErrorReporter fatalErrorReporter, String namespace, String tableName, String partitioningColumnValue, int threadPoolSize, int maximumConcurrentColumns, boolean lowHeapUsageMode, boolean force, boolean allowEmptyInput, String sortColumnFormula, Database db, StatusCallback statusCallback, Database.StorageFormat storageFormat, String parquetCodecName, SyncMode syncMode, boolean lateCleanup, Table sourceTable)
    Read, partition, group, sort input data and re-write it into multi-day locations or parquet.
    void merge​(com.fishlib.io.logger.Logger log, com.fishlib.util.process.FatalErrorReporter fatalErrorReporter, String namespace, String tableName, String partitioningColumnValue, int threadPoolSize, int maximumConcurrentColumns, boolean lowHeapUsageMode, boolean force, boolean allowEmptyInput, String sortColumnFormula, StatusCallback statusCallback, Database.StorageFormat storageFormat, String parquetCodecName, SyncMode syncMode)
    Read, partition, group, sort input data and re-write it into multi-day locations.
    static Stream<String> supportedParquetCodecsStream​(com.fishlib.configuration.Configuration configuration)
    Determine the supported parquet codecs.

    Methods inherited from class java.lang.Object

    clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
  • Constructor Details

  • Method Details

    • merge

      public void merge​(@NotNull com.fishlib.configuration.Configuration configuration, @NotNull com.fishlib.io.logger.Logger log, @NotNull com.fishlib.util.process.FatalErrorReporter fatalErrorReporter, @NotNull String... args)
      Read, partition, group, sort input data and re-write it into multi-day locations.
      Parameters:
      configuration - The configuration
      log - The log
      fatalErrorReporter - An error reporter
      args - Command-line args to parse
    • merge

      public void merge​(@NotNull com.fishlib.io.logger.Logger log, @NotNull com.fishlib.util.process.FatalErrorReporter fatalErrorReporter, @NotNull String namespace, @NotNull String tableName, @NotNull String partitioningColumnValue, int threadPoolSize, boolean lowHeapUsageMode, boolean force, boolean allowEmptyInput, @Nullable String sortColumnFormula, @NotNull StatusCallback statusCallback, @Nullable Database.StorageFormat storageFormat, @Nullable String parquetCodecName, @Nullable SyncMode syncMode)
      Read, partition, group, sort input data and re-write it into multi-day locations.
      Parameters:
      log - The log
      fatalErrorReporter - An error reporter
      namespace - The namespace to merge to
      tableName - The table name to merge to
      partitioningColumnValue - The partitioning column value
      threadPoolSize - The number of parallel threads to use
      lowHeapUsageMode - Whether to prioritize heap conservation over throughput
      force - Whether to force merge when destinations already have data
      allowEmptyInput - Whether to allow merge to proceed if the input data is empty
      sortColumnFormula - Formula to apply for sorting, post-grouping
      statusCallback - A callback for status
      storageFormat - If non-null this specifies the storage format, otherwise use the schema's default
      parquetCodecName - If using Parquet, an optional Parquet codec name
      syncMode - SyncMode for column files, currently only used for Database.StorageFormat.DeephavenV1
    • merge

      public void merge​(@NotNull com.fishlib.io.logger.Logger log, @NotNull com.fishlib.util.process.FatalErrorReporter fatalErrorReporter, @NotNull String namespace, @NotNull String tableName, @NotNull String partitioningColumnValue, int threadPoolSize, int maximumConcurrentColumns, boolean lowHeapUsageMode, boolean force, boolean allowEmptyInput, @Nullable String sortColumnFormula, @NotNull StatusCallback statusCallback, @Nullable Database.StorageFormat storageFormat, @Nullable String parquetCodecName, @Nullable SyncMode syncMode)
      Read, partition, group, sort input data and re-write it into multi-day locations.
      Parameters:
      log - The log
      fatalErrorReporter - An error reporter
      namespace - The namespace to merge to
      tableName - The table name to merge to
      partitioningColumnValue - The partitioning column value
      threadPoolSize - The number of parallel threads to use
      lowHeapUsageMode - Whether to prioritize heap conservation over throughput
      force - Whether to force merge when destinations already have data
      allowEmptyInput - Whether to allow merge to proceed if the input data is empty
      sortColumnFormula - Formula to apply for sorting, post-grouping
      statusCallback - A callback for status
      storageFormat - If non-null this specifies the storage format, otherwise use the schema's default
      parquetCodecName - If using Parquet, an optional Parquet codec name
      syncMode - SyncMode for column files, currently only used for Database.StorageFormat.DeephavenV1
    • merge

      public void merge​(@NotNull com.fishlib.io.logger.Logger log, @NotNull com.fishlib.util.process.FatalErrorReporter fatalErrorReporter, @NotNull String namespace, @NotNull String tableName, @NotNull String partitioningColumnValue, int threadPoolSize, boolean lowHeapUsageMode, boolean force, boolean allowEmptyInput, @Nullable String sortColumnFormula, @NotNull Database db, @NotNull StatusCallback statusCallback, @Nullable Database.StorageFormat storageFormat, @Nullable String parquetCodecName, @Nullable SyncMode syncMode)
      Read, partition, group, sort input data and re-write it into multi-day locations or parquet.
      Parameters:
      log - The log
      fatalErrorReporter - An error reporter
      namespace - The namespace to merge to
      tableName - The table name to merge to
      partitioningColumnValue - The partitioning column value
      threadPoolSize - The number of parallel threads to use
      lowHeapUsageMode - Whether to prioritize heap conservation over throughput
      force - Whether to force merge when destinations already have data
      allowEmptyInput - Whether to allow merge to proceed if the input data is empty
      sortColumnFormula - Formula to apply for sorting, post-grouping
      db - A database to use when reading source data
      statusCallback - A callback for status
      storageFormat - If non-null, force a location, otherwise use the schema's default
      parquetCodecName - If using Parquet, an optional Parquet codec name
      syncMode - SyncMode for column files, currently only used for Database.StorageFormat.DeephavenV1
    • merge

      public void merge​(@NotNull com.fishlib.io.logger.Logger log, @NotNull com.fishlib.util.process.FatalErrorReporter fatalErrorReporter, @NotNull String namespace, @NotNull String tableName, @NotNull String partitioningColumnValue, int threadPoolSize, boolean lowHeapUsageMode, boolean force, boolean allowEmptyInput, @Nullable String sortColumnFormula, @NotNull Database db, @NotNull StatusCallback statusCallback, @Nullable Database.StorageFormat storageFormat, @Nullable String parquetCodecName, @Nullable SyncMode syncMode, boolean lateCleanup)
      Read, partition, group, sort input data and re-write it into multi-day locations or parquet.
      Parameters:
      log - The log
      fatalErrorReporter - An error reporter
      namespace - The namespace to merge to
      tableName - The table name to merge to
      partitioningColumnValue - The partitioning column value
      threadPoolSize - The number of parallel threads to use
      lowHeapUsageMode - Whether to prioritize heap conservation over throughput
      force - Whether to force merge when destinations already have data
      allowEmptyInput - Whether to allow merge to proceed if the input data is empty
      sortColumnFormula - Formula to apply for sorting, post-grouping
      db - A database to use when reading source data
      statusCallback - A callback for status
      storageFormat - If non-null, force a location, otherwise use the schema's default
      parquetCodecName - If using Parquet, an optional Parquet codec name
      syncMode - SyncMode for column files, currently only used for Database.StorageFormat.DeephavenV1
      lateCleanup - Defers cleanup until ready to replace with newly merged data.
    • merge

      public void merge​(@NotNull com.fishlib.io.logger.Logger log, @NotNull com.fishlib.util.process.FatalErrorReporter fatalErrorReporter, @NotNull String namespace, @NotNull String tableName, @NotNull String partitioningColumnValue, int threadPoolSize, int maximumConcurrentColumns, boolean lowHeapUsageMode, boolean force, boolean allowEmptyInput, @Nullable String sortColumnFormula, @NotNull Database db, @NotNull StatusCallback statusCallback, @Nullable Database.StorageFormat storageFormat, @Nullable String parquetCodecName, @Nullable SyncMode syncMode, boolean lateCleanup, @Nullable Table sourceTable)
      Read, partition, group, sort input data and re-write it into multi-day locations or parquet.
      Parameters:
      log - The log
      fatalErrorReporter - An error reporter
      namespace - The namespace to merge to
      tableName - The table name to merge to
      partitioningColumnValue - The partitioning column value
      threadPoolSize - The number of parallel threads to use
      maximumConcurrentColumns - The maximum number of columns to merge concurrently in maximum throughput mode
      lowHeapUsageMode - Whether to prioritize heap conservation over throughput
      force - Whether to force merge when destinations already have data
      allowEmptyInput - Whether to allow merge to proceed if the input data is empty
      sortColumnFormula - Formula to apply for sorting, post-grouping
      db - A database to use when reading source data
      statusCallback - A callback for status
      storageFormat - If non-null, force a location, otherwise use the schema's default
      parquetCodecName - If using Parquet, an optional Parquet codec name
      syncMode - SyncMode for column files, currently only used for Database.StorageFormat.DeephavenV1
      lateCleanup - Defers cleanup until ready to replace with newly merged data.
      sourceTable - A table to use as a source for the merge operation instead of calling getInputData(Logger, String, Database, TableDefinition, String, String, String)
    • filterWritablePartitions

      protected File[] filterWritablePartitions​(@NotNull File[] originalWritablePartitions)
      Filter the input array of candidate writable partitions. This can be used by an overriding class to merge only a subset of the possible partitions. Partitions removed from the originalWritablePartitions array will not be touched.
      Parameters:
      originalWritablePartitions - the list of writable partitions to filter
      Returns:
      all partitions for which the merge should continue.
    • getInputData

      @Nullable protected abstract Table getInputData​(@NotNull com.fishlib.io.logger.Logger log, @NotNull String logPrefix, @NotNull Database db, @NotNull TableDefinition tableDefinition, @NotNull String namespace, @NotNull String tableName, @NotNull String partitioningColumnValue)
      Get the input data that should be merged.
      Parameters:
      log - Logger for any useful output
      logPrefix - A prefix for any log output
      db - The Database instance to use when sourcing input data
      tableDefinition - The definition of the destination table
      namespace - The namespace of the destination table
      tableName - The name of the destination table
      partitioningColumnValue - The value of the destination partitioning column
      Returns:
      Input data in Table form, or null if no such data exists
    • supportedParquetCodecsStream

      public static Stream<String> supportedParquetCodecsStream​(com.fishlib.configuration.Configuration configuration)
      Determine the supported parquet codecs. Some codecs require additional jar files which aren't part of the base Deephaven installation, so by default the available codecs are limited through the property SUPPORTED_PARQUET_CODECS_PROPERTY.
      Parameters:
      configuration - the Configuration instance
      Returns:
      a Stream of available codecs
    • makeExecutor

      public static ExecutorService makeExecutor​(@NotNull String name, int poolSize)
      Make a suitable ExecutorService for use in merge processing.
      Parameters:
      name - The name of the executor to be used in thread naming
      poolSize - The number of threads that will be allowed in the executor (core and maximum)
      Returns:
      The new ExecutorService