Merging Data

Both batch and streaming data is generally put in intraday tables when imported. Once imported, merge functionality will be used on a day-to-day basis to move data from the intraday database to the historical database. See the Deephaven Data Lifecycle for further details. Merge functionality is provided by several scripts, classes, and the "Data Merge" persistent query type.

Merging is normally performed for persistent data on a nightly basis. Similar to imports, merges may be performed by Persistent Query jobs ("Data Merge Job"), from the command line, or scripted for maximum flexibility.

Merging is closely related to Validation. After each merge operation, it is useful to verify the data quality and, optionally, delete the intraday source data.

Data Merge Query

When the persistent query type "Data Merge" is selected, the Persistent Query Configuration Editor window shows the following options:

This is similar to the options presented in the previous CSVImport and Import - JDBC configuration types window except for the Merge Settings tab.

Note: Data Merges for multi-partition imports cannot be run from a persistent query at present. This must be done from the command line.

Clicking the Merge Settings tab presents a panel with the options pertaining to merging data to a file already in Deephaven:

Merge Settings

  • Namespace:  This is the namespace for the data being merged.
  • Table:  This is the table for the data being merged.
  • Partition Value Formula:  This is the formula needed to partition the data being merged. If a specific partition value is used it will need to be surrounded by quotes. In most cases, the previous day's data will be merged. For example:
    • com.illumon.util.calendar.Calendars.calendar("USNYSE").previousDay(1) - merge the previous day's data based on the USNYSE calendar
    • "2017-01-01" - merge the data for the date "2017-01-01" (the quotes are required)
  • Table Data Service Configuration:  Specifies how input data to merge should be found and read. The drop-down menu is populated from the Data Routing Configuration YAML file and contains all of the items from the tableDataService section and the dataImportServer section, unless at least one has the "merge" tag. If any merge tag is present, then only those items with that tag are included.
  • Sort Column Formula:  An optional formula to sort on after applying column groupings. If no formula is supplied, data will be in source order except where reordered by grouping.
  • Format: Specifies the format in which the data should be written.
    • Default: use the default merge format from the schema. If none is defined, use Deephaven.
    • Deephaven: write the data in Deephaven format.
    • Parquet: write the data in Parquet format.
  • Codec: For a Parquet merge, it specifies the compression codec to be used.
    • DEFAULT: use the default codec (either from the schema, or if none is defined there, SNAPPY).
    • <Other values>: use the chosen codec. See Parquet Codecs for further details.
  • Low Heap Mode: Whether to run in low heap usage mode, which makes trade-offs to minimize the RAM required for the merge JVM's heap, rather than maximizing throughput
  • Force: Whether to allow overwrite of existing merged data
  • Thread Pool Size: The number of concurrent threads to use when computing output order (i.e. grouping and sorting) or transferring data from input partitions to output partitions. More threads are not always better, especially if there is significant reordering of input data which may degrade read cache efficiency.

Once you have configured all the settings needed for your query, click the OK button in the lower right corner of the Persistent Query Configuration Editor window. 

For batch data, a Data Merge query will typically have a dependency on an import query configured in the Scheduling tab. For streamed data a merge query may simply run at a specified time.

Data Merge queries can also be created from an import query. After creating an import query with the Schema Editor, you will be prompted to create the corresponding merge. If you have an import query without a corresponding merge, an option to Create Merge Query will be available in the Query Config panel's context (right-click) menu.

Parquet Codecs

Parquet allows the use of various compression codecs. Supported codecs include UNCOMPRESSED and SNAPPY.

Merge from Script

The most flexible method for merging data into Deephaven is via Groovy/Python scripting. Data Merge scripts may be deployed via the command line or through a Batch Query (RunAndDone) persistent query. Merge tasks are most easily executed from a script by using a "builder" class. The underlying merge logic is identical to that accessible from the command line tools and persistent queries.

Command Line Execution

To run a script from the command line, use the iris_exec program as follows, where ./script.groovy is a local script file to execute on the server:

iris_exec run_local_script -- -s ./script.groovy -h <merge server host> -p <merge server port>

See Running Local Script for more details on options.

Note: Merging data requires the appropriate write permissions. This means these scripts should be run on a "merge server", not a "query server". Since the latter is the default in the local script runner, you must specify an alternate host and/or port to reference a merge server. See the -queryHost and -queryPort options to run_local_script.

Example

The following example shows how to execute a merge from a Groovy or Python script. Note: there are many options not illustrated here. Please refer to Merge API Reference for all the variations available.

from deephaven import *

MergeData.Builder(db, "Test", "MyTable")\
    .setPartitionColumnValue(partition)\
    .build()\
    .run()

print("Done merging!")
import com.illumon.iris.importers.util.MergeData
import com.illumon.iris.importers.ImportOutputMode

new MergeData.Builder(db,"Test", "MyTable")
    .setPartitionColumnValue(partition)
    .build()
    .run()

