-
Notifications
You must be signed in to change notification settings - Fork 17
Fix parallelization config for writes #188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughThis change introduces a configurable Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant OpenFgaClient
participant ThreadPool
participant FGAService
User->>OpenFgaClient: writeNonTransaction(tuples, options)
OpenFgaClient->>OpenFgaClient: Split tuples into transaction chunks
alt maxParallelRequests > 1
OpenFgaClient->>ThreadPool: Submit write tasks (up to maxParallelRequests)
loop For each chunk
ThreadPool->>FGAService: WriteTransaction(chunk)
FGAService-->>ThreadPool: Response/Failure
end
ThreadPool-->>OpenFgaClient: Aggregate results
else maxParallelRequests <= 1
loop For each chunk
OpenFgaClient->>FGAService: WriteTransaction(chunk)
FGAService-->>OpenFgaClient: Response/Failure
end
end
OpenFgaClient-->>User: Final result/failure
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~15 minutes Possibly related issues
Suggested reviewers
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ast-grep (0.38.6)src/test/java/dev/openfga/sdk/api/client/OpenFgaClientTest.javaNote ⚡️ Unit Test Generation is now available in beta!Learn more here, or try it out under "Finishing Touches" below. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
Codecov Report❌ Patch coverage is
❌ Your project status has failed because the head coverage (33.67%) is below the target coverage (80.00%). You can increase the head coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## main #188 +/- ##
============================================
- Coverage 33.73% 33.67% -0.06%
- Complexity 1005 1007 +2
============================================
Files 182 182
Lines 6900 6927 +27
Branches 778 781 +3
============================================
+ Hits 2328 2333 +5
- Misses 4467 4487 +20
- Partials 105 107 +2 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (5)
src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java (3)
439-443
: Consider validating maxParallelRequests value.While the code handles
maxParallelRequests <= 1
by falling back to sequential execution, negative values might indicate a configuration error. Consider adding validation to ensure the value is positive.int maxParallelRequests = options.getMaxParallelRequests() != null ? options.getMaxParallelRequests() : DEFAULT_MAX_METHOD_PARALLEL_REQS; + +if (maxParallelRequests < 1) { + throw new FgaInvalidParameterException("maxParallelRequests must be greater than 0"); +}
453-453
: Use newFixedThreadPool instead of newScheduledThreadPool.Since the code doesn't use any scheduling features,
newFixedThreadPool
would be more appropriate and efficient thannewScheduledThreadPool
.-var executor = Executors.newScheduledThreadPool(maxParallelRequests); +var executor = Executors.newFixedThreadPool(maxParallelRequests);
473-473
: Consider adding a timeout to prevent indefinite blocking.The
latch.await()
call could block indefinitely if a task fails to decrement the latch. Consider adding a reasonable timeout.-latch.await(); +if (!latch.await(60, TimeUnit.SECONDS)) { + throw new FgaApiException("Timeout waiting for parallel write operations to complete"); +}src/main/java/dev/openfga/sdk/api/configuration/ClientWriteOptions.java (2)
22-22
: Consider consistency with existing field patterns.The field declaration uses
Integer
(nullable) which is appropriate for optional configuration, but differs from the pattern used bytransactionChunkSize
(primitive int with default handling in getter). Consider whether this field should have similar default value handling.
66-68
: Consider providing default value handling consistent with other getters.The getter returns the raw nullable Integer, which means calling code must handle null values. This differs from
getTransactionChunkSize()
which provides a default value. Consider whether a default should be provided here for consistency.If a default value is appropriate, apply this pattern:
public Integer getMaxParallelRequests() { - return maxParallelRequests; + return maxParallelRequests != null && maxParallelRequests > 0 ? maxParallelRequests : null; }Or consider returning a primitive int with a default:
-public Integer getMaxParallelRequests() { - return maxParallelRequests; +public int getMaxParallelRequests() { + return maxParallelRequests != null && maxParallelRequests > 0 ? maxParallelRequests : 10; // or appropriate default }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
CHANGELOG.md
(1 hunks)README.md
(2 hunks)src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java
(2 hunks)src/main/java/dev/openfga/sdk/api/configuration/ClientWriteOptions.java
(2 hunks)src/test/java/dev/openfga/sdk/api/client/OpenFgaClientTest.java
(3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Analyze (java)
🔇 Additional comments (9)
src/main/java/dev/openfga/sdk/api/configuration/ClientWriteOptions.java (3)
22-22
: LGTM! Field declaration follows consistent patterns.The
maxParallelRequests
field is appropriately declared asInteger
wrapper type, allowing for null values to represent unset configuration, which aligns with the optional nature of this parameter.
61-64
: LGTM! Setter method follows established builder pattern.The setter method correctly implements the fluent interface pattern consistent with other methods in the class. Accepting
Integer
parameter allows for null values representing unset configuration.
66-68
: LGTM! Simple getter follows standard accessor pattern.The getter correctly returns the raw
Integer
value, allowing consumers to handle null checks as needed. This is appropriate for optional configuration parameters.CHANGELOG.md (1)
4-4
: LGTM! Changelog entry is well-formatted and informative.The changelog entry correctly documents the new feature in the "Unreleased" section with appropriate reference to issue #187. The description clearly communicates the nature of the change.
src/test/java/dev/openfga/sdk/api/client/OpenFgaClientTest.java (3)
1219-1222
: LGTM! Appropriate test configuration for sequential execution.The addition of
maxParallelRequests(1)
to theClientWriteOptions
correctly tests the new parallelization configuration feature. Setting it to 1 ensures sequential execution, which is appropriate for this test that needs deterministic verification of request ordering and chunking behavior.
1289-1292
: LGTM! Sequential execution ensures predictable failure testing.The addition of
maxParallelRequests(1)
is essential for this failure scenario test. Sequential execution ensures that the test can reliably verify the expected behavior: first request succeeds, second fails with a 400 error, and third is skipped. Parallel execution would make this test non-deterministic.
2012-2013
: LGTM! Sequential execution ensures deterministic batch processing.The addition of
maxParallelRequests(1)
toClientBatchCheckOptions
ensures that batches are processed sequentially, which is necessary for this test to reliably verify the WireMock stubs based on specific correlation IDs. This allows predictable testing of the batch splitting functionality.Note: This change extends the parallelization configuration to batch check operations, which appears to be broader than the "writes" focus mentioned in the PR description, but this is likely intentional for consistency across operations.
README.md (1)
545-545
: Documentation accurately reflects the new parallelization feature.The changes clearly explain the new behavior where the SDK splits writes into smaller transactions and sends them with limited parallelization, controlled by the
maxParallelRequests
option.Also applies to: 573-574
src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java (1)
417-419
: Good default configuration for non-transactional writes.Setting
transactionChunkSize
to 1 andmaxParallelRequests
to the default value ensures backward compatibility while enabling the new parallelization feature.
var executor = Executors.newScheduledThreadPool(maxParallelRequests); | ||
var latch = new CountDownLatch(transactions.size()); | ||
var failure = new AtomicReference<Throwable>(); | ||
var lastResponse = new AtomicReference<ClientWriteResponse>(); | ||
|
||
Consumer<ClientWriteRequest> singleWriteRequest = | ||
tx -> this.writeTransactions(storeId, tx, options).whenComplete((response, throwable) -> { | ||
try { | ||
if (throwable != null) { | ||
failure.compareAndSet(null, throwable); | ||
} else { | ||
lastResponse.set(response); | ||
} | ||
} finally { | ||
latch.countDown(); | ||
} | ||
}); | ||
|
||
try { | ||
transactions.forEach(tx -> executor.execute(() -> singleWriteRequest.accept(tx))); | ||
latch.await(); | ||
if (failure.get() != null) { | ||
return CompletableFuture.failedFuture(failure.get()); | ||
} | ||
return CompletableFuture.completedFuture(lastResponse.get()); | ||
} catch (Exception e) { | ||
return CompletableFuture.failedFuture(e); | ||
} finally { | ||
executor.shutdown(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Critical: Only the last response is returned in parallel execution mode.
When executing multiple transactions in parallel, the current implementation only returns the last successful response (line 477), discarding results from all other transactions. This means:
- The caller cannot determine which specific writes/deletes succeeded
- The response doesn't accurately represent the complete operation
- Information about partial successes is lost
Consider aggregating all responses or returning a composite response that includes results from all transactions.
🤖 Prompt for AI Agents
In src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java between lines 453
and 482, the code currently returns only the last successful response from
parallel transaction executions, losing information about other transaction
results. To fix this, modify the implementation to collect and aggregate all
individual ClientWriteResponse objects from each transaction into a composite
response or a list, and return this aggregated result instead of just the last
one. This ensures the caller receives complete information about all transaction
outcomes.
public ClientWriteOptions maxParallelRequests(Integer maxParallelRequests) { | ||
this.maxParallelRequests = maxParallelRequests; | ||
return this; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add input validation for the setter method.
The setter accepts any Integer value including negative numbers, which could lead to invalid configurations. Consider adding validation to ensure only positive values are accepted.
public ClientWriteOptions maxParallelRequests(Integer maxParallelRequests) {
+ if (maxParallelRequests != null && maxParallelRequests < 1) {
+ throw new IllegalArgumentException("maxParallelRequests must be greater than 0");
+ }
this.maxParallelRequests = maxParallelRequests;
return this;
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
public ClientWriteOptions maxParallelRequests(Integer maxParallelRequests) { | |
this.maxParallelRequests = maxParallelRequests; | |
return this; | |
} | |
public ClientWriteOptions maxParallelRequests(Integer maxParallelRequests) { | |
if (maxParallelRequests != null && maxParallelRequests < 1) { | |
throw new IllegalArgumentException("maxParallelRequests must be greater than 0"); | |
} | |
this.maxParallelRequests = maxParallelRequests; | |
return this; | |
} |
🤖 Prompt for AI Agents
In src/main/java/dev/openfga/sdk/api/configuration/ClientWriteOptions.java
around lines 61 to 64, the setter method maxParallelRequests currently accepts
any Integer, including negative values. Add input validation to check if the
provided maxParallelRequests is positive (greater than zero) before setting the
field. If the value is invalid, throw an IllegalArgumentException with a clear
message indicating that only positive integers are allowed.
Few questions in addition to coderabbit's:
Also wondering about the new thread pool, looks like we are creating a new one for each write operation, perhaps there's a more effecient way? I think we could also add more comprehensive tests for parallel execution paths? |
Summary
maxParallelRequests
inClientWriteOptions
maxParallelRequests
is 1AI Fix for #187
Testing
./gradlew build test-integration
(fails: java.net.http.HttpTimeoutException)https://chatgpt.com/codex/tasks/task_e_688a7a20d2c4832292d36c24590f5f74
Summary by CodeRabbit
Documentation
New Features
Tests