Skip to content

🔄 Đa luồng (Multithreading) trong C++

📖 Giới thiệu

Multithreading là kỹ thuật cho phép chương trình thực hiện nhiều tác vụ đồng thời, tận dụng tối đa khả năng của CPU đa nhân. C++11 đã giới thiệu thư viện <thread> cung cấp các công cụ mạnh mẽ để làm việc với threads một cách tiêu chuẩn và portable.

Tại sao cần Multithreading?

  • Performance: Tận dụng CPU đa nhân
  • Responsiveness: UI không bị đơ khi xử lý tác vụ nặng
  • Parallel processing: Xử lý dữ liệu lớn song song
  • Concurrent operations: I/O operations, network requests

Ứng dụng thực tiễn:

  • Game engines: Rendering, physics, AI chạy song song
  • Web servers: Xử lý nhiều requests đồng thời
  • Image/Video processing: Parallel pixel processing
  • Scientific computing: Matrix operations, simulations

Thách thức với Multithreading:

  • Race conditions: Nhiều threads truy cập shared data
  • Deadlocks: Threads chờ nhau vô hạn
  • Thread safety: Đảm bảo data consistency
  • Debugging: Khó debug concurrent code

🔧 Cú pháp

Thư viện cần thiết

cpp
#include <thread>         // std::thread
#include <mutex>          // std::mutex, std::lock_guard
#include <condition_variable> // std::condition_variable
#include <future>         // std::async, std::future
#include <atomic>         // std::atomic
#include <chrono>         // std::chrono (for sleep)

Tạo và quản lý threads

cpp
#include <iostream>
#include <thread>
#include <chrono>
using namespace std;

// Function để chạy trong thread
void simpleTask(int id) {
    cout << "Thread " << id << " starting..." << endl;
    this_thread::sleep_for(chrono::seconds(2));
    cout << "Thread " << id << " finished!" << endl;
}

int main() {
    // Tạo threads
    thread t1(simpleTask, 1);
    thread t2(simpleTask, 2);
    
    // Chờ threads hoàn thành
    t1.join();  // Block until t1 finishes
    t2.join();  // Block until t2 finishes
    
    cout << "All threads completed!" << endl;
    return 0;
}

Lambda functions với threads

cpp
// Sử dụng lambda
thread t1([]() {
    cout << "Lambda thread running!" << endl;
});

// Lambda với capture
int data = 42;
thread t2([&data]() {
    cout << "Data from main: " << data << endl;
});

t1.join();
t2.join();

Mutex và Thread Safety

cpp
#include <mutex>

mutex mtx;  // Global mutex
int sharedCounter = 0;

void incrementCounter(int times) {
    for (int i = 0; i < times; i++) {
        // Critical section
        mtx.lock();
        sharedCounter++;
        mtx.unlock();
    }
}

// Hoặc sử dụng lock_guard (RAII)
void safeIncrement(int times) {
    for (int i = 0; i < times; i++) {
        lock_guard<mutex> lock(mtx);  // Automatic unlock
        sharedCounter++;
    }
}

Condition Variables

cpp
#include <condition_variable>
#include <queue>

mutex queueMutex;
condition_variable cv;
queue<int> dataQueue;
bool finished = false;

// Producer thread
void producer() {
    for (int i = 0; i < 10; i++) {
        {
            lock_guard<mutex> lock(queueMutex);
            dataQueue.push(i);
            cout << "Produced: " << i << endl;
        }
        cv.notify_one();  // Notify waiting consumer
        this_thread::sleep_for(chrono::milliseconds(100));
    }
    
    {
        lock_guard<mutex> lock(queueMutex);
        finished = true;
    }
    cv.notify_all();
}

// Consumer thread
void consumer(int id) {
    while (true) {
        unique_lock<mutex> lock(queueMutex);
        cv.wait(lock, []{ return !dataQueue.empty() || finished; });
        
        if (!dataQueue.empty()) {
            int data = dataQueue.front();
            dataQueue.pop();
            lock.unlock();
            
            cout << "Consumer " << id << " consumed: " << data << endl;
        } else if (finished) {
            break;
        }
    }
}

Atomic Variables

cpp
#include <atomic>

atomic<int> atomicCounter(0);

void atomicIncrement(int times) {
    for (int i = 0; i < times; i++) {
        atomicCounter++;  // Thread-safe without mutex
    }
}

