#
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
#
""" This module supports reading an external Parquet files into Deephaven tables and writing Deephaven tables out as
Parquet files. """
from dataclasses import dataclass
from enum import Enum
from typing import List, Optional, Union, Dict
import jpy
from deephaven import DHError
from deephaven.column import Column
from deephaven.dtypes import DType
from deephaven.table import Table
_JParquetTools = jpy.get_type("io.deephaven.parquet.table.ParquetTools")
_JFile = jpy.get_type("java.io.File")
_JCompressionCodecName = jpy.get_type("org.apache.parquet.hadoop.metadata.CompressionCodecName")
_JParquetInstructions = jpy.get_type("io.deephaven.parquet.table.ParquetInstructions")
_JTableDefinition = jpy.get_type("io.deephaven.engine.table.TableDefinition")
[docs]@dataclass
class ColumnInstruction:
""" This class specifies the instructions for reading/writing a Parquet column. """
column_name: str = None
parquet_column_name: str = None
codec_name: str = None
codec_args: str = None
use_dictionary: bool = False
def _build_parquet_instructions(
col_instructions: List[ColumnInstruction] = None,
compression_codec_name: str = None,
max_dictionary_keys: int = None,
max_dictionary_size: int = None,
is_legacy_parquet: bool = False,
target_page_size: int = None,
is_refreshing: bool = False,
for_read: bool = True,
force_build: bool = False,
):
if not any(
[
force_build,
col_instructions,
compression_codec_name,
max_dictionary_keys is not None,
max_dictionary_size is not None,
is_legacy_parquet,
target_page_size is not None,
is_refreshing,
]
):
return None
builder = _JParquetInstructions.builder()
if col_instructions is not None:
for ci in col_instructions:
if for_read and not ci.parquet_column_name:
raise ValueError("must specify the parquet column name for read.")
if not for_read and not ci.column_name:
raise ValueError("must specify the table column name for write.")
builder.addColumnNameMapping(ci.parquet_column_name, ci.column_name)
if ci.column_name:
if ci.codec_name:
builder.addColumnCodec(ci.column_name, ci.codec_name, ci.codec_args)
builder.useDictionary(ci.column_name, ci.use_dictionary)
if compression_codec_name:
builder.setCompressionCodecName(compression_codec_name)
if max_dictionary_keys is not None:
builder.setMaximumDictionaryKeys(max_dictionary_keys)
if max_dictionary_size is not None:
builder.setMaximumDictionarySize(max_dictionary_size)
if is_legacy_parquet:
builder.setIsLegacyParquet(is_legacy_parquet)
if target_page_size is not None:
builder.setTargetPageSize(target_page_size)
if is_refreshing:
builder.setIsRefreshing(is_refreshing)
return builder.build()
def _j_table_definition(table_definition: Union[Dict[str, DType], List[Column], None]) -> Optional[jpy.JType]:
if table_definition is None:
return None
elif isinstance(table_definition, Dict):
return _JTableDefinition.of(
[
Column(name=name, data_type=dtype).j_column_definition
for name, dtype in table_definition.items()
]
)
elif isinstance(table_definition, List):
return _JTableDefinition.of(
[col.j_column_definition for col in table_definition]
)
else:
raise DHError(f"Unexpected table_definition type: {type(table_definition)}")
[docs]class ParquetFileLayout(Enum):
""" The parquet file layout. """
SINGLE_FILE = 1
""" A single parquet file. """
FLAT_PARTITIONED = 2
""" A single directory of parquet files. """
KV_PARTITIONED = 3
""" A key-value directory partitioning of parquet files. """
METADATA_PARTITIONED = 4
""" A directory containing a _metadata parquet file and an optional _common_metadata parquet file. """
[docs]def read(
path: str,
col_instructions: Optional[List[ColumnInstruction]] = None,
is_legacy_parquet: bool = False,
is_refreshing: bool = False,
file_layout: Optional[ParquetFileLayout] = None,
table_definition: Union[Dict[str, DType], List[Column], None] = None,
) -> Table:
""" Reads in a table from a single parquet, metadata file, or directory with recognized layout.
Args:
path (str): the file or directory to examine
col_instructions (Optional[List[ColumnInstruction]]): instructions for customizations while reading, None by
default.
is_legacy_parquet (bool): if the parquet data is legacy
is_refreshing (bool): if the parquet data represents a refreshing source
file_layout (Optional[ParquetFileLayout]): the parquet file layout, by default None. When None, the layout is
inferred.
table_definition (Union[Dict[str, DType], List[Column], None]): the table definition, by default None. When None,
the definition is inferred from the parquet file(s). Setting a definition guarantees the returned table will
have that definition. This is useful for bootstrapping purposes when the initial partitioned directory is
empty and is_refreshing=True. It is also useful for specifying a subset of the parquet definition. When set,
file_layout must also be set.
Returns:
a table
Raises:
DHError
"""
try:
read_instructions = _build_parquet_instructions(
col_instructions=col_instructions,
is_legacy_parquet=is_legacy_parquet,
is_refreshing=is_refreshing,
for_read=True,
force_build=True,
)
j_table_definition = _j_table_definition(table_definition)
if j_table_definition is not None:
if not file_layout:
raise DHError("Must provide file_layout when table_definition is set")
if file_layout == ParquetFileLayout.SINGLE_FILE:
j_table = _JParquetTools.readSingleFileTable(_JFile(path), read_instructions, j_table_definition)
elif file_layout == ParquetFileLayout.FLAT_PARTITIONED:
j_table = _JParquetTools.readFlatPartitionedTable(_JFile(path), read_instructions, j_table_definition)
elif file_layout == ParquetFileLayout.KV_PARTITIONED:
j_table = _JParquetTools.readKeyValuePartitionedTable(_JFile(path), read_instructions, j_table_definition)
elif file_layout == ParquetFileLayout.METADATA_PARTITIONED:
raise DHError(f"file_layout={ParquetFileLayout.METADATA_PARTITIONED} with table_definition not currently supported")
else:
raise DHError(f"Invalid parquet file_layout '{file_layout}'")
else:
if not file_layout:
j_table = _JParquetTools.readTable(path, read_instructions)
elif file_layout == ParquetFileLayout.SINGLE_FILE:
j_table = _JParquetTools.readSingleFileTable(_JFile(path), read_instructions)
elif file_layout == ParquetFileLayout.FLAT_PARTITIONED:
j_table = _JParquetTools.readFlatPartitionedTable(_JFile(path), read_instructions)
elif file_layout == ParquetFileLayout.KV_PARTITIONED:
j_table = _JParquetTools.readKeyValuePartitionedTable(_JFile(path), read_instructions)
elif file_layout == ParquetFileLayout.METADATA_PARTITIONED:
j_table = _JParquetTools.readPartitionedTableWithMetadata(_JFile(path), read_instructions)
else:
raise DHError(f"Invalid parquet file_layout '{file_layout}'")
return Table(j_table=j_table)
except Exception as e:
raise DHError(e, "failed to read parquet data.") from e
def _j_file_array(paths: List[str]):
return jpy.array("java.io.File", [_JFile(el) for el in paths])
[docs]def delete(path: str) -> None:
""" Deletes a Parquet table on disk.
Args:
path (str): path to delete
Raises:
DHError
"""
try:
_JParquetTools.deleteTable(_JFile(path))
except Exception as e:
raise DHError(e, f"failed to delete a parquet table: {path} on disk.") from e
[docs]def write(
table: Table,
path: str,
col_definitions: List[Column] = None,
col_instructions: List[ColumnInstruction] = None,
compression_codec_name: str = None,
max_dictionary_keys: int = None,
max_dictionary_size: int = None,
target_page_size: int = None,
) -> None:
""" Write a table to a Parquet file.
Args:
table (Table): the source table
path (str): the destination file path; the file name should end in a ".parquet" extension. If the path
includes non-existing directories they are created. If there is an error, any intermediate directories
previously created are removed; note this makes this method unsafe for concurrent use
col_definitions (List[Column]): the column definitions to use, default is None
col_instructions (List[ColumnInstruction]): instructions for customizations while writing, default is None
compression_codec_name (str): the default compression codec to use, if not specified, defaults to SNAPPY
max_dictionary_keys (int): the maximum dictionary keys allowed, if not specified, defaults to 2^20 (1,048,576)
max_dictionary_size (int): the maximum dictionary size (in bytes) allowed, defaults to 2^20 (1,048,576)
target_page_size (int): the target page size in bytes, if not specified, defaults to 2^20 bytes (1 MiB)
Raises:
DHError
"""
try:
write_instructions = _build_parquet_instructions(
col_instructions=col_instructions,
compression_codec_name=compression_codec_name,
max_dictionary_keys=max_dictionary_keys,
max_dictionary_size=max_dictionary_size,
target_page_size=target_page_size,
for_read=False,
)
table_definition = None
if col_definitions is not None:
table_definition = _JTableDefinition.of([col.j_column_definition for col in col_definitions])
if table_definition:
if write_instructions:
_JParquetTools.writeTable(table.j_table, path, table_definition, write_instructions)
else:
_JParquetTools.writeTable(table.j_table, _JFile(path), table_definition)
else:
if write_instructions:
_JParquetTools.writeTable(table.j_table, _JFile(path), write_instructions)
else:
_JParquetTools.writeTable(table.j_table, path)
except Exception as e:
raise DHError(e, "failed to write to parquet data.") from e
[docs]def batch_write(
tables: List[Table],
paths: List[str],
col_definitions: List[Column],
col_instructions: List[ColumnInstruction] = None,
compression_codec_name: str = None,
max_dictionary_keys: int = None,
max_dictionary_size: int = None,
target_page_size: int = None,
grouping_cols: List[str] = None,
):
""" Writes tables to disk in parquet format to a supplied set of paths.
If you specify grouping columns, there must already be grouping information for those columns in the sources.
This can be accomplished with .groupBy(<grouping columns>).ungroup() or .sort(<grouping column>).
Note that either all the tables are written out successfully or none is.
Args:
tables (List[Table]): the source tables
paths (List[str]): the destinations paths. Any non existing directories in the paths provided are
created. If there is an error, any intermediate directories previously created are removed; note this makes
this method unsafe for concurrent use
col_definitions (List[Column]): the column definitions to use
col_instructions (List[ColumnInstruction]): instructions for customizations while writing
compression_codec_name (str): the compression codec to use, if not specified, defaults to SNAPPY
max_dictionary_keys (int): the maximum dictionary keys allowed, if not specified, defaults to 2^20 (1,048,576)
max_dictionary_size (int): the maximum dictionary size (in bytes) allowed, defaults to 2^20 (1,048,576)
target_page_size (int): the target page size in bytes, if not specified, defaults to 2^20 bytes (1 MiB)
grouping_cols (List[str]): the group column names
Raises:
DHError
"""
try:
write_instructions = _build_parquet_instructions(
col_instructions=col_instructions,
compression_codec_name=compression_codec_name,
max_dictionary_keys=max_dictionary_keys,
max_dictionary_size=max_dictionary_size,
target_page_size=target_page_size,
for_read=False,
)
table_definition = _JTableDefinition.of([col.j_column_definition for col in col_definitions])
if grouping_cols:
_JParquetTools.writeParquetTables([t.j_table for t in tables], table_definition, write_instructions,
_j_file_array(paths), grouping_cols)
else:
_JParquetTools.writeTables([t.j_table for t in tables], table_definition,
_j_file_array(paths))
except Exception as e:
raise DHError(e, "write multiple tables to parquet data failed.") from e