diff --git a/CMakeLists.txt b/CMakeLists.txt index 4dd962c..89d65f1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,6 +16,12 @@ add_executable( src/producer.cpp include/reader_writer.h src/reader_writer.cpp + include/p_producer_consumer.h + src/p_producer_consumer.cpp + include/bconsumer.h + include/bproducer.h + src/bconsumer.cpp + src/bproducer.cpp ) target_link_libraries(os-labs PRIVATE m) target_include_directories(os-labs PRIVATE ${PROJECT_SOURCE_DIR}/include) \ No newline at end of file diff --git a/include/bconsumer.h b/include/bconsumer.h new file mode 100644 index 0000000..09c54f7 --- /dev/null +++ b/include/bconsumer.h @@ -0,0 +1,22 @@ +// +// Created by lumijiez on 12/8/24. +// + +#ifndef BCONSUMER_H +#define BCONSUMER_H + +#include "buffer.h" +#include + +class BConsumer { +public: + BConsumer(int id, Buffer& buffer, std::counting_semaphore<5>& sem); + void run() const; + +private: + int id; + Buffer& buffer; + std::counting_semaphore<5>& consumerSem; +}; + +#endif //BCONSUMER_H diff --git a/include/bproducer.h b/include/bproducer.h new file mode 100644 index 0000000..f9ffd9c --- /dev/null +++ b/include/bproducer.h @@ -0,0 +1,22 @@ +// +// Created by lumijiez on 12/8/24. +// + +#ifndef BPRODUCER_H +#define BPRODUCER_H + +#include "buffer.h" +#include + +class BProducer { +public: + BProducer(int id, Buffer& buffer, std::counting_semaphore<3>& sem); + void run() const; + +private: + int id; + Buffer& buffer; + std::counting_semaphore<3>& producerSem; +}; + +#endif //BPRODUCER_H diff --git a/include/p_producer_consumer.h b/include/p_producer_consumer.h new file mode 100644 index 0000000..62506d4 --- /dev/null +++ b/include/p_producer_consumer.h @@ -0,0 +1,20 @@ +// +// Created by lumijiez on 12/8/24. +// + +#ifndef P_PRODUCER_CONSUMER_H +#define P_PRODUCER_CONSUMER_H + +#define PNUM_PRODUCERS 3 +#define PNUM_CONSUMERS 2 +#define PBUFFER_SIZE 10 + +#define PSEM_MUTEX_NAME "/buffer_mutex" +#define PSEM_FULL_NAME "/buffer_full" +#define PSEM_EMPTY_NAME "/buffer_empty" + +void pproducer(int producer_id, int write_fd); + +void pconsumer(int consumer_id, int read_fd); + +#endif //P_PRODUCER_CONSUMER_H diff --git a/main.cpp b/main.cpp index 14926d1..6a26266 100644 --- a/main.cpp +++ b/main.cpp @@ -8,6 +8,12 @@ #include "consumer.h" #include "reader_writer.h" #include +#include +#include + +#include "bconsumer.h" +#include "bproducer.h" +#include "p_producer_consumer.h" std::atomic stopProduction(false); @@ -20,65 +26,54 @@ std::atomic stopProduction(false); } void runProducerConsumer() { - const int NUM_PRODUCERS = 3; - const int NUM_CONSUMERS = 2; + constexpr int NUM_PRODUCERS = 3; + constexpr int NUM_CONSUMERS = 2; - // Create pipe int pipe_fd[2]; if (pipe(pipe_fd) == -1) { std::cerr << "Pipe creation failed" << std::endl; return; } - // Set pipe to non-blocking mode - int flags = fcntl(pipe_fd[1], F_GETFL, 0); + const int flags = fcntl(pipe_fd[1], F_GETFL, 0); fcntl(pipe_fd[1], F_SETFL, flags | O_NONBLOCK); - // Create semaphores std::counting_semaphore<3> producerSemaphore(3); std::counting_semaphore<5> consumerSemaphore(5); - // Atomic flags for synchronization - std::atomic stopProduction(false); - std::atomic itemsProduced(0); - std::atomic itemsConsumed(0); + std::atomic stopProduction(false); + std::atomic itemsProduced(0); + std::atomic itemsConsumed(0); - // Create threads std::vector producerThreads; std::vector consumerThreads; - // Create producer threads for (int i = 0; i < NUM_PRODUCERS; ++i) { producerThreads.emplace_back([&, i]() { - Producer producer(i, pipe_fd[1], producerSemaphore); + const Producer producer(i, pipe_fd[1], producerSemaphore); producer.run(itemsProduced, stopProduction); }); } - // Create consumer threads for (int i = 0; i < NUM_CONSUMERS; ++i) { consumerThreads.emplace_back([&, i]() { - Consumer consumer(i, pipe_fd[0], consumerSemaphore); + const Consumer consumer(i, pipe_fd[0], consumerSemaphore); consumer.run(itemsConsumed, stopProduction); }); } - // Wait for producers to finish for (auto& thread : producerThreads) { thread.join(); } - // Signal stop and wait for consumers stopProduction = true; for (auto& thread : consumerThreads) { thread.join(); } - // Close pipe close(pipe_fd[0]); close(pipe_fd[1]); - // Print final statistics std::cout << "Total items produced: " << itemsProduced << "\nTotal items consumed: " << itemsConsumed << std::endl; } @@ -95,12 +90,84 @@ void runReaderWriter() { rw.run(); } +void runProcessProducerConsumer() { + const sem_t *mutex = sem_open(PSEM_MUTEX_NAME, O_CREAT, 0644, 1); + const sem_t *full = sem_open(PSEM_FULL_NAME, O_CREAT, 0644, PBUFFER_SIZE); + + if (const sem_t *empty = sem_open(PSEM_EMPTY_NAME, O_CREAT, 0644, 0); mutex == SEM_FAILED || full == SEM_FAILED || empty == SEM_FAILED) { + perror("Failed to create semaphores"); + return; + } + + int pipe_fd[2]; + if (pipe(pipe_fd) == -1) { + perror("Failed to create pipe"); + return; + } + + for (int i = 0; i < PNUM_PRODUCERS; i++) { + if (const pid_t pid = fork(); pid == 0) { + close(pipe_fd[0]); + pproducer(i, pipe_fd[1]); + exit(0); + } + } + + for (int i = 0; i < PNUM_CONSUMERS; i++) { + if (const pid_t pid = fork(); pid == 0) { + close(pipe_fd[1]); + pconsumer(i, pipe_fd[0]); + exit(0); + } + } + + for (int i = 0; i < PNUM_PRODUCERS + PNUM_CONSUMERS; i++) { + wait(nullptr); + } + + sem_unlink(PSEM_MUTEX_NAME); + sem_unlink(PSEM_FULL_NAME); + sem_unlink(PSEM_EMPTY_NAME); +} + +void runProducerConsumerBuffer() { + Buffer buffer(10); + std::counting_semaphore<3> producerSem(3); + std::counting_semaphore<5> consumerSem(5); + + std::vector producers; + std::vector consumers; + + for (int i = 0; i < 3; ++i) { + producers.emplace_back([i, &buffer, &producerSem]() { + const BProducer bproducer(i, buffer, producerSem); + bproducer.run(); + }); + } + + for (int i = 0; i < 3; ++i) { + consumers.emplace_back([i, &buffer, &consumerSem]() { + const BConsumer bconsumer(i, buffer, consumerSem); + bconsumer.run(); + }); + } + + for (auto& p : producers) { + p.join(); + } + for (auto& c : consumers) { + c.join(); + } +} + int main() { while (true) { std::cout << "=== Main Menu ===\n"; std::cout << "1. Run Signal Handler\n"; - std::cout << "2. Run Producer/Consumer\n"; - std::cout << "3. Run Reader/Writer\n"; + std::cout << "2. Run Threaded Producer/Consumer\n"; + std::cout << "3. Run Process Producer/Consumer\n"; + std::cout << "4. Run Buffered Producer/Consumer\n"; + std::cout << "5. Run Reader/Writer\n"; std::cout << "0. Exit\n"; std::cout << "Enter your choice: "; @@ -112,10 +179,18 @@ int main() { std::cout << "Running Signal Handler...\n"; runSignalHandler(); case 2: - std::cout << "Running Producer/Consumer...\n"; + std::cout << "Running Threaded Producer/Consumer...\n"; runProducerConsumer(); break; case 3: + std::cout << "Running Process Producer/Consumer...\n"; + runProcessProducerConsumer(); + break; + case 4: + std::cout << "Running Buffered Producer/Consumer...\n"; + runProducerConsumerBuffer(); + break; + case 5: std::cout << "Running Reader-Writer...\n"; runReaderWriter(); break; diff --git a/src/bconsumer.cpp b/src/bconsumer.cpp new file mode 100644 index 0000000..3b7e895 --- /dev/null +++ b/src/bconsumer.cpp @@ -0,0 +1,19 @@ +// +// Created by lumijiez on 12/8/24. +// + +#include "bconsumer.h" +#include +#include + +BConsumer::BConsumer(const int id, Buffer& buffer, std::counting_semaphore<5>& sem) + : id(id), buffer(buffer), consumerSem(sem) {} + +void BConsumer::run() const { + while (true) { + consumerSem.acquire(); + int item = buffer.consume(id); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + consumerSem.release(); + } +} \ No newline at end of file diff --git a/src/bproducer.cpp b/src/bproducer.cpp new file mode 100644 index 0000000..d1ed734 --- /dev/null +++ b/src/bproducer.cpp @@ -0,0 +1,25 @@ +// +// Created by lumijiez on 12/8/24. +// + +#include "bproducer.h" +#include +#include +#include + +BProducer::BProducer(const int id, Buffer& buffer, std::counting_semaphore<3>& sem) + : id(id), buffer(buffer), producerSem(sem) {} + +void BProducer::run() const { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dis(1, 100); + + while (true) { + producerSem.acquire(); + const int item = dis(gen); + buffer.produce(id, item); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + producerSem.release(); + } +} \ No newline at end of file diff --git a/src/p_producer_consumer.cpp b/src/p_producer_consumer.cpp new file mode 100644 index 0000000..cb6c4bd --- /dev/null +++ b/src/p_producer_consumer.cpp @@ -0,0 +1,44 @@ +// +// Created by lumijiez on 12/8/24. +// + +#include "p_producer_consumer.h" + +#include +#include +#include +#include +#include + +void pproducer(const int producer_id, const int write_fd) { + srand(time(nullptr) * getpid()); + + while (true) { + const int item = rand() % 100; + char item_str[32]; + snprintf(item_str, sizeof(item_str), "%d", item); + + write(write_fd, item_str, strlen(item_str) + 1); + + printf("Producer %d produced: %d\n", producer_id, item); + + sleep(1); + } +} + +void pconsumer(const int consumer_id, const int read_fd) { + while (true) { + char item_str[32]; + + if (const ssize_t bytesRead = read(read_fd, item_str, sizeof(item_str)); bytesRead > 0) { + printf("Consumer %d consumed: %s\n", consumer_id, item_str); + } else { + if (bytesRead == 0) { + break; + } + perror("Error reading from pipe"); + } + + sleep(2); + } +} \ No newline at end of file