deephaven_enterprise.data_ingestion.kafka_options¶
This module provides utilities for configuring Kafka ingestion into Deephaven Data Import Servers (DIS). It includes functions and classes for building Kafka table writer options, managing schema validation and updates.
- class KafkaTableWriterOptions(j_ktw_options)[source]¶
Bases:
JObjectWrapperA wrapper object representing the options for a Kafka Table Writer.
Note: It should not be instantiated directly by user code but rather through calling
kafka_table_writer_options().- j_object_type¶
alias of
KafkaTableWriter$Options
- class LastByView(name, cols)¶
Bases:
NamedTupleA named tuple to represent a last by view, with the name of the view and the key columns to last by. The name can be None or an empty string if there will be just one last by view on the table, otherwise, the name should be unique for each distinct set of key columns.
- cols¶
Alias for field number 1
- name¶
Alias for field number 0
- class SchemaHelper(opts, partition_col='Date', grouping_cols=None, non_symbol_table_cols=None, merge_key_formula=None)[source]¶
Bases:
objectSchemaHelper is a helper class for automatically creating and validating Deephaven Enterprise Schemas from KafkaTableWriterOptions.
Creates a new instance of SchemaHelper
- Parameters:
opts (KafkaTableWriterOptions) – The KafkaTableWriterOptions object
partition_col (str) – The partition column for the schema, defaults to “Date”
grouping_cols (list[str]) – The grouping columns for the schema, defaults to None
non_symbol_table_cols (list[str]) – The columns to be written without a symbol table, defaults to None
merge_key_formula (str) – The key formula for merge process, defaults to None, which means use the predefined “${autobalance_single}”. For more information, see the Deephaven documentation about “Partitioning Key Formulas”
- add_or_validate()[source]¶
Validates the schema against the latest schema known by the SchemaService. If the schema does not exist within the SchemaService, the schema will be added. If the schema does not match the latest schema, then an appropriate exception is thrown.
- Returns:
True if the schema was added, False if the schema matched the latest schema
- Return type:
- Raises:
DHError – If the schema does not match the latest schema
- validate_or_update()[source]¶
Validates the schema against the latest schema known by the SchemaService. If the schema does not exist within the SchemaService, the schema will be added. If the schema does not match the latest schema, then an attempt is made to update the schema within the SchemaService. The only schema-changes permitted are adding or removing columns, changes to SymbolTable, Grouping, and merge KeyFormula. Any other schema change will throw an exception
Note that existing partitions cannot be appended after a schema change takes effect. A schema change should only be done prior to adding data for a given partition (e.g. “Date”). For more information, see the Deephaven documentation: https://deephaven.io/enterprise/docs/coreplus/coreplus-kafka/#schema-helper-tools
- Returns:
True if the schema was added or updated, False if the schema matched the latest schema
- Return type:
- Raises:
DHError – If the schema does not match the latest schema
- kafka_table_writer_options(kafka_properties, topic, table_name, namespace, key_spec, value_spec, partition_value=None, partition_func=None, partition_filter=None, resume_from=None, ignore_offset_from_checkpoints=False, ignore_offset_from_broker=False, partition_offset_fallback_func=None, commit_offset_to_broker=True, enable_transactions=False, cols_to_ignore=None, transformation=None, last_by_views=None)[source]¶
Builds a KafkaTableWriterOptions instance to be used by
consume_to_dis()to ingest a Kafka topic and persistently writing the data to a Deephaven Data Import Server. It can also be passed on toadd_or_validate()orvalidate_or_update()to add/update/validate the schema of the target Deephaven table.- Parameters:
kafka_properties (dict[str, str]) – The properties for the underlying Kafka consumer, see https://kafka.apache.org/documentation/#consumerconfigs for more details on the properties.
namespace (str) – The namespace of the Deephaven table to ingest
table_name (str) – The name of the Deephaven table to ingest
topic (str) – The Kafka topic to consume
key_spec (
KeyValueSpec) – specifies how to map the Key field in Kafka messages to Deephaven column(s). It can be the result of calling one of the functions:simple_spec(),avro_spec()orjson_spec(). or the predefined KeyValueSpec.IGNORE or KeyValueSpec.FROM_PROPERTIES. The default is None which works the same as KeyValueSpec.FROM_PROPERTIES, in which case, the kafka_properties param should include values for dictionary keys ‘deephaven.key.column.name’ and ‘deephaven.key.column.type’, for the single resulting column name and typevalue_spec (
KeyValueSpec) – specifies how to map the Value field in Kafka messages to Deephaven column(s). It can be the result of calling one of the functions:simple_spec(),avro_spec()orjson_spec(). or the predefined KeyValueSpec.IGNORE or KeyValueSpec.FROM_PROPERTIES. The default is None which works the same as KeyValueSpec.FROM_PROPERTIES, in which case, the kafka_properties param should include values for dictionary keys ‘deephaven.value.column.name’ and ‘deephaven.value.column.type’, for the single resulting column name and typepartition_value (str) – The fixed partition value to use for the Deephaven table, mutually exclusive with the ‘partition_func’ argument. Must be specified if ‘partition_func’ is None. default is None
partition_func (tuple[str, Callable[[int], str]]) – The input column name (must be of long type), and the function to dynamically determine which column partition a row belongs to. The function takes the column value as an argument and returns the partition value, mutually exclusive with the ‘partition_value’ argument. Must be specified if ‘partition_value’ is None. default is None.
partition_filter (Callable[[int], bool]) – The function to use to determine if a Kafka partition should be ingested, default is None, meaning to ingest all Kafka partitions.
resume_from (Callable[[str], str]) – The function to determine the previous column partition, which is used to to determine the starting offset for a new column partition. The function takes the current partition value as an argument and return the previous partition value. If the previous partition has a checkpoint record, then the starting offset for the new partition will be the offset recorded in the checkpoint record.
ignore_offset_from_checkpoints (bool) – Whether to ignore the offset from the checkpoint, default is False
ignore_offset_from_broker (bool) – Whether to ignore the offset from the broker, default is False
partition_offset_fallback_func (Callable[[int], int]) –
The function to determine the Kafka offset to consume from if the offset is not found in the checkpoint or the Kafka broker or is explicitly skipped. The function takes the Kafka partition number as an argument and returns the offset to consume from. default is None, meaning the consumer will start consuming from the beginning of the topic. There are 3 special values that can be returned by the function:
SEEK_TO_BEGINNING,SEEK_TO_END, andDONT_SEEK.These 4 parameters ‘resume_from’, ‘ignore_offset_from_checkpoints’, ‘ignore_offset_from_broker’ and ‘partition_offset_fallback_func’ are used to determine the starting offset for the Kafka consumer. For more details on how the starting offset is determined, see https://deephaven.io/enterprise/docs/coreplus/coreplus-kafka/#deephaven-table-partitions.
commit_offset_to_broker (bool) – Whether to commit the offset to the broker, default is True
enable_transactions (bool) – Whether to enforce transactional semantics when a Kafka message is expanded into multiple rows by a transformation, default is False.
cols_to_ignore (Union[str, list[str]]) – The column(s) to ignore from the Kafka message, default is None
transformation (Callable[[Table], Table]) – The function to transform the table before writing it to disk, default is None.
last_by_views (Union[LastByView, list[LastByView]]) – The last by view(s) to create, default is None.
- Return type:
- Returns:
KafkaTableWriterOptions
- Raises:
DHError –