Appendix C: Data Merging Classes
The MergeIntradayData
class is used to copy and reorganize data from intraday to historical storage.
This class is primarily used by merge queries created in the Query Configuration Editor or by the Schema Editor in the Deephaven UI. However, it can also be used by the XML-driven import and merge scripts. Merge queries are the preferred method to use the MergeIntradayData
class, but it can also be called directly, used from custom scripts, or called from an application. This section details its functionality and arguments.
Intraday data is written into column files in the order in which it arrives. As such, there is no grouping of data, and the only sorting is by arrival time of the events. Since intraday data is typically used for data that is still receiving updates, the main priorities are around low latency in appending to the table and delivering those updates to running queries. Historical data does not receive updates, so the priority there is around organizing data for more efficient querying.
Functionality of the MergeIntradayData class
The MergeIntradayData
class reads intraday data for a specified partition (usually a date) and writes it into historical storage. During this process, the class can group the data based on grouping columns specified in the schema, sort data based on an argument passed to the class, and distribute data across one or more writable partitions.
Note: These writable partitions are different from the column partition indicated by the partitioning column in the schema. Writable partitions are storage partitions set up to allow sharding of historical data for faster querying and potentially to provide for larger total storage capacity. See Historical Partitioning.
When sharing data, the MergeIntradayData
class uses the partitioning keyFormula
defined in the table's schema. This formula describes how data should be distributed when there are multiple writable partitions configured for historical storage. For more details, see Schemas.
In addition to the MergeIntradayData
class, there is also a RemergeData
class. It provides similar functionality, but merges data from existing historical partitions for one namespace and table into new historical partitions for a different namespace and/or table.
During merge processing, the source data is read, and an Deephaven byExternal
method is used to create a new sub-table of results for each partition based on the target table's partitioning formula. Then, for each partition sub-table, groups are aggregated and ordered from largest to smallest (number of rows in the group). Optional sorting is applied to rows within the groups, and the grouped data is then written out to disk. Note that columns used for grouping cannot be used for sorting.
Arguments and Property Settings
The MergeIntradayData
class (com.illumon.iris.importers.MergeIntradayData
) takes the following arguments:
Required (first three arguments, in this order):
<namespace> <tableName> <partitioning column value>
Optional (by name, after the three required arguments):
sortColumn=<column name or formula to sort by>
tableDef=<absolute path to table definition file>
lowHeapUsageMode=<true or false>
force=<true or false>
Parameter |
Description |
---|---|
|
(required) Namespace in which to find the table whose data is to be merged. |
|
(required) Name of the table whose data is to be merged. |
|
(required) This is typically a date string but could be some other unique string value. It indicates what column partition will be merged by this merge operation. |
|
(optional) A formula by which to sort rows within a group. If not provided, rows within a group will be sorted based on the order in which they were written into the intraday partition. This column cannot be one of the grouping columns. |
|
(optional) If a different table definition is to be used for the merge operation, or the table definition is not accessible within the context in which the merge operation is being run, this value can provide a path to the definition file. |
|
(optional) Reduces memory consumption of the merge process at the expense of throughput. Restricts most activities to single-threaded, one-by-one, and limits expansion of indexes in memory. |
|
(optional) By default, a merge will fail if any historical partition already exists for the data to be merged. |
The RemergeData
class (com.illumon.iris.importers.RemergeData
) main takes the following arguments:
Required (first five arguments, in this order):
<sourceNamespace> <sourceTableName> <namespace> <tableName> <partitioning column value>
Optional (by name, after the five required arguments):
sortColumn=<column name or formula to sort by>
tableDef=<absolute path to table definition file>
lowHeapUsageMode=<true or false>
force=<true or false>
Parameter |
Description |
---|---|
|
(required) Namespace in which to find the table whose data is to be re-merged. |
|
(required) Name of the table whose data is to be re-merged. |
|
(required) Namespace in which to find the table into which to re-merge the data. |
|
(required) Name of the table into which to re-merge the data. |
|
(required) This is typically a date string but could be some other unique string value. It indicates what column partition will be merged by this merge operation. |
|
(optional) A formula by which to sort rows within a group. If not provided, rows within a group will be sorted based on the order in which they were written into the intraday partition. This column cannot be one of the grouping columns. |
|
(optional) If a different table definition is to be used for the merge operation, or the table definition is not accessible within the context in which the merge operation is being run, this value can provide a path to the definition file. |
|
(optional) Reduces memory consumption of the merge process at the expense of throughput. Restricts most activities to single-threaded, one-by-one, and limits expansion of indexes in memory. |
|
(optional) By default, a merge will fail if any historical partition already exists for the data to be merged. |
In addition to these arguments, both classes will use the property iris.concurrentWriteThreads
to control parallelization of processing, executing steps that allow for parallel processing via a thread pool whose maximum capacity nConcurrentThreads
is defined by the value of that property.
The initial execution of byExternal
, to partition the results, is single threaded. Subsequent steps may be parallelized depending on nConcurrentThreads
, the value of lowHeapUsageMode
, and the number of destination partitions.
For each destination partition, the output ordering must be determined by grouping and sorting the input data that will be written to that partition. Following this, the data must be written according to said ordering.
In lowHeapUsageMode
, only one destination partition is processed at a time. For each partition, the ordering is determined in a single thread, and then the thread pool is used to execute writing jobs for each column of the output table.
Otherwise (if lowHeapUsageMode
is false), all destination partitions are processed in parallel, to the limit imposed by nConcurrentThreads
. The ordering jobs for the destination partitions are executed via the thread pool, as are the writing jobs for each output column of each destination partition.
Minimum heap usage for the merge process would be with iris.concurrentWriteThreads = 1
and lowHeapUsageMode = true
. The other extreme of tuning would be lowHeapUsageMode = false
and concurrentWriteThreads
set high enough that all output partitions and column writes could be processes in parallel. This "maximum throughput" configuration can easily consume large amounts of memory and dominate I/O channels. Configurations that move towards maximum throughput should be evaluated carefully.
Last Updated: 20 August 2019 09:54 -06:00 UTC Deephaven v.1.20180917
Deephaven Documentation Copyright 2016-2018 Deephaven Data Labs, LLC All Rights Reserved