threaded, process and buffer producers/consumers

This commit is contained in:
2024-12-08 21:08:10 +02:00
parent 9386abc939
commit eeebe5604e
8 changed files with 255 additions and 22 deletions

View File

@@ -16,6 +16,12 @@ add_executable(
src/producer.cpp src/producer.cpp
include/reader_writer.h include/reader_writer.h
src/reader_writer.cpp src/reader_writer.cpp
include/p_producer_consumer.h
src/p_producer_consumer.cpp
include/bconsumer.h
include/bproducer.h
src/bconsumer.cpp
src/bproducer.cpp
) )
target_link_libraries(os-labs PRIVATE m) target_link_libraries(os-labs PRIVATE m)
target_include_directories(os-labs PRIVATE ${PROJECT_SOURCE_DIR}/include) target_include_directories(os-labs PRIVATE ${PROJECT_SOURCE_DIR}/include)

22
include/bconsumer.h Normal file
View File

@@ -0,0 +1,22 @@
//
// Created by lumijiez on 12/8/24.
//
#ifndef BCONSUMER_H
#define BCONSUMER_H
#include "buffer.h"
#include <semaphore>
class BConsumer {
public:
BConsumer(int id, Buffer& buffer, std::counting_semaphore<5>& sem);
void run() const;
private:
int id;
Buffer& buffer;
std::counting_semaphore<5>& consumerSem;
};
#endif //BCONSUMER_H

22
include/bproducer.h Normal file
View File

@@ -0,0 +1,22 @@
//
// Created by lumijiez on 12/8/24.
//
#ifndef BPRODUCER_H
#define BPRODUCER_H
#include "buffer.h"
#include <semaphore>
class BProducer {
public:
BProducer(int id, Buffer& buffer, std::counting_semaphore<3>& sem);
void run() const;
private:
int id;
Buffer& buffer;
std::counting_semaphore<3>& producerSem;
};
#endif //BPRODUCER_H

View File

@@ -0,0 +1,20 @@
//
// Created by lumijiez on 12/8/24.
//
#ifndef P_PRODUCER_CONSUMER_H
#define P_PRODUCER_CONSUMER_H
#define PNUM_PRODUCERS 3
#define PNUM_CONSUMERS 2
#define PBUFFER_SIZE 10
#define PSEM_MUTEX_NAME "/buffer_mutex"
#define PSEM_FULL_NAME "/buffer_full"
#define PSEM_EMPTY_NAME "/buffer_empty"
void pproducer(int producer_id, int write_fd);
void pconsumer(int consumer_id, int read_fd);
#endif //P_PRODUCER_CONSUMER_H

119
main.cpp
View File

@@ -8,6 +8,12 @@
#include "consumer.h" #include "consumer.h"
#include "reader_writer.h" #include "reader_writer.h"
#include <fcntl.h> #include <fcntl.h>
#include <atomic>
#include <sys/wait.h>
#include "bconsumer.h"
#include "bproducer.h"
#include "p_producer_consumer.h"
std::atomic stopProduction(false); std::atomic stopProduction(false);
@@ -20,65 +26,54 @@ std::atomic stopProduction(false);
} }
void runProducerConsumer() { void runProducerConsumer() {
const int NUM_PRODUCERS = 3; constexpr int NUM_PRODUCERS = 3;
const int NUM_CONSUMERS = 2; constexpr int NUM_CONSUMERS = 2;
// Create pipe
int pipe_fd[2]; int pipe_fd[2];
if (pipe(pipe_fd) == -1) { if (pipe(pipe_fd) == -1) {
std::cerr << "Pipe creation failed" << std::endl; std::cerr << "Pipe creation failed" << std::endl;
return; return;
} }
// Set pipe to non-blocking mode const int flags = fcntl(pipe_fd[1], F_GETFL, 0);
int flags = fcntl(pipe_fd[1], F_GETFL, 0);
fcntl(pipe_fd[1], F_SETFL, flags | O_NONBLOCK); fcntl(pipe_fd[1], F_SETFL, flags | O_NONBLOCK);
// Create semaphores
std::counting_semaphore<3> producerSemaphore(3); std::counting_semaphore<3> producerSemaphore(3);
std::counting_semaphore<5> consumerSemaphore(5); std::counting_semaphore<5> consumerSemaphore(5);
// Atomic flags for synchronization std::atomic stopProduction(false);
std::atomic<bool> stopProduction(false); std::atomic itemsProduced(0);
std::atomic<int> itemsProduced(0); std::atomic itemsConsumed(0);
std::atomic<int> itemsConsumed(0);
// Create threads
std::vector<std::thread> producerThreads; std::vector<std::thread> producerThreads;
std::vector<std::thread> consumerThreads; std::vector<std::thread> consumerThreads;
// Create producer threads
for (int i = 0; i < NUM_PRODUCERS; ++i) { for (int i = 0; i < NUM_PRODUCERS; ++i) {
producerThreads.emplace_back([&, i]() { producerThreads.emplace_back([&, i]() {
Producer producer(i, pipe_fd[1], producerSemaphore); const Producer producer(i, pipe_fd[1], producerSemaphore);
producer.run(itemsProduced, stopProduction); producer.run(itemsProduced, stopProduction);
}); });
} }
// Create consumer threads
for (int i = 0; i < NUM_CONSUMERS; ++i) { for (int i = 0; i < NUM_CONSUMERS; ++i) {
consumerThreads.emplace_back([&, i]() { consumerThreads.emplace_back([&, i]() {
Consumer consumer(i, pipe_fd[0], consumerSemaphore); const Consumer consumer(i, pipe_fd[0], consumerSemaphore);
consumer.run(itemsConsumed, stopProduction); consumer.run(itemsConsumed, stopProduction);
}); });
} }
// Wait for producers to finish
for (auto& thread : producerThreads) { for (auto& thread : producerThreads) {
thread.join(); thread.join();
} }
// Signal stop and wait for consumers
stopProduction = true; stopProduction = true;
for (auto& thread : consumerThreads) { for (auto& thread : consumerThreads) {
thread.join(); thread.join();
} }
// Close pipe
close(pipe_fd[0]); close(pipe_fd[0]);
close(pipe_fd[1]); close(pipe_fd[1]);
// Print final statistics
std::cout << "Total items produced: " << itemsProduced std::cout << "Total items produced: " << itemsProduced
<< "\nTotal items consumed: " << itemsConsumed << std::endl; << "\nTotal items consumed: " << itemsConsumed << std::endl;
} }
@@ -95,12 +90,84 @@ void runReaderWriter() {
rw.run(); rw.run();
} }
void runProcessProducerConsumer() {
const sem_t *mutex = sem_open(PSEM_MUTEX_NAME, O_CREAT, 0644, 1);
const sem_t *full = sem_open(PSEM_FULL_NAME, O_CREAT, 0644, PBUFFER_SIZE);
if (const sem_t *empty = sem_open(PSEM_EMPTY_NAME, O_CREAT, 0644, 0); mutex == SEM_FAILED || full == SEM_FAILED || empty == SEM_FAILED) {
perror("Failed to create semaphores");
return;
}
int pipe_fd[2];
if (pipe(pipe_fd) == -1) {
perror("Failed to create pipe");
return;
}
for (int i = 0; i < PNUM_PRODUCERS; i++) {
if (const pid_t pid = fork(); pid == 0) {
close(pipe_fd[0]);
pproducer(i, pipe_fd[1]);
exit(0);
}
}
for (int i = 0; i < PNUM_CONSUMERS; i++) {
if (const pid_t pid = fork(); pid == 0) {
close(pipe_fd[1]);
pconsumer(i, pipe_fd[0]);
exit(0);
}
}
for (int i = 0; i < PNUM_PRODUCERS + PNUM_CONSUMERS; i++) {
wait(nullptr);
}
sem_unlink(PSEM_MUTEX_NAME);
sem_unlink(PSEM_FULL_NAME);
sem_unlink(PSEM_EMPTY_NAME);
}
void runProducerConsumerBuffer() {
Buffer buffer(10);
std::counting_semaphore<3> producerSem(3);
std::counting_semaphore<5> consumerSem(5);
std::vector<std::thread> producers;
std::vector<std::thread> consumers;
for (int i = 0; i < 3; ++i) {
producers.emplace_back([i, &buffer, &producerSem]() {
const BProducer bproducer(i, buffer, producerSem);
bproducer.run();
});
}
for (int i = 0; i < 3; ++i) {
consumers.emplace_back([i, &buffer, &consumerSem]() {
const BConsumer bconsumer(i, buffer, consumerSem);
bconsumer.run();
});
}
for (auto& p : producers) {
p.join();
}
for (auto& c : consumers) {
c.join();
}
}
int main() { int main() {
while (true) { while (true) {
std::cout << "=== Main Menu ===\n"; std::cout << "=== Main Menu ===\n";
std::cout << "1. Run Signal Handler\n"; std::cout << "1. Run Signal Handler\n";
std::cout << "2. Run Producer/Consumer\n"; std::cout << "2. Run Threaded Producer/Consumer\n";
std::cout << "3. Run Reader/Writer\n"; std::cout << "3. Run Process Producer/Consumer\n";
std::cout << "4. Run Buffered Producer/Consumer\n";
std::cout << "5. Run Reader/Writer\n";
std::cout << "0. Exit\n"; std::cout << "0. Exit\n";
std::cout << "Enter your choice: "; std::cout << "Enter your choice: ";
@@ -112,10 +179,18 @@ int main() {
std::cout << "Running Signal Handler...\n"; std::cout << "Running Signal Handler...\n";
runSignalHandler(); runSignalHandler();
case 2: case 2:
std::cout << "Running Producer/Consumer...\n"; std::cout << "Running Threaded Producer/Consumer...\n";
runProducerConsumer(); runProducerConsumer();
break; break;
case 3: case 3:
std::cout << "Running Process Producer/Consumer...\n";
runProcessProducerConsumer();
break;
case 4:
std::cout << "Running Buffered Producer/Consumer...\n";
runProducerConsumerBuffer();
break;
case 5:
std::cout << "Running Reader-Writer...\n"; std::cout << "Running Reader-Writer...\n";
runReaderWriter(); runReaderWriter();
break; break;

19
src/bconsumer.cpp Normal file
View File

@@ -0,0 +1,19 @@
//
// Created by lumijiez on 12/8/24.
//
#include "bconsumer.h"
#include <chrono>
#include <thread>
BConsumer::BConsumer(const int id, Buffer& buffer, std::counting_semaphore<5>& sem)
: id(id), buffer(buffer), consumerSem(sem) {}
void BConsumer::run() const {
while (true) {
consumerSem.acquire();
int item = buffer.consume(id);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
consumerSem.release();
}
}

25
src/bproducer.cpp Normal file
View File

@@ -0,0 +1,25 @@
//
// Created by lumijiez on 12/8/24.
//
#include "bproducer.h"
#include <random>
#include <chrono>
#include <thread>
BProducer::BProducer(const int id, Buffer& buffer, std::counting_semaphore<3>& sem)
: id(id), buffer(buffer), producerSem(sem) {}
void BProducer::run() const {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(1, 100);
while (true) {
producerSem.acquire();
const int item = dis(gen);
buffer.produce(id, item);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
producerSem.release();
}
}

View File

@@ -0,0 +1,44 @@
//
// Created by lumijiez on 12/8/24.
//
#include "p_producer_consumer.h"
#include <cstdio>
#include <cstdlib>
#include <ctime>
#include <unistd.h>
#include <cstring>
void pproducer(const int producer_id, const int write_fd) {
srand(time(nullptr) * getpid());
while (true) {
const int item = rand() % 100;
char item_str[32];
snprintf(item_str, sizeof(item_str), "%d", item);
write(write_fd, item_str, strlen(item_str) + 1);
printf("Producer %d produced: %d\n", producer_id, item);
sleep(1);
}
}
void pconsumer(const int consumer_id, const int read_fd) {
while (true) {
char item_str[32];
if (const ssize_t bytesRead = read(read_fd, item_str, sizeof(item_str)); bytesRead > 0) {
printf("Consumer %d consumed: %s\n", consumer_id, item_str);
} else {
if (bytesRead == 0) {
break;
}
perror("Error reading from pipe");
}
sleep(2);
}
}