diff --git a/crates/core/src/client_events/mod.rs b/crates/core/src/client_events/mod.rs index bb6e8aebd..485d1a031 100644 --- a/crates/core/src/client_events/mod.rs +++ b/crates/core/src/client_events/mod.rs @@ -376,6 +376,8 @@ async fn process_open_request( // Register subscription listener if subscribe=true if subscribe { + // Note: The actual subscription to the contract happens in the PUT operation + // when it receives SuccessfulPut. Here we just register the listener for updates. if let Some(subscription_listener) = subscription_listener { tracing::debug!(%client_id, %contract_key, "Registering subscription for PUT with auto-subscribe"); let register_listener = op_manager diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 415e1f7c1..2c7e24840 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -487,24 +487,62 @@ impl Operation for PutOp { ); } - // Start subscription if the contract is already seeded and the user requested it - if subscribe && is_seeding_contract { + // Start subscription if requested - should work for both new and existing contracts + if subscribe { tracing::debug!( tx = %id, %key, peer = %op_manager.ring.connection_manager.get_peer_key().unwrap(), - "Starting subscription request" + was_already_seeding = %is_seeding_contract, + "Starting subscription for contract after successful PUT" ); - // TODO: Make put operation atomic by linking it to the completion of this subscription request. - // Currently we can't link one transaction to another transaction's result, which would be needed - // to make this fully atomic. This should be addressed in a future refactoring. + + // The contract should now be stored locally. We need to: + // 1. Verify the contract is queryable locally + // 2. Start a subscription request to register with peers + + // Verify contract is stored and queryable + let has_contract = + super::has_contract(op_manager, key).await.unwrap_or(false); + + if !has_contract { + tracing::warn!( + tx = %id, + %key, + "Contract not queryable after PUT storage, attempting subscription anyway" + ); + } + + // Start subscription request + // Use try_get=true in case the contract isn't fully available yet super::start_subscription_request( op_manager, key, - false, + true, // try_get: attempt to GET if subscription fails HashSet::new(), ) .await; + + // Also ensure we're registered as a subscriber locally + // This helps with tracking who has the contract + let own_location = + op_manager.ring.connection_manager.own_location(); + if let Err(e) = + op_manager.ring.add_subscriber(&key, own_location.clone()) + { + tracing::debug!( + tx = %id, + %key, + "Could not add self as local subscriber: {:?}", + e + ); + } else { + tracing::debug!( + tx = %id, + %key, + "Added self as local subscriber for contract" + ); + } } tracing::info!( diff --git a/crates/core/tests/operations.rs b/crates/core/tests/operations.rs index 3cc7c7a59..0b9aa9e88 100644 --- a/crates/core/tests/operations.rs +++ b/crates/core/tests/operations.rs @@ -1823,6 +1823,236 @@ async fn test_put_with_subscribe_flag() -> TestResult { Ok(()) } +/// Test that a client can UPDATE a contract after PUT with subscribe:true +/// This verifies the fix for issue #1765 +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_put_subscribe_enables_update() -> TestResult { + freenet::config::set_logger(Some(LevelFilter::INFO), None); + + // Load test contract + const TEST_CONTRACT: &str = "test-contract-integration"; + let contract = test_utils::load_contract(TEST_CONTRACT, vec![].into())?; + let contract_key = contract.key(); + + // Create initial state + let initial_state = test_utils::create_empty_todo_list(); + let wrapped_state = WrappedState::from(initial_state); + + // Create network sockets + let network_socket_b = TcpListener::bind("127.0.0.1:0")?; + let ws_api_port_socket_a = TcpListener::bind("127.0.0.1:0")?; + let ws_api_port_socket_b = TcpListener::bind("127.0.0.1:0")?; + + // Configure gateway node B + let (config_b, _preset_cfg_b, config_b_gw) = { + let (cfg, preset) = base_node_test_config( + true, + vec![], + Some(network_socket_b.local_addr()?.port()), + ws_api_port_socket_b.local_addr()?.port(), + ) + .await?; + let public_port = cfg.network_api.public_port.unwrap(); + let path = preset.temp_dir.path().to_path_buf(); + (cfg, preset, gw_config(public_port, &path)?) + }; + + // Configure client node A + let (config_a, _preset_cfg_a) = base_node_test_config( + false, + vec![serde_json::to_string(&config_b_gw)?], + None, + ws_api_port_socket_a.local_addr()?.port(), + ) + .await?; + let ws_api_port = config_a.ws_api.ws_api_port.unwrap(); + + // Free ports + std::mem::drop(ws_api_port_socket_a); + std::mem::drop(network_socket_b); + std::mem::drop(ws_api_port_socket_b); + + // Start node A (client) + let node_a = async move { + let config = config_a.build().await?; + let node = NodeConfig::new(config.clone()) + .await? + .build(serve_gateway(config.ws_api).await) + .await?; + node.run().await + } + .boxed_local(); + + // Start node B (gateway) + let node_b = async { + let config = config_b.build().await?; + let node = NodeConfig::new(config.clone()) + .await? + .build(serve_gateway(config.ws_api).await) + .await?; + node.run().await + } + .boxed_local(); + + let test = tokio::time::timeout(Duration::from_secs(180), async { + // Wait for nodes to start up + tokio::time::sleep(Duration::from_secs(20)).await; + + // Connect to node A websocket API + let uri = + format!("ws://127.0.0.1:{ws_api_port}/v1/contract/command?encodingProtocol=native"); + let (stream, _) = connect_async(&uri).await?; + let mut client_api = WebApi::start(stream); + + // PUT contract with subscribe:true + make_put( + &mut client_api, + wrapped_state.clone(), + contract.clone(), + true, // subscribe:true - this is what we're testing + ) + .await?; + + // Wait for PUT response + tracing::info!("Waiting for PUT response with subscribe:true..."); + let resp = tokio::time::timeout(Duration::from_secs(30), client_api.recv()).await; + match resp { + Ok(Ok(HostResponse::ContractResponse(ContractResponse::PutResponse { key }))) => { + tracing::info!("PUT successful with subscribe:true for contract: {}", key); + assert_eq!(key, contract_key, "Contract key mismatch in PUT response"); + } + Ok(Ok(other)) => { + bail!("Unexpected response while waiting for PUT: {:?}", other); + } + Ok(Err(e)) => { + bail!("Error receiving PUT response: {}", e); + } + Err(_) => { + bail!("Timeout waiting for PUT response"); + } + } + + // Small delay to ensure subscription is established + tokio::time::sleep(Duration::from_secs(2)).await; + + // Now UPDATE the contract (this should work if subscribe:true worked correctly) + let mut todo_list: test_utils::TodoList = serde_json::from_slice(wrapped_state.as_ref()) + .unwrap_or_else(|_| test_utils::TodoList { + tasks: Vec::new(), + version: 0, + }); + + // Add a task + todo_list.tasks.push(test_utils::Task { + id: 1, + title: "Test subscribe:true fix".to_string(), + description: "Verify UPDATE works after PUT with subscribe:true".to_string(), + completed: false, + priority: 5, + }); + + let updated_bytes = serde_json::to_vec(&todo_list).unwrap(); + let updated_state = WrappedState::from(updated_bytes); + + tracing::info!("Attempting UPDATE after PUT with subscribe:true..."); + make_update(&mut client_api, contract_key, updated_state.clone()).await?; + + // Wait for UPDATE response or notification + // We might receive an UpdateNotification if we're subscribed (which means our fix works!) + let mut update_confirmed = false; + let start = std::time::Instant::now(); + + while start.elapsed() < Duration::from_secs(30) && !update_confirmed { + let resp = tokio::time::timeout(Duration::from_secs(5), client_api.recv()).await; + match resp { + Ok(Ok(HostResponse::ContractResponse(ContractResponse::UpdateResponse { + key, + summary: _, + }))) => { + tracing::info!("UPDATE successful after PUT with subscribe:true!"); + assert_eq!( + key, contract_key, + "Contract key mismatch in UPDATE response" + ); + update_confirmed = true; + } + Ok(Ok(HostResponse::ContractResponse(ContractResponse::UpdateNotification { + key, + update: _, + }))) => { + tracing::info!("Received UpdateNotification - this confirms we're subscribed!"); + assert_eq!( + key, contract_key, + "Contract key mismatch in UPDATE notification" + ); + // Getting a notification means we're properly subscribed - our fix is working! + update_confirmed = true; + } + Ok(Ok(other)) => { + tracing::debug!("Received other response: {:?}", other); + // Continue waiting for the update response/notification + } + Ok(Err(e)) => { + bail!("Error receiving UPDATE response: {}", e); + } + Err(_) => { + // Timeout on this iteration, continue if we haven't exceeded total time + } + } + } + + if !update_confirmed { + bail!("Did not receive UPDATE response or notification within timeout"); + } + + // Verify the state was actually updated with GET + make_get(&mut client_api, contract_key, true, false).await?; + + let resp = tokio::time::timeout(Duration::from_secs(30), client_api.recv()).await; + match resp { + Ok(Ok(HostResponse::ContractResponse(ContractResponse::GetResponse { + key, + state, + contract: _, + }))) => { + assert_eq!(key, contract_key); + + // Verify the task was added + let retrieved_list: test_utils::TodoList = serde_json::from_slice(state.as_ref())?; + assert_eq!(retrieved_list.tasks.len(), 1, "Task should have been added"); + assert_eq!(retrieved_list.tasks[0].title, "Test subscribe:true fix"); + + tracing::info!( + "GET confirmed UPDATE was successful - subscribe:true fix is working!" + ); + } + _ => { + bail!("Failed to verify updated state with GET"); + } + } + + Ok::<_, anyhow::Error>(()) + }); + + // Wait for test completion or node failures + select! { + a = node_a => { + let Err(a) = a; + return Err(anyhow!("Node A failed: {}", a).into()); + } + b = node_b => { + let Err(b) = b; + return Err(anyhow!("Node B failed: {}", b).into()); + } + r = test => { + r??; + tokio::time::sleep(Duration::from_secs(3)).await; + } + } + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_delegate_request() -> TestResult { freenet::config::set_logger(Some(LevelFilter::INFO), None);