Skip to content

GH-615: Produce Avro core data types out of Arrow VSR #638

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 89 commits into from
Apr 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
42187d7
Avro producers - interface and base class
Feb 15, 2025
c97c968
Avro producers - basic types
Feb 15, 2025
2b4d6ea
Avro producers - logical types
Feb 15, 2025
59374c1
Add the composite AVRO producer
Feb 15, 2025
d79a420
Add producers packages to the module exports
Feb 15, 2025
e0ad9ca
Protected constructor on abstract class
Feb 15, 2025
01d7c52
Use byte[] for fixed producer, matching fixed consumer
Feb 15, 2025
560f49f
Add the AVRO enum producer
Feb 15, 2025
26c9ba4
Add skip and set methods to the interface and base class
Feb 15, 2025
9d3bbe9
Add a specialized nullable producer
Feb 15, 2025
9aeabf4
Fix build warnings
martin-traverse Feb 25, 2025
6a6804b
Fix comment warnings
martin-traverse Feb 25, 2025
ef60f39
Utils to create producers for primitive and logical types
martin-traverse Feb 25, 2025
14682fe
Add producer for struct type
martin-traverse Feb 25, 2025
f0d5487
Add producer for arrays type
martin-traverse Feb 25, 2025
d4c2bea
Add producer for map type
martin-traverse Feb 25, 2025
1452137
Override methods in composite producers
martin-traverse Feb 25, 2025
8588d5a
Do not allow skipNull() on nullable producer
martin-traverse Feb 25, 2025
eedf575
Add a union type
martin-traverse Feb 26, 2025
2a7c7ee
Very basic first test case
martin-traverse Feb 26, 2025
2299a47
Fixes
martin-traverse Feb 26, 2025
c4cfe29
Maven spotless checks
martin-traverse Feb 26, 2025
a79ae69
Remove AutoClosable interface on producers
martin-traverse Feb 28, 2025
a171da1
First pass of Avro schema builder
martin-traverse Mar 1, 2025
b3bad23
Include create schema in high level RT test
martin-traverse Mar 1, 2025
8bdb17f
Fixes in create schema logic
martin-traverse Mar 1, 2025
22b339f
Spotless fixes
martin-traverse Mar 1, 2025
275e950
Simplify decimal producer - always output fixed width (this is what A…
martin-traverse Mar 1, 2025
d58c2ce
Update int, long and fixed producers to support logical types
martin-traverse Mar 1, 2025
f8c2191
Support for decimal and decimal 256 as logical types on top of the fi…
martin-traverse Mar 1, 2025
5af9db5
Support for date day and date millis as logical types, millis uses va…
martin-traverse Mar 1, 2025
8100f8d
Fix missing counter increments
martin-traverse Mar 2, 2025
be27d6e
Support all type width / unit combinations for time logical type
martin-traverse Mar 2, 2025
7b6d3e0
Update comment for date milli producer
martin-traverse Mar 2, 2025
2a5bffb
Producers for local timestamps
martin-traverse Mar 2, 2025
fab4e7b
Fixes for time producers
martin-traverse Mar 2, 2025
c523f40
Support for zone-aware timestamp types
martin-traverse Mar 2, 2025
f1ac401
Remove duration types from schema (not supported yet)
martin-traverse Mar 2, 2025
6af475e
Support all type widths for floating point types
martin-traverse Mar 2, 2025
8f82d0f
Support all type widths for signed integer types
martin-traverse Mar 2, 2025
c3620ca
Add support for all type widths of unsigned integers
martin-traverse Mar 2, 2025
c223ffb
Support list and fixed list, do not support large list for now
martin-traverse Mar 2, 2025
b974053
Do not include large types in schema generation
martin-traverse Mar 2, 2025
0a468ff
Include support for dense union
martin-traverse Mar 2, 2025
904b978
Rename fixed size binary producer to match convention
martin-traverse Mar 2, 2025
6866749
Remove boolean return flag on resetValueVector
martin-traverse Mar 23, 2025
1ace4d2
Remove time zone conversion from timestamp producers (both Arrow and …
martin-traverse Mar 23, 2025
f81389a
Use Java functions to handle conversion of unsigned int types
martin-traverse Mar 23, 2025
e58b9c2
Schema conversion tests for primitive and logical types
martin-traverse Mar 23, 2025
8f6d183
Fix schema building for map and record types
martin-traverse Mar 23, 2025
9de9a98
Fix schema building for unions with null members
martin-traverse Mar 23, 2025
fe39591
Schema tests for complex types containing simple types
martin-traverse Mar 23, 2025
0e362f0
Arrow to avro data tests for integers
martin-traverse Mar 23, 2025
67c5abf
Add tests for null, boolean and floating types
martin-traverse Mar 23, 2025
acaa6d9
Tests for remaining primitive types
martin-traverse Mar 23, 2025
8d85c43
Producer fixes
martin-traverse Mar 23, 2025
2995bc7
Tests for decimal types
martin-traverse Mar 23, 2025
c91e976
Test for logical date type
martin-traverse Mar 23, 2025
534acb8
Add tests for time data types
martin-traverse Mar 23, 2025
d21c658
Add tests for timestamp types
martin-traverse Mar 23, 2025
3c26e5d
Fix missing increment in timestamp sec TZ producer
martin-traverse Mar 23, 2025
e9e2954
Throw an error on overflow in date milli producer
martin-traverse Mar 23, 2025
94c214b
Include negative values in decimal tests
martin-traverse Mar 23, 2025
ce81b14
Revise logic for decimal producers
martin-traverse Mar 23, 2025
20bb46f
Use time-millis for time (sec) vectors instead of time-micros (consis…
martin-traverse Mar 23, 2025
976ac85
Tests for list and fixed list
martin-traverse Mar 24, 2025
aa03bdd
Producer fixes for fixed size list
martin-traverse Mar 24, 2025
5ccef03
Test for map types
martin-traverse Mar 24, 2025
a7ca151
Test for nullable map types
martin-traverse Mar 24, 2025
f26a2e1
Apply spotless
martin-traverse Mar 24, 2025
44cec9c
Add test for struct type
martin-traverse Mar 24, 2025
1d07493
Add test for nullable struct type
martin-traverse Mar 24, 2025
593d160
Update tests to use field writers (ensure correct vector layout)
martin-traverse Mar 24, 2025
cf559cb
Fix delegate offset for map producer
martin-traverse Mar 24, 2025
361d28c
Require key type = VARCHAR to encode Avro maps
martin-traverse Mar 24, 2025
81a3ce1
Use HTML table to type mapping comment
martin-traverse Mar 24, 2025
5470a78
Check bounds on index in setPosition()
martin-traverse Mar 24, 2025
fa7d470
Fix entry count in write map test
martin-traverse Mar 24, 2025
81065bf
Remove support for production of union data (pending fixes in the vec…
martin-traverse Mar 25, 2025
e36682a
Fix nano precision test in CI for time types
martin-traverse Mar 25, 2025
0b59da8
Improve tests for nullable lists and maps
martin-traverse Mar 27, 2025
bdb5a92
Set value counts for all complex vectors in the tests
martin-traverse Mar 27, 2025
ccf9c3a
Fix handling of child index updates for fixed size lists
martin-traverse Mar 27, 2025
7f0e3f0
Fixes for PR comments
martin-traverse Mar 27, 2025
98a99a5
Apply spotless
martin-traverse Mar 27, 2025
e1ca9b2
Correct comment on schemas for nullable fields
martin-traverse Mar 27, 2025
e39de26
Eliminate duplication in schema building methods
martin-traverse Apr 1, 2025
956e867
Test logical types using built-in field (not a regular schema prop)
martin-traverse Apr 1, 2025
00bbef0
Use structured types instead of raw props for logical type schemas (t…
martin-traverse Apr 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions adapter/avro/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
module org.apache.arrow.adapter.avro {
exports org.apache.arrow.adapter.avro.consumers;
exports org.apache.arrow.adapter.avro.consumers.logical;
exports org.apache.arrow.adapter.avro.producers;
exports org.apache.arrow.adapter.avro.producers.logical;
exports org.apache.arrow.adapter.avro;

requires org.apache.arrow.memory.core;
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.adapter.avro.producers;

import java.io.IOException;
import org.apache.arrow.vector.BaseFixedWidthVector;
import org.apache.arrow.vector.BigIntVector;
import org.apache.avro.io.Encoder;

/**
* Producer that produces long values from a {@link BigIntVector}, writes data to an Avro encoder.
*
* <p>Logical types are also supported, for vectors derived from {@link BaseFixedWidthVector} where
* the internal representation matches BigIntVector and requires no conversion.
*/
public class AvroBigIntProducer extends BaseAvroProducer<BaseFixedWidthVector> {

/** Instantiate an AvroBigIntProducer. */
public AvroBigIntProducer(BigIntVector vector) {
super(vector);
}

/** Protected constructor for logical types with a long representation. */
protected AvroBigIntProducer(BaseFixedWidthVector vector) {
super(vector);
if (vector.getTypeWidth() != BigIntVector.TYPE_WIDTH) {
throw new IllegalArgumentException(
"AvroBigIntProducer requires type width = " + BigIntVector.TYPE_WIDTH);
}
}

@Override
public void produce(Encoder encoder) throws IOException {
long value = vector.getDataBuffer().getLong(currentIndex * (long) BigIntVector.TYPE_WIDTH);
encoder.writeLong(value);
currentIndex++;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.adapter.avro.producers;

import java.io.IOException;
import org.apache.arrow.vector.BitVector;
import org.apache.avro.io.Encoder;

/**
* Producer that produces boolean values from a {@link BitVector}, writes data to an Avro encoder.
*/
public class AvroBooleanProducer extends BaseAvroProducer<BitVector> {

/** Instantiate am AvroBooleanProducer. */
public AvroBooleanProducer(BitVector vector) {
super(vector);
}

@Override
public void produce(Encoder encoder) throws IOException {
int bitValue = vector.get(currentIndex++);
encoder.writeBoolean(bitValue != 0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.adapter.avro.producers;

import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.avro.io.Encoder;

/**
* Producer that produces byte array values from a {@link VarBinaryVector}, writes data to an Avro
* encoder.
*/
public class AvroBytesProducer extends BaseAvroProducer<VarBinaryVector> {

/** Instantiate an AvroBytesProducer. */
public AvroBytesProducer(VarBinaryVector vector) {
super(vector);
}

@Override
public void produce(Encoder encoder) throws IOException {
// The nio ByteBuffer is created once per call, but underlying data is not copied
long offset = vector.getStartOffset(currentIndex);
long endOffset = vector.getEndOffset(currentIndex);
int length = (int) (endOffset - offset);
ByteBuffer nioBuffer = vector.getDataBuffer().nioBuffer(offset, length);
encoder.writeBytes(nioBuffer);
currentIndex++;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.adapter.avro.producers;

import java.io.IOException;
import org.apache.arrow.vector.IntVector;
import org.apache.avro.io.Encoder;

/**
* Producer that produces enum values from a dictionary-encoded {@link IntVector}, writes data to an
* Avro encoder.
*/
public class AvroEnumProducer extends BaseAvroProducer<IntVector> {

/** Instantiate an AvroEnumProducer. */
public AvroEnumProducer(IntVector vector) {
super(vector);
}

@Override
public void produce(Encoder encoder) throws IOException {
encoder.writeEnum(vector.get(currentIndex++));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.adapter.avro.producers;

import java.io.IOException;
import org.apache.arrow.vector.BaseFixedWidthVector;
import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.avro.io.Encoder;

/**
* Producer that produces fixed-size binary values from a {@link FixedSizeBinaryVector}, writes data
* to an Avro encoder.
*
* <p>Logical types are also supported, for vectors derived from {@link BaseFixedWidthVector} where
* the internal representation is fixed width bytes and requires no conversion.
*/
public class AvroFixedSizeBinaryProducer extends BaseAvroProducer<BaseFixedWidthVector> {

private final byte[] reuseBytes;

/** Instantiate an AvroFixedSizeBinaryProducer. */
public AvroFixedSizeBinaryProducer(FixedSizeBinaryVector vector) {
super(vector);
reuseBytes = new byte[vector.getTypeWidth()];
}

/** Protected constructor for logical types with a fixed width representation. */
protected AvroFixedSizeBinaryProducer(BaseFixedWidthVector vector) {
super(vector);
reuseBytes = new byte[vector.getTypeWidth()];
}

@Override
public void produce(Encoder encoder) throws IOException {
long offset = (long) currentIndex * vector.getTypeWidth();
vector.getDataBuffer().getBytes(offset, reuseBytes);
encoder.writeFixed(reuseBytes);
currentIndex++;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.adapter.avro.producers;

import java.io.IOException;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.complex.FixedSizeListVector;
import org.apache.avro.io.Encoder;

/**
* Producer that produces array values from a {@link FixedSizeListVector}, writes data to an avro
* encoder.
*/
public class AvroFixedSizeListProducer extends BaseAvroProducer<FixedSizeListVector> {

private final Producer<? extends FieldVector> delegate;

/** Instantiate an AvroFixedSizeListProducer. */
public AvroFixedSizeListProducer(
FixedSizeListVector vector, Producer<? extends FieldVector> delegate) {
super(vector);
this.delegate = delegate;
}

@Override
public void produce(Encoder encoder) throws IOException {

encoder.writeArrayStart();
encoder.setItemCount(vector.getListSize());

for (int i = 0; i < vector.getListSize(); i++) {
encoder.startItem();
delegate.produce(encoder);
}

encoder.writeArrayEnd();
currentIndex++;
}

@Override
public void skipNull() {
super.skipNull();
// Child vector contains a fixed number of elements for each entry
int childIndex = currentIndex * vector.getListSize();
delegate.setPosition(childIndex);
}

@Override
public void setPosition(int index) {
if (index < 0 || index > vector.getValueCount()) {
throw new IllegalArgumentException("Index out of bounds");
}
super.setPosition(index);
// Child vector contains a fixed number of elements for each entry
int childIndex = currentIndex * vector.getListSize();
delegate.setPosition(childIndex);
}

@Override
@SuppressWarnings("unchecked")
public void resetValueVector(FixedSizeListVector vector) {
((Producer<FieldVector>) delegate).resetValueVector(vector.getDataVector());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.adapter.avro.producers;

import java.io.IOException;
import org.apache.arrow.memory.util.Float16;
import org.apache.arrow.vector.Float2Vector;
import org.apache.avro.io.Encoder;

/**
* Producer that produces float values from a {@link Float2Vector}, writes data to an Avro encoder.
*/
public class AvroFloat2Producer extends BaseAvroProducer<Float2Vector> {

/** Instantiate an AvroFloat2Producer. */
public AvroFloat2Producer(Float2Vector vector) {
super(vector);
}

@Override
public void produce(Encoder encoder) throws IOException {
short rawValue = vector.getDataBuffer().getShort(currentIndex * (long) Float2Vector.TYPE_WIDTH);
encoder.writeFloat(Float16.toFloat(rawValue));
currentIndex++;
}
}
Loading
Loading