Skip to content

✨ Support Windows using IOCP #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jul 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/compiler-support.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ jobs:
- { tag: "ubuntu-2004_clang-11", name: "Ubuntu 20.04 Clang 11", cxx: "/usr/bin/clang++-11", cc: "/usr/bin/clang-11", runs-on: "ubuntu-20.04" }
- { tag: "ubuntu-2004_clang-10", name: "Ubuntu 20.04 Clang 10", cxx: "/usr/bin/clang++-10", cc: "/usr/bin/clang-10", runs-on: "ubuntu-20.04" }
- { tag: "ubuntu-2004_gcc-10", name: "Ubuntu 20.04 G++ 10", cxx: "/usr/bin/g++-10", cc: "/usr/bin/gcc-10", runs-on: "ubuntu-20.04" }
#- { tag: "windows-2022_msvc17", name: "Windows Server 2022 MSVC 17", cxx: "", cc: "", runs-on: "windows-2022" }
#- { tag: "windows-2019_msvc16", name: "Windows Server 2019 MSVC 16", cxx: "", cc: "", runs-on: "windows-2019" }
- { tag: "windows-2022_msvc17", name: "Windows Server 2022 MSVC 17", cxx: "", cc: "", runs-on: "windows-2022" }
- { tag: "windows-2019_msvc16", name: "Windows Server 2019 MSVC 16", cxx: "", cc: "", runs-on: "windows-2019" }
- { tag: "macos-12_gcc-12", name: "MacOS 12 G++ 12", cxx: "g++-12", cc: "gcc-12", runs-on: "macos-12" }
#- { tag: "macos-12_gcc-13", name: "MacOS 12 G++ 13", cxx: "g++-13", cc: "gcc-13", runs-on: "macos-12" }
- { tag: "macos-12_gcc-14", name: "MacOS 12 G++ 14", cxx: "g++-14", cc: "gcc-14", runs-on: "macos-12" }
Expand Down
5 changes: 5 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,18 @@ add_library(
${CMAKE_CURRENT_SOURCE_DIR}/src/address.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/dns.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/file.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/io_engine_generic_unix.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/io_engine_select.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/io_engine_uring.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/io_engine_iocp.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/io_engine.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/io_service.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/socket.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/tls.cpp)
target_link_libraries(asyncpp_io PUBLIC asyncpp OpenSSL::SSL Threads::Threads)
if(WIN32)
target_link_libraries(asyncpp_io PUBLIC wsock32 ws2_32 ntdll)
endif()
target_include_directories(asyncpp_io
PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
target_compile_features(asyncpp_io PUBLIC cxx_std_20)
Expand Down
12 changes: 6 additions & 6 deletions include/asyncpp/io/address.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ namespace asyncpp::io {
constexpr auto parse_part = [](std::string_view::const_iterator& it, std::string_view::const_iterator end) {
if (it == end || (*it < '0' && *it > '9')) return -1;
int32_t result = 0;
while (*it >= '0' && *it <= '9') {
while (it != end && *it >= '0' && *it <= '9') {
result = (result * 10) + (*it - '0');
it++;
}
Expand Down Expand Up @@ -157,7 +157,7 @@ namespace asyncpp::io {

constexpr std::span<const uint8_t, 16> data() const noexcept { return m_data; }
constexpr std::span<const uint8_t, 4> ipv4_data() const noexcept {
return std::span<const uint8_t, 4>{&m_data[12], &m_data[16]};
return std::span<const uint8_t, 4>{&m_data[12], &m_data[12] + 4};
}

constexpr uint64_t subnet_prefix() const noexcept {
Expand Down Expand Up @@ -189,7 +189,7 @@ namespace asyncpp::io {
}
constexpr ipv4_address mapped_ipv4() const noexcept {
if (!is_ipv4_mapped()) return ipv4_address();
return ipv4_address(std::span<const uint8_t, 4>(&m_data[12], &m_data[16]));
return ipv4_address(std::span<const uint8_t, 4>(&m_data[12], &m_data[12] + 4));
}

std::string to_string(bool full = false) const {
Expand Down Expand Up @@ -249,7 +249,7 @@ namespace asyncpp::io {
auto it = str.begin();
auto part_start = it;
bool is_v4_interop = false;
if (*it == ':') {
if (it != str.end() && *it == ':') {
dcidx = idx++;
it++;
if (it == str.end() || *it != ':') return std::nullopt;
Expand Down Expand Up @@ -508,7 +508,7 @@ namespace std {
template<>
struct hash<asyncpp::io::uds_address> {
size_t operator()(const asyncpp::io::uds_address& x) const noexcept {
size_t res = 0;
size_t res{};
for (auto e : x.data())
res = res ^ (e + 0x9e3779b99e3779b9ull + (res << 6) + (res >> 2));
return res;
Expand All @@ -518,7 +518,7 @@ namespace std {
template<>
struct hash<asyncpp::io::address> {
size_t operator()(const asyncpp::io::address& x) const noexcept {
size_t res;
size_t res{};
switch (x.type()) {
case asyncpp::io::address_type::ipv4: res = std::hash<asyncpp::io::ipv4_address>{}(x.ipv4()); break;
case asyncpp::io::address_type::ipv6: res = std::hash<asyncpp::io::ipv6_address>{}(x.ipv6()); break;
Expand Down
2 changes: 1 addition & 1 deletion include/asyncpp/io/detail/cancel_awaitable.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace asyncpp::io::detail {
bool await_ready() const noexcept { return m_child.await_ready(); }
bool await_suspend(coroutine_handle<> hdl) {
if (m_stop_token.stop_requested()) {
m_child.m_completion.result = -ECANCELED;
m_child.m_completion.result = std::make_error_code(std::errc::operation_canceled);
return false;
}
auto res = m_child.await_suspend(hdl);
Expand Down
66 changes: 59 additions & 7 deletions include/asyncpp/io/detail/io_engine.h
Original file line number Diff line number Diff line change
@@ -1,27 +1,62 @@
#pragma once
#include <asyncpp/io/endpoint.h>

#include <cstddef>
#include <ios>
#include <memory>
#include <system_error>

namespace asyncpp::io::detail {
class io_engine {
public:
#ifndef _WIN32
using file_handle_t = int;
constexpr static file_handle_t invalid_file_handle = -1;
using socket_handle_t = int;
constexpr static socket_handle_t invalid_socket_handle = -1;
#else
using file_handle_t = void*;
constexpr static file_handle_t invalid_file_handle = reinterpret_cast<void*>(static_cast<long long>(-1));
using socket_handle_t = unsigned long long;
constexpr static socket_handle_t invalid_socket_handle = ~static_cast<socket_handle_t>(0);

#endif
enum class fsync_flags { none, datasync };
enum class socket_type { stream, dgram, seqpacket };

struct completion_data {
completion_data(void (*cb)(void*) = nullptr, void* udata = nullptr) noexcept
: callback(cb), userdata(udata) {}

// Private data the engine can use to associate state
alignas(std::max_align_t) std::array<std::byte, 256> engine_state{};

// Info provided by caller
void (*callback)(void*);
void* userdata;
void (*callback)(void*){};
void* userdata{};

// Filled by io_engine
int result;
std::error_code result{};
union {
socket_handle_t result_handle{};
size_t result_size;
};

// Private data the engine can use to associate state
void* engine_state{};
template<typename T>
T* es_init() noexcept {
static_assert(std::is_standard_layout_v<T> && std::is_trivially_copyable_v<T> &&
std::is_trivially_destructible_v<T>);
static_assert(sizeof(T) <= std::tuple_size_v<decltype(engine_state)>);
engine_state.fill(std::byte{});
return new (engine_state.data()) T();
}
template<typename T>
T* es_get() noexcept {
static_assert(std::is_standard_layout_v<T> && std::is_trivially_copyable_v<T> &&
std::is_trivially_destructible_v<T>);
static_assert(sizeof(T) <= std::tuple_size_v<decltype(engine_state)>);
return reinterpret_cast<T*>(engine_state.data());
}
};

public:
Expand All @@ -33,6 +68,18 @@ namespace asyncpp::io::detail {
virtual void wake() = 0;

// Networking api
virtual socket_handle_t socket_create(address_type domain, socket_type type) = 0;
virtual std::pair<socket_handle_t, socket_handle_t> socket_create_connected_pair(address_type domain,
socket_type type) = 0;
virtual void socket_register(socket_handle_t socket) = 0;
virtual void socket_release(socket_handle_t socket) = 0;
virtual void socket_close(socket_handle_t socket) = 0;
virtual void socket_bind(socket_handle_t socket, endpoint ep) = 0;
virtual void socket_listen(socket_handle_t socket, size_t backlog) = 0;
virtual endpoint socket_local_endpoint(socket_handle_t socket) = 0;
virtual endpoint socket_remote_endpoint(socket_handle_t socket) = 0;
virtual void socket_enable_broadcast(socket_handle_t socket, bool enable) = 0;
virtual void socket_shutdown(socket_handle_t socket, bool receive, bool send) = 0;
virtual bool enqueue_connect(socket_handle_t socket, endpoint ep, completion_data* cd) = 0;
virtual bool enqueue_accept(socket_handle_t socket, completion_data* cd) = 0;
virtual bool enqueue_recv(socket_handle_t socket, void* buf, size_t len, completion_data* cd) = 0;
Expand All @@ -43,8 +90,13 @@ namespace asyncpp::io::detail {
completion_data* cd) = 0;

// Filesystem IO
virtual bool enqueue_readv(file_handle_t fd, void* buf, size_t len, off_t offset, completion_data* cd) = 0;
virtual bool enqueue_writev(file_handle_t fd, const void* buf, size_t len, off_t offset,
virtual file_handle_t file_open(const char* filename, std::ios_base::openmode mode) = 0;
virtual void file_register(file_handle_t fd) = 0;
virtual void file_release(file_handle_t fd) = 0;
virtual void file_close(file_handle_t fd) = 0;
virtual uint64_t file_size(file_handle_t fd) = 0;
virtual bool enqueue_readv(file_handle_t fd, void* buf, size_t len, uint64_t offset, completion_data* cd) = 0;
virtual bool enqueue_writev(file_handle_t fd, const void* buf, size_t len, uint64_t offset,
completion_data* cd) = 0;
virtual bool enqueue_fsync(file_handle_t fd, fsync_flags flags, completion_data* cd) = 0;

Expand Down
10 changes: 8 additions & 2 deletions include/asyncpp/io/endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,12 @@ namespace asyncpp::io {
m_ipv6 = {addr.ipv6(), port};
m_type = address_type::ipv6;
break;
#ifndef _WIN32
case address_type::uds:
m_uds = addr.uds();
m_type = address_type::uds;
break;
#endif
}
}
explicit constexpr endpoint(ipv4_endpoint ep) noexcept : m_ipv4(ep), m_type(address_type::ipv4) {}
Expand All @@ -128,15 +130,19 @@ namespace asyncpp::io {
constexpr ipv4_endpoint ipv4() const noexcept {
switch (m_type) {
case address_type::ipv4: return m_ipv4;
case address_type::ipv6:
case address_type::ipv6: return {};
#ifndef _WIN32
case address_type::uds: return {};
#endif
}
}
constexpr ipv6_endpoint ipv6() const noexcept {
switch (m_type) {
case address_type::ipv4: return {};
case address_type::ipv6: return m_ipv6;
#ifndef _WIN32
case address_type::uds: return {};
#endif
}
}
#ifndef _WIN32
Expand Down Expand Up @@ -213,7 +219,7 @@ namespace std {
template<>
struct hash<asyncpp::io::endpoint> {
size_t operator()(const asyncpp::io::endpoint& x) const noexcept {
size_t res;
size_t res{};
switch (x.type()) {
case asyncpp::io::address_type::ipv4: res = std::hash<asyncpp::io::ipv4_endpoint>{}(x.ipv4()); break;
case asyncpp::io::address_type::ipv6: res = std::hash<asyncpp::io::ipv6_endpoint>{}(x.ipv6()); break;
Expand Down
33 changes: 15 additions & 18 deletions include/asyncpp/io/file.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ namespace asyncpp::io {
detail::io_engine::completion_data m_completion;

public:
constexpr file_read_awaitable(io_engine* engine, io_engine::file_handle_t fd, void* buf, size_t len,
uint64_t offset, std::error_code* ec) noexcept
file_read_awaitable(io_engine* engine, io_engine::file_handle_t fd, void* buf, size_t len, uint64_t offset,
std::error_code* ec) noexcept
: m_engine(engine), m_fd(fd), m_buf(buf), m_len(len), m_offset(offset), m_ec(ec), m_completion{} {}
bool await_ready() const noexcept { return false; }
bool await_suspend(coroutine_handle<> hdl) {
Expand All @@ -112,10 +112,9 @@ namespace asyncpp::io {
return !m_engine->enqueue_readv(m_fd, m_buf, m_len, m_offset, &m_completion);
}
size_t await_resume() {
if (m_completion.result >= 0) return static_cast<size_t>(m_completion.result);
if (m_ec == nullptr)
throw std::system_error(std::error_code(-m_completion.result, std::system_category()));
*m_ec = std::error_code(-m_completion.result, std::system_category());
if (!m_completion.result) return m_completion.result_size;
if (m_ec == nullptr) throw std::system_error(m_completion.result);
*m_ec = m_completion.result;
return 0;
}
};
Expand All @@ -140,8 +139,8 @@ namespace asyncpp::io {
detail::io_engine::completion_data m_completion;

public:
constexpr file_write_awaitable(io_engine* engine, io_engine::file_handle_t fd, const void* buf, size_t len,
uint64_t offset, std::error_code* ec) noexcept
file_write_awaitable(io_engine* engine, io_engine::file_handle_t fd, const void* buf, size_t len,
uint64_t offset, std::error_code* ec) noexcept
: m_engine(engine), m_fd(fd), m_buf(buf), m_len(len), m_offset(offset), m_ec(ec), m_completion{} {}
bool await_ready() const noexcept { return false; }
bool await_suspend(coroutine_handle<> hdl) {
Expand All @@ -150,10 +149,9 @@ namespace asyncpp::io {
return !m_engine->enqueue_writev(m_fd, m_buf, m_len, m_offset, &m_completion);
}
size_t await_resume() {
if (m_completion.result >= 0) return static_cast<size_t>(m_completion.result);
if (m_ec == nullptr)
throw std::system_error(std::error_code(-m_completion.result, std::system_category()));
*m_ec = std::error_code(-m_completion.result, std::system_category());
if (!m_completion.result) return m_completion.result_size;
if (m_ec == nullptr) throw std::system_error(m_completion.result);
*m_ec = m_completion.result;
return 0;
}
};
Expand All @@ -175,7 +173,7 @@ namespace asyncpp::io {
detail::io_engine::completion_data m_completion;

public:
constexpr file_fsync_awaitable(io_engine* engine, io_engine::file_handle_t fd, std::error_code* ec) noexcept
file_fsync_awaitable(io_engine* engine, io_engine::file_handle_t fd, std::error_code* ec) noexcept
: m_engine(engine), m_fd(fd), m_ec(ec), m_completion{} {}
bool await_ready() const noexcept { return false; }
bool await_suspend(coroutine_handle<> hdl) {
Expand All @@ -184,10 +182,9 @@ namespace asyncpp::io {
return !m_engine->enqueue_fsync(m_fd, io_engine::fsync_flags::none, &m_completion);
}
void await_resume() {
if (m_completion.result >= 0) return;
if (m_ec == nullptr)
throw std::system_error(std::error_code(-m_completion.result, std::system_category()));
*m_ec = std::error_code(-m_completion.result, std::system_category());
if (!m_completion.result) return;
if (m_ec == nullptr) throw std::system_error(m_completion.result);
*m_ec = m_completion.result;
}
};
} // namespace detail
Expand Down Expand Up @@ -269,7 +266,7 @@ namespace asyncpp::io {
file(const file&) = delete;
file(file&&) noexcept;
file& operator=(const file&) = delete;
file& operator=(file&&);
file& operator=(file&&) noexcept;
~file();

[[nodiscard]] io_service& service() const noexcept { return *m_io; }
Expand Down
Loading
Loading