deephaven_enterprise.data_ingestion.kafka_options

This module provides utilities for configuring Kafka ingestion into Deephaven Data Import Servers (DIS). It includes functions and classes for building Kafka table writer options, managing schema validation and updates.

class KafkaTableWriterOptions(j_ktw_options)[source]

Bases: JObjectWrapper

A wrapper object representing the options for a Kafka Table Writer.

Note: It should not be instantiated directly by user code but rather through calling kafka_table_writer_options().

j_object_type

alias of KafkaTableWriter$Options

class LastByView(name, cols)

Bases: NamedTuple

A named tuple to represent a last by view, with the name of the view and the key columns to last by. The name can be None or an empty string if there will be just one last by view on the table, otherwise, the name should be unique for each distinct set of key columns.

cols

Alias for field number 1

name

Alias for field number 0

class SchemaHelper(opts, partition_col='Date', grouping_cols=None, non_symbol_table_cols=None, merge_key_formula=None)[source]

Bases: object

SchemaHelper is a helper class for automatically creating and validating Deephaven Enterprise Schemas from KafkaTableWriterOptions.

Creates a new instance of SchemaHelper

Parameters:
  • opts (KafkaTableWriterOptions) – The KafkaTableWriterOptions object

  • partition_col (str) – The partition column for the schema, defaults to “Date”

  • grouping_cols (list[str]) – The grouping columns for the schema, defaults to None

  • non_symbol_table_cols (list[str]) – The columns to be written without a symbol table, defaults to None

  • merge_key_formula (str) – The key formula for merge process, defaults to None, which means use the predefined “${autobalance_single}”. For more information, see the Deephaven documentation about “Partitioning Key Formulas”

add_or_validate()[source]

Validates the schema against the latest schema known by the SchemaService. If the schema does not exist within the SchemaService, the schema will be added. If the schema does not match the latest schema, then an appropriate exception is thrown.

Returns:

True if the schema was added, False if the schema matched the latest schema

Return type:

bool

Raises:

DHError – If the schema does not match the latest schema

table_definition()[source]

Constructs a TableDefinition from the schema.

Return type:

TableDefinition

validate_or_update()[source]

Validates the schema against the latest schema known by the SchemaService. If the schema does not exist within the SchemaService, the schema will be added. If the schema does not match the latest schema, then an attempt is made to update the schema within the SchemaService. The only schema-changes permitted are adding or removing columns, changes to SymbolTable, Grouping, and merge KeyFormula. Any other schema change will throw an exception

Note that existing partitions cannot be appended after a schema change takes effect. A schema change should only be done prior to adding data for a given partition (e.g. “Date”). For more information, see the Deephaven documentation: https://deephaven.io/enterprise/docs/coreplus/coreplus-kafka/#schema-helper-tools

Returns:

True if the schema was added or updated, False if the schema matched the latest schema

Return type:

bool

Raises:

DHError – If the schema does not match the latest schema

kafka_table_writer_options(kafka_properties, topic, table_name, namespace, key_spec, value_spec, partition_value=None, partition_func=None, partition_filter=None, resume_from=None, ignore_offset_from_checkpoints=False, ignore_offset_from_broker=False, partition_offset_fallback_func=None, commit_offset_to_broker=True, enable_transactions=False, cols_to_ignore=None, transformation=None, last_by_views=None)[source]

Builds a KafkaTableWriterOptions instance to be used by consume_to_dis() to ingest a Kafka topic and persistently writing the data to a Deephaven Data Import Server. It can also be passed on to add_or_validate() or validate_or_update() to add/update/validate the schema of the target Deephaven table.

Parameters:
  • kafka_properties (dict[str, str]) – The properties for the underlying Kafka consumer, see https://kafka.apache.org/documentation/#consumerconfigs for more details on the properties.

  • namespace (str) – The namespace of the Deephaven table to ingest

  • table_name (str) – The name of the Deephaven table to ingest

  • topic (str) – The Kafka topic to consume

  • key_spec (KeyValueSpec) – specifies how to map the Key field in Kafka messages to Deephaven column(s). It can be the result of calling one of the functions: simple_spec(), avro_spec() or json_spec(). or the predefined KeyValueSpec.IGNORE or KeyValueSpec.FROM_PROPERTIES. The default is None which works the same as KeyValueSpec.FROM_PROPERTIES, in which case, the kafka_properties param should include values for dictionary keys ‘deephaven.key.column.name’ and ‘deephaven.key.column.type’, for the single resulting column name and type

  • value_spec (KeyValueSpec) – specifies how to map the Value field in Kafka messages to Deephaven column(s). It can be the result of calling one of the functions: simple_spec(), avro_spec() or json_spec(). or the predefined KeyValueSpec.IGNORE or KeyValueSpec.FROM_PROPERTIES. The default is None which works the same as KeyValueSpec.FROM_PROPERTIES, in which case, the kafka_properties param should include values for dictionary keys ‘deephaven.value.column.name’ and ‘deephaven.value.column.type’, for the single resulting column name and type

  • partition_value (str) – The fixed partition value to use for the Deephaven table, mutually exclusive with the ‘partition_func’ argument. Must be specified if ‘partition_func’ is None. default is None

  • partition_func (tuple[str, Callable[[int], str]]) – The input column name (must be of long type), and the function to dynamically determine which column partition a row belongs to. The function takes the column value as an argument and returns the partition value, mutually exclusive with the ‘partition_value’ argument. Must be specified if ‘partition_value’ is None. default is None.

  • partition_filter (Callable[[int], bool]) – The function to use to determine if a Kafka partition should be ingested, default is None, meaning to ingest all Kafka partitions.

  • resume_from (Callable[[str], str]) – The function to determine the previous column partition, which is used to to determine the starting offset for a new column partition. The function takes the current partition value as an argument and return the previous partition value. If the previous partition has a checkpoint record, then the starting offset for the new partition will be the offset recorded in the checkpoint record.

  • ignore_offset_from_checkpoints (bool) – Whether to ignore the offset from the checkpoint, default is False

  • ignore_offset_from_broker (bool) – Whether to ignore the offset from the broker, default is False

  • partition_offset_fallback_func (Callable[[int], int]) –

    The function to determine the Kafka offset to consume from if the offset is not found in the checkpoint or the Kafka broker or is explicitly skipped. The function takes the Kafka partition number as an argument and returns the offset to consume from. default is None, meaning the consumer will start consuming from the beginning of the topic. There are 3 special values that can be returned by the function: SEEK_TO_BEGINNING, SEEK_TO_END, and DONT_SEEK.

    These 4 parameters ‘resume_from’, ‘ignore_offset_from_checkpoints’, ‘ignore_offset_from_broker’ and ‘partition_offset_fallback_func’ are used to determine the starting offset for the Kafka consumer. For more details on how the starting offset is determined, see https://deephaven.io/enterprise/docs/coreplus/coreplus-kafka/#deephaven-table-partitions.

  • commit_offset_to_broker (bool) – Whether to commit the offset to the broker, default is True

  • enable_transactions (bool) – Whether to enforce transactional semantics when a Kafka message is expanded into multiple rows by a transformation, default is False.

  • cols_to_ignore (Union[str, list[str]]) – The column(s) to ignore from the Kafka message, default is None

  • transformation (Callable[[Table], Table]) – The function to transform the table before writing it to disk, default is None.

  • last_by_views (Union[LastByView, list[LastByView]]) – The last by view(s) to create, default is None.

Return type:

KafkaTableWriterOptions

Returns:

KafkaTableWriterOptions

Raises:

DHError