// Các operations khác
atomicCounter.store(100);         // Set value
int value = atomicCounter.load(); // Get value
int old = atomicCounter.exchange(200); // Set new, return old

std::async và std::future

cpp
#include <future>

// Async function
int calculation(int n) {
    int sum = 0;
    for (int i = 1; i <= n; i++) {
        sum += i;
    }
    return sum;
}

int main() {
    // Launch async task
    future<int> result = async(launch::async, calculation, 1000);
    
    // Do other work while calculation runs
    cout << "Calculation running in background..." << endl;
    
    // Get result (blocks if not ready)
    int sum = result.get();
    cout << "Result: " << sum << endl;
    
    return 0;
}

🔬 Phân tích & Giải thích chi tiết

Thread Lifecycle

Thread States:

  1. Created: Thread object tạo nhưng chưa start
  2. Running: Thread đang thực thi
  3. Blocked: Thread chờ resource (mutex, I/O)
  4. Finished: Thread hoàn thành execution

Join vs Detach:

cpp
thread t(someFunction);

// Option 1: Join - wait for thread to finish
t.join();    // Main thread waits

// Option 2: Detach - let thread run independently
t.detach();  // Main thread continues, thread runs in background

Synchronization Mechanisms

Mutex Types:

  • mutex: Basic mutual exclusion
  • recursive_mutex: Same thread có thể lock nhiều lần
  • timed_mutex: Có timeout cho lock operations
  • shared_mutex: Multiple readers, single writer (C++17)

Lock Types:

  • lock_guard: RAII lock, tự động unlock
  • unique_lock: Flexible lock, có thể unlock manual
  • shared_lock: Shared ownership (với shared_mutex)

Race Conditions và Data Races

Race Condition Example:

cpp
// UNSAFE - Race condition
int counter = 0;
void unsafeIncrement() {
    for (int i = 0; i < 1000; i++) {
        counter++;  // Read-Modify-Write not atomic
    }
}

// SAFE - Using mutex
mutex mtx;
void safeIncrement() {
    for (int i = 0; i < 1000; i++) {
        lock_guard<mutex> lock(mtx);
        counter++;
    }
}

// SAFE - Using atomic
atomic<int> atomicCounter(0);
void atomicIncrement() {
    for (int i = 0; i < 1000; i++) {
        atomicCounter++;  // Atomic operation
    }
}

Deadlock Prevention

Common Deadlock Scenario:

cpp
mutex mtx1, mtx2;

// Thread 1
void thread1() {
    lock_guard<mutex> lock1(mtx1);
    this_thread::sleep_for(chrono::milliseconds(10));
    lock_guard<mutex> lock2(mtx2);  // Might deadlock
}

// Thread 2
void thread2() {
    lock_guard<mutex> lock2(mtx2);
    this_thread::sleep_for(chrono::milliseconds(10));
    lock_guard<mutex> lock1(mtx1);  // Might deadlock
}

// SOLUTION: Always lock in same order
void safeThread1() {
    lock_guard<mutex> lock1(mtx1);  // Always lock mtx1 first
    lock_guard<mutex> lock2(mtx2);  // Then mtx2
}

void safeThread2() {
    lock_guard<mutex> lock1(mtx1);  // Same order
    lock_guard<mutex> lock2(mtx2);
}

💻 Ví dụ minh họa

Ví dụ 1: Download Manager với Multithreading

cpp
#include <iostream>
#include <thread>
#include <vector>
#include <string>
#include <chrono>
#include <mutex>
#include <atomic>
#include <iomanip>
using namespace std;

// Simulate file download
struct DownloadTask {
    string filename;
    size_t totalSize;
    atomic<size_t> downloadedSize;
    atomic<bool> completed;
    chrono::steady_clock::time_point startTime;
    
    DownloadTask(const string& name, size_t size) 
        : filename(name), totalSize(size), downloadedSize(0), completed(false) {}
    
    double getProgress() const {
        return static_cast<double>(downloadedSize.load()) / totalSize * 100.0;
    }
    
    double getSpeed() const {
        auto now = chrono::steady_clock::now();
        auto duration = chrono::duration_cast<chrono::seconds>(now - startTime);
        if (duration.count() > 0) {
            return static_cast<double>(downloadedSize.load()) / duration.count();
        }
        return 0.0;
    }
};