println "Done merging!"

This script assumes the partition is set via a command line argument and might be executed from the command line for the 2018-05-01 partition as: (Note: The port is specified to connect to the appropriate Deephaven server for merge operations.)

iris_exec run_local_script -- -s ~/myscripts/merge_single.groovy -p 30002 partition "2018-05-01"

Merge API Reference

The Merge API is fairly simple, and analogous to the Import API. There is a single MergeData class with a builder method that returns a MergeDataBuilder object. MergeDataBuilder will produce a MergeData object when build() is called. Note that this MergeData class is in the com.illumon.iris.importers.util package (there is another MergeData class in the com.illumon.iris.importers package that is not intended for use from a script). Then the merge is executed via calling the run() method on the MergeData object. is used to build the merge. The general pattern for performing a merge is:

MergeData.Builder(db,<namespace>,<table>)
 .set<param>(<param value>)
  ...
  .build()
  .run()

Merge Options

Option Setter

Type

Req?

Default

Description

setPartitionColumnValue

String

No*

N/A

A literal string used to select the column partition to merge. Often a date, e.g.. "2018-05-01"

setPartitionColumnFormula

String

No*

N/A

An expression that will be evaluated to specify the partition to merge. For example currentDateNy()

setThreadPoolSize

int

No

4

The maximum number of parallel threads to use during the merge process

setLowHeapUsage

boolean

No

false

Whether to prioritize heap conservation over throughput

setForce

boolean

No

false

Whether to force merge when destination(s) already have data

setAllowEmptyInput

boolean

No

true

Whether to allow merge to proceed if the input data is empty

setSortColumnFormula

String

No

N/A

Formula to apply for sorting, post-grouping. For example, to sort on Timestamp, binned by second:
upperBin(Timestamp, 1000000000L)

setStorageFormat

String

No

N/A

Sets the merge format. Refer to the defaultMergeFormat in the schemas for valid values.

setCodecName

String

No

N/A

For Parquet, sets the compression codec. This can be a valid Parquet codec, such as UNCOMPRESSED. If this isn't used, then the default codec from the schema is used, or SNAPPY if one isn't defined in the schema.

* Either a partition column value or formula must be specified (not both).

Data Merging Classes

Note: It is recommended that merges should be executed by Persistent Query jobs or by scripting, not by directly calling these classes. This information is provided primarily for administrators who want an in-depth understanding of the merge processing.

The MergeIntradayData class is used to copy and reorganize data from intraday to historical storage.

This class is primarily used by merge queries created in the Query Configuration Editor or by the Schema Editor in the Deephaven UI.  However, it can also be used by the XML-driven import and merge scripts. Merge queries are the preferred method to use the MergeIntradayData class, but it can also be called directly, used from custom scripts, or called from an application. This section details its functionality and arguments.

Intraday data is written into column files in the order in which it arrives. As such, there is no grouping of data, and the only sorting is by arrival time of the events. Since intraday data is typically used for data that is still receiving updates, the main priorities are around low latency in appending to the table and delivering those updates to running queries. Historical data does not receive updates, so the priority there is around organizing data for more efficient querying.

Functionality of the MergeIntradayData class

The MergeIntradayData class reads intraday data for a specified partition (usually a date) and writes it into historical storage. During this process, the class can group the data based on grouping columns specified in the schema, sort data based on an argument passed to the class and distribute data across one or more writable partitions.

Note: These writable partitions are different from the column partition indicated by the partitioning column in the schema. Writable partitions are storage partitions set up to allow sharding of historical data for faster querying and potentially to provide for larger total storage capacity. See:  Historical Partitioning.

When sharing data, the MergeIntradayData class uses the partitioning keyFormula defined in the table's schema. This formula describes how data should be distributed when there are multiple writable partitions configured for historical storage. For more details see Schemas.

In addition to the MergeIntradayData class, there is also a RemergeData class. It provides similar functionality, but merges data from existing historical partitions for one namespace and table into new historical partitions for a different namespace and/or table.

During merge processing, the source data is read, and a Deephaven byExternal method is used to create a new sub-table of results for each partition based on the target table's partitioning formula. Then, for each partition sub-table, groups are aggregated and ordered from largest to smallest (number of rows in the group). Optional sorting is applied to rows within the groups, and the grouped data is then written out to disk. Note that columns used for grouping cannot be used for sorting.

Arguments and Property Settings

The MergeIntradayData class (com.illumon.iris.importers.MergeIntradayData) main takes the following arguments:

Required (first three arguments, in this order):

<namespace> <tableName> <partitioning column value>

Optional (by name, after the three required arguments):

sortColumn=<column name or formula to sort by>
tableDef=<absolute path to table definition file>
lowHeapUsageMode=<true or false>
force=<true or false>

Parameter

Description

namespace

(required) Namespace in which to find the table whose data is to be merged.

tableName

(required) Name of the table whose data is to be merged.

