Package io.deephaven.kafka.ingest
Class SimpleConsumerRecordToTableWriterAdapter
java.lang.Object
io.deephaven.kafka.ingest.SimpleConsumerRecordToTableWriterAdapter
- All Implemented Interfaces:
ConsumerRecordToTableWriterAdapter
public class SimpleConsumerRecordToTableWriterAdapter
extends Object
implements ConsumerRecordToTableWriterAdapter
An adapter that maps keys and values to single Deephaven columns. Each Kafka record produces one Deephaven row.
-
Method Summary
Modifier and TypeMethodDescriptionvoid
consumeRecord
(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?> record, DBDateTime recvTime) Consume a Kafka record, producing zero or more rows in the output.makeFactory
(String kafkaPartitionColumnName, String offsetColumnName, String timestampColumnName, String keyColumnName, String valueColumnName) makeFactory
(String kafkaPartitionColumnName, String offsetColumnName, String timestampColumnName, String recvTimeColumnName, String keyColumnName, String valueColumnName) makeFactory
(String kafkaPartitionColumnName, String offsetColumnName, String timestampColumnName, String recvTimeColumnName, String keyColumnName, String valueColumnName, Predicate<org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>> recordPredicate) Create aConsumerRecordToTableWriterAdapter
that maps simple keys and values to single columns in a Deephaven table.makeFactory
(String kafkaPartitionColumnName, String offsetColumnName, String timestampColumnName, String keyColumnName, String valueColumnName, Predicate<org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>> recordPredicate) Create aConsumerRecordToTableWriterAdapter
that maps simple keys and values to single columns in a Deephaven table.Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface io.deephaven.kafka.ingest.ConsumerRecordToTableWriterAdapter
close, consumeRecord, defaultConsumeRecord, setAsyncErrorHandler, setAsyncStatusUpdater
-
Method Details
-
makeFactory
public static Function<TableWriter,ConsumerRecordToTableWriterAdapter> makeFactory(String kafkaPartitionColumnName, String offsetColumnName, String timestampColumnName, String keyColumnName, @NotNull String valueColumnName) Delegates tomakeFactory(String, String, String, String, String, String, Predicate)
. All records will be ingested.- Parameters:
kafkaPartitionColumnName
- the name of the Integer column representing the Kafka partition, if null the partition is not mapped to a Deephaven columnoffsetColumnName
- the name of the Long column representing the Kafka offset, if null the offset is not mapped to a Deephaven columntimestampColumnName
- the name of the DateTime column representing the Kafka partition, if null the partition is not mapped to a Deephaven columnkeyColumnName
- the name of the Deephaven column for the record's keyvalueColumnName
- the name of the Deephaven column for the record's value- Returns:
- an adapter for the TableWriter
-
makeFactory
public static Function<TableWriter,ConsumerRecordToTableWriterAdapter> makeFactory(String kafkaPartitionColumnName, String offsetColumnName, String timestampColumnName, String recvTimeColumnName, String keyColumnName, @NotNull String valueColumnName) Delegates tomakeFactory(String, String, String, String, String, String, Predicate)
. All records will be ingested.- Parameters:
kafkaPartitionColumnName
- the name of the Integer column representing the Kafka partition, if null the partition is not mapped to a Deephaven columnoffsetColumnName
- the name of the Long column representing the Kafka offset, if null the offset is not mapped to a Deephaven columntimestampColumnName
- the name of the DateTime column representing the Kafka partition, if null the partition is not mapped to a Deephaven columnrecvTimeColumnName
- the name of the DateTime column for when the record was received by the ingester.keyColumnName
- the name of the Deephaven column for the record's keyvalueColumnName
- the name of the Deephaven column for the record's value- Returns:
- an adapter for the TableWriter
-
makeFactory
public static Function<TableWriter,ConsumerRecordToTableWriterAdapter> makeFactory(String kafkaPartitionColumnName, String offsetColumnName, String timestampColumnName, String keyColumnName, @NotNull String valueColumnName, @NotNull Predicate<org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>> recordPredicate) Create aConsumerRecordToTableWriterAdapter
that maps simple keys and values to single columns in a Deephaven table. Each Kafka record becomes a row in the table's output.- Parameters:
kafkaPartitionColumnName
- the name of the Integer column representing the Kafka partition, if null the partition is not mapped to a Deephaven columnoffsetColumnName
- the name of the Long column representing the Kafka offset, if null the offset is not mapped to a Deephaven columntimestampColumnName
- the name of the DateTime column representing the Kafka partition, if null the partition is not mapped to a Deephaven columnkeyColumnName
- the name of the Deephaven column for the record's keyvalueColumnName
- the name of the Deephaven column for the record's valuerecordPredicate
- returns true if the record should be ingested, otherwise false- Returns:
- an adapter for the TableWriter
-
makeFactory
public static Function<TableWriter,ConsumerRecordToTableWriterAdapter> makeFactory(String kafkaPartitionColumnName, String offsetColumnName, String timestampColumnName, String recvTimeColumnName, String keyColumnName, @NotNull String valueColumnName, @Nullable Predicate<org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>> recordPredicate) Create aConsumerRecordToTableWriterAdapter
that maps simple keys and values to single columns in a Deephaven table. Each Kafka record becomes a row in the table's output.- Parameters:
kafkaPartitionColumnName
- the name of the Integer column representing the Kafka partition, if null the partition is not mapped to a Deephaven columnoffsetColumnName
- the name of the Long column representing the Kafka offset, if null the offset is not mapped to a Deephaven columntimestampColumnName
- the name of the DateTime column representing the Kafka partition, if null the partition is not mapped to a Deephaven columnrecvTimeColumnName
- the name of the DateTime column for when the record was received by the ingester.keyColumnName
- the name of the Deephaven column for the record's keyvalueColumnName
- the name of the Deephaven column for the record's valuerecordPredicate
- returns true if the record should be ingested, otherwise false- Returns:
- an adapter for the TableWriter
-
consumeRecord
public void consumeRecord(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?> record, DBDateTime recvTime) throws IOExceptionDescription copied from interface:ConsumerRecordToTableWriterAdapter
Consume a Kafka record, producing zero or more rows in the output.- Specified by:
consumeRecord
in interfaceConsumerRecordToTableWriterAdapter
- Parameters:
record
- the record received fromKafkaConsumer.poll(Duration)
.recvTime
- the time the record was received fromKafkaConsumer.poll(Duration)
.- Throws:
IOException
- if there was an error writing to the output table
-