1717package za .co .absa .spline .admin
1818
1919import ch .qos .logback .classic .{Level , Logger }
20- import org .apache .http .Consts
21- import org .apache .http .entity .ContentType
2220import org .slf4j .Logger .ROOT_LOGGER_NAME
2321import org .slf4j .LoggerFactory
2422import org .slf4s .Logging
@@ -30,26 +28,22 @@ import za.co.absa.spline.common.rest.RESTClientApacheHttpImpl
3028import za .co .absa .spline .common .scala13 .Option
3129import za .co .absa .spline .common .security .TLSUtils
3230import za .co .absa .spline .persistence .AuxiliaryDBAction ._
33- import za .co .absa .spline .persistence .DefaultJsonSerDe ._
3431import za .co .absa .spline .persistence .OnDBExistsAction .{Drop , Fail , Skip }
3532import za .co .absa .spline .persistence .{ArangoConnectionURL , ArangoManagerFactory , ArangoManagerFactoryImpl }
36- import za .co .absa .spline .producer .rest .ProducerAPI
3733
3834import java .io .File
3935import java .net .URL
40- import java .nio .charset .StandardCharsets
41- import java .nio .file .Files
42- import scala .concurrent .ExecutionContext .Implicits ._
36+ import java .util .concurrent .{ExecutorService , Executors }
4337import scala .concurrent .duration ._
44- import scala .concurrent .{Await , Future }
45- import scala .jdk .CollectionConverters ._
38+ import scala .concurrent .{Await , ExecutionContext }
4639
4740object AdminCLI extends App {
4841
4942 case class AdminCLIConfig (
5043 cmd : Command = null ,
5144 logLevel : Level = Level .INFO ,
5245 disableSslValidation : Boolean = false ,
46+ parallelism : Int = Runtime .getRuntime.availableProcessors(),
5347 )
5448
5549 implicit class OptionParserOps (val p : OptionParser [AdminCLIConfig ]) extends AnyVal {
@@ -59,11 +53,11 @@ object AdminCLI extends App {
5953 p.arg[String ](" <db_url>" )
6054 required()
6155 text s " ArangoDB connection string in the format: ${ArangoConnectionURL .HumanReadableFormat }"
62- action { case (url, c@ AdminCLIConfig (cmd : DBCommand , _, _)) => c.copy(cmd.dbUrl = ArangoConnectionURL (url)) }
56+ action { case (url, c@ AdminCLIConfig (cmd : DBCommand , _, _, _ )) => c.copy(cmd.dbUrl = ArangoConnectionURL (url)) }
6357 )
6458 }
6559
66- private val dbManagerFactoryImpl = new ArangoManagerFactoryImpl ()
60+ private val dbManagerFactoryImpl = new ArangoManagerFactoryImpl ()( ExecutionContext .global)
6761 private val maybeConsole = InputConsole .systemConsoleIfAvailable()
6862
6963 val dbManagerFactory = maybeConsole
@@ -110,6 +104,11 @@ class AdminCLI(dbManagerFactory: ArangoManagerFactory) extends Logging {
110104 text s " Disable validation of self-signed SSL certificates. (Don't use on production). "
111105 action { case (_, conf) => conf.copy(disableSslValidation = true ) })
112106
107+ (opt[Int ](" threads" )
108+ text s " Number of threads to use for parallel processing. Default is the maximum number of processors available to the JVM; never smaller than 1. "
109+ validate (p => if (p > 0 ) success else failure(" Number of threads must be a positive integer" ))
110+ action ((p, conf) => conf.copy(parallelism = p)))
111+
113112 this .placeNewLine()
114113
115114 (cmd(" db-init" )
@@ -118,10 +117,10 @@ class AdminCLI(dbManagerFactory: ArangoManagerFactory) extends Logging {
118117 children(
119118 opt[Unit ]('f' , " force" )
120119 text " Re-create the database if one already exists."
121- action { case (_, c@ AdminCLIConfig (cmd : DBInit , _, _)) => c.copy(cmd.copy(force = true )) },
120+ action { case (_, c@ AdminCLIConfig (cmd : DBInit , _, _, _ )) => c.copy(cmd.copy(force = true )) },
122121 opt[Unit ]('s' , " skip" )
123122 text " Skip existing database. Don't throw error, just end."
124- action { case (_, c@ AdminCLIConfig (cmd : DBInit , _, _)) => c.copy(cmd.copy(skip = true )) })
123+ action { case (_, c@ AdminCLIConfig (cmd : DBInit , _, _, _ )) => c.copy(cmd.copy(skip = true )) })
125124 children (this .dbCommandOptions: _* )
126125 )
127126
@@ -140,22 +139,22 @@ class AdminCLI(dbManagerFactory: ArangoManagerFactory) extends Logging {
140139 children(
141140 opt[Unit ](" check-access" )
142141 text " Check access to the database"
143- action { case (_, c@ AdminCLIConfig (cmd : DBExec , _, _)) => c.copy(cmd.addAction(CheckDBAccess )) },
142+ action { case (_, c@ AdminCLIConfig (cmd : DBExec , _, _, _ )) => c.copy(cmd.addAction(CheckDBAccess )) },
144143 opt[Unit ](" foxx-reinstall" )
145144 text " Reinstall Foxx services"
146- action { case (_, c@ AdminCLIConfig (cmd : DBExec , _, _)) => c.copy(cmd.addAction(FoxxReinstall )) },
145+ action { case (_, c@ AdminCLIConfig (cmd : DBExec , _, _, _ )) => c.copy(cmd.addAction(FoxxReinstall )) },
147146 opt[Unit ](" indices-delete" )
148147 text " Delete indices"
149- action { case (_, c@ AdminCLIConfig (cmd : DBExec , _, _)) => c.copy(cmd.addAction(IndicesDelete )) },
148+ action { case (_, c@ AdminCLIConfig (cmd : DBExec , _, _, _ )) => c.copy(cmd.addAction(IndicesDelete )) },
150149 opt[Unit ](" indices-create" )
151150 text " Create indices"
152- action { case (_, c@ AdminCLIConfig (cmd : DBExec , _, _)) => c.copy(cmd.addAction(IndicesCreate )) },
151+ action { case (_, c@ AdminCLIConfig (cmd : DBExec , _, _, _ )) => c.copy(cmd.addAction(IndicesCreate )) },
153152 opt[Unit ](" views-delete" )
154153 text " Delete views"
155- action { case (_, c@ AdminCLIConfig (cmd : DBExec , _, _)) => c.copy(cmd.addAction(ViewsDelete )) },
154+ action { case (_, c@ AdminCLIConfig (cmd : DBExec , _, _, _ )) => c.copy(cmd.addAction(ViewsDelete )) },
156155 opt[Unit ](" views-create" )
157156 text " Create views"
158- action { case (_, c@ AdminCLIConfig (cmd : DBExec , _, _)) => c.copy(cmd.addAction(ViewsCreate )) })
157+ action { case (_, c@ AdminCLIConfig (cmd : DBExec , _, _, _ )) => c.copy(cmd.addAction(ViewsCreate )) })
159158 children (this .dbCommandOptions: _* )
160159 )
161160
@@ -168,11 +167,14 @@ class AdminCLI(dbManagerFactory: ArangoManagerFactory) extends Logging {
168167 opt[File ](" dir" )
169168 text " Path to the directory containing the lineage data files to import."
170169 required()
171- action { case (dir, c@ AdminCLIConfig (cmd : LineageImport , _, _)) => c.copy(cmd.copy(lineageDumpPath = dir)) },
170+ action { case (dir, c@ AdminCLIConfig (cmd : LineageImport , _, _, _ )) => c.copy(cmd.copy(lineageDumpPath = dir)) },
172171 opt[URL ](" producer-url" )
173172 text " Producer API base URL to which the lineage data files will be posted."
174173 required()
175- action { case (url, c@ AdminCLIConfig (cmd : LineageImport , _, _)) => c.copy(cmd.copy(producerApiUrl = url)) }
174+ action { case (url, c@ AdminCLIConfig (cmd : LineageImport , _, _, _)) => c.copy(cmd.copy(producerApiUrl = url)) },
175+ opt[Unit ](" fail-fast" )
176+ text " Fail on the first error during import. If not specified, the import will continue on errors."
177+ action { case (_, c@ AdminCLIConfig (cmd : LineageImport , _, _, _)) => c.copy(cmd.copy(failOnErrors = true )) },
176178 ))
177179
178180 this .placeNewLine()
@@ -184,19 +186,22 @@ class AdminCLI(dbManagerFactory: ArangoManagerFactory) extends Logging {
184186 opt[File ](" dir" )
185187 text " Path to the directory where the lineage data files will be exported."
186188 required()
187- action { case (dir, c@ AdminCLIConfig (cmd : LineageExport , _, _)) => c.copy(cmd.copy(lineageDumpPath = dir)) },
189+ action { case (dir, c@ AdminCLIConfig (cmd : LineageExport , _, _, _ )) => c.copy(cmd.copy(lineageDumpPath = dir)) },
188190 opt[URL ](" producer-url" )
189191 text " Producer API base URL from which the lineage data files will be fetched."
190192 required()
191- action { case (url, c@ AdminCLIConfig (cmd : LineageExport , _, _)) => c.copy(cmd.copy(producerApiUrl = url)) }
193+ action { case (url, c@ AdminCLIConfig (cmd : LineageExport , _, _, _)) => c.copy(cmd.copy(producerApiUrl = url)) },
194+ opt[Unit ](" fail-fast" )
195+ text " Fail on the first error during export. If not specified, the export will continue on errors."
196+ action { case (_, c@ AdminCLIConfig (cmd : LineageExport , _, _, _)) => c.copy(cmd.copy(failOnErrors = true )) },
192197 ))
193198
194199 checkConfig {
195- case AdminCLIConfig (null , _, _) =>
200+ case AdminCLIConfig (null , _, _, _ ) =>
196201 failure(" No command given" )
197- case AdminCLIConfig (cmd : DBCommand , _, _) if cmd.dbUrl == null =>
202+ case AdminCLIConfig (cmd : DBCommand , _, _, _ ) if cmd.dbUrl == null =>
198203 failure(" DB connection string is required" )
199- case AdminCLIConfig (cmd : DBInit , _, _) if cmd.force && cmd.skip =>
204+ case AdminCLIConfig (cmd : DBInit , _, _, _ ) if cmd.force && cmd.skip =>
200205 failure(" Options '--force' and '--skip' cannot be used together" )
201206 case _ =>
202207 success
@@ -213,6 +218,9 @@ class AdminCLI(dbManagerFactory: ArangoManagerFactory) extends Logging {
213218 .setLevel(conf.logLevel)
214219
215220 val sslCtxOpt = Option .when(conf.disableSslValidation)(TLSUtils .TrustingAllSSLContext )
221+ implicit val threadPool : ExecutorService = Executors .newWorkStealingPool(conf.parallelism)
222+ implicit val execContext : ExecutionContext = ExecutionContext .fromExecutorService(threadPool)
223+
216224
217225 conf.cmd match {
218226 case DBInit (url, force, skip) =>
@@ -229,93 +237,27 @@ class AdminCLI(dbManagerFactory: ArangoManagerFactory) extends Logging {
229237 val dbManager = dbManagerFactory.create(url, sslCtxOpt)
230238 Await .result(dbManager.upgrade(), Duration .Inf )
231239
232- case LineageImport (producerApiBaseUrl, path) =>
233- val dir = path.toPath
234-
240+ case LineageImport (producerApiBaseUrl, path, failOnErrors) =>
235241 val restClient = new RESTClientApacheHttpImpl (
236242 uri = producerApiBaseUrl.toURI,
237243 maybeSslContext = sslCtxOpt,
238244 maybeCredentials = None
239245 )
246+ val importer = new LineageImporter (restClient, failOnErrors)
247+ val eventualResult = importer.importFrom(path)
248+ val (nPlans, nEvents) = Await .result(eventualResult, Duration .Inf )
249+ println(ansi " Imported %bold{ $nPlans} execution plans and %bold{ $nEvents} execution events " )
240250
241- def process (pattern : String , url : String , fileContentToBodyFn : String => String ): Future [Int ] = {
242- val dirStream = Files .newDirectoryStream(dir, pattern)
243- try {
244- dirStream.asScala
245- .map(_.toFile)
246- .filter(_.isFile)
247- .foldLeft(Future .successful(0 )) { (prevFut, file) =>
248- prevFut.flatMap { n =>
249- val rawJsonStr = Files .readString(file.toPath).trim
250- restClient.post(
251- path = url,
252- body = fileContentToBodyFn(rawJsonStr),
253- contentType = ContentType .create(ProducerAPI .MimeTypeV1_1 , Consts .UTF_8 )
254- ).map(_ => n + 1 )
255- }
256- }
257- } finally {
258- dirStream.close()
259- }
260- }
261-
262- val resFuture = for {
263- nPlans <- process(" plan-*.json" , " execution-plans" , identity)
264- _ <- process(" event-*.json" , " execution-events" , s => if (s startsWith " [" ) s else s " [ $s] " )
265- } yield nPlans
266-
267- val nPlans = Await .result(resFuture, Duration .Inf )
268- println(ansi " %green{Imported $nPlans execution plans with events from $path} " )
269-
270- case LineageExport (producerApiBaseUrl, path) =>
271- path.mkdirs()
272-
251+ case LineageExport (producerApiBaseUrl, path, failOnErrors) =>
273252 val restClient = new RESTClientApacheHttpImpl (
274253 uri = producerApiBaseUrl.toURI,
275254 maybeSslContext = sslCtxOpt,
276255 maybeCredentials = None
277256 )
278-
279- val resFuture = restClient
280- .get(" execution-plans" )
281- .map(_.fromJson[Array [String ]])
282- .flatMap((ids : Array [String ]) => {
283- if (ids.isEmpty) {
284- println(ansi " %yellow{No lineage data found in the database} " )
285- Future .successful((0 , 0 ))
286- } else {
287- println(s " Found ${ids.length} execution plans in the database. Exporting to $path/ ... " )
288- ids.foldLeft(Future .successful((0 , 0 ))) { (prevFut, planId) =>
289- prevFut.flatMap { case (nPlans, nEvents) =>
290- log.debug(s " Exporting execution plan with id: $planId" )
291- val eventualPlanJson = restClient.get(s " execution-plans/ $planId" )
292- val eventualEventJsons = restClient.get(s " execution-plans/ $planId/events " )
293- for {
294- planJson <- eventualPlanJson
295- events <- eventualEventJsons.map(_.fromJson[Seq [Map [String , Any ]]])
296- } yield {
297- Files .writeString(
298- path.toPath.resolve(s " plan- $planId.json " ),
299- planJson,
300- StandardCharsets .UTF_8
301- )
302- events.foreach(event => {
303- val eventJson = event.toJson
304- Files .writeString(
305- path.toPath.resolve(s " event- $planId- ${event(" timestamp" )}.json " ),
306- eventJson,
307- StandardCharsets .UTF_8
308- )
309- })
310- (nPlans + 1 , nEvents + events.length)
311- }
312- }
313- }
314- }
315- })
316-
317- val (nPlans, nEvents) = Await .result(resFuture, Duration .Inf )
318- println(ansi " %green{Exported $nPlans execution plans and $nEvents execution events} " )
257+ val exporter = new LineageExporter (restClient, failOnErrors)
258+ val eventualResult = exporter.exportTo(path)
259+ val (nPlans, nEvents) = Await .result(eventualResult, Duration .Inf )
260+ println(ansi " Exported %bold{ $nPlans} execution plans and %bold{ $nEvents} execution events " )
319261
320262 case DBExec (url, actions) =>
321263 val dbManager = dbManagerFactory.create(url, sslCtxOpt)
0 commit comments