Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ doc-tools/missing-doclet/bin/
/modules/parquet-data-format/src/main/rust/debug
/modules/parquet-data-format/src/main/resources/native/
/modules/parquet-data-format/jni/target/debug
/modules/parquet-data-format/jni/target/.rustc_info.json

/modules/parquet-data-format/jni/target/release
**/Cargo.lock
Expand Down
1 change: 1 addition & 0 deletions modules/parquet-data-format/jni/target/.rustc_info.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"rustc_fingerprint":11842151147061415538,"outputs":{"17747080675513052775":{"success":true,"status":"","code":0,"stdout":"rustc 1.89.0 (29483883e 2025-08-04)\nbinary: rustc\ncommit-hash: 29483883eed69d5fb4db01964cdf2af4d86e9cb2\ncommit-date: 2025-08-04\nhost: aarch64-apple-darwin\nrelease: 1.89.0\nLLVM version: 20.1.7\n","stderr":""},"7971740275564407648":{"success":true,"status":"","code":0,"stdout":"___\nlib___.rlib\nlib___.dylib\nlib___.dylib\nlib___.a\nlib___.dylib\n/Users/darsaga/.rustup/toolchains/stable-aarch64-apple-darwin\noff\npacked\nunpacked\n___\ndebug_assertions\npanic=\"unwind\"\nproc_macro\ntarget_abi=\"\"\ntarget_arch=\"aarch64\"\ntarget_endian=\"little\"\ntarget_env=\"\"\ntarget_family=\"unix\"\ntarget_feature=\"aes\"\ntarget_feature=\"crc\"\ntarget_feature=\"dit\"\ntarget_feature=\"dotprod\"\ntarget_feature=\"dpb\"\ntarget_feature=\"dpb2\"\ntarget_feature=\"fcma\"\ntarget_feature=\"fhm\"\ntarget_feature=\"flagm\"\ntarget_feature=\"fp16\"\ntarget_feature=\"frintts\"\ntarget_feature=\"jsconv\"\ntarget_feature=\"lor\"\ntarget_feature=\"lse\"\ntarget_feature=\"neon\"\ntarget_feature=\"paca\"\ntarget_feature=\"pacg\"\ntarget_feature=\"pan\"\ntarget_feature=\"pmuv3\"\ntarget_feature=\"ras\"\ntarget_feature=\"rcpc\"\ntarget_feature=\"rcpc2\"\ntarget_feature=\"rdm\"\ntarget_feature=\"sb\"\ntarget_feature=\"sha2\"\ntarget_feature=\"sha3\"\ntarget_feature=\"ssbs\"\ntarget_feature=\"vh\"\ntarget_has_atomic=\"128\"\ntarget_has_atomic=\"16\"\ntarget_has_atomic=\"32\"\ntarget_has_atomic=\"64\"\ntarget_has_atomic=\"8\"\ntarget_has_atomic=\"ptr\"\ntarget_os=\"macos\"\ntarget_pointer_width=\"64\"\ntarget_vendor=\"apple\"\nunix\n","stderr":""}},"successes":{}}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Let's add these files to gitignore. I think we can add parquet-data-format/jni/target/*.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed

3 changes: 3 additions & 0 deletions modules/parquet-data-format/jni/target/CACHEDIR.TAG
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Signature: 8a477f597d28d172789f06886806bc55
# This file is a cache directory tag created by cargo.
# For information about cache directory tags see https://bford.info/cachedir/
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
package com.parquet.parquetdataformat;

import com.parquet.parquetdataformat.engine.ParquetDataFormat;
import com.parquet.parquetdataformat.fields.ParquetFieldUtil;
import com.parquet.parquetdataformat.fields.ArrowSchemaBuilder;
import com.parquet.parquetdataformat.engine.read.ParquetDataSourceCodec;
import com.parquet.parquetdataformat.writer.ParquetWriter;
import org.opensearch.index.engine.DataFormatPlugin;
Expand Down Expand Up @@ -61,7 +61,7 @@ public class ParquetDataFormatPlugin extends Plugin implements DataFormatPlugin,
@Override
@SuppressWarnings("unchecked")
public <T extends DataFormat> IndexingExecutionEngine<T> indexingEngine(MapperService mapperService, ShardPath shardPath) {
return (IndexingExecutionEngine<T>) new ParquetExecutionEngine(() -> ParquetFieldUtil.getSchema(mapperService), shardPath);
return (IndexingExecutionEngine<T>) new ParquetExecutionEngine(() -> ArrowSchemaBuilder.getSchema(mapperService), shardPath);
}

