Package io.deephaven.kafka.ingest
Class ResumeImportFrom
java.lang.Object
io.deephaven.kafka.ingest.ResumeImportFrom
public class ResumeImportFrom extends Object
Utility to copy source file records from one checkpoint to another, with zero size so that a Kafka import resumes
from where it left off.
This must not be called while an ingester is running for the provided table.
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
ResumeImportFrom.Behavior
What behavior should we provide when an existing partition is encountered?static class
ResumeImportFrom.NoPriorPartitionsFoundException
-
Constructor Summary
Constructors Constructor Description ResumeImportFrom()
-
Method Summary
Modifier and Type Method Description static void
resumeAtOffset(String disName, String namespace, String tableName, String columnPartition, boolean overwrite, boolean truncate, String topic, int kafkaPartition, long newOffset)
Write a new checkpoint record for the specified internal partition with a given topic and offset.static void
resumeFrom(com.fishlib.io.logger.Logger logger, String disName, String namespace, String tableName, String priorColumnPartition, String newColumnPartition, ResumeImportFrom.Behavior behavior)
Copy the checkpoint records from a prior column partition to a new column partition.static void
resumeFrom(String disName, String namespace, String tableName, String priorColumnPartition, String newColumnPartition, ResumeImportFrom.Behavior behavior)
Copy the checkpoint records from a prior column partition to a new column partition.
-
Constructor Details
-
ResumeImportFrom
public ResumeImportFrom()
-
-
Method Details
-
resumeFrom
public static void resumeFrom(String disName, String namespace, String tableName, String priorColumnPartition, String newColumnPartition, ResumeImportFrom.Behavior behavior)Copy the checkpoint records from a prior column partition to a new column partition. This is intended to allow you to resume a Kafka stream from where you left off on a prior ingestion, without having to keep the prior data around until you are ready to ingest the new data- Parameters:
disName
- the name of the DIS configuration that defines our storage locationnamespace
- the namespace of the tabletableName
- the table namepriorColumnPartition
- the column partition to copy Kafka offsets fromnewColumnPartition
- the column partition to copy Kafka offsets tobehavior
- the behavior when an existing new column partition is found
-
resumeFrom
public static void resumeFrom(com.fishlib.io.logger.Logger logger, String disName, String namespace, String tableName, String priorColumnPartition, String newColumnPartition, ResumeImportFrom.Behavior behavior)Copy the checkpoint records from a prior column partition to a new column partition. This is intended to allow you to resume a Kafka stream from where you left off on a prior ingestion, without having to keep the prior data around until you are ready to ingest the new data- Parameters:
logger
- logger for skipped or overwritten locationsdisName
- the name of the DIS configuration that defines our storage locationnamespace
- the namespace of the tabletableName
- the table namepriorColumnPartition
- the column partition to copy Kafka offsets fromnewColumnPartition
- the column partition to copy Kafka offsets tobehavior
- the behavior when an existing new column partition is found
-
resumeAtOffset
public static void resumeAtOffset(String disName, String namespace, String tableName, String columnPartition, boolean overwrite, boolean truncate, String topic, int kafkaPartition, long newOffset)Write a new checkpoint record for the specified internal partition with a given topic and offset. This is intended to allow you to resume a Kafka stream after a given message that is causing problems with ingestion. If you specify truncate, then the partition will start empty. If you do not specify truncate, then the partition will be appended to when the ingester restarts. If truncate is true, then the existing checkpoint record size is ignored and the size is set to null. If truncate is false, then the existing size record is left intact; but the source file size is changed. If overwrite is false, existing source size records are not changed. If the checkpoint record is not compatible with the desired result, then an error is thrown. Otherwise if the checkpoint record already has the desired offset, no changes are made and no error is thrown. If no existing source size record exists, then the overwrite flag has no effect on the behavior. The internal partition is derived as the kafka topic + "-" + partition". For example a topic of "trades" and a partition of zero maps to an internal partition of "trades-0", which is consistent with the Deephaven Kafka ingester.- Parameters:
disName
- the name of the DIS configuration that defines our storage locationnamespace
- the namespace of the tabletableName
- the table namecolumnPartition
- the column partition to copy Kafka offsets tooverwrite
- overwrite an existing partitiontruncate
- truncate the partitiontopic
- the topic that will be resume from (must match the configured topic)kafkaPartition
- the Kafka partition that we are configuring an offset fornewOffset
- the offset that we will resume from
-