lab4, producer/consumer fixed output

This commit is contained in:
2024-12-08 19:46:11 +02:00
parent c0670991b1
commit 9386abc939
5 changed files with 137 additions and 52 deletions

View File

@@ -5,18 +5,18 @@
#ifndef CONSUMER_H #ifndef CONSUMER_H
#define CONSUMER_H #define CONSUMER_H
#include "buffer.h"
#include <semaphore> #include <semaphore>
class Consumer { class Consumer {
public: public:
Consumer(int id, Buffer& buffer, std::counting_semaphore<5>& sem); Consumer(int id, int read_fd, std::counting_semaphore<5>& consumerSem);
void run() const; void run(std::atomic<int>& itemsConsumed, const std::atomic<bool>& stopFlag) const;
private: private:
int id; int consumerId;
Buffer& buffer; int readFd;
std::counting_semaphore<5>& consumerSem; std::counting_semaphore<5>& consumerSemaphore;
}; };
#endif //CONSUMER_H #endif

View File

@@ -5,18 +5,17 @@
#ifndef PRODUCER_H #ifndef PRODUCER_H
#define PRODUCER_H #define PRODUCER_H
#include "buffer.h"
#include <semaphore> #include <semaphore>
class Producer { class Producer {
public: public:
Producer(int id, Buffer& buffer, std::counting_semaphore<3>& sem); Producer(int id, int write_fd, std::counting_semaphore<3>& producerSem);
void run() const; void run(std::atomic<int>& itemsProduced, const std::atomic<bool>& stopFlag) const;
private: private:
int id; int producerId;
Buffer& buffer; int writeFd;
std::counting_semaphore<3>& producerSem; std::counting_semaphore<3>& producerSemaphore;
}; };
#endif //PRODUCER_H #endif

View File

@@ -7,6 +7,9 @@
#include "producer.h" #include "producer.h"
#include "consumer.h" #include "consumer.h"
#include "reader_writer.h" #include "reader_writer.h"
#include <fcntl.h>
std::atomic stopProduction(false);
[[noreturn]] void runSignalHandler() { [[noreturn]] void runSignalHandler() {
setupSignalHandlers(); setupSignalHandlers();
@@ -17,33 +20,67 @@
} }
void runProducerConsumer() { void runProducerConsumer() {
Buffer buffer(10); const int NUM_PRODUCERS = 3;
std::counting_semaphore<3> producerSem(3); const int NUM_CONSUMERS = 2;
std::counting_semaphore<5> consumerSem(5);
std::vector<std::thread> producers; // Create pipe
std::vector<std::thread> consumers; int pipe_fd[2];
if (pipe(pipe_fd) == -1) {
std::cerr << "Pipe creation failed" << std::endl;
return;
}
for (int i = 0; i < 3; ++i) { // Set pipe to non-blocking mode
producers.emplace_back([i, &buffer, &producerSem]() { int flags = fcntl(pipe_fd[1], F_GETFL, 0);
const Producer producer(i, buffer, producerSem); fcntl(pipe_fd[1], F_SETFL, flags | O_NONBLOCK);
producer.run();
// Create semaphores
std::counting_semaphore<3> producerSemaphore(3);
std::counting_semaphore<5> consumerSemaphore(5);
// Atomic flags for synchronization
std::atomic<bool> stopProduction(false);
std::atomic<int> itemsProduced(0);
std::atomic<int> itemsConsumed(0);
// Create threads
std::vector<std::thread> producerThreads;
std::vector<std::thread> consumerThreads;
// Create producer threads
for (int i = 0; i < NUM_PRODUCERS; ++i) {
producerThreads.emplace_back([&, i]() {
Producer producer(i, pipe_fd[1], producerSemaphore);
producer.run(itemsProduced, stopProduction);
}); });
} }
for (int i = 0; i < 3; ++i) { // Create consumer threads
consumers.emplace_back([i, &buffer, &consumerSem]() { for (int i = 0; i < NUM_CONSUMERS; ++i) {
const Consumer consumer(i, buffer, consumerSem); consumerThreads.emplace_back([&, i]() {
consumer.run(); Consumer consumer(i, pipe_fd[0], consumerSemaphore);
consumer.run(itemsConsumed, stopProduction);
}); });
} }
for (auto& p : producers) { // Wait for producers to finish
p.join(); for (auto& thread : producerThreads) {
thread.join();
} }
for (auto& c : consumers) {
c.join(); // Signal stop and wait for consumers
stopProduction = true;
for (auto& thread : consumerThreads) {
thread.join();
} }
// Close pipe
close(pipe_fd[0]);
close(pipe_fd[1]);
// Print final statistics
std::cout << "Total items produced: " << itemsProduced
<< "\nTotal items consumed: " << itemsConsumed << std::endl;
} }
void runReaderWriter() { void runReaderWriter() {
@@ -74,7 +111,6 @@ int main() {
case 1: case 1:
std::cout << "Running Signal Handler...\n"; std::cout << "Running Signal Handler...\n";
runSignalHandler(); runSignalHandler();
break;
case 2: case 2:
std::cout << "Running Producer/Consumer...\n"; std::cout << "Running Producer/Consumer...\n";
runProducerConsumer(); runProducerConsumer();

View File

@@ -2,17 +2,47 @@
// Created by lumijiez on 11/16/24. // Created by lumijiez on 11/16/24.
// //
#include "consumer.h" #include "consumer.h"
#include <chrono>
#include <thread>
Consumer::Consumer(const int id, Buffer& buffer, std::counting_semaphore<5>& sem) #include <iostream>
: id(id), buffer(buffer), consumerSem(sem) {} #include <cstring>
#include <unistd.h>
#include <atomic>
#include <fcntl.h>
#include <sstream>
void Consumer::run() const { Consumer::Consumer(const int id, const int read_fd, std::counting_semaphore<5>& consumerSem)
while (true) { : consumerId(id), readFd(read_fd), consumerSemaphore(consumerSem) {}
consumerSem.acquire();
int item = buffer.consume(id); void Consumer::run(std::atomic<int>& itemsConsumed, const std::atomic<bool>& stopFlag) const {
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); const int flags = fcntl(readFd, F_GETFL, 0);
consumerSem.release(); fcntl(readFd, F_SETFL, flags | O_NONBLOCK);
while (!stopFlag || itemsConsumed < 60) {
consumerSemaphore.acquire();
int item;
if (const ssize_t bytes_read = read(readFd, &item, sizeof(item)); bytes_read > 0) {
std::ostringstream output;
output << "Consumer " << consumerId << " consumed: " << item << std::endl;
std::cout << output.str();
++itemsConsumed;
}
else if (bytes_read == 0) {
break;
}
else {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
std::ostringstream output;
output << "Consumer " << consumerId << "read error: " << strerror(errno) << std::endl;
std::cerr << output.str();
break;
}
}
consumerSemaphore.release();
usleep(150000); // 150ms
} }
} }

View File

@@ -4,21 +4,41 @@
#include "producer.h" #include "producer.h"
#include <random> #include <random>
#include <chrono> #include <chrono>
#include <csignal>
#include <iostream>
#include <thread> #include <thread>
Producer::Producer(const int id, Buffer& buffer, std::counting_semaphore<3>& sem) Producer::Producer(const int id, const int write_fd, std::counting_semaphore<3>& producerSem)
: id(id), buffer(buffer), producerSem(sem) {} : producerId(id), writeFd(write_fd), producerSemaphore(producerSem) {}
void Producer::run(std::atomic<int>& itemsProduced, const std::atomic<bool>& stopFlag) const {
signal(SIGPIPE, SIG_IGN);
void Producer::run() const {
std::random_device rd; std::random_device rd;
std::mt19937 gen(rd()); std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(1, 100); std::uniform_int_distribution<> dis(1, 1000);
while (true) { while (!stopFlag && itemsProduced < 60) {
producerSem.acquire(); producerSemaphore.acquire();
const int item = dis(gen);
buffer.produce(id, item); int item = dis(gen);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
producerSem.release(); if (const ssize_t bytes_written = write(writeFd, &item, sizeof(item)); bytes_written > 0) {
std::ostringstream output;
output << "Producer " << producerId << " produced: " << item << std::endl;
std::cout << output.str();
++itemsProduced;
} else {
std::ostringstream output;
output << "Producer " << producerId << "failed to write to pipe." << std::endl;
std::cerr << output.str();
if (errno != EAGAIN && errno != EINTR) {
break;
}
}
producerSemaphore.release();
usleep(100000);
} }
} }