Skip to content

[NU-2154] Standalone Flink live data #8231

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 23 commits into
base: staging
Choose a base branch
from
Open

[NU-2154] Standalone Flink live data #8231

wants to merge 23 commits into from

Conversation

mgoworko
Copy link
Contributor

Describe your changes

This change introduces basic mechanism of synchronizing live data from standalone Flink jobs using the Designer db. This mechanism may be suitable for demonstrational purposes or low/medium traffic environments. It may be a base for future implementation of betten synchronization methods, using for example Redis.

Checklist before merge

  • Related issue ID is placed at the beginning of PR title in [brackets] (can be GH issue or Nu Jira issue)
  • Code is cleaned from temporary changes and commented out lines
  • Parts of the code that are not easy to understand are documented in the code
  • Changes are covered by automated tests
  • Showcase in dev-application.conf added to demonstrate the feature
  • Documentation added or updated
  • Added entry in Changelog.md describing the change from the perspective of a public distribution user
  • Added MigrationGuide.md entry in the appropriate subcategory if introducing a breaking change
  • Verify that PR will be squashed during merge

@github-actions github-actions bot added client client main fe docs ui labels Jun 12, 2025
@@ -40,7 +43,26 @@ class FlinkScenarioJob(modelData: ModelData) {
env: StreamExecutionEnvironment,
processListeners: List[ProcessListener],
): JobExecutionResult = {
val compilerFactory = new FlinkProcessCompilerDataFactory(modelData, deploymentData, processListeners)
val liveDataCollectingListener =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be a safe change, because nothing changes in Flink jobs when db uploading of results is disabled.

@@ -25,6 +37,477 @@ class DetectLargeTransactionSpec extends AnyFreeSpecLike with BaseE2ESpec with M
eventually {
val processedTransactions = client.readAllMessagesFromKafka("ProcessedTransactions")
processedTransactions should equal(largeAmountTransactions)
given()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added assertion of live data in the E2E test using standalone Flink.

mgoworko added 4 commits June 12, 2025 15:47
# Conflicts:
#	designer/deployment-manager-api/src/main/scala/pl/touk/nussknacker/engine/api/deployment/DeploymentManager.scala
#	designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/scenarioTesting/ResultsWithCountsDto.scala
#	designer/server/src/main/scala/pl/touk/nussknacker/ui/api/description/scenarioTesting/ResultsWithCountsDtoCodecs.scala
#	designer/server/src/test/scala/pl/touk/nussknacker/ui/api/livedata/ScenarioLiveDataApiHttpServiceSpec.scala
#	engine/flink/management/src/main/scala/pl/touk/nussknacker/engine/management/jobrunner/FlinkMiniClusterScenarioJobRunner.scala
@github-actions github-actions bot removed ui client client main fe labels Jun 12, 2025
@mgoworko mgoworko requested a review from arkadius June 13, 2025 08:03
@@ -2039,9 +2040,8 @@ lazy val liveDataCollector = (project in file("designer/live-data-collector"))
),
)
.dependsOn(
deploymentManagerApi % Provided,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep this dependency explicit even if it is transitively available via liveDataCollector. It should be easy to remove this feature if we decide to replace this approach with collector with something else. BTW I missed one thing in the previous PR. The collector module shouldn't be nested inside designer. We have the strategy for modules organization:

  • nested inside the designer module - things that are not operational, only needed during designing the scenario
  • nested inside the engine module - things that are used during runtime
  • on the root level - things that are shared between the designer and operational part (scenario runtime).

Folllowing this approach, the collector should be on the root level, the same as scenario-compiler

) extends LiveDataPreviewMode

