// Copyright 2021 The Pigweed Authors // // Licensed under the Apache License, Version 2.0 (the "License"); you may not // use this file except in compliance with the License. You may obtain a copy of // the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the // License for the specific language governing permissions and limitations under // the License. #include #include "gtest/gtest.h" #include "pw_containers/vector.h" #include "pw_multisink/multisink.h" #include "pw_multisink/test_thread.h" #include "pw_string/string_builder.h" #include "pw_thread/thread.h" #include "pw_thread/yield.h" namespace pw::multisink { namespace { constexpr size_t kEntryBufferSize = sizeof("message 000"); constexpr size_t kMaxMessageCount = 250; constexpr size_t kBufferSize = kMaxMessageCount * kEntryBufferSize; using MessageSpan = std::span>; void CompareSentAndReceivedMessages(const MessageSpan& sent_messages, const MessageSpan& received_messages) { ASSERT_EQ(sent_messages.size(), received_messages.size()); for (size_t i = 0; i < sent_messages.size(); ++i) { ASSERT_EQ(sent_messages[i].size(), received_messages[i].size()); EXPECT_EQ(std::string_view(sent_messages[i]), std::string_view(received_messages[i])); } } } // namespace // Static message pool to avoid recreating messages for every test and avoids // using std::string. class MessagePool { public: static MessagePool& Instance() { static MessagePool instance; return instance; } MessagePool(const MessagePool&) = delete; MessagePool& operator=(const MessagePool&) = delete; MessagePool(MessagePool&&) = delete; MessagePool& operator=(MessagePool&&) = delete; MessageSpan GetMessages(size_t message_count) const { PW_ASSERT(message_count <= messages_.size()); return MessageSpan(messages_.begin(), message_count); } private: MessagePool() { for (size_t i = 0; i < kMaxMessageCount; ++i) { messages_.emplace_back(); messages_.back() << "message %u" << static_cast(i); } } Vector, kMaxMessageCount> messages_; }; // Continuously reads logs from a multisink, using PopEntry() and stores copies // of the retrieved messages for later verification. The thread stops when the // the number of read messages and total drop count matches the expected count. class LogPopReaderThread : public thread::ThreadCore { public: LogPopReaderThread(MultiSink& multisink, uint32_t expected_message_and_drop_count) : multisink_(multisink), total_drop_count_(0), expected_message_and_drop_count_(expected_message_and_drop_count) { PW_ASSERT(expected_message_and_drop_count_ <= kMaxMessageCount); } uint32_t drop_count() { return total_drop_count_; } const MessageSpan received_messages() { return MessageSpan(received_messages_.begin(), received_messages_.size()); } void Run() override { multisink_.AttachDrain(drain_); ReadAllEntries(); }; virtual void ReadAllEntries() { do { uint32_t drop_count = 0; const Result possible_entry = drain_.PopEntry(entry_buffer_, drop_count); total_drop_count_ += drop_count; if (possible_entry.status().IsOutOfRange()) { pw::this_thread::yield(); continue; } ASSERT_EQ(possible_entry.status(), OkStatus()); if (received_messages_.full()) { return; } received_messages_.emplace_back(); received_messages_.back() << std::string_view( reinterpret_cast(possible_entry.value().data()), possible_entry.value().size()); pw::this_thread::yield(); } while (total_drop_count_ + received_messages_.size() < expected_message_and_drop_count_); } protected: MultiSink::Drain drain_; MultiSink& multisink_; std::array entry_buffer_; uint32_t total_drop_count_; const uint32_t expected_message_and_drop_count_; Vector, kMaxMessageCount> received_messages_; }; class LogPeekAndCommitReaderThread : public LogPopReaderThread { public: LogPeekAndCommitReaderThread(MultiSink& multisink, uint32_t expected_message_and_drop_count) : LogPopReaderThread(multisink, expected_message_and_drop_count) {} virtual void ReadAllEntries() { do { uint32_t drop_count = 0; const Result possible_entry = drain_.PeekEntry(entry_buffer_, drop_count); total_drop_count_ += drop_count; if (possible_entry.status().IsOutOfRange()) { pw::this_thread::yield(); continue; } ASSERT_EQ(possible_entry.status(), OkStatus()); if (received_messages_.full()) { return; } pw::this_thread::yield(); received_messages_.emplace_back(); received_messages_.back() << std::string_view( reinterpret_cast(possible_entry.value().entry().data()), possible_entry.value().entry().size()); ASSERT_EQ(drain_.PopEntry(possible_entry.value()), OkStatus()); pw::this_thread::yield(); } while (total_drop_count_ + received_messages_.size() < expected_message_and_drop_count_); } }; // Adds the provided messages to the shared multisink. class LogWriterThread : public thread::ThreadCore { public: LogWriterThread(MultiSink& multisink, const MessageSpan& message_stack) : multisink_(multisink), message_stack_(message_stack) {} void Run() override { for (const auto& message : message_stack_) { multisink_.HandleEntry( std::as_bytes(std::span(std::string_view(message)))); pw::this_thread::yield(); } }; private: MultiSink& multisink_; const MessageSpan& message_stack_; }; class MultiSinkTest : public ::testing::Test { protected: MultiSinkTest() : multisink_(buffer_) {} std::byte buffer_[kBufferSize]; MultiSink multisink_; private: }; TEST_F(MultiSinkTest, SingleWriterSingleReader) { const uint32_t log_count = 100; const uint32_t drop_count = 5; const uint32_t expected_message_and_drop_count = log_count + drop_count; const auto message_stack = MessagePool::Instance().GetMessages(log_count); // Start reader thread. LogPopReaderThread reader_thread_core(multisink_, expected_message_and_drop_count); thread::Thread reader_thread(test::MultiSinkTestThreadOptions(), reader_thread_core); // Start writer thread. LogWriterThread writer_thread_core(multisink_, message_stack); thread::Thread writer_thread(test::MultiSinkTestThreadOptions(), writer_thread_core); // Wait for writer thread to end. writer_thread.join(); multisink_.HandleDropped(drop_count); reader_thread.join(); EXPECT_EQ(reader_thread_core.drop_count(), drop_count); CompareSentAndReceivedMessages(message_stack, reader_thread_core.received_messages()); } TEST_F(MultiSinkTest, SingleWriterSinglePeekAndCommitReader) { const uint32_t log_count = 100; const uint32_t drop_count = 5; const uint32_t expected_message_and_drop_count = log_count + drop_count; const auto message_stack = MessagePool::Instance().GetMessages(log_count); // Start reader thread. LogPeekAndCommitReaderThread reader_thread_core( multisink_, expected_message_and_drop_count); thread::Thread reader_thread(test::MultiSinkTestThreadOptions(), reader_thread_core); // Start writer thread. LogWriterThread writer_thread_core(multisink_, message_stack); thread::Thread writer_thread(test::MultiSinkTestThreadOptions(), writer_thread_core); // Wait for writer thread to end. writer_thread.join(); multisink_.HandleDropped(drop_count); reader_thread.join(); EXPECT_EQ(reader_thread_core.drop_count(), drop_count); CompareSentAndReceivedMessages(message_stack, reader_thread_core.received_messages()); } TEST_F(MultiSinkTest, SingleWriterMultipleReaders) { const uint32_t log_count = 100; const uint32_t drop_count = 5; const uint32_t expected_message_and_drop_count = log_count + drop_count; const auto message_stack = MessagePool::Instance().GetMessages(log_count); // Start reader threads. LogPopReaderThread reader_thread_core1(multisink_, expected_message_and_drop_count); thread::Thread reader_thread1(test::MultiSinkTestThreadOptions(), reader_thread_core1); LogPopReaderThread reader_thread_core2(multisink_, expected_message_and_drop_count); thread::Thread reader_thread2(test::MultiSinkTestThreadOptions(), reader_thread_core2); LogPeekAndCommitReaderThread reader_thread_core3( multisink_, expected_message_and_drop_count); thread::Thread reader_thread3(test::MultiSinkTestThreadOptions(), reader_thread_core3); // Start writer thread. LogWriterThread writer_thread_core(multisink_, message_stack); thread::Thread writer_thread(test::MultiSinkTestThreadOptions(), writer_thread_core); // Wait for writer thread to end. writer_thread.join(); multisink_.HandleDropped(drop_count); reader_thread1.join(); reader_thread2.join(); reader_thread3.join(); EXPECT_EQ(reader_thread_core1.drop_count(), drop_count); CompareSentAndReceivedMessages(message_stack, reader_thread_core1.received_messages()); EXPECT_EQ(reader_thread_core2.drop_count(), drop_count); CompareSentAndReceivedMessages(message_stack, reader_thread_core2.received_messages()); EXPECT_EQ(reader_thread_core3.drop_count(), drop_count); CompareSentAndReceivedMessages(message_stack, reader_thread_core3.received_messages()); } TEST_F(MultiSinkTest, MultipleWritersMultipleReaders) { const uint32_t log_count = 100; const uint32_t drop_count = 7; const uint32_t expected_message_and_drop_count = 2 * log_count + drop_count; const auto message_stack = MessagePool::Instance().GetMessages(log_count); // Start reader threads. LogPopReaderThread reader_thread_core1(multisink_, expected_message_and_drop_count); thread::Thread reader_thread1(test::MultiSinkTestThreadOptions(), reader_thread_core1); LogPopReaderThread reader_thread_core2(multisink_, expected_message_and_drop_count); thread::Thread reader_thread2(test::MultiSinkTestThreadOptions(), reader_thread_core2); LogPeekAndCommitReaderThread reader_thread_core3( multisink_, expected_message_and_drop_count); thread::Thread reader_thread3(test::MultiSinkTestThreadOptions(), reader_thread_core3); // Start writer threads. LogWriterThread writer_thread_core1(multisink_, message_stack); thread::Thread writer_thread1(test::MultiSinkTestThreadOptions(), writer_thread_core1); LogWriterThread writer_thread_core2(multisink_, message_stack); thread::Thread writer_thread2(test::MultiSinkTestThreadOptions(), writer_thread_core2); // Wait for writer thread to end. writer_thread1.join(); writer_thread2.join(); multisink_.HandleDropped(drop_count); reader_thread1.join(); reader_thread2.join(); reader_thread3.join(); EXPECT_EQ(reader_thread_core1.drop_count(), drop_count); EXPECT_EQ(reader_thread_core2.drop_count(), drop_count); EXPECT_EQ(reader_thread_core3.drop_count(), drop_count); // Since we don't know the order that messages came in, we can't check them. EXPECT_EQ(reader_thread_core1.received_messages().size(), expected_message_and_drop_count - drop_count); EXPECT_EQ(reader_thread_core2.received_messages().size(), expected_message_and_drop_count - drop_count); EXPECT_EQ(reader_thread_core3.received_messages().size(), expected_message_and_drop_count - drop_count); } TEST_F(MultiSinkTest, OverflowMultisink) { // Expect the multisink to overflow and readers to not fail when poping, or // peeking and commiting entries. const size_t log_count = kMaxMessageCount; const size_t max_buffer_entry_count = 20; std::byte small_multisink_buffer[max_buffer_entry_count * kEntryBufferSize]; MultiSink small_multisink(small_multisink_buffer); const auto message_stack = MessagePool::Instance().GetMessages(log_count); // Start reader threads. LogPeekAndCommitReaderThread reader_thread_core1(small_multisink, log_count); thread::Thread reader_thread1(test::MultiSinkTestThreadOptions(), reader_thread_core1); LogPopReaderThread reader_thread_core2(small_multisink, log_count); thread::Thread reader_thread2(test::MultiSinkTestThreadOptions(), reader_thread_core2); // Start writer threads. LogWriterThread writer_thread_core1(small_multisink, message_stack); thread::Thread writer_thread1(test::MultiSinkTestThreadOptions(), writer_thread_core1); LogWriterThread writer_thread_core2(small_multisink, message_stack); thread::Thread writer_thread2(test::MultiSinkTestThreadOptions(), writer_thread_core2); // Wait for writer thread to end. writer_thread1.join(); writer_thread2.join(); reader_thread1.join(); reader_thread2.join(); // Verifying received messages and drop message counts is unreliable as we // can't control the order threads will operate. } } // namespace pw::multisink