private Class<? extends DataFormat> getDataFormatType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,96 +8,148 @@

package com.parquet.parquetdataformat.fields;

import com.parquet.parquetdataformat.fields.number.ByteParquetField;
import com.parquet.parquetdataformat.fields.number.DoubleParquetField;
import com.parquet.parquetdataformat.fields.number.FloatParquetField;
import com.parquet.parquetdataformat.fields.number.HalfFloatParquetField;
import com.parquet.parquetdataformat.fields.number.IntegerParquetField;
import com.parquet.parquetdataformat.fields.number.LongParquetField;
import com.parquet.parquetdataformat.fields.number.ShortParquetField;
import com.parquet.parquetdataformat.fields.number.UnsignedLongParquetField;
import org.apache.arrow.vector.types.FloatingPointPrecision;
import org.apache.arrow.vector.types.TimeUnit;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.opensearch.index.mapper.BooleanFieldMapper;
import org.opensearch.index.mapper.DateFieldMapper;
import org.opensearch.index.mapper.KeywordFieldMapper;
import org.opensearch.index.mapper.NumberFieldMapper;
import org.opensearch.index.mapper.TextFieldMapper;

import java.util.HashMap;
import java.util.Map;
import com.parquet.parquetdataformat.plugins.fields.CoreDataFieldPlugin;
import com.parquet.parquetdataformat.plugins.fields.ParquetFieldPlugin;

public class ArrowFieldRegistry {
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
* Registry for mapping OpenSearch field types to their corresponding Parquet field implementations.
* This class maintains a centralized mapping between OpenSearch field type names and their
* Arrow/Parquet field representations, enabling efficient field type resolution during
* schema creation and data processing.
*
* <p>The registry is initialized once during class loading and provides thread-safe
* read-only access to field mappings.</p>
*/
public final class ArrowFieldRegistry {

private static final Map<String, FieldType> FIELD_TYPE_MAP = new HashMap<>();
private static final Map<String, ParquetField> PARQUET_FIELD_MAP = new HashMap<>();
/**
* All registered field mappings (thread-safe, mutable)
*/
private static final Map<String, ParquetField> FIELD_REGISTRY = new ConcurrentHashMap<>();

// Static initialization block to populate the field registry
static {
//TODO: darsaga check which fields can be nullable and which can not be

// Number types
FIELD_TYPE_MAP.put(NumberFieldMapper.NumberType.HALF_FLOAT.typeName(),
FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.HALF)));
FIELD_TYPE_MAP.put(NumberFieldMapper.NumberType.FLOAT.typeName(),
FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)));
FIELD_TYPE_MAP.put(NumberFieldMapper.NumberType.DOUBLE.typeName(),
FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)));
FIELD_TYPE_MAP.put(NumberFieldMapper.NumberType.BYTE.typeName(),
FieldType.nullable(new ArrowType.Int(8, true)));
FIELD_TYPE_MAP.put(NumberFieldMapper.NumberType.SHORT.typeName(),
FieldType.nullable(new ArrowType.Int(16, true)));
FIELD_TYPE_MAP.put(NumberFieldMapper.NumberType.INTEGER.typeName(),
FieldType.nullable(new ArrowType.Int(32, true)));
FIELD_TYPE_MAP.put(NumberFieldMapper.NumberType.LONG.typeName(),
FieldType.nullable(new ArrowType.Int(64, true)));
FIELD_TYPE_MAP.put(NumberFieldMapper.NumberType.UNSIGNED_LONG.typeName(),
FieldType.nullable(new ArrowType.Int(64, false)));