final case class DbUploader(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upload has a very strong connotation with files. We have a very similar mechanism that have a ready set of words for us - the metrics. In the metrics domain, metrics are registered in the registry and then they are reported to some service responsible for exposing them. Based on that we can call it the same manner. Also, when you pick the noun that will be exposed in the configuration, always take a look from the perspective, how you would descibe it in the documenation (even if you don't produce it yet) or during some casual talk with a user. You would rather use sentences such as "To configure the reporting mechanism/storage for live data, you should add ... configuration entry". The noun that was exposed is dedicated for "normal" users (administrators) instead of developers. It describe the mechanism instead of some syntetics role/class in the code. LSS - let's think about the domain model that we want to expose and based on that, let's pick names instead of thinking of implemenatation and then exposing the implementation details in the configuration.

Take a look how the best do this: https://github.com/apache/flink/blob/master/docs/content/docs/deployment/config.md You can find nouns such as "metrics", "execution", "logging", "checkpointing"

private class OneShotSource extends SourceFunction[String] {

override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
// emit once and sleep forever
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Why sleep forever? BTW, it won't sleep forever but only until java process will be interrupted. Is it ok?
  2. Let's use a new, Source api instead of a legacy one.

processIdWithName: ProcessIdWithName,
dbUploader: DbUploader,
): Unit = {
env
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you write a few words of comment why you implemented it as a flink data stream processing pipeline? e.g. why is it not a Thread?

@transient private var connection: Connection = _

override def open(openContext: OpenContext): Unit = {
Class.forName("org.postgresql.Driver")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like a mistake (dead code).

@@ -28,6 +28,22 @@ scenarioTypes {
componentId: sink-kafka
}
}
allowEndingScenarioWithoutSink: true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is our official, minimal configuration. We don't plan to enable it for everyone

@@ -28,6 +28,22 @@ scenarioTypes {
componentId: sink-kafka
}
}
allowEndingScenarioWithoutSink: true
liveDataPreview {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same here.

allowEndingScenarioWithoutSink: true
liveDataPreview {
enabled: true
maxNumberOfSamples: 20
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can hardcode this values as a default values for LiveDataPreviewMode.Enabled. Thanks to that, during configuration, only enabling will be necessary.


class LiveDataEntity(tag: Tag) extends TableWithSchema[LiveDataEntityData](tag, "live_data") {

def scenarioId: Rep[ProcessId] = column[ProcessId]("scenario_id")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For batch processing, we should have a (collectorId, activityId) as a primary key. The activityId for the deployment is a Flink's jobid. Let's leave a comment that currently, this mechanism is limited to the streaming scenarios.

}
}

private def removeOldEntries(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. What will happen if someone redeploy the scenario during uploadIntervalInSeconds? I guess that you will aggregate results from the last job with the new one? Is it ok?
  2. For mini cluster, we keep results after scenario is stopped. For the standalone Flink, not?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT about an approach, that we collect live data for (jobid, collectorid) and during reading we take the last deployment and return result for jobid = deploymentid? Thanks to that we avoid weak assumptions, magic numbers etc.

@@ -79,11 +60,8 @@ class FlinkMiniClusterScenarioJobRunner(

override def liveDataPreviewSupport: LiveDataPreviewSupport = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why liveDataPreviewSupport is a part of ScenarioJobRunner if it is not related with it? I don't see any occurrencces of this method usage from the job runner.

val processIdWithName = ProcessIdWithName(processVersion.processId, processVersion.processName)
dbUploaderOpt.foreach(PeriodicLiveDataUploader.register(env, processIdWithName, _))
Some(
LiveDataCollectingListenerHolder.createListenerFor(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When do you close the allocated resources for jobs that have been run on a remote flink cluster?

.filter(_.scenarioId === processIdWithName.id)
.filter(
_.updatedAt < Instant.now.getEpochSecond - uploadIntervalInSeconds - 5
) // Drop data older than interval + 5 seconds
Copy link
Member

@arkadius arkadius Jun 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why drop? why older than interval + 5 seconds? why not 0 seconds or 1 minute? (magic number)

maxNumberOfSamples: Int,
): Map[NodeTransition, LiveDataForNodeTransition] = {
data.flatten
.groupBy(_._1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use toGroupedMap. Thanks to that we avoid this meaningless _1, _2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants