21
21
import com .fasterxml .jackson .databind .node .ObjectNode ;
22
22
import java .io .*;
23
23
import java .net .URL ;
24
+ import java .nio .file .Files ;
25
+ import java .nio .file .StandardOpenOption ;
24
26
import java .sql .DriverManager ;
25
- import java .sql .SQLException ;
26
27
import java .util .*;
27
28
import java .util .function .Function ;
28
29
import java .util .stream .Collectors ;
@@ -61,17 +62,13 @@ public class Neo4jAdminExampleIT {
61
62
62
63
private static final String SHARED_FOLDER = "/admin-import/" ;
63
64
64
- // neo4j-admin database import from Parquet files is an unreleased feature, and we are using a custom docker image
65
- // for that reason, until the feature is publicly available. Set this image name using
66
- // `NEO4J_PRE_RELEASE_DOCKER_IMAGE` environment variable.
67
65
@ Container
68
- private static final GenericContainer <?> NEO4J = new Neo4jContainer <>(DockerImageName .parse (
69
- Optional .ofNullable (System .getenv ("NEO4J_PRE_RELEASE_DOCKER_IMAGE" ))
70
- .orElseThrow (
71
- () -> new IllegalArgumentException (
72
- "Docker image name is not set through NEO4J_PRE_RELEASE_DOCKER_IMAGE environment variable!" )))
73
- .asCompatibleSubstituteFor ("neo4j" ))
66
+ private static final GenericContainer <?> NEO4J = new Neo4jContainer <>(
67
+ DockerImageName .parse ("neo4j:2025.03-enterprise" ))
74
68
.withEnv ("NEO4J_ACCEPT_LICENSE_AGREEMENT" , "yes" )
69
+ .withNeo4jConfig ("dbms.integrations.cloud_storage.gs.project_id" , "connectors-public" )
70
+ .withNeo4jConfig ("server.config.strict_validation.enabled" , "false" )
71
+ .withNeo4jConfig ("internal.dbms.cloud.storage.gs.host" , "https://storage.googleapis.com" )
75
72
.withAdminPassword ("letmein!" )
76
73
.withCreateContainerCmdModifier (cmd -> cmd .withUser ("neo4j" ))
77
74
.withFileSystemBind (
@@ -106,7 +103,7 @@ void runs_an_offline_import_of_dvd_rental_data_set() throws Exception {
106
103
var specification = ImportSpecificationDeserializer .deserialize (reader );
107
104
var sharedFolder = pathFor (SHARED_FOLDER );
108
105
var neo4jAdmin = new Neo4jAdmin (sharedFolder , driver , TARGET_DATABASE );
109
- neo4jAdmin .copyFiles (specification );
106
+ neo4jAdmin .createHeaderFiles (specification );
110
107
neo4jAdmin .executeImport (specification , NEO4J );
111
108
}
112
109
}
@@ -338,35 +335,35 @@ private void migrateTemporalProperties() {
338
335
}
339
336
}
340
337
341
- public void copyFiles (ImportSpecification specification ) throws Exception {
338
+ public void createHeaderFiles (ImportSpecification specification ) throws Exception {
342
339
Map <String , Source > indexedSources =
343
340
specification .getSources ().stream ().collect (Collectors .toMap (Source ::getName , Function .identity ()));
344
341
Map <String , NodeTarget > indexedNodes = specification .getTargets ().getNodes ().stream ()
345
342
.collect (Collectors .toMap (Target ::getName , Function .identity ()));
346
343
for (Target target : specification .getTargets ().getAll ()) {
347
344
switch (target ) {
348
- case NodeTarget nodeTarget -> copyFile (indexedSources , nodeTarget );
345
+ case NodeTarget nodeTarget -> createHeaderFile (indexedSources , nodeTarget );
349
346
case RelationshipTarget relationshipTarget ->
350
- copyFile (indexedSources , indexedNodes , relationshipTarget );
347
+ createHeaderFile (indexedSources , indexedNodes , relationshipTarget );
351
348
default -> throw new RuntimeException ("unsupported target type: %s" .formatted (target .getClass ()));
352
349
}
353
350
}
354
351
}
355
352
356
- private void copyFile (Map <String , Source > sources , NodeTarget nodeTarget ) throws Exception {
353
+ private void createHeaderFile (Map <String , Source > sources , NodeTarget nodeTarget ) throws Exception {
357
354
var source = sources .get (nodeTarget .getSource ());
358
355
assertThat (source ).isInstanceOf (ParquetSource .class );
359
- File parquetFile = new File (sharedFolder , fileName (nodeTarget ));
356
+ File parquetFile = new File (sharedFolder , headerFileName (nodeTarget ));
360
357
List <String > fields = readFieldNames (source );
361
358
Map <String , String > fieldMappings = computeFieldMappings (fields , nodeTarget );
362
359
363
- copyParquetSource (( ParquetSource ) source , parquetFile , fieldMappings );
360
+ createHeaderFile ( parquetFile , fieldMappings );
364
361
}
365
362
366
- private void copyFile (
363
+ private void createHeaderFile (
367
364
Map <String , Source > sources , Map <String , NodeTarget > nodes , RelationshipTarget relationshipTarget )
368
365
throws Exception {
369
- File parquetFile = new File (sharedFolder , fileName (relationshipTarget ));
366
+ File parquetFile = new File (sharedFolder , headerFileName (relationshipTarget ));
370
367
371
368
var source = sources .get (relationshipTarget .getSource ());
372
369
assertThat (source ).isInstanceOf (ParquetSource .class );
@@ -377,26 +374,16 @@ private void copyFile(
377
374
Map <String , String > fieldMappings =
378
375
computeFieldMappings (fields , relationshipTarget , startNodeTarget , endNodeTarget );
379
376
380
- copyParquetSource (( ParquetSource ) source , parquetFile , fieldMappings );
377
+ createHeaderFile ( parquetFile , fieldMappings );
381
378
}
382
379
383
- // 🐤
384
- private void copyParquetSource (ParquetSource source , File targetFile , Map <String , String > fieldMappings )
385
- throws SQLException {
386
- var renamedColumns = String .join (
387
- ", " ,
388
- fieldMappings .entrySet ().stream ()
389
- .map (e -> String .format ("%s AS \" %s\" " , e .getKey (), e .getValue ()))
390
- .toList ());
391
-
392
- try (var connection = DriverManager .getConnection ("jdbc:duckdb:" );
393
- var statement = connection .prepareStatement (String .format (
394
- "COPY (SELECT %s FROM read_parquet($1)) TO '%s' (FORMAT 'parquet', CODEC 'zstd')" ,
395
- renamedColumns , targetFile .getAbsolutePath ()))) {
380
+ private void createHeaderFile (File targetFile , Map <String , String > fieldMappings ) throws IOException {
396
381
397
- statement .setString (1 , source .uri ());
398
- statement .execute ();
399
- }
382
+ var sortedKeys = fieldMappings .keySet ().stream ().sorted ().toList ();
383
+ var values = sortedKeys .stream ().map (fieldMappings ::get ).collect (Collectors .joining ("," ));
384
+ Files .writeString (targetFile .toPath (), values );
385
+ Files .writeString (targetFile .toPath (), "\n " , StandardOpenOption .APPEND );
386
+ Files .writeString (targetFile .toPath (), String .join ("," , sortedKeys ), StandardOpenOption .APPEND );
400
387
}
401
388
402
389
private static String [] importCommand (ImportSpecification specification , String database ) {
@@ -409,14 +396,16 @@ private static String[] importCommand(ImportSpecification specification, String
409
396
command .append (" --nodes=" );
410
397
command .append (String .join (":" , nodeTarget .getLabels ()));
411
398
command .append ("=" );
412
- command .append ("/import/%s" .formatted (fileName (nodeTarget )));
399
+ command .append ("/import/%s," .formatted (headerFileName (nodeTarget )));
400
+ command .append ("%s" .formatted (sourceUri (specification , nodeTarget )));
413
401
}
414
402
415
403
for (RelationshipTarget relationshipTarget : targets .getRelationships ()) {
416
404
command .append (" --relationships=" );
417
405
command .append (relationshipTarget .getType ());
418
406
command .append ("=" );
419
- command .append ("/import/%s" .formatted (fileName (relationshipTarget )));
407
+ command .append ("/import/%s," .formatted (headerFileName (relationshipTarget )));
408
+ command .append ("%s" .formatted (sourceUri (specification , relationshipTarget )));
420
409
}
421
410
422
411
return command .toString ().split (" " );
@@ -493,8 +482,19 @@ private static Map<String, String> indexByField(List<PropertyMapping> properties
493
482
return result ;
494
483
}
495
484
496
- private static String fileName (Target target ) {
497
- return "%s.parquet" .formatted (target .getName ());
485
+ private static String headerFileName (Target target ) {
486
+ return "%s_header.csv" .formatted (target .getName ());
487
+ }
488
+
489
+ private static String sourceUri (ImportSpecification specification , Target target ) {
490
+ var maybeSource = specification .getSources ().stream ()
491
+ .filter (src -> src .getName ().equals (target .getSource ()))
492
+ .findFirst ();
493
+ assertThat (maybeSource ).isPresent ();
494
+ var rawSource = maybeSource .get ();
495
+ assertThat (rawSource ).isInstanceOf (ParquetSource .class );
496
+ var source = (ParquetSource ) rawSource ;
497
+ return source .uri ();
498
498
}
499
499
500
500
private static String idSpaceFor (NodeTarget nodeTarget ) {
0 commit comments