partitioning column value

(required) This is typically a date string but could be some other unique string value. It indicates what column partition will be merged by this merge operation.

sortColumn

(optional) A formula by which to sort rows within a group. If not provided, rows within a group will be sorted based on the order in which they were written into the intraday partition. This column cannot be one of the grouping columns.

tableDef

(optional) If a different table definition is to be used for the merge operation, or the table definition is not accessible within the context in which the merge operation is being run, this value can provide a path to the definition file.

lowHeapUsage

(optional) Reduces memory consumption of the merge process at the expense of throughput. Restricts most activities to single-threaded, one-by-one, and limits expansion of indexes in memory.

force

(optional) By default, a merge will fail if any historical partition already exists for the data to be merged. Force will cause merge to overwrite the existing destination(s).

mergeFormat

Sets the merge format. Refer to the defaultMergeFormat in the schemas for valid values.

parquetCodec

For Parquet, sets the compression codec. This can be a valid Parquet codec, such as UNCOMPRESSED. If this isn't used, then the default codec from the schema is used, or SNAPPY if one isn't defined in the schema.

The RemergeData class (com.illumon.iris.importers.RemergeData) main takes the following arguments:

Required (first five arguments, in this order):

<sourceNamespace> <sourceTableName> <namespace> <tableName> <partitioning column value>

Optional (by name, after the five required arguments):

sortColumn=<column name or formula to sort by>
tableDef=<absolute path to table definition file>
lowHeapUsageMode=<true or false>
force=<true or false>

Parameter

Description

sourceNamespace

(required) Namespace in which to find the table whose data is to be re-merged.

sourceTableName

(required) Name of the table whose data is to be re-merged.

namespace

(required) Namespace in which to find the table into which to re-merge the data.

tableName

(required) Name of the table into which to re-merge the data.

partitioning column value

(required) This is typically a date string but could be some other unique string value. It indicates what column partition will be merged by this merge operation.

sortColumn

(optional) A formula by which to sort rows within a group. If not provided, rows within a group will be sorted based on the order in which they were written into the intraday partition. This column cannot be one of the grouping columns.

tableDef

(optional) If a different table definition is to be used for the merge operation, or the table definition is not accessible within the context in which the merge operation is being run, this value can provide a path to the definition file.

lowHeapUsageMode

(optional) Reduces memory consumption of the merge process at the expense of throughput. Restricts most activities to single-threaded, one-by-one, and limits expansion of indexes in memory.

force

(optional) By default, a merge will fail if any historical partition already exists for the data to be merged. Force will cause merge to overwrite the existing destination(s).

In addition to these arguments, both classes will use the property iris.concurrentWriteThreads to control parallelization of processing, executing steps that allow for parallel processing via a thread pool whose maximum capacity nConcurrentThreads is defined by the value of that property.

The initial execution of byExternal, to partition the results, is single threaded. Subsequent steps may be parallelized depending on nConcurrentThreads, the value of lowHeapUsageMode, and the number of destination partitions.

For each destination partition, the output ordering must be determined by grouping and sorting the input data that will be written to that partition. Following this, the data must be written according to said ordering.

In lowHeapUsageMode, only one destination partition is processed at a time. For each partition, the ordering is determined in a single thread, and then the thread pool is used to execute writing jobs for each column of the output table.

Otherwise (if lowHeapUsageMode is false), all destination partitions are processed in parallel, to the limit imposed by nConcurrentThreads. The ordering jobs for the destination partitions are executed via the thread pool, as are the writing jobs for each output column of each destination partition.

Minimum heap usage for the merge process would be with iris.concurrentWriteThreads = 1 and lowHeapUsageMode = true. The other extreme of tuning would be lowHeapUsageMode = false and concurrentWriteThreads set high enough that all output partitions and column writes could be processes in parallel. This "maximum throughput" configuration can easily consume large amounts of memory and dominate I/O channels. Configurations that move towards maximum throughput should be evaluated carefully.

Merge Heap Calculations and Merge Throughput Optimizations

For merge optimization, the following considerations matter:

  • Total available heap
  • Data buffer cache
  • Number of concurrent threads
  • Low heap usage mode (or, alternatively, maximum throughput mode)
  • Number of output partitions, and their relative performance

A good rule of thumb for determining the best value for your merge heap size is to double the value of the data buffer pool size.

buffer pool size = max_over_all_columns(column file data size) * nWritingThreads / nOutputPartitons / 0.85

heap size = 2*buffer pool size

You will need to give your process enough heap to accommodate the requisite data buffer pool. (See Data Buffer Pool Configuration.) The data buffer pool needs to be able to fit the number of columns you will write concurrently.

See also: Merge Optimization


Last Updated: 16 February 2021 18:06 -04:00 UTC    Deephaven v.1.20200928  (See other versions)

Deephaven Documentation     Copyright 2016-2020  Deephaven Data Labs, LLC     All Rights Reserved