Package io.deephaven.kafka
Class KafkaTools
java.lang.Object
io.deephaven.kafka.KafkaTools
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic classstatic interfaceA callback which is invoked from the consumer loop, enabling clients to inject logic to be invoked by the Kafka consumer thread.static interfaceDetermines the initial offset to seek to for a given KafkaConsumer and TopicPartition.static enumEnum to specify operations that may apply to either of Kafka KEY or VALUE fields.static classstatic interfacestatic classstatic interfacestatic interfaceMarker interface forStreamConsumerregistrar provider objects.static interfaceType for the resultTablereturned by kafka consumers. -
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final IntPredicatestatic final IntToLongFunctionstatic final IntToLongFunctionstatic final IntToLongFunctionstatic final Stringstatic final Stringstatic final Stringstatic final Stringstatic final Stringstatic final Stringstatic final Stringstatic final Stringstatic final longstatic final Stringstatic final Stringstatic final Stringstatic final Stringstatic final KafkaTools.Consume.KeyOrValueSpecThe names for the key or value columns can be provided in the properties as "deephaven.key.column.name" or "deephaven.value.column.name", and otherwise default to "KafkaKey" or "KafkaValue".static final Stringstatic final Stringstatic final Stringstatic final Stringstatic final Stringstatic final Stringstatic final Stringstatic final Stringstatic final Stringstatic final Stringstatic final Stringstatic final Stringstatic final Stringstatic final Stringstatic final Stringstatic final Stringstatic final Stringstatic final Stringstatic final longstatic final longstatic final Stringstatic final Stringstatic final Stringstatic final Stringstatic final Stringstatic final Stringstatic final Stringstatic final Stringstatic final Stringstatic final Stringstatic final Stringstatic final String -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionstatic voidavroSchemaToColumnDefinitions(List<ColumnDefinition<?>> columnsOut, Map<String, String> fieldPathToColumnNameOut, org.apache.avro.Schema schema, Function<String, String> requestedFieldPathToColumnName) static voidavroSchemaToColumnDefinitions(List<ColumnDefinition<?>> columnsOut, Map<String, String> fieldPathToColumnNameOut, org.apache.avro.Schema schema, Function<String, String> requestedFieldPathToColumnName, boolean useUTF8Strings) static voidavroSchemaToColumnDefinitions(List<ColumnDefinition<?>> columnsOut, org.apache.avro.Schema schema) Convert an Avro schema to a list of column definitions, mapping every avro field to a column of the same name.static voidavroSchemaToColumnDefinitions(List<ColumnDefinition<?>> columnsOut, org.apache.avro.Schema schema, Function<String, String> requestedFieldPathToColumnName) Convert an Avro schema to a list of column definitions.static org.apache.avro.SchemacolumnDefinitionsToAvroSchema(Table t, String schemaName, String namespace, Properties colProps, Predicate<String> includeOnly, Predicate<String> exclude, org.apache.commons.lang3.mutable.MutableObject<Properties> colPropsOut) static voidconsume(@NotNull Properties kafkaProperties, @NotNull String topic, @NotNull IntPredicate partitionFilter, @NotNull KafkaTools.InitialOffsetLookup partitionToInitialOffset, KafkaTools.Consume.KeyOrValueSpec keySpec, KafkaTools.Consume.KeyOrValueSpec valueSpec, @NotNull KafkaTools.StreamConsumerRegistrarProvider streamConsumerRegistrarProvider, @Nullable KafkaTools.ConsumerLoopCallback consumerLoopCallback) Consume from Kafka tostream consumerssupplied bystreamConsumerRegistrar.static PartitionedTableconsumeToPartitionedTable(@NotNull Properties kafkaProperties, @NotNull String topic, @NotNull IntPredicate partitionFilter, @NotNull IntToLongFunction partitionToInitialOffset, KafkaTools.Consume.KeyOrValueSpec keySpec, KafkaTools.Consume.KeyOrValueSpec valueSpec, @NotNull KafkaTools.TableType tableType) Consume from Kafka to a DeephavenPartitionedTablecontaining one constituentTableper partition.static TableconsumeToTable(@NotNull Properties kafkaProperties, @NotNull String topic, @NotNull IntPredicate partitionFilter, @NotNull IntToLongFunction partitionToInitialOffset, KafkaTools.Consume.KeyOrValueSpec keySpec, KafkaTools.Consume.KeyOrValueSpec valueSpec, @NotNull KafkaTools.TableType tableType) Consume from Kafka to a DeephavenTable.static KafkaTools.TableTypefriendlyNameToTableType(@NotNull String typeName) Map "Python-friendly" table type name to aKafkaTools.TableType.static org.apache.avro.SchemagetAvroSchema(String avroSchemaAsJsonString) Create an Avro schema object for a String containing a JSON encoded Avro schema definition.static TableDefinitiongetTableDefinition(@NotNull Properties kafkaProperties, KafkaTools.Consume.KeyOrValueSpec keySpec, KafkaTools.Consume.KeyOrValueSpec valueSpec) Construct aTableDefinitionbased on the input Properties andKafkaTools.Consume.KeyOrValueSpecparameters.static String[]listTopics(@NotNull Properties kafkaProperties) static IntPredicatepartitionFilterFromArray(int[] partitions) static IntToLongFunctionpartitionToOffsetFromParallelArrays(int[] partitions, long[] offsets) predicateFromSet(Set<String> set) static RunnableproduceFromTable(@NotNull Table table, @NotNull Properties kafkaProperties, @NotNull String topic, KafkaTools.Produce.KeyOrValueSpec keySpec, KafkaTools.Produce.KeyOrValueSpec valueSpec, boolean lastByKeyColumns) Produce a Kafka stream from a Deephaven table.static RunnableproduceFromTable(KafkaPublishOptions options) Produce a Kafka stream from a Deephaven table.topics(@NotNull Properties kafkaProperties)
-
Field Details
-
KAFKA_PARTITION_COLUMN_NAME_PROPERTY
- See Also:
-
KAFKA_PARTITION_COLUMN_NAME_DEFAULT
- See Also:
-
OFFSET_COLUMN_NAME_PROPERTY
- See Also:
-
OFFSET_COLUMN_NAME_DEFAULT
- See Also:
-
TIMESTAMP_COLUMN_NAME_PROPERTY
- See Also:
-
TIMESTAMP_COLUMN_NAME_DEFAULT
- See Also:
-
RECEIVE_TIME_COLUMN_NAME_PROPERTY
- See Also:
-
RECEIVE_TIME_COLUMN_NAME_DEFAULT
-
KEY_BYTES_COLUMN_NAME_PROPERTY
- See Also:
-
KEY_BYTES_COLUMN_NAME_DEFAULT
-
VALUE_BYTES_COLUMN_NAME_PROPERTY
- See Also:
-
VALUE_BYTES_COLUMN_NAME_DEFAULT
-
KEY_COLUMN_NAME_PROPERTY
- See Also:
-
KEY_COLUMN_NAME_DEFAULT
- See Also:
-
VALUE_COLUMN_NAME_PROPERTY
- See Also:
-
VALUE_COLUMN_NAME_DEFAULT
- See Also:
-
KEY_COLUMN_TYPE_PROPERTY
- See Also:
-
VALUE_COLUMN_TYPE_PROPERTY
- See Also:
-
SCHEMA_SERVER_PROPERTY
- See Also:
-
SHORT_DESERIALIZER
-
INT_DESERIALIZER
-
LONG_DESERIALIZER
-
FLOAT_DESERIALIZER
-
DOUBLE_DESERIALIZER
-
BYTE_ARRAY_DESERIALIZER
-
STRING_DESERIALIZER
-
BYTE_BUFFER_DESERIALIZER
-
AVRO_DESERIALIZER
-
DESERIALIZER_FOR_IGNORE
-
SHORT_SERIALIZER
-
INT_SERIALIZER
-
LONG_SERIALIZER
-
FLOAT_SERIALIZER
-
DOUBLE_SERIALIZER
-
BYTE_ARRAY_SERIALIZER
-
STRING_SERIALIZER
-
BYTE_BUFFER_SERIALIZER
-
AVRO_SERIALIZER
-
SERIALIZER_FOR_IGNORE
-
NESTED_FIELD_NAME_SEPARATOR
- See Also:
-
NESTED_FIELD_COLUMN_NAME_SEPARATOR
- See Also:
-
AVRO_LATEST_VERSION
- See Also:
-
SEEK_TO_BEGINNING
public static final long SEEK_TO_BEGINNING- See Also:
-
DONT_SEEK
public static final long DONT_SEEK- See Also:
-
SEEK_TO_END
public static final long SEEK_TO_END- See Also:
-
ALL_PARTITIONS
-
ALL_PARTITIONS_SEEK_TO_BEGINNING
-
ALL_PARTITIONS_DONT_SEEK
-
ALL_PARTITIONS_SEEK_TO_END
-
DIRECT_MAPPING
-
FROM_PROPERTIES
The names for the key or value columns can be provided in the properties as "deephaven.key.column.name" or "deephaven.value.column.name", and otherwise default to "KafkaKey" or "KafkaValue". The types for key or value are either specified in the properties as "deephaven.key.column.type" or "deephaven.value.column.type" or deduced from the serializer classes for "key.deserializer" or "value.deserializer" in the provided Properties object.
-
-
Constructor Details
-
KafkaTools
public KafkaTools()
-
-
Method Details
-
getAvroSchema
Create an Avro schema object for a String containing a JSON encoded Avro schema definition.- Parameters:
avroSchemaAsJsonString- The JSON Avro schema definition- Returns:
- an Avro schema object
-
columnDefinitionsToAvroSchema
public static org.apache.avro.Schema columnDefinitionsToAvroSchema(Table t, String schemaName, String namespace, Properties colProps, Predicate<String> includeOnly, Predicate<String> exclude, org.apache.commons.lang3.mutable.MutableObject<Properties> colPropsOut) -
avroSchemaToColumnDefinitions
-
avroSchemaToColumnDefinitions
-
avroSchemaToColumnDefinitions
public static void avroSchemaToColumnDefinitions(List<ColumnDefinition<?>> columnsOut, org.apache.avro.Schema schema, Function<String, String> requestedFieldPathToColumnName) Convert an Avro schema to a list of column definitions.- Parameters:
columnsOut- Column definitions for output; should be empty on entry.schema- Avro schemarequestedFieldPathToColumnName- An optional mapping to specify selection and naming of columns from Avro fields, or null for map all fields using field path for column name.
-
avroSchemaToColumnDefinitions
public static void avroSchemaToColumnDefinitions(List<ColumnDefinition<?>> columnsOut, org.apache.avro.Schema schema) Convert an Avro schema to a list of column definitions, mapping every avro field to a column of the same name.- Parameters:
columnsOut- Column definitions for output; should be empty on entry.schema- Avro schema
-
friendlyNameToTableType
@ScriptApi public static KafkaTools.TableType friendlyNameToTableType(@NotNull @NotNull String typeName) Map "Python-friendly" table type name to aKafkaTools.TableType. Supported values are:"blink""stream"(deprecated; use"blink")"append""ring:<capacity>"where capacity is a integer number specifying the maximum number of trailing rows to include in the result
- Parameters:
typeName- The friendly name- Returns:
- The mapped
KafkaTools.TableType
-
consumeToTable
public static Table consumeToTable(@NotNull @NotNull Properties kafkaProperties, @NotNull @NotNull String topic, @NotNull @NotNull IntPredicate partitionFilter, @NotNull @NotNull IntToLongFunction partitionToInitialOffset, @NotNull KafkaTools.Consume.KeyOrValueSpec keySpec, @NotNull KafkaTools.Consume.KeyOrValueSpec valueSpec, @NotNull @NotNull KafkaTools.TableType tableType) Consume from Kafka to a DeephavenTable.- Parameters:
kafkaProperties- Properties to configure the result and also to be passed to create the KafkaConsumertopic- Kafka topic namepartitionFilter- A predicate returning true for the partitions to consume. The convenience constantALL_PARTITIONSis defined to facilitate requesting all partitions.partitionToInitialOffset- A function specifying the desired initial offset for each partition consumedkeySpec- Conversion specification for Kafka record keysvalueSpec- Conversion specification for Kafka record valuestableType-KafkaTools.TableTypespecifying the type of the expected result- Returns:
- The result
Tablecontaining Kafka stream data formatted according totableType
-
consumeToPartitionedTable
public static PartitionedTable consumeToPartitionedTable(@NotNull @NotNull Properties kafkaProperties, @NotNull @NotNull String topic, @NotNull @NotNull IntPredicate partitionFilter, @NotNull @NotNull IntToLongFunction partitionToInitialOffset, @NotNull KafkaTools.Consume.KeyOrValueSpec keySpec, @NotNull KafkaTools.Consume.KeyOrValueSpec valueSpec, @NotNull @NotNull KafkaTools.TableType tableType) Consume from Kafka to a DeephavenPartitionedTablecontaining one constituentTableper partition.- Parameters:
kafkaProperties- Properties to configure the result and also to be passed to create the KafkaConsumertopic- Kafka topic namepartitionFilter- A predicate returning true for the partitions to consume. The convenience constantALL_PARTITIONSis defined to facilitate requesting all partitions.partitionToInitialOffset- A function specifying the desired initial offset for each partition consumedkeySpec- Conversion specification for Kafka record keysvalueSpec- Conversion specification for Kafka record valuestableType-KafkaTools.TableTypespecifying the type of the expected result's constituent tables- Returns:
- The result
PartitionedTablecontaining Kafka stream data formatted according totableType
-
getTableDefinition
public static TableDefinition getTableDefinition(@NotNull @NotNull Properties kafkaProperties, @NotNull KafkaTools.Consume.KeyOrValueSpec keySpec, @NotNull KafkaTools.Consume.KeyOrValueSpec valueSpec) Construct aTableDefinitionbased on the input Properties andKafkaTools.Consume.KeyOrValueSpecparameters. Given the same input Properties and Consume.KeyOrValueSpec parameters, the returned TableDefinition is the same as the TableDefinition of the table produced byconsumeToTable(Properties, String, IntPredicate, IntToLongFunction, Consume.KeyOrValueSpec, Consume.KeyOrValueSpec, TableType)- Parameters:
kafkaProperties- Properties to configure this tablekeySpec- Conversion specification for Kafka record keysvalueSpec- Conversion specification for Kafka record values- Returns:
- A TableDefinition derived from the input Properties and KeyOrValueSpec instances
-
consume
public static void consume(@NotNull @NotNull Properties kafkaProperties, @NotNull @NotNull String topic, @NotNull @NotNull IntPredicate partitionFilter, @NotNull @NotNull KafkaTools.InitialOffsetLookup partitionToInitialOffset, @NotNull KafkaTools.Consume.KeyOrValueSpec keySpec, @NotNull KafkaTools.Consume.KeyOrValueSpec valueSpec, @NotNull @NotNull KafkaTools.StreamConsumerRegistrarProvider streamConsumerRegistrarProvider, @Nullable @Nullable KafkaTools.ConsumerLoopCallback consumerLoopCallback) Consume from Kafka tostream consumerssupplied bystreamConsumerRegistrar.- Parameters:
kafkaProperties- Properties to configure this table and also to be passed to create the KafkaConsumertopic- Kafka topic namepartitionFilter- A predicate returning true for the partitions to consume. The convenience constantALL_PARTITIONSis defined to facilitate requesting all partitions.partitionToInitialOffset- A function specifying the desired initial offset for each partition consumedkeySpec- Conversion specification for Kafka record keysvalueSpec- Conversion specification for Kafka record valuesstreamConsumerRegistrarProvider- A provider for a function toregisterStreamConsumerinstances. The registered stream consumers must acceptchunk typesthat correspond toStreamChunkUtils.chunkTypeForColumnIndex(TableDefinition, int)for the suppliedTableDefinition. Seesingleandper-partition.consumerLoopCallback- callback to inject logic into the ingester's consumer loop
-
produceFromTable
public static Runnable produceFromTable(@NotNull @NotNull Table table, @NotNull @NotNull Properties kafkaProperties, @NotNull @NotNull String topic, @NotNull KafkaTools.Produce.KeyOrValueSpec keySpec, @NotNull KafkaTools.Produce.KeyOrValueSpec valueSpec, boolean lastByKeyColumns) Produce a Kafka stream from a Deephaven table.Note that
tablemust only change in ways that are meaningful when turned into a stream of events over Kafka.Two primary use cases are considered:
- A stream of changes (puts and removes) to a key-value data set. In order to handle this efficiently
and allow for correct reconstruction of the state at a consumer, it is assumed that the input data is the result
of a Deephaven aggregation, e.g.
TableOperations.aggAllBy(io.deephaven.api.agg.spec.AggSpec),TableOperations.aggBy(io.deephaven.api.agg.Aggregation), orTableOperations.lastBy(). This means that key columns (as specified bykeySpec) must not be modified, and no rows should be shifted if there are any key columns. Note that specifyinglastByKeyColumns=truecan make it easy to satisfy this constraint if the input data is not already aggregated. - A stream of independent log records. In this case, the input table should either be a
blink tableor should only ever add rows (regardless of whether theattributeis specified).
If other use cases are identified, a publication mode or extensible listener framework may be introduced at a later date.
- Parameters:
table- The table used as a source of data to be sent to Kafka.kafkaProperties- Properties to be passed to create the associated KafkaProducer.topic- Kafka topic namekeySpec- Conversion specification for Kafka record keys from table column data.valueSpec- Conversion specification for Kafka record values from table column data.lastByKeyColumns- Whether to publish only the last record for each unique key. Ignored whenkeySpecisIGNORE. Otherwise, iflastByKeycolumns == truethis method will internally perform alastByaggregation ontablegrouped by the input columns ofkeySpecand publish to Kafka from the result.- Returns:
- a callback to stop producing and shut down the associated table listener; note a caller should keep a reference to this return value to ensure liveliness.
- See Also:
- A stream of changes (puts and removes) to a key-value data set. In order to handle this efficiently
and allow for correct reconstruction of the state at a consumer, it is assumed that the input data is the result
of a Deephaven aggregation, e.g.
-
produceFromTable
Produce a Kafka stream from a Deephaven table.Note that
tablemust only change in ways that are meaningful when turned into a stream of events over Kafka.Two primary use cases are considered:
- A stream of changes (puts and removes) to a key-value data set. In order to handle this efficiently
and allow for correct reconstruction of the state at a consumer, it is assumed that the input data is the result
of a Deephaven aggregation, e.g.
TableOperations.aggAllBy(io.deephaven.api.agg.spec.AggSpec),TableOperations.aggBy(io.deephaven.api.agg.Aggregation), orTableOperations.lastBy(). This means that key columns (as specified bykeySpec) must not be modified, and no rows should be shifted if there are any key columns. Note that specifyinglastByKeyColumns=truecan make it easy to satisfy this constraint if the input data is not already aggregated. - A stream of independent log records. In this case, the input table should either be a
blink tableor should only ever add rows (regardless of whether theattributeis specified).
If other use cases are identified, a publication mode or extensible listener framework may be introduced at a later date.
- Parameters:
options- the options- Returns:
- a callback to stop producing and shut down the associated table listener; note a caller should keep a reference to this return value to ensure liveliness.
- A stream of changes (puts and removes) to a key-value data set. In order to handle this efficiently
and allow for correct reconstruction of the state at a consumer, it is assumed that the input data is the result
of a Deephaven aggregation, e.g.
-
partitionFilterFromArray
-
partitionToOffsetFromParallelArrays
public static IntToLongFunction partitionToOffsetFromParallelArrays(int[] partitions, long[] offsets) -
predicateFromSet
-
topics
-
listTopics
-