16
16
*/
17
17
package org .neo4j .importer ;
18
18
19
- import static org .assertj .core .api .Assertions .assertThat ;
20
-
21
19
import com .fasterxml .jackson .databind .node .ObjectNode ;
22
- import java .io .*;
23
- import java .net .URL ;
24
- import java .sql .DriverManager ;
25
- import java .sql .SQLException ;
26
- import java .util .*;
27
- import java .util .function .Function ;
28
- import java .util .stream .Collectors ;
29
- import java .util .stream .Stream ;
30
20
import org .junit .jupiter .api .AfterEach ;
31
21
import org .junit .jupiter .api .BeforeEach ;
32
22
import org .junit .jupiter .api .Test ;
33
23
import org .neo4j .cypherdsl .core .Cypher ;
34
24
import org .neo4j .cypherdsl .core .internal .SchemaNames ;
35
- import org .neo4j .driver .*;
25
+ import org .neo4j .driver .AuthTokens ;
26
+ import org .neo4j .driver .Driver ;
27
+ import org .neo4j .driver .GraphDatabase ;
28
+ import org .neo4j .driver .QueryConfig ;
29
+ import org .neo4j .driver .SessionConfig ;
36
30
import org .neo4j .importer .v1 .ImportSpecification ;
37
31
import org .neo4j .importer .v1 .ImportSpecificationDeserializer ;
38
32
import org .neo4j .importer .v1 .actions .Action ;
55
49
import org .testcontainers .utility .DockerImageName ;
56
50
import org .testcontainers .utility .MountableFile ;
57
51
52
+ import java .io .File ;
53
+ import java .io .InputStream ;
54
+ import java .io .InputStreamReader ;
55
+ import java .net .URL ;
56
+ import java .sql .DriverManager ;
57
+ import java .sql .SQLException ;
58
+ import java .util .ArrayList ;
59
+ import java .util .HashMap ;
60
+ import java .util .HashSet ;
61
+ import java .util .List ;
62
+ import java .util .Map ;
63
+ import java .util .Set ;
64
+ import java .util .function .Function ;
65
+ import java .util .stream .Collectors ;
66
+ import java .util .stream .Stream ;
67
+
68
+ import static org .assertj .core .api .Assertions .assertThat ;
69
+
58
70
@ SuppressWarnings ({"SameParameterValue" , "resource" })
59
71
@ Testcontainers
60
72
public class Neo4jAdminExampleIT {
61
73
62
74
private static final String SHARED_FOLDER = "/admin-import/" ;
63
75
64
- // neo4j-admin database import from Parquet files is an unreleased feature, and we are using a custom docker image
65
- // for that reason, until the feature is publicly available. Set this image name using
66
- // `NEO4J_PRE_RELEASE_DOCKER_IMAGE` environment variable.
67
76
@ Container
68
- private static final GenericContainer <?> NEO4J = new Neo4jContainer <>(DockerImageName .parse (
69
- Optional .ofNullable (System .getenv ("NEO4J_PRE_RELEASE_DOCKER_IMAGE" ))
70
- .orElseThrow (
71
- () -> new IllegalArgumentException (
72
- "Docker image name is not set through NEO4J_PRE_RELEASE_DOCKER_IMAGE environment variable!" )))
73
- .asCompatibleSubstituteFor ("neo4j" ))
77
+ private static final GenericContainer <?> NEO4J = new Neo4jContainer <>(
78
+ DockerImageName .parse ("neo4j:2025.04-enterprise" ))
74
79
.withEnv ("NEO4J_ACCEPT_LICENSE_AGREEMENT" , "yes" )
80
+ .withNeo4jConfig ("dbms.integrations.cloud_storage.gs.project_id" , "connectors-public" )
81
+ .withNeo4jConfig ("server.config.strict_validation.enabled" , "false" )
82
+ .withNeo4jConfig ("internal.dbms.cloud.storage.gs.host" , "https://storage.googleapis.com" )
75
83
.withAdminPassword ("letmein!" )
76
84
.withCreateContainerCmdModifier (cmd -> cmd .withUser ("neo4j" ))
77
85
.withFileSystemBind (
@@ -106,7 +114,7 @@ void runs_an_offline_import_of_dvd_rental_data_set() throws Exception {
106
114
var specification = ImportSpecificationDeserializer .deserialize (reader );
107
115
var sharedFolder = pathFor (SHARED_FOLDER );
108
116
var neo4jAdmin = new Neo4jAdmin (sharedFolder , driver , TARGET_DATABASE );
109
- neo4jAdmin .copyFiles (specification );
117
+ neo4jAdmin .createHeaderFiles (specification );
110
118
neo4jAdmin .executeImport (specification , NEO4J );
111
119
}
112
120
}
@@ -181,9 +189,9 @@ private static void assertSchema(Driver driver) {
181
189
private static void assertNodeConstraint (Driver driver , String constraintType , String label , String property ) {
182
190
var records = driver .executableQuery (
183
191
"""
184
- SHOW CONSTRAINTS YIELD type, entityType, labelsOrTypes, properties \
185
- WHERE type = $constraintType AND entityType = 'NODE' AND labelsOrTypes = [$label] AND properties = [$property] \
186
- RETURN count(*) = 1 AS result""" )
192
+ SHOW CONSTRAINTS YIELD type, entityType, labelsOrTypes, properties \
193
+ WHERE type = $constraintType AND entityType = 'NODE' AND labelsOrTypes = [$label] AND properties = [$property] \
194
+ RETURN count(*) = 1 AS result""" )
187
195
.withConfig (QueryConfig .builder ().withDatabase (TARGET_DATABASE ).build ())
188
196
.withParameters (Map .of ("constraintType" , constraintType , "label" , label , "property" , property ))
189
197
.execute ()
@@ -195,9 +203,9 @@ RETURN count(*) = 1 AS result""")
195
203
private static void assertNodeTypeConstraint (Driver driver , String label , String property , String propertyType ) {
196
204
var records = driver .executableQuery (
197
205
"""
198
- SHOW CONSTRAINTS YIELD type, entityType, labelsOrTypes, properties, propertyType \
199
- WHERE type = 'NODE_PROPERTY_TYPE' AND entityType = 'NODE' AND labelsOrTypes = [$label] AND properties = [$property] AND propertyType = $propertyType \
200
- RETURN count(*) = 1 AS result""" )
206
+ SHOW CONSTRAINTS YIELD type, entityType, labelsOrTypes, properties, propertyType \
207
+ WHERE type = 'NODE_PROPERTY_TYPE' AND entityType = 'NODE' AND labelsOrTypes = [$label] AND properties = [$property] AND propertyType = $propertyType \
208
+ RETURN count(*) = 1 AS result""" )
201
209
.withConfig (QueryConfig .builder ().withDatabase (TARGET_DATABASE ).build ())
202
210
.withParameters (Map .of ("label" , label , "property" , property , "propertyType" , propertyType ))
203
211
.execute ()
@@ -210,9 +218,9 @@ private static void assertRelationshipConstraint(
210
218
Driver driver , String constraintType , String relType , String property ) {
211
219
var records = driver .executableQuery (
212
220
"""
213
- SHOW CONSTRAINTS YIELD type, entityType, labelsOrTypes, properties \
214
- WHERE type = $constraintType AND entityType = 'RELATIONSHIP' AND labelsOrTypes = [$type] AND properties = [$property] \
215
- RETURN count(*) = 1 AS result""" )
221
+ SHOW CONSTRAINTS YIELD type, entityType, labelsOrTypes, properties \
222
+ WHERE type = $constraintType AND entityType = 'RELATIONSHIP' AND labelsOrTypes = [$type] AND properties = [$property] \
223
+ RETURN count(*) = 1 AS result""" )
216
224
.withConfig (QueryConfig .builder ().withDatabase (TARGET_DATABASE ).build ())
217
225
.withParameters (Map .of ("constraintType" , constraintType , "type" , relType , "property" , property ))
218
226
.execute ()
@@ -225,9 +233,9 @@ private static void assertRelationshipTypeConstraint(
225
233
Driver driver , String relType , String property , String propertyType ) {
226
234
var records = driver .executableQuery (
227
235
"""
228
- SHOW CONSTRAINTS YIELD type, entityType, labelsOrTypes, properties, propertyType \
229
- WHERE type = 'RELATIONSHIP_PROPERTY_TYPE' AND entityType = 'RELATIONSHIP' AND labelsOrTypes = [$type] AND properties = [$property] AND propertyType = $propertyType \
230
- RETURN count(*) = 1 AS result""" )
236
+ SHOW CONSTRAINTS YIELD type, entityType, labelsOrTypes, properties, propertyType \
237
+ WHERE type = 'RELATIONSHIP_PROPERTY_TYPE' AND entityType = 'RELATIONSHIP' AND labelsOrTypes = [$type] AND properties = [$property] AND propertyType = $propertyType \
238
+ RETURN count(*) = 1 AS result""" )
231
239
.withConfig (QueryConfig .builder ().withDatabase (TARGET_DATABASE ).build ())
232
240
.withParameters (Map .of ("type" , relType , "property" , property , "propertyType" , propertyType ))
233
241
.execute ()
@@ -320,53 +328,53 @@ private void migrateTemporalProperties() {
320
328
try (var session = driver .session (SessionConfig .forDatabase (targetDatabase ))) {
321
329
session .run (
322
330
"""
323
- MATCH (customer:Customer)
324
- CALL (customer) {
325
- MATCH (customer)
326
- SET customer.creation_date = date({year: 1970, month: 1, day: 1}) + duration({days: customer.creation_date})
327
- } IN TRANSACTIONS""" )
331
+ MATCH (customer:Customer)
332
+ CALL (customer) {
333
+ MATCH (customer)
334
+ SET customer.creation_date = date({year: 1970, month: 1, day: 1}) + duration({days: customer.creation_date})
335
+ } IN TRANSACTIONS""" )
328
336
.consume ();
329
337
330
338
session .run (
331
339
"""
332
- MATCH ()-[rental:HAS_RENTED]->()
333
- CALL (rental) {
334
- WITH rental
335
- SET rental.date = datetime({epochMillis: rental.date/1000})
336
- } IN TRANSACTIONS""" )
340
+ MATCH ()-[rental:HAS_RENTED]->()
341
+ CALL (rental) {
342
+ WITH rental
343
+ SET rental.date = datetime({epochMillis: rental.date/1000})
344
+ } IN TRANSACTIONS""" )
337
345
.consume ();
338
346
}
339
347
}
340
348
341
- public void copyFiles (ImportSpecification specification ) throws Exception {
349
+ public void createHeaderFiles (ImportSpecification specification ) throws Exception {
342
350
Map <String , Source > indexedSources =
343
351
specification .getSources ().stream ().collect (Collectors .toMap (Source ::getName , Function .identity ()));
344
352
Map <String , NodeTarget > indexedNodes = specification .getTargets ().getNodes ().stream ()
345
353
.collect (Collectors .toMap (Target ::getName , Function .identity ()));
346
354
for (Target target : specification .getTargets ().getAll ()) {
347
355
switch (target ) {
348
- case NodeTarget nodeTarget -> copyFile (indexedSources , nodeTarget );
356
+ case NodeTarget nodeTarget -> createHeaderFile (indexedSources , nodeTarget );
349
357
case RelationshipTarget relationshipTarget ->
350
- copyFile (indexedSources , indexedNodes , relationshipTarget );
358
+ createHeaderFile (indexedSources , indexedNodes , relationshipTarget );
351
359
default -> throw new RuntimeException ("unsupported target type: %s" .formatted (target .getClass ()));
352
360
}
353
361
}
354
362
}
355
363
356
- private void copyFile (Map <String , Source > sources , NodeTarget nodeTarget ) throws Exception {
364
+ private void createHeaderFile (Map <String , Source > sources , NodeTarget nodeTarget ) throws Exception {
357
365
var source = sources .get (nodeTarget .getSource ());
358
366
assertThat (source ).isInstanceOf (ParquetSource .class );
359
- File parquetFile = new File (sharedFolder , fileName (nodeTarget ));
367
+ File parquetFile = new File (sharedFolder , headerFileName (nodeTarget ));
360
368
List <String > fields = readFieldNames (source );
361
369
Map <String , String > fieldMappings = computeFieldMappings (fields , nodeTarget );
362
370
363
- copyParquetSource (( ParquetSource ) source , parquetFile , fieldMappings );
371
+ generateHeaderFile ( parquetFile , fieldMappings );
364
372
}
365
373
366
- private void copyFile (
374
+ private void createHeaderFile (
367
375
Map <String , Source > sources , Map <String , NodeTarget > nodes , RelationshipTarget relationshipTarget )
368
376
throws Exception {
369
- File parquetFile = new File (sharedFolder , fileName (relationshipTarget ));
377
+ File parquetFile = new File (sharedFolder , headerFileName (relationshipTarget ));
370
378
371
379
var source = sources .get (relationshipTarget .getSource ());
372
380
assertThat (source ).isInstanceOf (ParquetSource .class );
@@ -377,25 +385,30 @@ private void copyFile(
377
385
Map <String , String > fieldMappings =
378
386
computeFieldMappings (fields , relationshipTarget , startNodeTarget , endNodeTarget );
379
387
380
- copyParquetSource (( ParquetSource ) source , parquetFile , fieldMappings );
388
+ generateHeaderFile ( parquetFile , fieldMappings );
381
389
}
382
390
383
391
// 🐤
384
- private void copyParquetSource (ParquetSource source , File targetFile , Map <String , String > fieldMappings )
385
- throws SQLException {
386
- var renamedColumns = String .join (
387
- ", " ,
388
- fieldMappings .entrySet ().stream ()
389
- .map (e -> String .format ("%s AS \" %s\" " , e .getKey (), e .getValue ()))
390
- .toList ());
391
-
392
+ private void generateHeaderFile (File targetFile , Map <String , String > fieldMappings ) throws SQLException {
393
+ var keys = fieldMappings .keySet ().stream ().sorted ().toList ();
394
+ var values = keys .stream ().map (fieldMappings ::get ).toList ();
395
+ var sql = String .format (
396
+ "COPY (PIVOT (\n " +
397
+ " SELECT i,\n " +
398
+ " unnest(l) AS x,\n " +
399
+ " generate_subscripts(l, 1) AS index\n " +
400
+ " FROM (\n " +
401
+ " VALUES (1, ['%s']), (2, ['%s'])\n " +
402
+ " ) tbl(i,l)\n " +
403
+ ") on 'test_' || index using first(x)) TO '%s' (FORMAT 'parquet', CODEC 'zstd')" ,
404
+ String .join ("','" , keys ),
405
+ String .join ("','" , values ),
406
+ targetFile .getAbsolutePath ());
392
407
try (var connection = DriverManager .getConnection ("jdbc:duckdb:" );
393
- var statement = connection .prepareStatement (String .format (
394
- "COPY (SELECT %s FROM read_parquet($1)) TO '%s' (FORMAT 'parquet', CODEC 'zstd')" ,
395
- renamedColumns , targetFile .getAbsolutePath ()))) {
396
-
397
- statement .setString (1 , source .uri ());
398
- statement .execute ();
408
+ // courtesy of https://github.com/duckdb/duckdb/discussions/17047
409
+ // pivot statements cannot be parameterized hence the ugly interpolation
410
+ var statement = connection .createStatement ()) {
411
+ statement .execute (sql );
399
412
}
400
413
}
401
414
@@ -409,14 +422,16 @@ private static String[] importCommand(ImportSpecification specification, String
409
422
command .append (" --nodes=" );
410
423
command .append (String .join (":" , nodeTarget .getLabels ()));
411
424
command .append ("=" );
412
- command .append ("/import/%s" .formatted (fileName (nodeTarget )));
425
+ command .append ("/import/%s," .formatted (headerFileName (nodeTarget )));
426
+ command .append ("%s" .formatted (sourceUri (specification , nodeTarget )));
413
427
}
414
428
415
429
for (RelationshipTarget relationshipTarget : targets .getRelationships ()) {
416
430
command .append (" --relationships=" );
417
431
command .append (relationshipTarget .getType ());
418
432
command .append ("=" );
419
- command .append ("/import/%s" .formatted (fileName (relationshipTarget )));
433
+ command .append ("/import/%s," .formatted (headerFileName (relationshipTarget )));
434
+ command .append ("%s" .formatted (sourceUri (specification , relationshipTarget )));
420
435
}
421
436
422
437
return command .toString ().split (" " );
@@ -475,7 +490,7 @@ private static Set<String> getKeyProperties(NodeTarget nodeTarget) {
475
490
// 🐤
476
491
private static List <String > readFieldNames (Source source ) throws Exception {
477
492
try (var connection = DriverManager .getConnection ("jdbc:duckdb:" );
478
- var statement = connection .prepareStatement ("DESCRIBE (SELECT * FROM read_parquet($1))" )) {
493
+ var statement = connection .prepareStatement ("DESCRIBE (SELECT * FROM read_parquet($1))" )) {
479
494
statement .setString (1 , ((ParquetSource ) source ).uri ());
480
495
var fields = new ArrayList <String >();
481
496
try (var results = statement .executeQuery ()) {
@@ -493,8 +508,19 @@ private static Map<String, String> indexByField(List<PropertyMapping> properties
493
508
return result ;
494
509
}
495
510
496
- private static String fileName (Target target ) {
497
- return "%s.parquet" .formatted (target .getName ());
511
+ private static String headerFileName (Target target ) {
512
+ return "%s_header.parquet" .formatted (target .getName ());
513
+ }
514
+
515
+ private static String sourceUri (ImportSpecification specification , Target target ) {
516
+ var maybeSource = specification .getSources ().stream ()
517
+ .filter (src -> src .getName ().equals (target .getSource ()))
518
+ .findFirst ();
519
+ assertThat (maybeSource ).isPresent ();
520
+ var rawSource = maybeSource .get ();
521
+ assertThat (rawSource ).isInstanceOf (ParquetSource .class );
522
+ var source = (ParquetSource ) rawSource ;
523
+ return source .uri ();
498
524
}
499
525
500
526
private static String idSpaceFor (NodeTarget nodeTarget ) {
@@ -518,7 +544,7 @@ private static List<String> generateSchemaStatements(List<? extends Target> targ
518
544
.flatMap (target -> switch (target ) {
519
545
case NodeTarget nodeTarget -> generateNodeSchemaStatements (nodeTarget );
520
546
case RelationshipTarget relationshipTarget ->
521
- generateRelationshipSchemaStatements (relationshipTarget );
547
+ generateRelationshipSchemaStatements (relationshipTarget );
522
548
default -> Stream .empty ();
523
549
})
524
550
.toList ();
@@ -686,7 +712,7 @@ private static String propertyType(PropertyType propertyType) {
686
712
case ZONED_TIME -> "ZONED TIME" ;
687
713
case ZONED_TIME_ARRAY -> "LIST<ZONED TIME NOT NULL>" ;
688
714
default ->
689
- throw new IllegalArgumentException (String .format ("Unsupported property type: %s" , propertyType ));
715
+ throw new IllegalArgumentException (String .format ("Unsupported property type: %s" , propertyType ));
690
716
};
691
717
}
692
718
}
0 commit comments