Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions vector/src/main/codegen/includes/vv_imports.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.arrow.vector.complex.*;
import org.apache.arrow.vector.complex.reader.*;
import org.apache.arrow.vector.complex.impl.*;
import org.apache.arrow.vector.complex.writer.*;
import org.apache.arrow.vector.complex.writer.BaseWriter.ExtensionWriter;
import org.apache.arrow.vector.complex.writer.BaseWriter.StructWriter;
import org.apache.arrow.vector.complex.writer.BaseWriter.ListWriter;
import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter;
Expand Down
4 changes: 4 additions & 0 deletions vector/src/main/codegen/templates/AbstractFieldReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ public void copyAsField(String name, ${name}Writer writer) {

</#list></#list>

public void copyAsValue(StructWriter writer, ExtensionTypeWriterFactory writerFactory) {
fail("CopyAsValue StructWriter");
}

public void read(ExtensionHolder holder) {
fail("Extension");
}
Expand Down
3 changes: 3 additions & 0 deletions vector/src/main/codegen/templates/BaseReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public interface RepeatedStructReader extends StructReader{
boolean next();
int size();
void copyAsValue(StructWriter writer);
void copyAsValue(StructWriter writer, ExtensionTypeWriterFactory writerFactory);
}

public interface ListReader extends BaseReader{
Expand All @@ -59,6 +60,7 @@ public interface RepeatedListReader extends ListReader{
boolean next();
int size();
void copyAsValue(ListWriter writer);
void copyAsValue(ListWriter writer, ExtensionTypeWriterFactory writerFactory);
}

public interface MapReader extends BaseReader{
Expand All @@ -69,6 +71,7 @@ public interface RepeatedMapReader extends MapReader{
boolean next();
int size();
void copyAsValue(MapWriter writer);
void copyAsValue(MapWriter writer, ExtensionTypeWriterFactory writerFactory);
}

public interface ScalarReader extends
Expand Down
42 changes: 35 additions & 7 deletions vector/src/main/codegen/templates/ComplexCopier.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,14 @@ public class ComplexCopier {
* @param output field to write to
*/
public static void copy(FieldReader input, FieldWriter output) {
writeValue(input, output);
writeValue(input, output, null);
}

private static void writeValue(FieldReader reader, FieldWriter writer) {
public static void copy(FieldReader input, FieldWriter output, ExtensionTypeWriterFactory extensionTypeWriterFactory) {
writeValue(input, output, extensionTypeWriterFactory);
}

private static void writeValue(FieldReader reader, FieldWriter writer, ExtensionTypeWriterFactory extensionTypeWriterFactory) {
final MinorType mt = reader.getMinorType();

switch (mt) {
Expand All @@ -61,7 +65,7 @@ private static void writeValue(FieldReader reader, FieldWriter writer) {
FieldReader childReader = reader.reader();
FieldWriter childWriter = getListWriterForReader(childReader, writer);
if (childReader.isSet()) {
writeValue(childReader, childWriter);
writeValue(childReader, childWriter, extensionTypeWriterFactory);
} else {
childWriter.writeNull();
}
Expand All @@ -79,8 +83,8 @@ private static void writeValue(FieldReader reader, FieldWriter writer) {
FieldReader structReader = reader.reader();
if (structReader.isSet()) {
writer.startEntry();
writeValue(mapReader.key(), getMapWriterForReader(mapReader.key(), writer.key()));
writeValue(mapReader.value(), getMapWriterForReader(mapReader.value(), writer.value()));
writeValue(mapReader.key(), getMapWriterForReader(mapReader.key(), writer.key()), extensionTypeWriterFactory);
writeValue(mapReader.value(), getMapWriterForReader(mapReader.value(), writer.value()), extensionTypeWriterFactory);
writer.endEntry();
} else {
writer.writeNull();
Expand All @@ -99,7 +103,7 @@ private static void writeValue(FieldReader reader, FieldWriter writer) {
if (childReader.getMinorType() != Types.MinorType.NULL) {
FieldWriter childWriter = getStructWriterForReader(childReader, writer, name);
if (childReader.isSet()) {
writeValue(childReader, childWriter);
writeValue(childReader, childWriter, extensionTypeWriterFactory);
} else {
childWriter.writeNull();
}
Expand All @@ -110,6 +114,21 @@ private static void writeValue(FieldReader reader, FieldWriter writer) {
writer.writeNull();
}
break;
case EXTENSIONTYPE:
if (extensionTypeWriterFactory == null) {
throw new UnsupportedOperationException(
"EXTENSIONTYPE are not supported yet. Please provide an ExtensionTypeWriterFactory." );
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can ever support extension without the factory right?

Suggested change
throw new UnsupportedOperationException(
"EXTENSIONTYPE are not supported yet. Please provide an ExtensionTypeWriterFactory." );
throw new IllegalArgumentException(
"Must provide ExtensionTypeWriterFactory" );

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, we need a factory to determine writer impl for extension type.

}
if (reader.isSet()) {
Object value = reader.readObject();
if (value != null) {
writer.addExtensionTypeWriterFactory(extensionTypeWriterFactory);
writer.writeExtension(value);
}
} else {
writer.writeNull();
}
break;
<#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first />
<#assign fields = minor.fields!type.fields />
<#assign uncappedName = name?uncap_first/>
Expand Down Expand Up @@ -162,6 +181,9 @@ private static FieldWriter getStructWriterForReader(FieldReader reader, StructWr
return (FieldWriter) writer.map(name);
case LISTVIEW:
return (FieldWriter) writer.listView(name);
case EXTENSIONTYPE:
ExtensionWriter extensionWriter = writer.extension(name, reader.getField().getType());
return (FieldWriter) extensionWriter;
default:
throw new UnsupportedOperationException(reader.getMinorType().toString());
}
Expand All @@ -185,7 +207,10 @@ private static FieldWriter getListWriterForReader(FieldReader reader, ListWriter
case NULL:
return (FieldWriter) writer.list();
case LISTVIEW:
return (FieldWriter) writer.listView();
return (FieldWriter) writer.listView();
case EXTENSIONTYPE:
ExtensionWriter extensionWriter = writer.extension(reader.getField().getType());
return (FieldWriter) extensionWriter;
default:
throw new UnsupportedOperationException(reader.getMinorType().toString());
}
Expand All @@ -211,6 +236,9 @@ private static FieldWriter getMapWriterForReader(FieldReader reader, MapWriter w
return (FieldWriter) writer.listView();
case MAP:
return (FieldWriter) writer.map(false);
case EXTENSIONTYPE:
ExtensionWriter extensionWriter = writer.extension(reader.getField().getType());
return (FieldWriter) extensionWriter;
default:
throw new UnsupportedOperationException(reader.getMinorType().toString());
}
Expand Down
2 changes: 2 additions & 0 deletions vector/src/main/codegen/templates/NullReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public void read(int arrayIndex, Nullable${name}Holder holder){
}
</#list></#list>

public void copyAsValue(StructWriter writer, ExtensionTypeWriterFactory writerFactory){}

public void read(ExtensionHolder holder) {
holder.isSet = 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.ReferenceManager;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.complex.impl.ExtensionTypeWriterFactory;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.util.DataSizeRoundingUtil;
import org.apache.arrow.vector.util.TransferPair;
Expand Down Expand Up @@ -260,6 +261,18 @@ public void copyFromSafe(int fromIndex, int thisIndex, ValueVector from) {
throw new UnsupportedOperationException();
}

@Override
public void copyFrom(
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
throw new UnsupportedOperationException();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be abstract methods instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because this method is used only with complex type vectors and they have implementation; for non-complex, it's not supported, and this behavior is covered here

}

@Override
public void copyFromSafe(
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
throw new UnsupportedOperationException();
}

/**
* Transfer the validity buffer from `validityBuffer` to the target vector's `validityBuffer`.
* Start at `startIndex` and copy `length` number of elements. If the starting index is 8 byte
Expand Down
13 changes: 13 additions & 0 deletions vector/src/main/java/org/apache/arrow/vector/NullVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.arrow.memory.util.hash.ArrowBufHasher;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.compare.VectorVisitor;
import org.apache.arrow.vector.complex.impl.ExtensionTypeWriterFactory;
import org.apache.arrow.vector.complex.impl.NullReader;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
Expand Down Expand Up @@ -329,6 +330,18 @@ public void copyFromSafe(int fromIndex, int thisIndex, ValueVector from) {
throw new UnsupportedOperationException();
}

@Override
public void copyFrom(
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
throw new UnsupportedOperationException();
}

@Override
public void copyFromSafe(
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
throw new UnsupportedOperationException();
}

@Override
public String getName() {
return this.getField().getName();
Expand Down
25 changes: 25 additions & 0 deletions vector/src/main/java/org/apache/arrow/vector/ValueVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.arrow.memory.OutOfMemoryException;
import org.apache.arrow.memory.util.hash.ArrowBufHasher;
import org.apache.arrow.vector.compare.VectorVisitor;
import org.apache.arrow.vector.complex.impl.ExtensionTypeWriterFactory;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.Field;
Expand Down Expand Up @@ -309,6 +310,30 @@ public interface ValueVector extends Closeable, Iterable<ValueVector> {
*/
void copyFromSafe(int fromIndex, int thisIndex, ValueVector from);

/**
* Copy a cell value from a particular index in source vector to a particular position in this
* vector.
*
* @param fromIndex position to copy from in source vector
* @param thisIndex position to copy to in this vector
* @param from source vector
* @param writerFactory the extension type writer factory to use for copying extension type values
*/
void copyFrom(
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory);

/**
* Same as {@link #copyFrom(int, int, ValueVector)} except that it handles the case when the
* capacity of the vector needs to be expanded before copy.
*
* @param fromIndex position to copy from in source vector
* @param thisIndex position to copy to in this vector
* @param from source vector
* @param writerFactory the extension type writer factory to use for copying extension type values
*/
void copyFromSafe(
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory);

/**
* Accept a generic {@link VectorVisitor} and return the result.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.arrow.vector.DensityAwareVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.complex.impl.ExtensionTypeWriterFactory;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.ArrowType.FixedSizeList;
Expand Down Expand Up @@ -151,6 +152,18 @@ public void copyFromSafe(int fromIndex, int thisIndex, ValueVector from) {
throw new UnsupportedOperationException();
}

@Override
public void copyFrom(
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
throw new UnsupportedOperationException();
}

@Override
public void copyFromSafe(
int fromIndex, int thisIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
throw new UnsupportedOperationException();
}

@Override
public String getName() {
return name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.arrow.vector.ZeroVector;
import org.apache.arrow.vector.compare.VectorVisitor;
import org.apache.arrow.vector.complex.impl.ComplexCopier;
import org.apache.arrow.vector.complex.impl.ExtensionTypeWriterFactory;
import org.apache.arrow.vector.complex.impl.UnionLargeListReader;
import org.apache.arrow.vector.complex.impl.UnionLargeListWriter;
import org.apache.arrow.vector.complex.reader.FieldReader;
Expand Down Expand Up @@ -482,12 +483,42 @@ public void copyFromSafe(int inIndex, int outIndex, ValueVector from) {
*/
@Override
public void copyFrom(int inIndex, int outIndex, ValueVector from) {
copyFrom(inIndex, outIndex, from, null);
}

/**
* Copy a cell value from a particular index in source vector to a particular position in this
* vector.
*
* @param inIndex position to copy from in source vector
* @param outIndex position to copy to in this vector
* @param from source vector
* @param writerFactory the extension type writer factory to use for copying extension type values
*/
@Override
public void copyFrom(
int inIndex, int outIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
Preconditions.checkArgument(this.getMinorType() == from.getMinorType());
FieldReader in = from.getReader();
in.setPosition(inIndex);
UnionLargeListWriter out = getWriter();
out.setPosition(outIndex);
ComplexCopier.copy(in, out);
ComplexCopier.copy(in, out, writerFactory);
}

/**
* Same as {@link #copyFrom(int, int, ValueVector)} except that it handles the case when the
* capacity of the vector needs to be expanded before copy.
*
* @param inIndex position to copy from in source vector
* @param outIndex position to copy to in this vector
* @param from source vector
* @param writerFactory the extension type writer factory to use for copying extension type values
*/
@Override
public void copyFromSafe(
int inIndex, int outIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
copyFrom(inIndex, outIndex, from, writerFactory);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.ZeroVector;
import org.apache.arrow.vector.compare.VectorVisitor;
import org.apache.arrow.vector.complex.impl.ExtensionTypeWriterFactory;
import org.apache.arrow.vector.complex.impl.UnionLargeListViewReader;
import org.apache.arrow.vector.complex.impl.UnionLargeListViewWriter;
import org.apache.arrow.vector.complex.impl.UnionListReader;
Expand Down Expand Up @@ -346,6 +347,20 @@ public void copyFrom(int inIndex, int outIndex, ValueVector from) {
"LargeListViewVector does not support copyFrom operation yet.");
}

@Override
public void copyFromSafe(
int inIndex, int outIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
throw new UnsupportedOperationException(
"LargeListViewVector does not support copyFromSafe operation yet.");
}

@Override
public void copyFrom(
int inIndex, int outIndex, ValueVector from, ExtensionTypeWriterFactory writerFactory) {
throw new UnsupportedOperationException(
"LargeListViewVector does not support copyFrom operation yet.");
}

@Override
public FieldVector getDataVector() {
return vector;
Expand Down
Loading