class DownloadManager {
private:
    vector<unique_ptr<DownloadTask>> tasks;
    mutex consoleMutex;
    atomic<int> activeDownloads;
    const int MAX_CONCURRENT = 3;
    
public:
    DownloadManager() : activeDownloads(0) {}
    
    void addDownload(const string& filename, size_t size) {
        tasks.push_back(make_unique<DownloadTask>(filename, size));
    }
    
    void downloadFile(DownloadTask& task) {
        // Wait for available slot
        while (activeDownloads.load() >= MAX_CONCURRENT) {
            this_thread::sleep_for(chrono::milliseconds(100));
        }
        
        activeDownloads++;
        task.startTime = chrono::steady_clock::now();
        
        {
            lock_guard<mutex> lock(consoleMutex);
            cout << "🚀 Starting download: " << task.filename << endl;
        }
        
        // Simulate download with random speed
        size_t chunkSize = task.totalSize / 100;  // 1% chunks
        
        while (task.downloadedSize.load() < task.totalSize) {
            // Simulate network delay
            this_thread::sleep_for(chrono::milliseconds(50 + rand() % 100));
            
            // Download chunk
            size_t remaining = task.totalSize - task.downloadedSize.load();
            size_t downloadSize = min(chunkSize, remaining);
            task.downloadedSize += downloadSize;
            
            // Random chance of temporary slowdown
            if (rand() % 20 == 0) {
                this_thread::sleep_for(chrono::milliseconds(200));
            }
        }
        
        task.completed = true;
        activeDownloads--;
        
        {
            lock_guard<mutex> lock(consoleMutex);
            cout << "✅ Completed: " << task.filename 
                 << " (" << task.totalSize << " bytes)" << endl;
        }
    }
    
    void startAllDownloads() {
        vector<thread> downloadThreads;
        
        // Start download threads
        for (auto& task : tasks) {
            downloadThreads.emplace_back(&DownloadManager::downloadFile, this, ref(*task));
        }
        
        // Progress monitoring thread
        thread progressThread(&DownloadManager::monitorProgress, this);
        
        // Wait for all downloads to complete
        for (auto& t : downloadThreads) {
            t.join();
        }
        
        // Stop progress monitoring
        progressThread.detach();
        
        cout << "\n🎉 All downloads completed!" << endl;
    }
    
private:
    void monitorProgress() {
        while (true) {
            bool allCompleted = true;
            
            {
                lock_guard<mutex> lock(consoleMutex);
                cout << "\n📊 DOWNLOAD PROGRESS:" << endl;
                cout << string(60, '-') << endl;
                
                for (const auto& task : tasks) {
                    if (!task->completed.load()) {
                        allCompleted = false;
                    }
                    
                    cout << "📄 " << setw(20) << left << task->filename 
                         << " [" << setw(6) << fixed << setprecision(1) 
                         << task->getProgress() << "%] "
                         << "(" << setw(8) << task->downloadedSize.load() 
                         << "/" << task->totalSize << " bytes) "
                         << setw(8) << static_cast<int>(task->getSpeed()) << " B/s"
                         << (task->completed.load() ? " ✅" : " ⬇️") << endl;
                }
                
                cout << "\nActive downloads: " << activeDownloads.load() 
                     << "/" << MAX_CONCURRENT << endl;
            }
            
            if (allCompleted) break;
            
            this_thread::sleep_for(chrono::seconds(1));
        }
    }
};

int main() {
    cout << "=== MULTITHREADED DOWNLOAD MANAGER ===" << endl;
    
    DownloadManager manager;
    
    // Add some downloads
    manager.addDownload("document.pdf", 2500000);
    manager.addDownload("video.mp4", 15000000);
    manager.addDownload("software.zip", 8000000);
    manager.addDownload("music.mp3", 4500000);
    manager.addDownload("image.jpg", 1200000);
    manager.addDownload("data.csv", 3000000);
    
    cout << "Starting downloads with max " << 3 << " concurrent connections..." << endl;
    
    auto startTime = chrono::steady_clock::now();
    manager.startAllDownloads();
    auto endTime = chrono::steady_clock::now();
    
    auto duration = chrono::duration_cast<chrono::seconds>(endTime - startTime);
    cout << "Total time: " << duration.count() << " seconds" << endl;
    
    return 0;
}

Ví dụ 2: Producer-Consumer với Thread Pool

cpp
#include <iostream>
#include <thread>
#include <queue>
#include <vector>
#include <functional>
#include <condition_variable>
#include <future>
#include <atomic>
using namespace std;

// Task type
using Task = function<void()>;

class ThreadPool {
private:
    vector<thread> workers;
    queue<Task> tasks;
    mutex queueMutex;
    condition_variable condition;
    atomic<bool> stop;
    
public:
    ThreadPool(size_t numThreads) : stop(false) {
        // Create worker threads
        for (size_t i = 0; i < numThreads; i++) {
            workers.emplace_back([this, i] {
                cout << "👷 Worker " << i << " started" << endl;
                
                while (true) {
                    Task task;
                    
                    {
                        unique_lock<mutex> lock(queueMutex);
                        condition.wait(lock, [this] { return stop.load() || !tasks.empty(); });
                        
                        if (stop.load() && tasks.empty()) {
                            break;
                        }
                        
                        task = move(tasks.front());
                        tasks.pop();
                    }
                    
                    // Execute task
                    task();
                }
                
                cout << "👷 Worker " << i << " stopped" << endl;
            });
        }
    }
    
    ~ThreadPool() {
        shutdown();
    }
    
    template<typename F, typename... Args>
    auto enqueue(F&& f, Args&&... args) -> future<typename result_of<F(Args...)>::type> {
        using return_type = typename result_of<F(Args...)>::type;
        
        // Package task
        auto task = make_shared<packaged_task<return_type()>>(
            bind(forward<F>(f), forward<Args>(args)...)
        );
        
        future<return_type> result = task->get_future();
        
        {
            unique_lock<mutex> lock(queueMutex);
            
            if (stop.load()) {
                throw runtime_error("ThreadPool is stopped");
            }
            
            tasks.emplace([task]() { (*task)(); });
        }
        
        condition.notify_one();
        return result;
    }
    
    void shutdown() {
        {
            unique_lock<mutex> lock(queueMutex);
            stop = true;
        }
        
        condition.notify_all();
        
        for (thread& worker : workers) {
            if (worker.joinable()) {
                worker.join();
            }
        }
    }
    
    size_t getQueueSize() {
        lock_guard<mutex> lock(queueMutex);
        return tasks.size();
    }
};

// Example work functions
int heavyCalculation(int n, const string& name) {
    cout << "🔢 " << name << " calculating sum of 1 to " << n << "..." << endl;
    
    int sum = 0;
    for (int i = 1; i <= n; i++) {
        sum += i;
        // Simulate heavy work
        if (i % 10000 == 0) {
            this_thread::sleep_for(chrono::milliseconds(10));
        }
    }
    
    cout << "✅ " << name << " completed: " << sum << endl;
    return sum;
}

void fileProcessing(const string& filename) {
    cout << "📁 Processing file: " << filename << endl;
    
    // Simulate file processing
    this_thread::sleep_for(chrono::milliseconds(500 + rand() % 1000));
    
    cout << "✅ File processed: " << filename << endl;
}

string dataTransform(const string& input) {
    cout << "🔄 Transforming data: " << input << endl;
    
    // Simulate data transformation
    this_thread::sleep_for(chrono::milliseconds(200 + rand() % 300));
    
    string result = input + "_transformed";
    cout << "✅ Transform completed: " << result << endl;
    return result;
}

int main() {
    cout << "=== THREAD POOL DEMO ===" << endl;
    
    // Create thread pool with 4 workers
    ThreadPool pool(4);
    
    vector<future<int>> calculationResults;
    vector<future<void>> fileResults;
    vector<future<string>> transformResults;
    
    cout << "\n=== SUBMITTING TASKS ===" << endl;
    
    // Submit calculation tasks
    for (int i = 1; i <= 5; i++) {
        string taskName = "Task_" + to_string(i);
        auto future = pool.enqueue(heavyCalculation, i * 50000, taskName);
        calculationResults.push_back(move(future));
    }
    
    // Submit file processing tasks
    vector<string> files = {"data1.txt", "config.xml", "image.jpg", "log.txt", "backup.zip"};
    for (const string& filename : files) {
        auto future = pool.enqueue(fileProcessing, filename);
        fileResults.push_back(move(future));
    }
    
    // Submit data transformation tasks
    vector<string> data = {"dataset_A", "dataset_B", "dataset_C"};
    for (const string& input : data) {
        auto future = pool.enqueue(dataTransform, input);
        transformResults.push_back(move(future));
    }
    
    cout << "📊 Queue size: " << pool.getQueueSize() << " tasks" << endl;
    
    cout << "\n=== COLLECTING RESULTS ===" << endl;
    
    // Collect calculation results
    cout << "\n📊 Calculation Results:" << endl;
    for (size_t i = 0; i < calculationResults.size(); i++) {
        int result = calculationResults[i].get();
        cout << "Result " << (i + 1) << ": " << result << endl;
    }
    
    // Wait for file processing
    cout << "\n📁 File Processing Results:" << endl;
    for (size_t i = 0; i < fileResults.size(); i++) {
        fileResults[i].get();  // Just wait for completion
        cout << "File " << (i + 1) << " processed" << endl;
    }
    
    // Collect transformation results
    cout << "\n🔄 Transformation Results:" << endl;
    for (size_t i = 0; i < transformResults.size(); i++) {
        string result = transformResults[i].get();
        cout << "Transform " << (i + 1) << ": " << result << endl;
    }
    
    cout << "\n=== ALL TASKS COMPLETED ===" << endl;
    
    return 0;
}

Ví dụ 3: Real-time Data Processing Pipeline

cpp
#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <chrono>
#include <random>
#include <iomanip>
using namespace std;

// Data structures
struct SensorData {
    int sensorId;
    double temperature;
    double humidity;
    chrono::steady_clock::time_point timestamp;
    
    SensorData(int id, double temp, double hum) 
        : sensorId(id), temperature(temp), humidity(hum), 
          timestamp(chrono::steady_clock::now()) {}
};

struct ProcessedData {
    int sensorId;
    double avgTemperature;
    double avgHumidity;
    bool alert;
    chrono::steady_clock::time_point processTime;
    
    ProcessedData(const SensorData& data, bool isAlert = false) 
        : sensorId(data.sensorId), avgTemperature(data.temperature), 
          avgHumidity(data.humidity), alert(isAlert),
          processTime(chrono::steady_clock::now()) {}
};

// Thread-safe queue template
template<typename T>
class ThreadSafeQueue {
private:
    queue<T> queue_;
    mutable mutex mutex_;
    condition_variable condition_;
    
public:
    void push(T item) {
        lock_guard<mutex> lock(mutex_);
        queue_.push(item);
        condition_.notify_one();
    }
    
    bool tryPop(T& item) {
        lock_guard<mutex> lock(mutex_);
        if (queue_.empty()) {
            return false;
        }
        item = queue_.front();
        queue_.pop();
        return true;
    }
    
    void waitAndPop(T& item) {
        unique_lock<mutex> lock(mutex_);
        condition_.wait(lock, [this] { return !queue_.empty(); });
        item = queue_.front();
        queue_.pop();
    }
    
    bool empty() const {
        lock_guard<mutex> lock(mutex_);
        return queue_.empty();
    }
    
    size_t size() const {
        lock_guard<mutex> lock(mutex_);
        return queue_.size();
    }
};

class DataProcessingPipeline {
private:
    ThreadSafeQueue<SensorData> rawDataQueue;
    ThreadSafeQueue<ProcessedData> processedDataQueue;
    ThreadSafeQueue<ProcessedData> alertQueue;
    
    atomic<bool> running;
    atomic<int> totalProcessed;
    atomic<int> totalAlerts;
    
    // Threads
    thread dataGeneratorThread;
    thread dataProcessorThread;
    thread alertHandlerThread;
    thread monitorThread;
    
    // Random number generator
    mutable mt19937 rng;
    mutable uniform_real_distribution<double> tempDist;
    mutable uniform_real_distribution<double> humidityDist;
    mutable uniform_int_distribution<int> sensorDist;
    
public:
    DataProcessingPipeline() 
        : running(false), totalProcessed(0), totalAlerts(0),
          rng(chrono::steady_clock::now().time_since_epoch().count()),
          tempDist(15.0, 35.0), humidityDist(30.0, 80.0), sensorDist(1, 10) {}
    
    ~DataProcessingPipeline() {
        stop();
    }
    
    void start() {
        running = true;
        
        // Start threads
        dataGeneratorThread = thread(&DataProcessingPipeline::generateData, this);
        dataProcessorThread = thread(&DataProcessingPipeline::processData, this);
        alertHandlerThread = thread(&DataProcessingPipeline::handleAlerts, this);
        monitorThread = thread(&DataProcessingPipeline::monitorSystem, this);
        
        cout << "🚀 Data processing pipeline started!" << endl;
    }
    
    void stop() {
        if (running.load()) {
            running = false;
            
            cout << "\n🛑 Stopping pipeline..." << endl;
            
            // Join threads
            if (dataGeneratorThread.joinable()) dataGeneratorThread.join();
            if (dataProcessorThread.joinable()) dataProcessorThread.join();
            if (alertHandlerThread.joinable()) alertHandlerThread.join();
            if (monitorThread.joinable()) monitorThread.join();
            
            cout << "✅ Pipeline stopped" << endl;
        }
    }
    
    void runFor(int seconds) {
        start();
        this_thread::sleep_for(chrono::seconds(seconds));
        stop();
    }
    
private:
    void generateData() {
        cout << "🏭 Data generator started" << endl;
        
        while (running.load()) {
            // Generate random sensor data
            int sensorId = sensorDist(rng);
            double temp = tempDist(rng);
            double humidity = humidityDist(rng);
            
            // Occasionally generate extreme values for alerts
            if (rng() % 20 == 0) {
                temp = (rng() % 2 == 0) ? 50.0 : -10.0;  // Extreme temperature
            }
            
            SensorData data(sensorId, temp, humidity);
            rawDataQueue.push(data);
            
            // Generate data every 100-300ms
            this_thread::sleep_for(chrono::milliseconds(100 + rng() % 200));
        }
        
        cout << "🏭 Data generator stopped" << endl;
    }
    
    void processData() {
        cout << "⚙️ Data processor started" << endl;
        
        while (running.load() || !rawDataQueue.empty()) {
            SensorData rawData;
            
            if (rawDataQueue.tryPop(rawData)) {
                // Simulate processing time
                this_thread::sleep_for(chrono::milliseconds(50));
                
                // Check for alerts
                bool isAlert = (rawData.temperature > 40.0 || rawData.temperature < 0.0 ||
                               rawData.humidity > 90.0 || rawData.humidity < 20.0);
                
                ProcessedData processed(rawData, isAlert);
                processedDataQueue.push(processed);
                
                if (isAlert) {
                    alertQueue.push(processed);
                    totalAlerts++;
                }
                
                totalProcessed++;
            } else {
                this_thread::sleep_for(chrono::milliseconds(10));
            }
        }
        
        cout << "⚙️ Data processor stopped" << endl;
    }
    
    void handleAlerts() {
        cout << "🚨 Alert handler started" << endl;
        
        while (running.load() || !alertQueue.empty()) {
            ProcessedData alertData;
            
            if (alertQueue.tryPop(alertData)) {
                cout << "🚨 ALERT! Sensor " << alertData.sensorId 
                     << " - Temp: " << fixed << setprecision(1) << alertData.avgTemperature 
                     << "°C, Humidity: " << alertData.avgHumidity << "%" << endl;
                
                // Simulate alert handling (send email, SMS, etc.)
                this_thread::sleep_for(chrono::milliseconds(100));
            } else {
                this_thread::sleep_for(chrono::milliseconds(50));
            }
        }
        
        cout << "🚨 Alert handler stopped" << endl;
    }
    
    void monitorSystem() {
        cout << "📊 System monitor started" << endl;
        
        while (running.load()) {
            cout << "\n📊 SYSTEM STATUS:" << endl;
            cout << "Raw data queue: " << rawDataQueue.size() << " items" << endl;
            cout << "Processed queue: " << processedDataQueue.size() << " items" << endl;
            cout << "Alert queue: " << alertQueue.size() << " items" << endl;
            cout << "Total processed: " << totalProcessed.load() << endl;
            cout << "Total alerts: " << totalAlerts.load() << endl;
            cout << "Processing rate: " << (totalProcessed.load() / 5.0) << " items/sec" << endl;
            
            this_thread::sleep_for(chrono::seconds(5));
        }
        
        cout << "📊 System monitor stopped" << endl;
    }
};

int main() {
    cout << "=== REAL-TIME DATA PROCESSING PIPELINE ===" << endl;
    
    DataProcessingPipeline pipeline;
    
    cout << "Running pipeline for 20 seconds..." << endl;
    pipeline.runFor(20);
    
    cout << "\n🎉 Demo completed!" << endl;
    
    return 0;
}

🏋️ Thực hành

Bài tập 1: Parallel Matrix Multiplication

Implement parallel matrix multiplication sử dụng threads:

  • Chia ma trận thành blocks
  • Mỗi thread tính một block
  • So sánh performance với sequential version

Bài tập 2: Web Server Simulator

Tạo simple web server simulator:

  • Thread pool để handle requests
  • Thread-safe request queue
  • Statistics tracking (requests/second, response times)

Bài tập 3: Producer-Consumer với Multiple Producers

Extend producer-consumer pattern:

  • Nhiều producer threads
  • Một consumer thread
  • Buffer với size limit
  • Graceful shutdown mechanism

Bài tập 4: Parallel Image Processing

Simulate image processing:

  • Divide image into sections
  • Each thread processes a section
  • Apply filters (blur, sharpen, etc.)
  • Combine results

Lời giải chi tiết

Bài tập 1 - Parallel Matrix Multiplication:

cpp
#include <iostream>
#include <vector>
#include <thread>
#include <chrono>
using namespace std;

class ParallelMatrix {
private:
    static void multiplyBlock(const vector<vector<int>>& A, 
                             const vector<vector<int>>& B,
                             vector<vector<int>>& C,
                             int startRow, int endRow,
                             int startCol, int endCol,
                             int K) {
        for (int i = startRow; i < endRow; i++) {
            for (int j = startCol; j < endCol; j++) {
                C[i][j] = 0;
                for (int k = 0; k < K; k++) {
                    C[i][j] += A[i][k] * B[k][j];
                }
            }
        }
    }
    
public:
    static vector<vector<int>> multiply(const vector<vector<int>>& A,
                                       const vector<vector<int>>& B,
                                       int numThreads = 4) {
        int N = A.size();
        int M = B[0].size();
        int K = B.size();
        
        vector<vector<int>> C(N, vector<int>(M, 0));
        vector<thread> threads;
        
        int blockSize = N / numThreads;
        
        for (int t = 0; t < numThreads; t++) {
            int startRow = t * blockSize;
            int endRow = (t == numThreads - 1) ? N : (t + 1) * blockSize;
            
            threads.emplace_back(multiplyBlock, 
                               ref(A), ref(B), ref(C),
                               startRow, endRow, 0, M, K);
        }
        
        for (auto& t : threads) {
            t.join();
        }
        
        return C;
    }
};

📋 Tóm tắt

Các điểm quan trọng

Core Threading Concepts:

  • std::thread - Tạo và quản lý threads
  • std::mutex - Mutual exclusion locks
  • std::condition_variable - Thread synchronization
  • std::atomic - Lock-free atomic operations
  • std::async - High-level async programming

Synchronization Primitives:

  • Mutex: Basic locking mechanism
  • Lock Guards: RAII-style automatic locking
  • Condition Variables: Wait/notify patterns
  • Atomic Variables: Lock-free operations

Common Patterns:

  • Producer-Consumer: Queue-based communication
  • Thread Pool: Reusable worker threads
  • Pipeline: Sequential processing stages

Best Practices

Thread Safety:

  • Minimize shared state
  • Use RAII locks (lock_guard, unique_lock)
  • Prefer atomic operations over mutex when possible
  • Design for deadlock prevention

Performance:

  • Don't create too many threads (use thread pools)
  • Balance workload across threads
  • Consider cache locality
  • Profile to find bottlenecks

Debugging:

  • Use thread-safe logging
  • Avoid shared global state
  • Test with different thread counts
  • Use thread sanitizers

Chuẩn bị cho bài tiếp theo

Bài học tiếp theo sẽ tìm hiểu về Move Semantics - tối ưu hóa performance:

  • Rvalue References: T&& và move constructors
  • std::move: Explicit move operations
  • Perfect Forwarding: std::forward và universal references
  • Copy vs Move: Khi nào sử dụng từng loại
  • Performance Benefits: Tránh unnecessary copies

Multithreading là kỹ năng quan trọng cho ứng dụng hiệu suất cao. Hãy luyện tập để viết code concurrent an toàn!

Khóa học C++ miễn phí