diff --git a/pw_async2/public/pw_async2/dispatcher_base.h b/pw_async2/public/pw_async2/dispatcher_base.h index 7e081bb99..a833859e4 100644 --- a/pw_async2/public/pw_async2/dispatcher_base.h +++ b/pw_async2/public/pw_async2/dispatcher_base.h @@ -257,7 +257,7 @@ class Waker { friend class DispatcherImpl; public: - Waker() = default; + constexpr Waker() = default; Waker(Waker&& other) noexcept PW_LOCKS_EXCLUDED(dispatcher_lock()); /// Replace this ``Waker`` with another. diff --git a/pw_async2_epoll/dispatcher.cc b/pw_async2_epoll/dispatcher.cc index b4fce14f8..6fc2eb071 100644 --- a/pw_async2_epoll/dispatcher.cc +++ b/pw_async2_epoll/dispatcher.cc @@ -124,15 +124,24 @@ Status Dispatcher::NativeWaitForWake() { PW_CHECK_INT_EQ( bytes_read, 1, "Dispatcher failed to read wake notification"); PW_DCHECK_INT_EQ(unused, kNotificationSignal); - } else { - if ((event.events & (EPOLLIN | EPOLLRDHUP)) != 0) { - NativeFindAndWakeFileDescriptor(event.data.fd, - FileDescriptorType::kReadable); - } - if ((event.events & EPOLLOUT) != 0) { - NativeFindAndWakeFileDescriptor(event.data.fd, - FileDescriptorType::kWritable); - } + continue; + } + + // Debug log for missed events. + if (PW_LOG_LEVEL >= PW_LOG_LEVEL_DEBUG && + wakers_[event.data.fd].read.IsEmpty() && + wakers_[event.data.fd].write.IsEmpty()) { + PW_LOG_DEBUG( + "Received an event for registered file descriptor %d, but there is " + "no task to wake", + event.data.fd); + } + + if ((event.events & (EPOLLIN | EPOLLRDHUP)) != 0) { + std::move(wakers_[event.data.fd].read).Wake(); + } + if ((event.events & EPOLLOUT) != 0) { + std::move(wakers_[event.data.fd].write).Wake(); } } @@ -167,35 +176,10 @@ Status Dispatcher::NativeUnregisterFileDescriptor(int fd) { PW_LOG_ERROR("Failed to unregister epoll event: %s", std::strerror(errno)); return Status::Internal(); } - - auto fd_waker = std::find_if(fd_wakers_.begin(), - fd_wakers_.end(), - [fd](auto& f) { return f.fd == fd; }); - if (fd_waker != fd_wakers_.end()) { - fd_wakers_.erase(fd_waker); - } - + wakers_.erase(fd); return OkStatus(); } -void Dispatcher::NativeFindAndWakeFileDescriptor(int fd, - FileDescriptorType type) { - auto fd_waker = - std::find_if(fd_wakers_.begin(), fd_wakers_.end(), [fd, type](auto& f) { - return f.fd == fd && f.type == type; - }); - if (fd_waker == fd_wakers_.end()) { - PW_LOG_WARN( - "Received an event for registered file descriptor %d, but there is no " - "task to wake", - fd); - return; - } - - std::move(fd_waker->waker).Wake(); - fd_wakers_.erase(fd_waker); -} - void Dispatcher::DoWake() { // Perform a write to unblock the waiting dispatcher. ssize_t bytes_written = write(notify_fd_, &kNotificationSignal, 1); diff --git a/pw_async2_epoll/public_overrides/pw_async2/dispatcher_native.h b/pw_async2_epoll/public_overrides/pw_async2/dispatcher_native.h index 7c9672ce9..397c6641d 100644 --- a/pw_async2_epoll/public_overrides/pw_async2/dispatcher_native.h +++ b/pw_async2_epoll/public_overrides/pw_async2/dispatcher_native.h @@ -13,7 +13,7 @@ // the License. #pragma once -#include +#include #include "pw_assert/assert.h" #include "pw_async2/dispatcher_base.h" @@ -41,22 +41,19 @@ class Dispatcher final : public DispatcherImpl { Status NativeUnregisterFileDescriptor(int fd); void NativeAddReadWakerForFileDescriptor(int fd, Waker&& waker) { - NativeAddWakerForFileDescriptor( - fd, FileDescriptorType::kReadable, std::move(waker)); + wakers_[fd].read = std::move(waker); } void NativeAddWriteWakerForFileDescriptor(int fd, Waker&& waker) { - NativeAddWakerForFileDescriptor( - fd, FileDescriptorType::kWritable, std::move(waker)); + wakers_[fd].write = std::move(waker); } private: static constexpr size_t kMaxEventsToProcessAtOnce = 5; - struct FdWaker { - int fd; - FileDescriptorType type; - Waker waker; + struct ReadWriteWaker { + Waker read; + Waker write; }; void DoWake() final; @@ -67,17 +64,11 @@ class Dispatcher final : public DispatcherImpl { Status NativeWaitForWake(); void NativeFindAndWakeFileDescriptor(int fd, FileDescriptorType type); - void NativeAddWakerForFileDescriptor(int fd, - FileDescriptorType type, - Waker&& waker) { - fd_wakers_.push_back({fd, type, std::move(waker)}); - } - int epoll_fd_; int notify_fd_; int wait_fd_; - std::vector fd_wakers_; + std::unordered_map wakers_; }; } // namespace pw::async2