Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions indexify/src/indexify/proto/executor_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,10 @@ message FunctionCallResult {
message DesiredExecutorState {
repeated FunctionExecutorDescription function_executors = 1;
repeated Allocation allocations = 2;
repeated FunctionCallResult function_call_results = 3;
// Server supplied clock value used to deduplicate messages. Executor records max clock value
// it observed and ignores all the messages with clock value <= the max observed value.
optional uint64 clock = 4;
optional uint64 clock = 3;
repeated FunctionCallResult function_call_results = 4;
}

enum AllocationOutcomeCode {
Expand Down
2 changes: 1 addition & 1 deletion indexify/src/indexify/proto/executor_api_pb2.py

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions indexify/src/indexify/proto/executor_api_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -572,29 +572,29 @@ class FunctionCallResult(_message.Message):
) -> None: ...

class DesiredExecutorState(_message.Message):
__slots__ = ("function_executors", "allocations", "function_call_results", "clock")
__slots__ = ("function_executors", "allocations", "clock", "function_call_results")
FUNCTION_EXECUTORS_FIELD_NUMBER: _ClassVar[int]
ALLOCATIONS_FIELD_NUMBER: _ClassVar[int]
FUNCTION_CALL_RESULTS_FIELD_NUMBER: _ClassVar[int]
CLOCK_FIELD_NUMBER: _ClassVar[int]
FUNCTION_CALL_RESULTS_FIELD_NUMBER: _ClassVar[int]
function_executors: _containers.RepeatedCompositeFieldContainer[
FunctionExecutorDescription
]
allocations: _containers.RepeatedCompositeFieldContainer[Allocation]
clock: int
function_call_results: _containers.RepeatedCompositeFieldContainer[
FunctionCallResult
]
clock: int
def __init__(
self,
function_executors: _Optional[
_Iterable[_Union[FunctionExecutorDescription, _Mapping]]
] = ...,
allocations: _Optional[_Iterable[_Union[Allocation, _Mapping]]] = ...,
clock: _Optional[int] = ...,
function_call_results: _Optional[
_Iterable[_Union[FunctionCallResult, _Mapping]]
] = ...,
clock: _Optional[int] = ...,
) -> None: ...

class ExecutionPlanUpdate(_message.Message):
Expand Down
10 changes: 7 additions & 3 deletions server/proto/executor_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ message ExecutorState {
// reconciled by Executor. Not included into state_hash.
// Initial value on Executor startup is 0.
optional uint64 server_clock = 12;

// Catalog entry name that this executor is associated with.
optional string catalog_entry_name = 14;

repeated FunctionCallWatch function_call_watches = 15;
}

// Updates that Executor wants to report to Server. If report_executor_state RPC is successful
Expand All @@ -209,7 +214,6 @@ message FunctionCallWatch {
message ReportExecutorStateRequest {
optional ExecutorState executor_state = 1;
optional ExecutorUpdate executor_update = 2;
repeated FunctionCallWatch function_call_watches = 3;
}

// A message sent by Server to Executor to acknowledge the receipt of ReportExecutorStateRequest.
Expand Down Expand Up @@ -255,10 +259,10 @@ message FunctionCallResult {
message DesiredExecutorState {
repeated FunctionExecutorDescription function_executors = 1;
repeated Allocation allocations = 2;
repeated FunctionCallResult function_call_results = 3;
// Server supplied clock value used to deduplicate messages. Executor records max clock value
// it observed and ignores all the messages with clock value <= the max observed value.
optional uint64 clock = 4;
optional uint64 clock = 3;
repeated FunctionCallResult function_call_results = 4;
}

enum AllocationOutcomeCode {
Expand Down
2 changes: 1 addition & 1 deletion server/src/executor_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ impl ExecutorApi for ExecutorAPIService {
.ok_or(Status::invalid_argument("executor_update is required"))?;

let mut watch_function_calls = HashSet::new();
for function_call_watch in &request.get_ref().function_call_watches {
for function_call_watch in &executor_state.function_call_watches {
let executor_watch: ExecutorWatch = function_call_watch.try_into().unwrap();
watch_function_calls.insert(executor_watch);
}
Expand Down
15 changes: 11 additions & 4 deletions server/src/state_store/executor_watches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use std::collections::{HashMap, HashSet};

use tokio::sync::RwLock;

use crate::data_model::FunctionRunStatus;

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct ExecutorWatch {
pub namespace: String,
Expand Down Expand Up @@ -114,13 +116,18 @@ impl ExecutorWatches {
updated_request_states: &HashMap<String, crate::data_model::RequestCtx>,
) -> HashSet<String> {
// Build the set of ExecutorWatch objects for all updated function runs
let mut executor_watches = HashSet::new();
let mut possible_watches = HashSet::new();
for (ctx_key, function_run_ids) in updated_function_runs {
let Some(ctx) = updated_request_states.get(ctx_key) else {
continue;
};
for function_call_id in function_run_ids {
executor_watches.insert(ExecutorWatch {
if let Some(function_run) = ctx.function_runs.get(function_call_id) {
if function_run.status != FunctionRunStatus::Completed {
continue;
}
}
possible_watches.insert(ExecutorWatch {
namespace: ctx.namespace.clone(),
application: ctx.application_name.clone(),
request_id: ctx.request_id.clone(),
Expand All @@ -133,8 +140,8 @@ impl ExecutorWatches {
let executors_guard = self.executors.read().await;

let mut impacted_executors: HashSet<String> = HashSet::new();
for fc_id in executor_watches.iter() {
if let Some(executors) = requests_guard.get(fc_id) {
for possible_watch in possible_watches.iter() {
if let Some(executors) = requests_guard.get(possible_watch) {
for ex in executors {
// Include executor only if it currently has any watches
if let Some(watches) = executors_guard.get(ex) &&
Expand Down
6 changes: 6 additions & 0 deletions server/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,12 @@ impl IndexifyState {
.await;
changed_executors.extend(impacted_executors.into_iter().map(|e| e.into()));
}
if let RequestPayload::UpsertExecutor(req) = &request.payload {
if !req.watch_function_calls.is_empty() && req.update_executor_state {
changed_executors.insert(req.executor.id.clone());
}
}

// Notify the executors with state changes
{
let mut executor_states = self.executor_states.write().await;
Expand Down