Scala wrapper around Flink's REST API endpoints. This API is listed in the Flink documentation as the "Monitoring REST API", but it includes support for uploading JARs and managing jobs, so it's more fully-featured than the name would imply.
This project is meant to be an easy way for Scala developers to interact with Flink's REST API. It contains wrapper methods for API calls and returns responses in case classes for easy access.
This wrapper does not reference Flink as a library. This makes it for ideal for applications which need to invoke Flink jobs or report on Flink job statuses, but don't want to be tied to Flink at compile time.
This library was developed and tested against Flink 1.3.0. Compatibility with older or newer versions is not guaranteed.
Add an SBT reference:
libraryDependencies += "com.github.mjreid" %% "flink-wrapper" % "0.0.2"Create an instance of the FlinkRestClient:
val flinkClient = FlinkRestClient("http://flink-server.example.com:8081/")Use the FlinkRestClient to make a request, being sure to have an implicit ExecutionContext in scope:
import scala.concurrent.ExecutionContext.Implicits.global
val result = flinkClient.getJobsOverview()Do something with the result.
result.map { jobsOverview => println(jobsOverview) }This wrapper uses "basic" Scala idioms (namely, Future and Option) to communicate failures. Any caught exceptions in
a failed Future will be wrapped in a FlinkWrapperException or one of its subclasses.
The possible exceptions are FlinkWrapperInvalidJsonException when an unexpected JSON value is returned, and
FlinkWrapperUnexpectedStatusCodeException when an unexpected HTTP status code is returned.
This wrapper uses play-ws under the hood to do HTTP requests in a non-blocking fashion. As a result, all calls return
Future objects, which require an execution context in scope.
If you're making a small app, you can import the global execution context. However, as a best practice for larger apps, we recommend using a dedicated executor service. This will isolate your thread pools so that your application doesn't stall if the Flink REST endpoints take a long time to respond, for example.
The follow APIs are available in this wrapper. See the Scaladoc for more details.
runProgramstarts a Flink program with the specified parameters. Important note: ThejarIdargument is not what appears in the Flink UI -- a hidden GUID is appended. It's recommended to use theuploadJarmethod (see below), which will return the full filename that can be used as part ofrunProgram.cancelJobcancels a job immediately.cancelJobWithSavepointcancels a job with a savepoint. This method returns aCancelJobAcceptedobject; use thegetCancellationStatusmethod to query the status. Cancellations can fail if, for example, the savepoint files cannot be written.uploadJaruploads a JAR that can be executed viarunProgram.
getJobsListreturns all jobs in Flink, separated by state.getJobOverviewreturns all running and finished jobs in Flink with additional execution details.getJobDetailsgets the details of a specific Flink job.getJobPlangets the plan of a specific Flink job. (The plan data is also included ingetJobDetails.)getJobExceptionsgets any exceptions that occurred in a job.
getConfiggets the system-level Flink configuration.getClusterOverviewgets the Flink cluster information.getJobManagerConfiggets all the JobManager configurations.getClusterTaskManagersgets the information about all the TaskManagers inside the Flink cluster.
- Michael Reid (mjreid @github)
- Your name here? Pull requests welcome!
