Skip to content

Memory Access Violation in ranged_parallel_group_launch After Completion Handler Invocation. #1630

@ogis-futamata

Description

@ogis-futamata

Overview

During the execution of ranged_parallel_group_launch, there are cases where the completion handler of make_parallel_group is invoked immediately.
If the completion handler is called in the middle of the process, the remaining operations of ranged_parallel_group_launch are delayed according to coroutine processing.
When ranged_parallel_group_launch resumes execution, it could access freed memory.

Environment

Boost 1.88

  • Linux arch 6.12.10-arch1-1
    • clang version 19.1.7
    • g++ (GCC) 15.1.1 20250425

Boost1.87

  • Godbolt

Reproducible Code

Using Boost 1.87 (latest on Godbolt):
Without sanitizer: https://godbolt.org/z/Evq8M9Tqq
With sanitizer: https://godbolt.org/z/3MYbW8r6Y

#include <boost/asio.hpp>
#include <boost/asio/experimental/parallel_group.hpp>

namespace asio = boost::asio;

asio::awaitable<void> inner_make_parallel_group() {
    auto exe = co_await asio::this_coro::executor;

    auto f = [&]() -> asio::awaitable<void> {
        co_await asio::dispatch(exe, asio::deferred);
        co_return;
    };

    using op_type = decltype(
        asio::co_spawn(
            exe,
            f,
            asio::deferred
        )
    );

    std::vector<op_type> ops;
    ops.push_back(
        asio::co_spawn(
            exe,
            f,
            asio::deferred
        )
    );

    co_await asio::experimental::make_parallel_group(
        std::move(ops)
    ).async_wait(
        asio::experimental::wait_for_all(),
        asio::deferred
    );
    co_return;
}

asio::awaitable<void> outer_make_parallel_group() {
    auto exe = co_await asio::this_coro::executor;

    auto f = [&]() -> asio::awaitable<void> {
        asio::steady_timer timer(exe, std::chrono::seconds(1));
        co_await timer.async_wait(asio::deferred);

        co_await inner_make_parallel_group();
        co_return;
    };

    using op_type = decltype(
        asio::co_spawn(
            exe,
            f,
            asio::deferred
        )
    );

    std::vector<op_type> ops;
    ops.push_back(
        asio::co_spawn(
            exe,
            f,
            asio::deferred
        )
    );
    ops.push_back(
        asio::co_spawn(
            exe,
            f,
            asio::deferred
        )
    );

    co_await asio::experimental::make_parallel_group(
        std::move(ops)
    ).async_wait(
        asio::experimental::wait_for_all(),
        asio::deferred
    );
    co_return;
}

asio::awaitable<void> coro_main() {
    co_await outer_make_parallel_group();
    co_return;
}

int main() {
    asio::io_context ioc;
    asio::co_spawn(
        ioc,
        coro_main,
        asio::detached
    );
    ioc.run();
}

Current Behavior

  1. The asynchronous operation starts with ranged_parallel_group_launch.
    void ranged_parallel_group_launch(Condition cancellation_condition,
    Handler handler, Range&& range, const Allocator& allocator)
    {
    // Get the user's completion handler's cancellation slot, so that we can allow
    // cancellation of the entire group.
    associated_cancellation_slot_t<Handler> slot
    = asio::get_associated_cancellation_slot(handler);
    // The type of the asynchronous operation.
    typedef decay_t<decltype(*declval<typename Range::iterator>())> op_type;
    // Create the shared state for the operation.
    typedef ranged_parallel_group_state<Condition,
    Handler, op_type, Allocator> state_type;
    std::shared_ptr<state_type> state = std::allocate_shared<state_type>(
    asio::detail::recycling_allocator<state_type,
    asio::detail::thread_info_base::parallel_group_tag>(),
    std::move(cancellation_condition),
    std::move(handler), range.size(), allocator);
    std::size_t idx = 0;
    std::size_t range_size = range.size();
    for (auto&& op : std::forward<Range>(range))
    {
    typedef associated_executor_t<op_type> ex_type;
    ex_type ex = asio::get_associated_executor(op);
    std::move(op)(
    ranged_parallel_group_op_handler_with_executor<
    ex_type, Condition, Handler, op_type, Allocator>(
    state, std::move(ex), idx++));
    }
  2. If the asynchronous operation (co_spawn) completes immediately, it is invoked from the current thread.
    awaitable<awaitable_thread_entry_point, Executor> co_spawn_entry_point(
    awaitable<void, Executor>*, co_spawn_state<Handler, Executor, Function> s)
    {
    (void) co_await co_spawn_dispatch{};
    (co_await awaitable_thread_has_context_switched{}) = false;
    std::exception_ptr e = nullptr;
    #if !defined(ASIO_NO_EXCEPTIONS)
    try
    #endif // !defined(ASIO_NO_EXCEPTIONS)
    {
    co_await s.function();
    }
    #if !defined(ASIO_NO_EXCEPTIONS)
    catch (...)
    {
    e = std::current_exception();
    }
    #endif // !defined(ASIO_NO_EXCEPTIONS)
    bool switched = (co_await awaitable_thread_has_context_switched{});
    if (!switched)
    {
    co_await this_coro::throw_if_cancelled(false);
    (void) co_await co_spawn_post();
    }
    (dispatch)(s.handler_work.get_executor(),
    [handler = std::move(s.handler), e]() mutable
    {
    std::move(handler)(e);
    });
    }
  3. The remaining operations of ranged_parallel_group_launch are delayed, and upon resuming execution, it accesses freed memory.
    // Check if any of the operations has already requested cancellation, and if
    // so, emit a signal for each operation in the group.
    if ((state->cancellations_requested_ -= range_size) > 0)
    for (auto& signal : state->cancellation_signals_)
    signal.emit(state->cancel_type_);
    // Register a handler with the user's completion handler's cancellation slot.
    if (slot.is_connected())
    slot.template emplace<
    ranged_parallel_group_cancellation_handler<
    Condition, Handler, op_type, Allocator>>(state);
    }

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions