Class CsvImportProcessor
- All Implemented Interfaces:
com.fishlib.base.log.LogOutputAppendable
,DataImportProcessor
,CheckpointRecord.SourceFileSizeRecord
DataImportProcessor
that handles streams of CSV text. The encoding and import properties are specified
at the Tailer level.
Note that while this processor allows for skipping initial rows, it does _not_ allow for skipping footer rows.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic class
static enum
The mode for which the CSV processor manages transactions -
Field Summary
Fields inherited from interface com.illumon.iris.db.tables.dataimport.logtailer.DataImportProcessor
END_OF_DATA_FLAG, TRUNCATED_FLAG
-
Method Summary
Modifier and TypeMethodDescriptiondefault com.fishlib.base.log.LogOutput
append
(com.fishlib.base.log.LogOutput logOutput) getName()
long
getSize()
void
onNewFile
(FilePosition filePosition) Notify the import processor that a new file is available.int
processContent
(ByteBuffer dataBuffer, long bufferSentTimestamp) Process input blocks of bytes from the stream.boolean
shouldBeginTransaction
(long nowMillis) Check if a transaction should be started.boolean
shouldEndTransaction
(long nowMillis, boolean isNewFile) Check if any currently in-progress transactions should be completed.Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface com.illumon.iris.db.tables.dataimport.logtailer.DataImportProcessor
isDirty, setContext
-
Method Details
-
onNewFile
Description copied from interface:DataImportProcessor
Notify the import processor that a new file is available.- Specified by:
onNewFile
in interfaceDataImportProcessor
- Parameters:
filePosition
- The new file's initial position
-
shouldEndTransaction
public boolean shouldEndTransaction(long nowMillis, boolean isNewFile) Description copied from interface:DataImportProcessor
Check if any currently in-progress transactions should be completed.- Specified by:
shouldEndTransaction
in interfaceDataImportProcessor
- Parameters:
nowMillis
- The current timeisNewFile
- Whether the method is being invoked because of a new file- Returns:
- Whether the transaction should be completed
-
shouldBeginTransaction
public boolean shouldBeginTransaction(long nowMillis) Description copied from interface:DataImportProcessor
Check if a transaction should be started.- Specified by:
shouldBeginTransaction
in interfaceDataImportProcessor
- Parameters:
nowMillis
- The current time- Returns:
- Whether a transaction should begin
-
processContent
Process input blocks of bytes from the stream.
Some things to note about processing CSV this way: This code depends on using a flavor of BufferedReader combined with a particular string
encoding
. The base java classes for this have several internal layers of buffering when using charsets that can not be reset, nor can they be rewound.
Because of this, the code needs to provide some means to guarantee that the DIS always ends up in a valid position, aligned with a line boundary. The solution to this problem is the
LineOrientedBufferedReader
.
Note: though the noted class is org.apache.commons.csv, this is a deephaven class integrated into a fork of the denoted package.
This class reads from the underlying readers and internally buffers up to an end line. It provides one guarantee, if any data is returned, you are guaranteed to read up to one complete row.
This allows for a few assumptions:
- CsvParser.hasNext()/next() will either return EOS OR a complete CSV line (unless some unexpected exception occurs, in which case we bail out)
- When the CsvImportProcessor returns, its file position is guaranteed to be at the beginning of the following line.
Which provides for the following process:
- Try to get the next CSVRecord
- If we got it, there is at least one complete row, update the file position after it's been processed.
- If we did not get it, then the underlying stream does not have a complete row, and will continue buffering characters until it does.
- Specified by:
processContent
in interfaceDataImportProcessor
- Parameters:
dataBuffer
- The data to processbufferSentTimestamp
- the timestamp of the buffer.- Returns:
- the reason for returning - will always be DataImportProcessor.END_OF_DATA_FLAG
-
getName
-
getSize
public long getSize() -
append
default com.fishlib.base.log.LogOutput append(@NotNull com.fishlib.base.log.LogOutput logOutput) - Specified by:
append
in interfacecom.fishlib.base.log.LogOutputAppendable
-