// Other types
FIELD_TYPE_MAP.put(DateFieldMapper.CONTENT_TYPE,
FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MILLISECOND, null)));
FIELD_TYPE_MAP.put(BooleanFieldMapper.CONTENT_TYPE,
FieldType.nullable(new ArrowType.Bool()));
FIELD_TYPE_MAP.put(KeywordFieldMapper.CONTENT_TYPE,
FieldType.nullable(new ArrowType.Utf8()));
FIELD_TYPE_MAP.put(TextFieldMapper.CONTENT_TYPE,
FieldType.nullable(new ArrowType.Utf8()));

setUpParquetFieldMap();
initialize();
}

private static void setUpParquetFieldMap() {
// Private constructor to prevent instantiation of utility class
private ArrowFieldRegistry() {
throw new UnsupportedOperationException("Registry class should not be instantiated");
}

//Number fields
PARQUET_FIELD_MAP.put(NumberFieldMapper.NumberType.HALF_FLOAT.typeName(), new HalfFloatParquetField());
PARQUET_FIELD_MAP.put(NumberFieldMapper.NumberType.FLOAT.typeName(), new FloatParquetField());
PARQUET_FIELD_MAP.put(NumberFieldMapper.NumberType.DOUBLE.typeName(), new DoubleParquetField());
PARQUET_FIELD_MAP.put(NumberFieldMapper.NumberType.BYTE.typeName(), new ByteParquetField());
PARQUET_FIELD_MAP.put(NumberFieldMapper.NumberType.SHORT.typeName(), new ShortParquetField());
PARQUET_FIELD_MAP.put(NumberFieldMapper.NumberType.INTEGER.typeName(), new IntegerParquetField());
PARQUET_FIELD_MAP.put(NumberFieldMapper.NumberType.LONG.typeName(), new LongParquetField());
PARQUET_FIELD_MAP.put(NumberFieldMapper.NumberType.UNSIGNED_LONG.typeName(), new UnsignedLongParquetField());
/**
* Initialize the registry with all available plugins.
* This method should be called during node startup after all plugins are loaded.
*/
public static synchronized void initialize() {
// Always register core plugins first
registerCorePlugins();
}

//Date field
PARQUET_FIELD_MAP.put(DateFieldMapper.CONTENT_TYPE, new DateParquetField());
/**
* Register core OpenSearch field plugins.
* These are always available and provide the foundation field type support.
*/
private static void registerCorePlugins() {
// Register core data fields
registerPlugin(new CoreDataFieldPlugin(), "CoreDataFields");
}
/**
* Register a single plugin's field types.
*/
private static void registerPlugin(ParquetFieldPlugin plugin, String pluginName) {
Map<String, ParquetField> fields = plugin.getParquetFields();

if (fields != null && !fields.isEmpty()) {
for (Map.Entry<String, ParquetField> entry : fields.entrySet()) {
String fieldType = entry.getKey();
ParquetField parquetField = entry.getValue();

// Validate registration
validateFieldRegistration(fieldType, parquetField, pluginName);

// Check for conflicts
if (FIELD_REGISTRY.containsKey(fieldType)) {
throw new IllegalArgumentException(
String.format("Field type [%s] is already registered. Plugin [%s] cannot override it.",
fieldType, pluginName)
);
}

FIELD_REGISTRY.put(fieldType, parquetField);
}
}
}

//Boolean field
PARQUET_FIELD_MAP.put(BooleanFieldMapper.CONTENT_TYPE, new BooleanParquetField());
private static void validateFieldRegistration(String fieldType, ParquetField parquetField, String source) {
if (fieldType == null || fieldType.trim().isEmpty()) {
throw new IllegalArgumentException("Field type name cannot be null or empty");
}

if (parquetField == null) {
throw new IllegalArgumentException("ParquetField implementation cannot be null");
}

// Validate that the ParquetField can provide required Arrow types
try {
parquetField.getArrowType();
parquetField.getFieldType();
} catch (Exception e) {
throw new IllegalArgumentException(
String.format("Invalid ParquetField implementation for type [%s] from source [%s]: %s",
fieldType, source, e.getMessage()), e
);
}
}

//Text field
PARQUET_FIELD_MAP.put(TextFieldMapper.CONTENT_TYPE, new TextParquetField());
/**
* Get registry statistics for monitoring and debugging.
*/
public static RegistryStats getStats() {
Set<String> allTypes = getRegisteredFieldNames();

//Keyword field
PARQUET_FIELD_MAP.put(KeywordFieldMapper.CONTENT_TYPE, new KeywordParquetField());
return new RegistryStats(
FIELD_REGISTRY.size(), // Single source of truth
allTypes
);
}

public static FieldType getFieldType(String typeName) {
return FIELD_TYPE_MAP.get(typeName);
/**
* Get all registered field type names.
*/
public static Set<String> getRegisteredFieldNames() {
return Collections.unmodifiableSet(FIELD_REGISTRY.keySet());
}

public static ParquetField getParquetField(String typeName) {
return PARQUET_FIELD_MAP.get(typeName);
/**
* Returns the ParquetField implementation for the specified OpenSearch field type, or null if not found.
*/
public static ParquetField getParquetField(String fieldType) {
return FIELD_REGISTRY.get(fieldType);
}

public static class RegistryStats {
private final int totalFields;
private final Set<String> allFieldTypes;

public RegistryStats(int totalFields, Set<String> allFieldTypes) {
this.totalFields = totalFields;
this.allFieldTypes = allFieldTypes;
}

// Getters
public int getTotalFields() { return totalFields; }
public Set<String> getAllFieldTypes() { return allFieldTypes; }

@Override
public String toString() {
return String.format("RegistryStats{total=%d, }", totalFields);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package com.parquet.parquetdataformat.fields;

import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.opensearch.index.mapper.Mapper;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.MetadataFieldMapper;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

/**
* Utility class for creating Apache Arrow schemas from OpenSearch mapper services.
* This class provides methods to convert OpenSearch field mappings into Arrow schema definitions
* that can be used for Parquet data format operations.
*/
public final class ArrowSchemaBuilder {

// Private constructor to prevent instantiation of utility class
private ArrowSchemaBuilder() {
throw new UnsupportedOperationException("Utility class should not be instantiated");
}

/**
* Creates an Apache Arrow Schema from the provided MapperService.
* This method extracts all non-metadata field mappers and converts them to Arrow fields.
*
* @param mapperService the OpenSearch mapper service containing field definitions
* @return a new Schema containing Arrow field definitions for all mapped fields
* @throws IllegalArgumentException if mapperService is null
* @throws IllegalStateException if no valid fields are found or if a field type is not supported
*/
public static Schema getSchema(final MapperService mapperService) {
Objects.requireNonNull(mapperService, "MapperService cannot be null");

final List<Field> fields = extractFieldsFromMappers(mapperService);

if (fields.isEmpty()) {
throw new IllegalStateException("No valid fields found in mapper service");
}

return new Schema(fields);
}

/**
* Extracts Arrow fields from the mapper service, filtering out metadata fields.
*
* @param mapperService the mapper service to extract fields from
* @return a list of Arrow fields
*/
private static List<Field> extractFieldsFromMappers(final MapperService mapperService) {
final List<Field> fields = new ArrayList<>();

for (final Mapper mapper : mapperService.documentMapper().mappers()) {
if (isMetadataField(mapper)) {
continue;
}

final Field arrowField = createArrowField(mapper);
fields.add(arrowField);
}

return fields;
}

/**
* Checks if the given mapper represents a metadata field.
*
* @param mapper the mapper to check
* @return true if the mapper is a metadata field, false otherwise
*/
private static boolean isMetadataField(final Mapper mapper) {
return mapper instanceof MetadataFieldMapper;
}

/**
* Creates an Arrow Field from an OpenSearch Mapper.
*
* @param mapper the mapper to convert
* @return a new Arrow Field
* @throws IllegalStateException if the mapper type is not supported
*/
private static Field createArrowField(final Mapper mapper) {
final ParquetField parquetField = ArrowFieldRegistry.getParquetField(mapper.typeName());

if (parquetField == null) {
throw new IllegalStateException(
String.format("Unsupported field type '%s' for field '%s'",
mapper.typeName(), mapper.name())
);
}

return new Field(mapper.name(), parquetField.getFieldType(), null);
}
}

This file was deleted.

Loading
Loading