Skip to content

Commit 075543c

Browse files
committed
fix: 2025.05 properly supports CSV header for Parquet, thx @meistermeier!
1 parent b97f40c commit 075543c

File tree

1 file changed

+34
-48
lines changed

1 file changed

+34
-48
lines changed

examples/neo4j-admin/src/test/java/org/neo4j/importer/Neo4jAdminExampleIT.java

Lines changed: 34 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,25 @@
1616
*/
1717
package org.neo4j.importer;
1818

19+
import static org.assertj.core.api.Assertions.assertThat;
20+
1921
import com.fasterxml.jackson.databind.node.ObjectNode;
22+
import java.io.File;
23+
import java.io.IOException;
24+
import java.io.InputStream;
25+
import java.io.InputStreamReader;
26+
import java.net.URL;
27+
import java.nio.file.Files;
28+
import java.sql.DriverManager;
29+
import java.util.ArrayList;
30+
import java.util.HashMap;
31+
import java.util.HashSet;
32+
import java.util.List;
33+
import java.util.Map;
34+
import java.util.Set;
35+
import java.util.function.Function;
36+
import java.util.stream.Collectors;
37+
import java.util.stream.Stream;
2038
import org.junit.jupiter.api.AfterEach;
2139
import org.junit.jupiter.api.BeforeEach;
2240
import org.junit.jupiter.api.Test;
@@ -49,24 +67,6 @@
4967
import org.testcontainers.utility.DockerImageName;
5068
import org.testcontainers.utility.MountableFile;
5169

52-
import java.io.File;
53-
import java.io.InputStream;
54-
import java.io.InputStreamReader;
55-
import java.net.URL;
56-
import java.sql.DriverManager;
57-
import java.sql.SQLException;
58-
import java.util.ArrayList;
59-
import java.util.HashMap;
60-
import java.util.HashSet;
61-
import java.util.List;
62-
import java.util.Map;
63-
import java.util.Set;
64-
import java.util.function.Function;
65-
import java.util.stream.Collectors;
66-
import java.util.stream.Stream;
67-
68-
import static org.assertj.core.api.Assertions.assertThat;
69-
7070
@SuppressWarnings({"SameParameterValue", "resource"})
7171
@Testcontainers
7272
public class Neo4jAdminExampleIT {
@@ -75,7 +75,7 @@ public class Neo4jAdminExampleIT {
7575

7676
@Container
7777
private static final GenericContainer<?> NEO4J = new Neo4jContainer<>(
78-
DockerImageName.parse("neo4j:2025.04-enterprise"))
78+
DockerImageName.parse("neo4j:2025.05-enterprise")) // minimum is 2025.05
7979
.withEnv("NEO4J_ACCEPT_LICENSE_AGREEMENT", "yes")
8080
.withNeo4jConfig("dbms.integrations.cloud_storage.gs.project_id", "connectors-public")
8181
.withNeo4jConfig("server.config.strict_validation.enabled", "false")
@@ -356,7 +356,7 @@ public void createHeaderFiles(ImportSpecification specification) throws Exceptio
356356
switch (target) {
357357
case NodeTarget nodeTarget -> createHeaderFile(indexedSources, nodeTarget);
358358
case RelationshipTarget relationshipTarget ->
359-
createHeaderFile(indexedSources, indexedNodes, relationshipTarget);
359+
createHeaderFile(indexedSources, indexedNodes, relationshipTarget);
360360
default -> throw new RuntimeException("unsupported target type: %s".formatted(target.getClass()));
361361
}
362362
}
@@ -380,37 +380,23 @@ private void createHeaderFile(
380380
var source = sources.get(relationshipTarget.getSource());
381381
assertThat(source).isInstanceOf(ParquetSource.class);
382382

383-
var startNodeTarget = nodes.get(relationshipTarget.getStartNodeReference());
384-
var endNodeTarget = nodes.get(relationshipTarget.getEndNodeReference());
383+
var startNodeTarget =
384+
nodes.get(relationshipTarget.getStartNodeReference().getName());
385+
var endNodeTarget =
386+
nodes.get(relationshipTarget.getEndNodeReference().getName());
385387
List<String> fields = readFieldNames(source);
386388
Map<String, String> fieldMappings =
387389
computeFieldMappings(fields, relationshipTarget, startNodeTarget, endNodeTarget);
388390

389391
generateHeaderFile(parquetFile, fieldMappings);
390392
}
391393

