Class SimpleConsumerRecordToTableWriterAdapter

java.lang.Object
io.deephaven.kafka.ingest.SimpleConsumerRecordToTableWriterAdapter
All Implemented Interfaces:
ConsumerRecordToTableWriterAdapter

public class SimpleConsumerRecordToTableWriterAdapter
extends Object
implements ConsumerRecordToTableWriterAdapter
An adapter that maps keys and values to single Deephaven columns. Each Kafka record produces one Deephaven row.
  • Method Details

    • makeFactory

      public static Function<TableWriter,​ConsumerRecordToTableWriterAdapter> makeFactory​(String kafkaPartitionColumnName, String offsetColumnName, String timestampColumnName, String keyColumnName, @NotNull String valueColumnName)
      Create a ConsumerRecordToTableWriterAdapter that maps simple keys and values to single columns in a Deephaven table. Each Kafka record becomes a row in the table's output.
      Parameters:
      kafkaPartitionColumnName - the name of the Integer column representing the Kafka partition, if null the partition is not mapped to a Deephaven column
      offsetColumnName - the name of the Long column representing the Kafka offset, if null the offset is not mapped to a Deephaven column
      timestampColumnName - the name of the DateTime column representing the Kafka partition, if null the partition is not mapped to a Deephaven column
      keyColumnName - the name of the Deephaven column for the record's key
      valueColumnName - the name of the Deephaven column for the record's value
      Returns:
      an adapter for the TableWriter
    • consumeRecord

      public void consumeRecord​(org.apache.kafka.clients.consumer.ConsumerRecord<?,​?> record) throws IOException
      Description copied from interface: ConsumerRecordToTableWriterAdapter
      Consume a Kafka record, producing zero or more rows in the output.
      Specified by:
      consumeRecord in interface ConsumerRecordToTableWriterAdapter
      Parameters:
      record - the record received from KafkaConsumer.poll(Duration).
      Throws:
      IOException - if there was an error writing to the output table