Skip to content

Fix subscribe:true flag for PUT operations #1767

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/core/src/client_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,8 @@ async fn process_open_request(

// Register subscription listener if subscribe=true
if subscribe {
// Note: The actual subscription to the contract happens in the PUT operation
// when it receives SuccessfulPut. Here we just register the listener for updates.
if let Some(subscription_listener) = subscription_listener {
tracing::debug!(%client_id, %contract_key, "Registering subscription for PUT with auto-subscribe");
let register_listener = op_manager
Expand Down
52 changes: 45 additions & 7 deletions crates/core/src/operations/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,24 +487,62 @@ impl Operation for PutOp {
);
}

// Start subscription if the contract is already seeded and the user requested it
if subscribe && is_seeding_contract {
// Start subscription if requested - should work for both new and existing contracts
if subscribe {
tracing::debug!(
tx = %id,
%key,
peer = %op_manager.ring.connection_manager.get_peer_key().unwrap(),
"Starting subscription request"
was_already_seeding = %is_seeding_contract,
"Starting subscription for contract after successful PUT"
);
// TODO: Make put operation atomic by linking it to the completion of this subscription request.
// Currently we can't link one transaction to another transaction's result, which would be needed
// to make this fully atomic. This should be addressed in a future refactoring.

// The contract should now be stored locally. We need to:
// 1. Verify the contract is queryable locally
// 2. Start a subscription request to register with peers

// Verify contract is stored and queryable
let has_contract =
super::has_contract(op_manager, key).await.unwrap_or(false);

if !has_contract {
tracing::warn!(
tx = %id,
%key,
"Contract not queryable after PUT storage, attempting subscription anyway"
);
}

// Start subscription request
// Use try_get=true in case the contract isn't fully available yet
super::start_subscription_request(
op_manager,
key,
false,
true, // try_get: attempt to GET if subscription fails
HashSet::new(),
)
.await;

// Also ensure we're registered as a subscriber locally
// This helps with tracking who has the contract
let own_location =
op_manager.ring.connection_manager.own_location();
if let Err(e) =
op_manager.ring.add_subscriber(&key, own_location.clone())
{
tracing::debug!(
tx = %id,
%key,
"Could not add self as local subscriber: {:?}",
e
);
} else {
tracing::debug!(
tx = %id,
%key,
"Added self as local subscriber for contract"
);
}
}

tracing::info!(
Expand Down
230 changes: 230 additions & 0 deletions crates/core/tests/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1823,6 +1823,236 @@ async fn test_put_with_subscribe_flag() -> TestResult {
Ok(())
}

/// Test that a client can UPDATE a contract after PUT with subscribe:true
/// This verifies the fix for issue #1765
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_put_subscribe_enables_update() -> TestResult {
freenet::config::set_logger(Some(LevelFilter::INFO), None);

// Load test contract
const TEST_CONTRACT: &str = "test-contract-integration";
let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?;
let contract_key = contract.key();

// Create initial state
let initial_state = test_utils::create_empty_todo_list();
let wrapped_state = WrappedState::from(initial_state);

// Create network sockets
let network_socket_b = TcpListener::bind("127.0.0.1:0")?;
let ws_api_port_socket_a = TcpListener::bind("127.0.0.1:0")?;
let ws_api_port_socket_b = TcpListener::bind("127.0.0.1:0")?;

// Configure gateway node B
let (config_b, _preset_cfg_b, config_b_gw) = {
let (cfg, preset) = base_node_test_config(
true,
vec![],
Some(network_socket_b.local_addr()?.port()),
ws_api_port_socket_b.local_addr()?.port(),
)
.await?;
let public_port = cfg.network_api.public_port.unwrap();
let path = preset.temp_dir.path().to_path_buf();
(cfg, preset, gw_config(public_port, &path)?)
};

// Configure client node A
let (config_a, _preset_cfg_a) = base_node_test_config(
false,
vec![serde_json::to_string(&config_b_gw)?],
None,
ws_api_port_socket_a.local_addr()?.port(),
)
.await?;
let ws_api_port = config_a.ws_api.ws_api_port.unwrap();

// Free ports
std::mem::drop(ws_api_port_socket_a);
std::mem::drop(network_socket_b);
std::mem::drop(ws_api_port_socket_b);

// Start node A (client)
let node_a = async move {
let config = config_a.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.await?;
node.run().await
}
.boxed_local();

// Start node B (gateway)
let node_b = async {
let config = config_b.build().await?;
let node = NodeConfig::new(config.clone())
.await?
.build(serve_gateway(config.ws_api).await)
.await?;
node.run().await
}
.boxed_local();

let test = tokio::time::timeout(Duration::from_secs(180), async {
// Wait for nodes to start up
tokio::time::sleep(Duration::from_secs(20)).await;

// Connect to node A websocket API
let uri =
format!("ws://127.0.0.1:{ws_api_port}/v1/contract/command?encodingProtocol=native");
let (stream, _) = connect_async(&uri).await?;
let mut client_api = WebApi::start(stream);

// PUT contract with subscribe:true
make_put(
&mut client_api,
wrapped_state.clone(),
contract.clone(),
true, // subscribe:true - this is what we're testing
)
.await?;

// Wait for PUT response
tracing::info!("Waiting for PUT response with subscribe:true...");
let resp = tokio::time::timeout(Duration::from_secs(30), client_api.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => {
tracing::info!("PUT successful with subscribe:true for contract: {}", key);
assert_eq!(key, contract_key, "Contract key mismatch in PUT response");
}
Ok(Ok(other)) => {
bail!("Unexpected response while waiting for PUT: {:?}", other);
}
Ok(Err(e)) => {
bail!("Error receiving PUT response: {}", e);
}
Err(_) => {
bail!("Timeout waiting for PUT response");
}
}

// Small delay to ensure subscription is established
tokio::time::sleep(Duration::from_secs(2)).await;

// Now UPDATE the contract (this should work if subscribe:true worked correctly)
let mut todo_list: test_utils::TodoList = serde_json::from_slice(wrapped_state.as_ref())
.unwrap_or_else(|_| test_utils::TodoList {
tasks: Vec::new(),
version: 0,
});

// Add a task
todo_list.tasks.push(test_utils::Task {
id: 1,
title: "Test subscribe:true fix".to_string(),
description: "Verify UPDATE works after PUT with subscribe:true".to_string(),
completed: false,
priority: 5,
});

let updated_bytes = serde_json::to_vec(&todo_list).unwrap();
let updated_state = WrappedState::from(updated_bytes);

tracing::info!("Attempting UPDATE after PUT with subscribe:true...");
make_update(&mut client_api, contract_key, updated_state.clone()).await?;

// Wait for UPDATE response or notification
// We might receive an UpdateNotification if we're subscribed (which means our fix works!)
let mut update_confirmed = false;
let start = std::time::Instant::now();

while start.elapsed() < Duration::from_secs(30) && !update_confirmed {
let resp = tokio::time::timeout(Duration::from_secs(5), client_api.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::UpdateResponse {
key,
summary: _,
}))) => {
tracing::info!("UPDATE successful after PUT with subscribe:true!");
assert_eq!(
key, contract_key,
"Contract key mismatch in UPDATE response"
);
update_confirmed = true;
}
Ok(Ok(HostResponse::ContractResponse(ContractResponse::UpdateNotification {
key,
update: _,
}))) => {
tracing::info!("Received UpdateNotification - this confirms we're subscribed!");
assert_eq!(
key, contract_key,
"Contract key mismatch in UPDATE notification"
);
// Getting a notification means we're properly subscribed - our fix is working!
update_confirmed = true;
}
Ok(Ok(other)) => {
tracing::debug!("Received other response: {:?}", other);
// Continue waiting for the update response/notification
}
Ok(Err(e)) => {
bail!("Error receiving UPDATE response: {}", e);
}
Err(_) => {
// Timeout on this iteration, continue if we haven't exceeded total time
}
}
}

if !update_confirmed {
bail!("Did not receive UPDATE response or notification within timeout");
}

// Verify the state was actually updated with GET
make_get(&mut client_api, contract_key, true, false).await?;

let resp = tokio::time::timeout(Duration::from_secs(30), client_api.recv()).await;
match resp {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse {
key,
state,
contract: _,
}))) => {
assert_eq!(key, contract_key);

// Verify the task was added
let retrieved_list: test_utils::TodoList = serde_json::from_slice(state.as_ref())?;
assert_eq!(retrieved_list.tasks.len(), 1, "Task should have been added");
assert_eq!(retrieved_list.tasks[0].title, "Test subscribe:true fix");

tracing::info!(
"GET confirmed UPDATE was successful - subscribe:true fix is working!"
);
}
_ => {
bail!("Failed to verify updated state with GET");
}
}

Ok::<_, anyhow::Error>(())
});

// Wait for test completion or node failures
select! {
a = node_a => {
let Err(a) = a;
return Err(anyhow!("Node A failed: {}", a).into());
}
b = node_b => {
let Err(b) = b;
return Err(anyhow!("Node B failed: {}", b).into());
}
r = test => {
r??;
tokio::time::sleep(Duration::from_secs(3)).await;
}
}

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_delegate_request() -> TestResult {
freenet::config::set_logger(Some(LevelFilter::INFO), None);
Expand Down
Loading