From 9386abc9394738a02d971285cb76b9354682300a Mon Sep 17 00:00:00 2001 From: Lumijiez Date: Sun, 8 Dec 2024 19:46:11 +0200 Subject: [PATCH] lab4, producer/consumer fixed output --- include/consumer.h | 14 ++++----- include/producer.h | 13 ++++----- main.cpp | 72 ++++++++++++++++++++++++++++++++++------------ src/consumer.cpp | 50 +++++++++++++++++++++++++------- src/producer.cpp | 40 +++++++++++++++++++------- 5 files changed, 137 insertions(+), 52 deletions(-) diff --git a/include/consumer.h b/include/consumer.h index 8e0b0fc..6e90efc 100644 --- a/include/consumer.h +++ b/include/consumer.h @@ -5,18 +5,18 @@ #ifndef CONSUMER_H #define CONSUMER_H -#include "buffer.h" #include class Consumer { public: - Consumer(int id, Buffer& buffer, std::counting_semaphore<5>& sem); - void run() const; + Consumer(int id, int read_fd, std::counting_semaphore<5>& consumerSem); + void run(std::atomic& itemsConsumed, const std::atomic& stopFlag) const; private: - int id; - Buffer& buffer; - std::counting_semaphore<5>& consumerSem; + int consumerId; + int readFd; + std::counting_semaphore<5>& consumerSemaphore; }; -#endif //CONSUMER_H +#endif + diff --git a/include/producer.h b/include/producer.h index 507b68e..ee293cf 100644 --- a/include/producer.h +++ b/include/producer.h @@ -5,18 +5,17 @@ #ifndef PRODUCER_H #define PRODUCER_H -#include "buffer.h" #include class Producer { public: - Producer(int id, Buffer& buffer, std::counting_semaphore<3>& sem); - void run() const; + Producer(int id, int write_fd, std::counting_semaphore<3>& producerSem); + void run(std::atomic& itemsProduced, const std::atomic& stopFlag) const; private: - int id; - Buffer& buffer; - std::counting_semaphore<3>& producerSem; + int producerId; + int writeFd; + std::counting_semaphore<3>& producerSemaphore; }; -#endif //PRODUCER_H +#endif diff --git a/main.cpp b/main.cpp index aff8b28..14926d1 100644 --- a/main.cpp +++ b/main.cpp @@ -7,6 +7,9 @@ #include "producer.h" #include "consumer.h" #include "reader_writer.h" +#include + +std::atomic stopProduction(false); [[noreturn]] void runSignalHandler() { setupSignalHandlers(); @@ -17,33 +20,67 @@ } void runProducerConsumer() { - Buffer buffer(10); - std::counting_semaphore<3> producerSem(3); - std::counting_semaphore<5> consumerSem(5); + const int NUM_PRODUCERS = 3; + const int NUM_CONSUMERS = 2; - std::vector producers; - std::vector consumers; + // Create pipe + int pipe_fd[2]; + if (pipe(pipe_fd) == -1) { + std::cerr << "Pipe creation failed" << std::endl; + return; + } - for (int i = 0; i < 3; ++i) { - producers.emplace_back([i, &buffer, &producerSem]() { - const Producer producer(i, buffer, producerSem); - producer.run(); + // Set pipe to non-blocking mode + int flags = fcntl(pipe_fd[1], F_GETFL, 0); + fcntl(pipe_fd[1], F_SETFL, flags | O_NONBLOCK); + + // Create semaphores + std::counting_semaphore<3> producerSemaphore(3); + std::counting_semaphore<5> consumerSemaphore(5); + + // Atomic flags for synchronization + std::atomic stopProduction(false); + std::atomic itemsProduced(0); + std::atomic itemsConsumed(0); + + // Create threads + std::vector producerThreads; + std::vector consumerThreads; + + // Create producer threads + for (int i = 0; i < NUM_PRODUCERS; ++i) { + producerThreads.emplace_back([&, i]() { + Producer producer(i, pipe_fd[1], producerSemaphore); + producer.run(itemsProduced, stopProduction); }); } - for (int i = 0; i < 3; ++i) { - consumers.emplace_back([i, &buffer, &consumerSem]() { - const Consumer consumer(i, buffer, consumerSem); - consumer.run(); + // Create consumer threads + for (int i = 0; i < NUM_CONSUMERS; ++i) { + consumerThreads.emplace_back([&, i]() { + Consumer consumer(i, pipe_fd[0], consumerSemaphore); + consumer.run(itemsConsumed, stopProduction); }); } - for (auto& p : producers) { - p.join(); + // Wait for producers to finish + for (auto& thread : producerThreads) { + thread.join(); } - for (auto& c : consumers) { - c.join(); + + // Signal stop and wait for consumers + stopProduction = true; + for (auto& thread : consumerThreads) { + thread.join(); } + + // Close pipe + close(pipe_fd[0]); + close(pipe_fd[1]); + + // Print final statistics + std::cout << "Total items produced: " << itemsProduced + << "\nTotal items consumed: " << itemsConsumed << std::endl; } void runReaderWriter() { @@ -74,7 +111,6 @@ int main() { case 1: std::cout << "Running Signal Handler...\n"; runSignalHandler(); - break; case 2: std::cout << "Running Producer/Consumer...\n"; runProducerConsumer(); diff --git a/src/consumer.cpp b/src/consumer.cpp index 2b2d75d..5862e4c 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -2,17 +2,47 @@ // Created by lumijiez on 11/16/24. // #include "consumer.h" -#include -#include -Consumer::Consumer(const int id, Buffer& buffer, std::counting_semaphore<5>& sem) - : id(id), buffer(buffer), consumerSem(sem) {} +#include +#include +#include +#include +#include +#include -void Consumer::run() const { - while (true) { - consumerSem.acquire(); - int item = buffer.consume(id); - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - consumerSem.release(); +Consumer::Consumer(const int id, const int read_fd, std::counting_semaphore<5>& consumerSem) + : consumerId(id), readFd(read_fd), consumerSemaphore(consumerSem) {} + +void Consumer::run(std::atomic& itemsConsumed, const std::atomic& stopFlag) const { + const int flags = fcntl(readFd, F_GETFL, 0); + fcntl(readFd, F_SETFL, flags | O_NONBLOCK); + + while (!stopFlag || itemsConsumed < 60) { + consumerSemaphore.acquire(); + + int item; + + if (const ssize_t bytes_read = read(readFd, &item, sizeof(item)); bytes_read > 0) { + std::ostringstream output; + output << "Consumer " << consumerId << " consumed: " << item << std::endl; + std::cout << output.str(); + + ++itemsConsumed; + } + else if (bytes_read == 0) { + break; + } + else { + if (errno != EAGAIN && errno != EWOULDBLOCK) { + std::ostringstream output; + output << "Consumer " << consumerId << "read error: " << strerror(errno) << std::endl; + std::cerr << output.str(); + break; + } + } + + consumerSemaphore.release(); + + usleep(150000); // 150ms } } diff --git a/src/producer.cpp b/src/producer.cpp index 0f7999d..21155c2 100644 --- a/src/producer.cpp +++ b/src/producer.cpp @@ -4,21 +4,41 @@ #include "producer.h" #include #include +#include +#include #include -Producer::Producer(const int id, Buffer& buffer, std::counting_semaphore<3>& sem) - : id(id), buffer(buffer), producerSem(sem) {} +Producer::Producer(const int id, const int write_fd, std::counting_semaphore<3>& producerSem) + : producerId(id), writeFd(write_fd), producerSemaphore(producerSem) {} + +void Producer::run(std::atomic& itemsProduced, const std::atomic& stopFlag) const { + signal(SIGPIPE, SIG_IGN); -void Producer::run() const { std::random_device rd; std::mt19937 gen(rd()); - std::uniform_int_distribution<> dis(1, 100); + std::uniform_int_distribution<> dis(1, 1000); - while (true) { - producerSem.acquire(); - const int item = dis(gen); - buffer.produce(id, item); - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - producerSem.release(); + while (!stopFlag && itemsProduced < 60) { + producerSemaphore.acquire(); + + int item = dis(gen); + + if (const ssize_t bytes_written = write(writeFd, &item, sizeof(item)); bytes_written > 0) { + std::ostringstream output; + output << "Producer " << producerId << " produced: " << item << std::endl; + std::cout << output.str(); + + ++itemsProduced; + } else { + std::ostringstream output; + output << "Producer " << producerId << "failed to write to pipe." << std::endl; + std::cerr << output.str(); + if (errno != EAGAIN && errno != EINTR) { + break; + } + } + producerSemaphore.release(); + + usleep(100000); } }