Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
7c88a39
signals(dedup): deduplicate signals and ensure all signals received
spkane31 Sep 17, 2025
ee1f660
code generation from git diff
spkane31 Sep 17, 2025
8496648
fixing go executor test
spkane31 Sep 17, 2025
5990328
using only a single map
spkane31 Sep 17, 2025
c2d83e5
implementations for ts/dotnet/java/python
spkane31 Sep 18, 2025
805ea1e
linting errors
spkane31 Sep 18, 2025
364e06c
fixes to typescript hopefully and logging
spkane31 Sep 18, 2025
ccb7e94
fixes for python and typescript impl
spkane31 Sep 18, 2025
3a2e45d
java and dotnet impl
spkane31 Sep 18, 2025
45ad4cf
merge conflict
spkane31 Sep 22, 2025
276d632
simplifying and addressing roeys comments
spkane31 Sep 23, 2025
737cda2
applying protobuf creation
spkane31 Sep 23, 2025
fa7b018
java linter
spkane31 Sep 23, 2025
7f904a8
fixing build error
spkane31 Sep 23, 2025
70ae344
java fixes
spkane31 Sep 25, 2025
93c4952
removing non-go changes
spkane31 Sep 25, 2025
208250c
protobufs
spkane31 Sep 25, 2025
5f8094f
simplifying the validation logic
spkane31 Sep 25, 2025
05373a6
not implemented for the non-go langs
spkane31 Sep 25, 2025
172dfd4
fixing other langs
spkane31 Sep 25, 2025
6aafbd0
Merge branch 'main' of github.com:temporalio/omes into spk/count-dedu…
spkane31 Sep 25, 2025
c6c2d4e
safeguard the signals_complete key in the workflow state
spkane31 Sep 26, 2025
cca8312
Merge branch 'main' of github.com:temporalio/omes into spk/count-dedu…
spkane31 Sep 26, 2025
c68590c
renaming
spkane31 Sep 26, 2025
f9d5847
Update workers/go/kitchensink/kitchen_sink.go
spkane31 Sep 29, 2025
cf1d323
addressing roeys comments and simplifying signal sending
spkane31 Sep 29, 2025
4832607
remove extraneous key
spkane31 Sep 29, 2025
7b71342
reverting back to last good state and starting over, throughput and g…
spkane31 Oct 2, 2025
fe5d64b
more protobuf changes
spkane31 Oct 2, 2025
9e9362d
signal dedupe not supported for non-go
spkane31 Oct 2, 2025
94b327c
linting in java
spkane31 Oct 3, 2025
71304e1
self review
spkane31 Oct 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 27 additions & 8 deletions loadgen/kitchen-sink-gen/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::protos::temporal::{
api::common::v1::{Memo, Payload, Payloads},
omes::kitchen_sink::{
action, awaitable_choice, client_action, do_actions_update, do_query, do_signal,
do_signal::do_signal_actions, do_update, execute_activity_action, with_start_client_action, Action, ActionSet,
do_signal::do_signal_actions, do_signal::DoSignalActions, do_update, execute_activity_action, with_start_client_action, Action, ActionSet,
AwaitWorkflowState, AwaitableChoice, ClientAction, ClientActionSet, ClientSequence,
DoQuery, DoSignal, DoUpdate, ExecuteActivityAction, ExecuteChildWorkflowAction,
HandlerInvocation, RemoteActivityOptions, ReturnResultAction, SetPatchMarkerAction,
Expand Down Expand Up @@ -335,7 +335,12 @@ impl<'a> Arbitrary<'a> for WorkflowInput {
fn arbitrary(u: &mut Unstructured<'a>) -> arbitrary::Result<Self> {
let num_actions = 1..=ARB_CONTEXT.with_borrow(|c| c.config.max_initial_actions);
let initial_actions = vec_of_size(u, num_actions)?;
Ok(Self { initial_actions })
Ok(Self {
initial_actions,
expected_signal_count: 0,
expected_signal_ids: vec![],
received_signal_ids: vec![]
})
}
}

Expand All @@ -362,6 +367,9 @@ impl<'a> Arbitrary<'a> for ClientActionSet {
initial_actions: vec![mk_action_set([action::Variant::SetWorkflowState(
ARB_CONTEXT.with_borrow(|c| c.cur_workflow_state.clone()),
)])],
expected_signal_count: 0,
expected_signal_ids: vec![],
received_signal_ids: vec![]
},
"temporal.omes.kitchen_sink.WorkflowInput",
)],
Expand Down Expand Up @@ -436,11 +444,17 @@ impl<'a> Arbitrary<'a> for DoSignal {
// Half of that in the handler half in main
if u.ratio(50, 100)? {
do_signal::Variant::DoSignalActions(
Some(do_signal_actions::Variant::DoActions(u.arbitrary()?)).into(),
DoSignalActions {
signal_id: u.arbitrary()?,
variant: Some(do_signal_actions::Variant::DoActions(u.arbitrary()?)),
}
)
} else {
do_signal::Variant::DoSignalActions(
Some(do_signal_actions::Variant::DoActionsInMain(u.arbitrary()?)).into(),
DoSignalActions {
signal_id: u.arbitrary()?,
variant: Some(do_signal_actions::Variant::DoActionsInMain(u.arbitrary()?)),
}
)
}
} else {
Expand Down Expand Up @@ -631,6 +645,9 @@ impl<'a> Arbitrary<'a> for ExecuteChildWorkflowAction {
],
concurrent: false,
}],
expected_signal_count: 0,
expected_signal_ids: vec![],
received_signal_ids: vec![]
};
let input = to_proto_payload(input, "temporal.omes.kitchen_sink.WorkflowInput");
Ok(Self {
Expand Down Expand Up @@ -850,10 +867,12 @@ fn mk_client_signal_action(actions: impl IntoIterator<Item = action::Variant>) -
ClientAction {
variant: Some(client_action::Variant::DoSignal(DoSignal {
variant: Some(do_signal::Variant::DoSignalActions(
Some(do_signal_actions::Variant::DoActionsInMain(mk_action_set(
actions,
)))
.into(),
DoSignalActions {
signal_id: 0, // Default signal_id for client actions
variant: Some(do_signal_actions::Variant::DoActionsInMain(mk_action_set(
actions,
))),
}
)),
with_start: false,
})),
Expand Down
62 changes: 62 additions & 0 deletions loadgen/kitchen_sink_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,68 @@ func TestKitchenSink(t *testing.T) {
},
historyMatcher: PartialHistoryMatcher(`WorkflowExecutionSignaled`),
},
{
name: "ClientSequence/Signal/Deduplication",
testInput: &TestInput{
WorkflowInput: &WorkflowInput{
ExpectedSignalCount: 10,
InitialActions: ListActionSet(
NewTimerAction(2000),
),
},
ClientSequence: &ClientSequence{
ActionSets: []*ClientActionSet{
{
Actions: NewSignalActionsWithIDs(10),
},
},
},
},
historyMatcher: PartialHistoryMatcher(`
WorkflowExecutionSignaled
WorkflowExecutionSignaled
WorkflowExecutionSignaled
WorkflowExecutionSignaled
WorkflowExecutionSignaled
WorkflowExecutionSignaled
WorkflowExecutionSignaled
WorkflowExecutionSignaled
WorkflowExecutionSignaled
WorkflowExecutionSignaled`),
expectedUnsupportedErrs: map[clioptions.Language]string{
clioptions.LangJava: "signal deduplication not implemented",
clioptions.LangPython: "signal deduplication not implemented",
clioptions.LangTypeScript: "signal deduplication not implemented",
clioptions.LangDotNet: "signal deduplication not implemented",
},
},
{
name: "ClientSequence/Signal/Deduplication/MissingSignal",
testInput: &TestInput{
WorkflowInput: &WorkflowInput{
ExpectedSignalCount: 3,
InitialActions: ListActionSet(
NewTimerAction(1000),
),
},
ClientSequence: &ClientSequence{
ActionSets: []*ClientActionSet{
{
Actions: NewSignalActionsWithIDs(2), // Only send 2 signals, expect 3 and WF should fail
},
},
},
},
historyMatcher: PartialHistoryMatcher(`
WorkflowExecutionSignaled
WorkflowExecutionSignaled`),
expectedUnsupportedErrs: map[clioptions.Language]string{
clioptions.LangJava: "signal deduplication not implemented",
clioptions.LangPython: "signal deduplication not implemented",
clioptions.LangTypeScript: "signal deduplication not implemented",
clioptions.LangDotNet: "signal deduplication not implemented",
},
},
{
name: "ClientSequence/Signal/Custom",
testInput: &TestInput{
Expand Down
23 changes: 23 additions & 0 deletions loadgen/kitchensink/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,29 @@ func NewAwaitWorkflowStateAction(key, value string) *Action {
}
}

func NewSignalActionsWithIDs(ids int32) []*ClientAction {
actions := make([]*ClientAction, ids)
for i := range ids {
actions[i] = &ClientAction{
Variant: &ClientAction_DoSignal{
DoSignal: &DoSignal{
Variant: &DoSignal_DoSignalActions_{
DoSignalActions: &DoSignal_DoSignalActions{
SignalId: i + 1,
Variant: &DoSignal_DoSignalActions_DoActions{
DoActions: SingleActionSet(
NewSetWorkflowStateAction(fmt.Sprintf("signal_%d", i), "received"),
),
},
},
},
},
},
}
}
return actions
}

func ConvertToPayload(newInput any) *common.Payload {
payload, err := jsonPayloadConverter.ToPayload(newInput)
if err != nil {
Expand Down
Loading
Loading