Class KafkaIngester
@ScriptApi public class KafkaIngester extends Object
The KafkaIngester is designed to work within a Live Query - Merge Server that has already created a
DataImportServer
. Each KafkaIngester is assigned a topic and a subset of Kafka partitions. Each Kafka
partition is mapped to a Deephaven internal partition. The column partition can be set through the constructor,
or defaults to DBTimeUtils.currentDateNy()
.
Automatic partition assignment and rebalancing are not supported, as the Deephaven checkpoint records store the Kafka partition and offset information on disk. Each Kafka ingester instance must uniquely control its checkpoint record, which is incompatible with rebalancing.
If no Deephaven checkpoint record exists for the partition, then an attempt is made to restore the most recently committed offset from the Kafka broker for the given Kafka partition.
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
KafkaIngester.PartitionRange
A predicate for handling a range of partitions.static class
KafkaIngester.PartitionRoundRobin
A predicate for evenly distributing partitions among a set of ingesters.static class
KafkaIngester.SinglePartition
A predicate for handling a single partition. -
Field Summary
Fields Modifier and Type Field Description static IntPredicate
ALL_PARTITIONS
Constant predicate that returns true for all partitions. -
Constructor Summary
Constructors Constructor Description KafkaIngester(com.fishlib.io.logger.Logger log, DataImportServer dataImportServer, Properties props, String namespace, String tableName, String topic, String columnPartition, Function<TableWriter,ConsumerRecordToTableWriterAdapter> adapterFactory)
Creates a Kafka ingester for all partitions of a given topic.KafkaIngester(com.fishlib.io.logger.Logger log, DataImportServer dataImportServer, Properties props, String namespace, String tableName, String topic, Function<TableWriter,ConsumerRecordToTableWriterAdapter> adapterFactory)
Creates a Kafka ingester for all partitions of a given topic and a Deephaven column partition ofDBTimeUtils.currentDateNy()
.KafkaIngester(com.fishlib.io.logger.Logger log, DataImportServer dataImportServer, Properties props, String namespace, String tableName, String topic, IntPredicate partitionFilter, String columnPartition, Function<TableWriter,ConsumerRecordToTableWriterAdapter> adapterFactory)
Creates a Kafka ingester for the given topic.KafkaIngester(com.fishlib.io.logger.Logger log, DataImportServer dataImportServer, Properties props, String namespace, String tableName, String topic, IntPredicate partitionFilter, String columnPartition, Function<TableWriter,ConsumerRecordToTableWriterAdapter> adapterFactory, UnaryOperator<String> resumeFrom)
Creates a Kafka ingester for the given topic.KafkaIngester(com.fishlib.io.logger.Logger log, DataImportServer dataImportServer, Properties props, String namespace, String tableName, String topic, IntPredicate partitionFilter, Function<TableWriter,ConsumerRecordToTableWriterAdapter> adapterFactory)
Creates a Kafka ingester for a given topic with a Deephaven column partition ofDBTimeUtils.currentDateNy()
. -
Method Summary
Modifier and Type Method Description void
start()
Starts a consumer thread which replicates the consumed Kafka messages to Deephaven.
-
Field Details
-
ALL_PARTITIONS
Constant predicate that returns true for all partitions. This is the default, each and every partition that exists will be handled by the same ingester. Because Kafka consumers are inherently single threaded, to scale beyond what a single consumer can handle, you must create multiple consumers each with a subset of partitions usingKafkaIngester.PartitionRange
,KafkaIngester.PartitionRoundRobin
,KafkaIngester.SinglePartition
or a customIntPredicate
.
-
-
Constructor Details
-
KafkaIngester
public KafkaIngester(com.fishlib.io.logger.Logger log, DataImportServer dataImportServer, Properties props, String namespace, String tableName, String topic, Function<TableWriter,ConsumerRecordToTableWriterAdapter> adapterFactory)Creates a Kafka ingester for all partitions of a given topic and a Deephaven column partition ofDBTimeUtils.currentDateNy()
.- Parameters:
log
- a log for outputdataImportServer
- the dataimport server instance to attach our stream processor toprops
- the properties used to create theKafkaConsumer
namespace
- the namespace of the table to replicate totableName
- the name of the table to replicate totopic
- the topic to replicateadapterFactory
- a function from theTableWriter
for an internal partition to a suitableConsumerRecordToTableWriterAdapter
class that handles records produced by the Kafka topic and transforms them into Deephaven rows.
-
KafkaIngester
public KafkaIngester(com.fishlib.io.logger.Logger log, DataImportServer dataImportServer, Properties props, String namespace, String tableName, String topic, String columnPartition, Function<TableWriter,ConsumerRecordToTableWriterAdapter> adapterFactory)Creates a Kafka ingester for all partitions of a given topic.- Parameters:
log
- a log for outputdataImportServer
- the dataimport server instance to attach our stream processor toprops
- the properties used to create theKafkaConsumer
namespace
- the namespace of the table to replicate totableName
- the name of the table to replicate totopic
- the topic to replicatecolumnPartition
- the column partition that we will replicate toadapterFactory
- a function from theTableWriter
for an internal partition to a suitableConsumerRecordToTableWriterAdapter
class that handles records produced by the Kafka topic and transforms them into Deephaven rows.
-
KafkaIngester
public KafkaIngester(com.fishlib.io.logger.Logger log, DataImportServer dataImportServer, Properties props, String namespace, String tableName, String topic, IntPredicate partitionFilter, Function<TableWriter,ConsumerRecordToTableWriterAdapter> adapterFactory)Creates a Kafka ingester for a given topic with a Deephaven column partition ofDBTimeUtils.currentDateNy()
.- Parameters:
log
- a log for outputdataImportServer
- the dataimport server instance to attach our stream processor toprops
- the properties used to create theKafkaConsumer
namespace
- the namespace of the table to replicate totableName
- the name of the table to replicate totopic
- the topic to replicatepartitionFilter
- a predicate indicating which partitions we should replicateadapterFactory
- a function from theTableWriter
for an internal partition to a suitableConsumerRecordToTableWriterAdapter
class that handles records produced by the Kafka topic and transforms them into Deephaven rows.
-
KafkaIngester
public KafkaIngester(com.fishlib.io.logger.Logger log, DataImportServer dataImportServer, Properties props, String namespace, String tableName, String topic, IntPredicate partitionFilter, String columnPartition, Function<TableWriter,ConsumerRecordToTableWriterAdapter> adapterFactory)Creates a Kafka ingester for the given topic.- Parameters:
log
- a log for outputdataImportServer
- the dataimport server instance to attach our stream processor toprops
- the properties used to create theKafkaConsumer
namespace
- the namespace of the table to replicate totableName
- the name of the table to replicate totopic
- the topic to replicatepartitionFilter
- a predicate indicating which partitions we should replicatecolumnPartition
- the column partition that we will replicate toadapterFactory
- a function from theTableWriter
for an internal partition to a suitableConsumerRecordToTableWriterAdapter
class that handles records produced by the Kafka topic and transforms them into Deephaven rows.
-
KafkaIngester
public KafkaIngester(com.fishlib.io.logger.Logger log, DataImportServer dataImportServer, Properties props, String namespace, String tableName, String topic, IntPredicate partitionFilter, String columnPartition, Function<TableWriter,ConsumerRecordToTableWriterAdapter> adapterFactory, UnaryOperator<String> resumeFrom)Creates a Kafka ingester for the given topic.- Parameters:
log
- a log for outputdataImportServer
- the dataimport server instance to attach our stream processor toprops
- the properties used to create theKafkaConsumer
namespace
- the namespace of the table to replicate totableName
- the name of the table to replicate totopic
- the topic to replicatepartitionFilter
- a predicate indicating which partitions we should replicatecolumnPartition
- the column partition that we will replicate toadapterFactory
- a function from theTableWriter
for an internal partition to a suitableConsumerRecordToTableWriterAdapter
class that handles records produced by the Kafka topic and transforms them into Deephaven rows.resumeFrom
- Given a column partition value, determine the prior column partition that we should read a checkpoint record to resume from if we do not have our own checkpoint record.
-
-
Method Details
-
start
Starts a consumer thread which replicates the consumed Kafka messages to Deephaven.This method must not be called more than once on an ingester instance.
-