diff --git a/c/src/main/java/org/apache/arrow/c/ArrayImporter.java b/c/src/main/java/org/apache/arrow/c/ArrayImporter.java index b74fb1b47..f31a8a1fa 100644 --- a/c/src/main/java/org/apache/arrow/c/ArrayImporter.java +++ b/c/src/main/java/org/apache/arrow/c/ArrayImporter.java @@ -58,7 +58,6 @@ void importArray(ArrowArray src) { ArrowArray ownedArray = ArrowArray.allocateNew(allocator); ownedArray.save(snapshot); src.markReleased(); - src.close(); recursionLevel = 0; diff --git a/c/src/main/java/org/apache/arrow/c/ArrowArrayStreamReader.java b/c/src/main/java/org/apache/arrow/c/ArrowArrayStreamReader.java index 07a88cd8d..34a9c4ec0 100644 --- a/c/src/main/java/org/apache/arrow/c/ArrowArrayStreamReader.java +++ b/c/src/main/java/org/apache/arrow/c/ArrowArrayStreamReader.java @@ -44,7 +44,6 @@ final class ArrowArrayStreamReader extends ArrowReader { this.ownedStream = ArrowArrayStream.allocateNew(allocator); this.ownedStream.save(snapshot); stream.markReleased(); - stream.close(); } @Override diff --git a/c/src/main/java/org/apache/arrow/c/Data.java b/c/src/main/java/org/apache/arrow/c/Data.java index 0b4da33b4..f9d2ee454 100644 --- a/c/src/main/java/org/apache/arrow/c/Data.java +++ b/c/src/main/java/org/apache/arrow/c/Data.java @@ -231,6 +231,22 @@ public static void exportArrayStream( new ArrayStreamExporter(allocator).export(out, reader); } + /** + * Equivalent to calling {@link #importField(BufferAllocator, ArrowSchema, + * CDataDictionaryProvider, boolean) importField(allocator, schema, provider, true)}. + * + * @param allocator Buffer allocator for allocating dictionary vectors + * @param schema C data interface struct representing the field [inout] + * @param provider A dictionary provider will be initialized with empty dictionary vectors + * (optional) + * @return Imported field object + * @see #importField(BufferAllocator, ArrowSchema, CDataDictionaryProvider, boolean) + */ + public static Field importField( + BufferAllocator allocator, ArrowSchema schema, CDataDictionaryProvider provider) { + return importField(allocator, schema, provider, true); + } + /** * Import Java Field from the C data interface. * @@ -241,19 +257,42 @@ public static void exportArrayStream( * @param schema C data interface struct representing the field [inout] * @param provider A dictionary provider will be initialized with empty dictionary vectors * (optional) + * @param closeImportedStructs if true, the ArrowSchema struct will be closed when this method + * completes. * @return Imported field object */ public static Field importField( - BufferAllocator allocator, ArrowSchema schema, CDataDictionaryProvider provider) { + BufferAllocator allocator, + ArrowSchema schema, + CDataDictionaryProvider provider, + boolean closeImportedStructs) { try { SchemaImporter importer = new SchemaImporter(allocator); return importer.importField(schema, provider); } finally { schema.release(); - schema.close(); + if (closeImportedStructs) { + schema.close(); + } } } + /** + * Equivalent to calling {@link #importSchema(BufferAllocator, ArrowSchema, + * CDataDictionaryProvider, boolean) importSchema(allocator, schema, provider, true)}. + * + * @param allocator Buffer allocator for allocating dictionary vectors + * @param schema C data interface struct representing the field + * @param provider A dictionary provider will be initialized with empty dictionary vectors + * (optional) + * @return Imported schema object + * @see #importSchema(BufferAllocator, ArrowSchema, CDataDictionaryProvider, boolean) + */ + public static Schema importSchema( + BufferAllocator allocator, ArrowSchema schema, CDataDictionaryProvider provider) { + return importSchema(allocator, schema, provider, true); + } + /** * Import Java Schema from the C data interface. * @@ -264,11 +303,16 @@ public static Field importField( * @param schema C data interface struct representing the field * @param provider A dictionary provider will be initialized with empty dictionary vectors * (optional) + * @param closeImportedStructs if true, the ArrowSchema struct will be closed when this method + * completes. * @return Imported schema object */ public static Schema importSchema( - BufferAllocator allocator, ArrowSchema schema, CDataDictionaryProvider provider) { - Field structField = importField(allocator, schema, provider); + BufferAllocator allocator, + ArrowSchema schema, + CDataDictionaryProvider provider, + boolean closeImportedStructs) { + Field structField = importField(allocator, schema, provider, closeImportedStructs); if (structField.getType().getTypeID() != ArrowTypeID.Struct) { throw new IllegalArgumentException( "Cannot import schema: ArrowSchema describes non-struct type"); @@ -276,24 +320,67 @@ public static Schema importSchema( return new Schema(structField.getChildren(), structField.getMetadata()); } + /** + * Equivalent to calling {@link #importIntoVector(BufferAllocator, ArrowArray, FieldVector, + * DictionaryProvider, boolean)} importIntoVector(allocator, array, vector, provider, true)}. + * + * @param allocator Buffer allocator + * @param array C data interface struct holding the array data + * @param vector Imported vector object [out] + * @param provider Dictionary provider to load dictionary vectors to (optional) + * @see #importIntoVector(BufferAllocator, ArrowArray, FieldVector, DictionaryProvider, boolean) + */ + public static void importIntoVector( + BufferAllocator allocator, + ArrowArray array, + FieldVector vector, + DictionaryProvider provider) { + importIntoVector(allocator, array, vector, provider, true); + } + /** * Import Java vector from the C data interface. * - *
The ArrowArray struct has its contents moved (as per the C data interface specification) to - * a private object held alive by the resulting array. + *
On successful completion, the ArrowArray struct will have been moved (as per the C data + * interface specification) to a private object held alive by the resulting array. * * @param allocator Buffer allocator * @param array C data interface struct holding the array data * @param vector Imported vector object [out] * @param provider Dictionary provider to load dictionary vectors to (optional) + * @param closeImportedStructs if true, the ArrowArray struct will be closed when this method + * completes successfully. */ public static void importIntoVector( BufferAllocator allocator, ArrowArray array, FieldVector vector, - DictionaryProvider provider) { + DictionaryProvider provider, + boolean closeImportedStructs) { ArrayImporter importer = new ArrayImporter(allocator, vector, provider); importer.importArray(array); + if (closeImportedStructs) { + array.close(); + } + } + + /** + * Equivalent to calling {@link #importVector(BufferAllocator, ArrowArray, ArrowSchema, + * CDataDictionaryProvider, boolean) importVector(allocator, array, schema, provider, true)}. + * + * @param allocator Buffer allocator for allocating the output FieldVector + * @param array C data interface struct holding the array data + * @param schema C data interface struct holding the array type + * @param provider Dictionary provider to load dictionary vectors to (optional) + * @return Imported vector object + * @see #importVector(BufferAllocator, ArrowArray, ArrowSchema, CDataDictionaryProvider, boolean) + */ + public static FieldVector importVector( + BufferAllocator allocator, + ArrowArray array, + ArrowSchema schema, + CDataDictionaryProvider provider) { + return importVector(allocator, array, schema, provider, true); } /** @@ -307,19 +394,42 @@ public static void importIntoVector( * @param array C data interface struct holding the array data * @param schema C data interface struct holding the array type * @param provider Dictionary provider to load dictionary vectors to (optional) + * @param closeImportedStructs if true, the ArrowArray struct will be closed when this method + * completes successfully and the ArrowSchema struct will be always be closed. * @return Imported vector object */ public static FieldVector importVector( BufferAllocator allocator, ArrowArray array, ArrowSchema schema, - CDataDictionaryProvider provider) { - Field field = importField(allocator, schema, provider); + CDataDictionaryProvider provider, + boolean closeImportedStructs) { + Field field = importField(allocator, schema, provider, closeImportedStructs); FieldVector vector = field.createVector(allocator); - importIntoVector(allocator, array, vector, provider); + importIntoVector(allocator, array, vector, provider, closeImportedStructs); return vector; } + /** + * Equivalent to calling {@link #importIntoVectorSchemaRoot(BufferAllocator, ArrowArray, + * VectorSchemaRoot, DictionaryProvider, boolean) importIntoVectorSchemaRoot(allocator, array, + * root, provider, true)}. + * + * @param allocator Buffer allocator + * @param array C data interface struct holding the record batch data + * @param root vector schema root to load into + * @param provider Dictionary provider to load dictionary vectors to (optional) + * @see #importIntoVectorSchemaRoot(BufferAllocator, ArrowArray, VectorSchemaRoot, + * DictionaryProvider, boolean) + */ + public static void importIntoVectorSchemaRoot( + BufferAllocator allocator, + ArrowArray array, + VectorSchemaRoot root, + DictionaryProvider provider) { + importIntoVectorSchemaRoot(allocator, array, root, provider, true); + } + /** * Import record batch from the C data interface into vector schema root. * @@ -333,15 +443,18 @@ public static FieldVector importVector( * @param array C data interface struct holding the record batch data * @param root vector schema root to load into * @param provider Dictionary provider to load dictionary vectors to (optional) + * @param closeImportedStructs if true, the ArrowArray struct will be closed when this method + * completes successfully */ public static void importIntoVectorSchemaRoot( BufferAllocator allocator, ArrowArray array, VectorSchemaRoot root, - DictionaryProvider provider) { + DictionaryProvider provider, + boolean closeImportedStructs) { try (StructVector structVector = StructVector.emptyWithDuplicates("", allocator)) { structVector.initializeChildrenFromFields(root.getSchema().getFields()); - importIntoVector(allocator, array, structVector, provider); + importIntoVector(allocator, array, structVector, provider, closeImportedStructs); StructVectorUnloader unloader = new StructVectorUnloader(structVector); VectorLoader loader = new VectorLoader(root); try (ArrowRecordBatch recordBatch = unloader.getRecordBatch()) { @@ -350,6 +463,21 @@ public static void importIntoVectorSchemaRoot( } } + /** + * Equivalent to calling {@link #importVectorSchemaRoot(BufferAllocator, ArrowSchema, + * CDataDictionaryProvider, boolean) importVectorSchemaRoot(allocator, schema, provider, true)}. + * + * @param allocator Buffer allocator for allocating the output VectorSchemaRoot + * @param schema C data interface struct holding the record batch schema + * @param provider Dictionary provider to load dictionary vectors to (optional) + * @return Imported vector schema root + * @see #importVectorSchemaRoot(BufferAllocator, ArrowSchema, CDataDictionaryProvider, boolean) + */ + public static VectorSchemaRoot importVectorSchemaRoot( + BufferAllocator allocator, ArrowSchema schema, CDataDictionaryProvider provider) { + return importVectorSchemaRoot(allocator, schema, provider, true); + } + /** * Import Java vector schema root from a C data interface Schema. * @@ -360,11 +488,37 @@ public static void importIntoVectorSchemaRoot( * @param allocator Buffer allocator for allocating the output VectorSchemaRoot * @param schema C data interface struct holding the record batch schema * @param provider Dictionary provider to load dictionary vectors to (optional) + * @param closeImportedStructs if true, the ArrowSchema struct will be closed when this method + * completes * @return Imported vector schema root */ public static VectorSchemaRoot importVectorSchemaRoot( - BufferAllocator allocator, ArrowSchema schema, CDataDictionaryProvider provider) { - return importVectorSchemaRoot(allocator, null, schema, provider); + BufferAllocator allocator, + ArrowSchema schema, + CDataDictionaryProvider provider, + boolean closeImportedStructs) { + return importVectorSchemaRoot(allocator, null, schema, provider, closeImportedStructs); + } + + /** + * Equivalent to calling {@link #importVectorSchemaRoot(BufferAllocator, ArrowArray, ArrowSchema, + * CDataDictionaryProvider, boolean) importVectorSchemaRoot(allocator, array, schema, provider, + * true)}. + * + * @param allocator Buffer allocator for allocating the output VectorSchemaRoot + * @param array C data interface struct holding the record batch data (optional) + * @param schema C data interface struct holding the record batch schema + * @param provider Dictionary provider to load dictionary vectors to (optional) + * @return Imported vector schema root + * @see #importVectorSchemaRoot(BufferAllocator, ArrowArray, ArrowSchema, CDataDictionaryProvider, + * boolean) + */ + public static VectorSchemaRoot importVectorSchemaRoot( + BufferAllocator allocator, + ArrowArray array, + ArrowSchema schema, + CDataDictionaryProvider provider) { + return importVectorSchemaRoot(allocator, array, schema, provider, true); } /** @@ -383,29 +537,56 @@ public static VectorSchemaRoot importVectorSchemaRoot( * @param array C data interface struct holding the record batch data (optional) * @param schema C data interface struct holding the record batch schema * @param provider Dictionary provider to load dictionary vectors to (optional) + * @param closeImportedStructs if true, the ArrowArray struct will be closed when this method + * completes successfully and the ArrowSchema struct will be always be closed. * @return Imported vector schema root */ public static VectorSchemaRoot importVectorSchemaRoot( BufferAllocator allocator, ArrowArray array, ArrowSchema schema, - CDataDictionaryProvider provider) { + CDataDictionaryProvider provider, + boolean closeImportedStructs) { VectorSchemaRoot vsr = - VectorSchemaRoot.create(importSchema(allocator, schema, provider), allocator); + VectorSchemaRoot.create( + importSchema(allocator, schema, provider, closeImportedStructs), allocator); if (array != null) { - importIntoVectorSchemaRoot(allocator, array, vsr, provider); + importIntoVectorSchemaRoot(allocator, array, vsr, provider, closeImportedStructs); } return vsr; } /** - * Import an ArrowArrayStream as an {@link ArrowReader}. + * Equivalent to calling {@link #importArrayStream(BufferAllocator, ArrowArrayStream, boolean) + * importArrayStream(allocator, stream, true)}. * * @param allocator Buffer allocator for allocating the output data. * @param stream C stream interface struct to import. * @return Imported reader + * @see #importArrayStream(BufferAllocator, ArrowArrayStream, boolean) */ public static ArrowReader importArrayStream(BufferAllocator allocator, ArrowArrayStream stream) { - return new ArrowArrayStreamReader(allocator, stream); + return importArrayStream(allocator, stream, true); + } + + /** + * Import an ArrowArrayStream as an {@link ArrowReader}. + * + *
On successful completion, the ArrowArrayStream struct will have been moved (as per the C + * data interface specification) to a private object held alive by the resulting ArrowReader. + * + * @param allocator Buffer allocator for allocating the output data. + * @param stream C stream interface struct to import. + * @param closeImportedStructs if true, the ArrowArrayStream struct will be closed when this + * method completes successfully + * @return Imported reader + */ + public static ArrowReader importArrayStream( + BufferAllocator allocator, ArrowArrayStream stream, boolean closeImportedStructs) { + ArrowArrayStreamReader reader = new ArrowArrayStreamReader(allocator, stream); + if (closeImportedStructs) { + stream.close(); + } + return reader; } } diff --git a/c/src/test/java/org/apache/arrow/c/RoundtripTest.java b/c/src/test/java/org/apache/arrow/c/RoundtripTest.java index 6d68449c0..010a30549 100644 --- a/c/src/test/java/org/apache/arrow/c/RoundtripTest.java +++ b/c/src/test/java/org/apache/arrow/c/RoundtripTest.java @@ -17,9 +17,7 @@ package org.apache.arrow.c; import static org.apache.arrow.vector.testing.ValueVectorDataPopulator.setVector; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -958,6 +956,50 @@ public void testVectorSchemaRootWithDuplicatedFieldNames() { @Test public void testSchema() { + Schema schema = createSchema(); + // Consumer allocates empty ArrowSchema + try (ArrowSchema consumerArrowSchema = ArrowSchema.allocateNew(allocator)) { + // Producer fills the schema with data + exportSchema(schema, consumerArrowSchema); + + // Consumer imports schema + Schema importedSchema = Data.importSchema(allocator, consumerArrowSchema, null); + assertEquals(schema.toJson(), importedSchema.toJson()); + } + } + + @Test + public void testSchemaStructReuse() { + Schema schema = createSchema(); + // Consumer allocates empty ArrowSchema + try (ArrowSchema consumerArrowSchema = ArrowSchema.allocateNew(allocator)) { + // Producer fills the schema with data + exportSchema(schema, consumerArrowSchema); + + // Consumer imports schema + Schema importedSchema = Data.importSchema(allocator, consumerArrowSchema, null, false); + assertEquals(schema.toJson(), importedSchema.toJson()); + + // Imported struct should be released but not closed + assertEquals(0, consumerArrowSchema.snapshot().release); + assertNotEquals(0, consumerArrowSchema.memoryAddress()); + + // Export and import again + exportSchema(schema, consumerArrowSchema); + importedSchema = Data.importSchema(allocator, consumerArrowSchema, null, false); + assertEquals(schema.toJson(), importedSchema.toJson()); + assertEquals(0, consumerArrowSchema.snapshot().release); + assertNotEquals(0, consumerArrowSchema.memoryAddress()); + } + } + + private void exportSchema(Schema schema, ArrowSchema targetArrowSchema) { + try (ArrowSchema arrowSchema = ArrowSchema.wrap(targetArrowSchema.memoryAddress())) { + Data.exportSchema(allocator, schema, null, arrowSchema); + } + } + + private static Schema createSchema() { Field decimalField = new Field("inner1", FieldType.nullable(new ArrowType.Decimal(19, 4, 128)), null); Field strField = new Field("inner2", FieldType.nullable(new ArrowType.Utf8()), null); @@ -968,16 +1010,7 @@ public void testSchema() { Arrays.asList(decimalField, strField)); Field intField = new Field("col2", FieldType.nullable(new ArrowType.Int(32, true)), null); Schema schema = new Schema(Arrays.asList(itemField, intField)); - // Consumer allocates empty ArrowSchema - try (ArrowSchema consumerArrowSchema = ArrowSchema.allocateNew(allocator)) { - // Producer fills the schema with data - try (ArrowSchema arrowSchema = ArrowSchema.wrap(consumerArrowSchema.memoryAddress())) { - Data.exportSchema(allocator, schema, null, arrowSchema); - } - // Consumer imports schema - Schema importedSchema = Data.importSchema(allocator, consumerArrowSchema, null); - assertEquals(schema.toJson(), importedSchema.toJson()); - } + return schema; } @Test @@ -1002,12 +1035,8 @@ public void testImportReleasedArray() { try (ArrowSchema consumerArrowSchema = ArrowSchema.allocateNew(allocator); ArrowArray consumerArrowArray = ArrowArray.allocateNew(allocator)) { // Producer creates structures from existing memory pointers - try (ArrowSchema arrowSchema = ArrowSchema.wrap(consumerArrowSchema.memoryAddress()); - ArrowArray arrowArray = ArrowArray.wrap(consumerArrowArray.memoryAddress())) { - // Producer exports vector into the C Data Interface structures - try (final NullVector vector = new NullVector()) { - Data.exportVector(allocator, vector, null, arrowArray, arrowSchema); - } + try (final NullVector vector = new NullVector()) { + exportFieldVector(vector, consumerArrowSchema, consumerArrowArray); } // Release array structure @@ -1025,6 +1054,45 @@ public void testImportReleasedArray() { } } + @Test + public void testArrayStructReuse() { + // Consumer allocates empty structures + try (ArrowSchema consumerArrowSchema = ArrowSchema.allocateNew(allocator); + ArrowArray consumerArrowArray = ArrowArray.allocateNew(allocator)) { + // Producer creates structures from existing memory pointers + try (final NullVector vector = new NullVector()) { + exportFieldVector(vector, consumerArrowSchema, consumerArrowArray); + } + Data.importVector(allocator, consumerArrowArray, consumerArrowSchema, null, false); + + // Imported structs should be released but not closed + assertEquals(0, consumerArrowSchema.snapshot().release); + assertNotEquals(0, consumerArrowSchema.memoryAddress()); + assertEquals(0, consumerArrowArray.snapshot().release); + assertNotEquals(0, consumerArrowArray.memoryAddress()); + + try (final NullVector vector = new NullVector()) { + exportFieldVector(vector, consumerArrowSchema, consumerArrowArray); + } + Data.importVector(allocator, consumerArrowArray, consumerArrowSchema, null, false); + + // Imported structs should be released but not closed + assertEquals(0, consumerArrowSchema.snapshot().release); + assertNotEquals(0, consumerArrowSchema.memoryAddress()); + assertEquals(0, consumerArrowArray.snapshot().release); + assertNotEquals(0, consumerArrowArray.memoryAddress()); + } + } + + private void exportFieldVector( + FieldVector vector, ArrowSchema consumerArrowSchema, ArrowArray consumerArrowArray) { + try (ArrowSchema arrowSchema = ArrowSchema.wrap(consumerArrowSchema.memoryAddress()); + ArrowArray arrowArray = ArrowArray.wrap(consumerArrowArray.memoryAddress())) { + // Producer exports vector into the C Data Interface structures + Data.exportVector(allocator, vector, null, arrowArray, arrowSchema); + } + } + private VectorSchemaRoot createTestVSR() { BitVector bitVector = new BitVector("boolean", allocator);