21
21
import com .fasterxml .jackson .databind .node .ObjectNode ;
22
22
import java .io .*;
23
23
import java .net .URL ;
24
+ import java .nio .file .Files ;
24
25
import java .sql .DriverManager ;
25
- import java .sql .SQLException ;
26
26
import java .util .*;
27
27
import java .util .function .Function ;
28
28
import java .util .stream .Collectors ;
@@ -61,17 +61,13 @@ public class Neo4jAdminExampleIT {
61
61
62
62
private static final String SHARED_FOLDER = "/admin-import/" ;
63
63
64
- // neo4j-admin database import from Parquet files is an unreleased feature, and we are using a custom docker image
65
- // for that reason, until the feature is publicly available. Set this image name using
66
- // `NEO4J_PRE_RELEASE_DOCKER_IMAGE` environment variable.
67
64
@ Container
68
- private static final GenericContainer <?> NEO4J = new Neo4jContainer <>(DockerImageName .parse (
69
- Optional .ofNullable (System .getenv ("NEO4J_PRE_RELEASE_DOCKER_IMAGE" ))
70
- .orElseThrow (
71
- () -> new IllegalArgumentException (
72
- "Docker image name is not set through NEO4J_PRE_RELEASE_DOCKER_IMAGE environment variable!" )))
73
- .asCompatibleSubstituteFor ("neo4j" ))
65
+ private static final GenericContainer <?> NEO4J = new Neo4jContainer <>(
66
+ DockerImageName .parse ("neo4j:2025.03-enterprise" ))
74
67
.withEnv ("NEO4J_ACCEPT_LICENSE_AGREEMENT" , "yes" )
68
+ .withNeo4jConfig ("dbms.integrations.cloud_storage.gs.project_id" , "connectors-public" )
69
+ .withNeo4jConfig ("server.config.strict_validation.enabled" , "false" )
70
+ .withNeo4jConfig ("internal.dbms.cloud.storage.gs.host" , "https://storage.googleapis.com" )
75
71
.withAdminPassword ("letmein!" )
76
72
.withCreateContainerCmdModifier (cmd -> cmd .withUser ("neo4j" ))
77
73
.withFileSystemBind (
@@ -106,7 +102,7 @@ void runs_an_offline_import_of_dvd_rental_data_set() throws Exception {
106
102
var specification = ImportSpecificationDeserializer .deserialize (reader );
107
103
var sharedFolder = pathFor (SHARED_FOLDER );
108
104
var neo4jAdmin = new Neo4jAdmin (sharedFolder , driver , TARGET_DATABASE );
109
- neo4jAdmin .copyFiles (specification );
105
+ neo4jAdmin .createHeaderFiles (specification );
110
106
neo4jAdmin .executeImport (specification , NEO4J );
111
107
}
112
108
}
@@ -338,35 +334,35 @@ private void migrateTemporalProperties() {
338
334
}
339
335
}
340
336
341
- public void copyFiles (ImportSpecification specification ) throws Exception {
337
+ public void createHeaderFiles (ImportSpecification specification ) throws Exception {
342
338
Map <String , Source > indexedSources =
343
339
specification .getSources ().stream ().collect (Collectors .toMap (Source ::getName , Function .identity ()));
344
340
Map <String , NodeTarget > indexedNodes = specification .getTargets ().getNodes ().stream ()
345
341
.collect (Collectors .toMap (Target ::getName , Function .identity ()));
346
342
for (Target target : specification .getTargets ().getAll ()) {
347
343
switch (target ) {
348
- case NodeTarget nodeTarget -> copyFile (indexedSources , nodeTarget );
344
+ case NodeTarget nodeTarget -> createHeaderFile (indexedSources , nodeTarget );
349
345
case RelationshipTarget relationshipTarget ->
350
- copyFile (indexedSources , indexedNodes , relationshipTarget );
346
+ createHeaderFile (indexedSources , indexedNodes , relationshipTarget );
351
347
default -> throw new RuntimeException ("unsupported target type: %s" .formatted (target .getClass ()));
352
348
}
353
349
}
354
350
}
355
351
356
- private void copyFile (Map <String , Source > sources , NodeTarget nodeTarget ) throws Exception {
352
+ private void createHeaderFile (Map <String , Source > sources , NodeTarget nodeTarget ) throws Exception {
357
353
var source = sources .get (nodeTarget .getSource ());
358
354
assertThat (source ).isInstanceOf (ParquetSource .class );
359
- File parquetFile = new File (sharedFolder , fileName (nodeTarget ));
355
+ File parquetFile = new File (sharedFolder , headerFileName (nodeTarget ));
360
356
List <String > fields = readFieldNames (source );
361
357
Map <String , String > fieldMappings = computeFieldMappings (fields , nodeTarget );
362
358
363
- copyParquetSource (( ParquetSource ) source , parquetFile , fieldMappings );
359
+ createHeaderFile ( parquetFile , fieldMappings );
364
360
}
365
361
366
- private void copyFile (
362
+ private void createHeaderFile (
367
363
Map <String , Source > sources , Map <String , NodeTarget > nodes , RelationshipTarget relationshipTarget )
368
364
throws Exception {
369
- File parquetFile = new File (sharedFolder , fileName (relationshipTarget ));
365
+ File parquetFile = new File (sharedFolder , headerFileName (relationshipTarget ));
370
366
371
367
var source = sources .get (relationshipTarget .getSource ());
372
368
assertThat (source ).isInstanceOf (ParquetSource .class );
@@ -377,26 +373,14 @@ private void copyFile(
377
373
Map <String , String > fieldMappings =
378
374
computeFieldMappings (fields , relationshipTarget , startNodeTarget , endNodeTarget );
379
375
380
- copyParquetSource (( ParquetSource ) source , parquetFile , fieldMappings );
376
+ createHeaderFile ( parquetFile , fieldMappings );
381
377
}
382
378
383
- // 🐤
384
- private void copyParquetSource (ParquetSource source , File targetFile , Map <String , String > fieldMappings )
385
- throws SQLException {
386
- var renamedColumns = String .join (
387
- ", " ,
388
- fieldMappings .entrySet ().stream ()
389
- .map (e -> String .format ("%s AS \" %s\" " , e .getKey (), e .getValue ()))
390
- .toList ());
391
-
392
- try (var connection = DriverManager .getConnection ("jdbc:duckdb:" );
393
- var statement = connection .prepareStatement (String .format (
394
- "COPY (SELECT %s FROM read_parquet($1)) TO '%s' (FORMAT 'parquet', CODEC 'zstd')" ,
395
- renamedColumns , targetFile .getAbsolutePath ()))) {
379
+ private void createHeaderFile (File targetFile , Map <String , String > fieldMappings ) throws IOException {
396
380
397
- statement . setString ( 1 , source . uri () );
398
- statement . execute ( );
399
- }
381
+ var sortedKeys = fieldMappings . keySet (). stream (). sorted (). toList ( );
382
+ var values = sortedKeys . stream (). map ( fieldMappings :: get ). collect ( Collectors . joining ( "," ) );
383
+ Files . writeString ( targetFile . toPath (), values + " \n " + String . join ( "," , sortedKeys ));
400
384
}
401
385
402
386
private static String [] importCommand (ImportSpecification specification , String database ) {
@@ -409,14 +393,16 @@ private static String[] importCommand(ImportSpecification specification, String
409
393
command .append (" --nodes=" );
410
394
command .append (String .join (":" , nodeTarget .getLabels ()));
411
395
command .append ("=" );
412
- command .append ("/import/%s" .formatted (fileName (nodeTarget )));
396
+ command .append ("/import/%s," .formatted (headerFileName (nodeTarget )));
397
+ command .append ("%s" .formatted (sourceUri (specification , nodeTarget )));
413
398
}
414
399
415
400
for (RelationshipTarget relationshipTarget : targets .getRelationships ()) {
416
401
command .append (" --relationships=" );
417
402
command .append (relationshipTarget .getType ());
418
403
command .append ("=" );
419
- command .append ("/import/%s" .formatted (fileName (relationshipTarget )));
404
+ command .append ("/import/%s," .formatted (headerFileName (relationshipTarget )));
405
+ command .append ("%s" .formatted (sourceUri (specification , relationshipTarget )));
420
406
}
421
407
422
408
return command .toString ().split (" " );
@@ -493,8 +479,19 @@ private static Map<String, String> indexByField(List<PropertyMapping> properties
493
479
return result ;
494
480
}
495
481
496
- private static String fileName (Target target ) {
497
- return "%s.parquet" .formatted (target .getName ());
482
+ private static String headerFileName (Target target ) {
483
+ return "%s_header.csv" .formatted (target .getName ());
484
+ }
485
+
486
+ private static String sourceUri (ImportSpecification specification , Target target ) {
487
+ var maybeSource = specification .getSources ().stream ()
488
+ .filter (src -> src .getName ().equals (target .getSource ()))
489
+ .findFirst ();
490
+ assertThat (maybeSource ).isPresent ();
491
+ var rawSource = maybeSource .get ();
492
+ assertThat (rawSource ).isInstanceOf (ParquetSource .class );
493
+ var source = (ParquetSource ) rawSource ;
494
+ return source .uri ();
498
495
}
499
496
500
497
private static String idSpaceFor (NodeTarget nodeTarget ) {
0 commit comments