392-
// 🐤
393-
private void generateHeaderFile(File targetFile, Map<String, String> fieldMappings) throws SQLException {
394-
var keys = fieldMappings.keySet().stream().sorted().toList();
395-
var values = keys.stream().map(fieldMappings::get).toList();
396-
var sql = String.format(
397-
"COPY (PIVOT (\n" +
398-
" SELECT i,\n" +
399-
" unnest(l) AS x,\n" +
400-
" generate_subscripts(l, 1) AS index\n" +
401-
" FROM (\n" +
402-
" VALUES (1, ['%s']), (2, ['%s'])\n" +
403-
" ) tbl(i,l)\n" +
404-
") on 'test_' || index using first(x)) TO '%s' (FORMAT 'parquet', CODEC 'zstd')",
405-
String.join("','", keys),
406-
String.join("','", values),
407-
targetFile.getAbsolutePath());
408-
try (var connection = DriverManager.getConnection("jdbc:duckdb:");
409-
// courtesy of https://github.com/duckdb/duckdb/discussions/17047
410-
// pivot statements cannot be parameterized hence the ugly interpolation
411-
var statement = connection.createStatement()) {
412-
statement.execute(sql);
413-
}
394+
private void generateHeaderFile(File targetFile, Map<String, String> fieldMappings) throws IOException {
395+
var originalFields = fieldMappings.keySet().stream().sorted().toList();
396+
var mappedNames = originalFields.stream().map(fieldMappings::get).toList();
397+
Files.writeString(
398+
targetFile.toPath(),
399+
"%s%n%s".formatted(String.join(",", mappedNames), String.join(",", originalFields)));
414400
}
415401

416402
private static String[] importCommand(ImportSpecification specification, String database) {
@@ -487,7 +473,7 @@ private static Set<String> getKeyProperties(NodeTarget nodeTarget) {
487473
// 🐤
488474
private static List<String> readFieldNames(Source source) throws Exception {
489475
try (var connection = DriverManager.getConnection("jdbc:duckdb:");
490-
var statement = connection.prepareStatement("DESCRIBE (SELECT * FROM read_parquet($1))")) {
476+
var statement = connection.prepareStatement("DESCRIBE (SELECT * FROM read_parquet($1))")) {
491477
statement.setString(1, ((ParquetSource) source).uri());
492478
var fields = new ArrayList<String>();
493479
try (var results = statement.executeQuery()) {
@@ -506,7 +492,7 @@ private static Map<String, String> indexByField(List<PropertyMapping> properties
506492
}
507493

508494
private static String headerFileName(Target target) {
509-
return "%s_header.parquet".formatted(target.getName());
495+
return "%s_header.csv".formatted(target.getName());
510496
}
511497

512498
private static String sourceUri(ImportSpecification specification, Target target) {
@@ -541,7 +527,7 @@ private static List<String> generateSchemaStatements(List<? extends Target> targ
541527
.flatMap(target -> switch (target) {
542528
case NodeTarget nodeTarget -> generateNodeSchemaStatements(nodeTarget);
543529
case RelationshipTarget relationshipTarget ->
544-
generateRelationshipSchemaStatements(relationshipTarget);
530+
generateRelationshipSchemaStatements(relationshipTarget);
545531
default -> Stream.empty();
546532
})
547533
.toList();
@@ -705,7 +691,7 @@ private static String propertyType(PropertyType propertyType) {
705691
case ZONED_TIME -> "ZONED TIME";
706692
case ZONED_TIME_ARRAY -> "LIST<ZONED TIME NOT NULL>";
707693
default ->
708-
throw new IllegalArgumentException(String.format("Unsupported property type: %s", propertyType));
694+
throw new IllegalArgumentException(String.format("Unsupported property type: %s", propertyType));
709695
};
710696
}
711697
}

0 commit comments

Comments
 (0)