🔄 Đa luồng (Multithreading) trong C++
📖 Giới thiệu
Multithreading là kỹ thuật cho phép chương trình thực hiện nhiều tác vụ đồng thời, tận dụng tối đa khả năng của CPU đa nhân. C++11 đã giới thiệu thư viện <thread> cung cấp các công cụ mạnh mẽ để làm việc với threads một cách tiêu chuẩn và portable.
Tại sao cần Multithreading?
- Performance: Tận dụng CPU đa nhân
- Responsiveness: UI không bị đơ khi xử lý tác vụ nặng
- Parallel processing: Xử lý dữ liệu lớn song song
- Concurrent operations: I/O operations, network requests
Ứng dụng thực tiễn:
- Game engines: Rendering, physics, AI chạy song song
- Web servers: Xử lý nhiều requests đồng thời
- Image/Video processing: Parallel pixel processing
- Scientific computing: Matrix operations, simulations
Thách thức với Multithreading:
- Race conditions: Nhiều threads truy cập shared data
- Deadlocks: Threads chờ nhau vô hạn
- Thread safety: Đảm bảo data consistency
- Debugging: Khó debug concurrent code
🔧 Cú pháp
Thư viện cần thiết
#include <thread> // std::thread
#include <mutex> // std::mutex, std::lock_guard
#include <condition_variable> // std::condition_variable
#include <future> // std::async, std::future
#include <atomic> // std::atomic
#include <chrono> // std::chrono (for sleep)Tạo và quản lý threads
#include <iostream>
#include <thread>
#include <chrono>
using namespace std;
// Function để chạy trong thread
void simpleTask(int id) {
cout << "Thread " << id << " starting..." << endl;
this_thread::sleep_for(chrono::seconds(2));
cout << "Thread " << id << " finished!" << endl;
}
int main() {
// Tạo threads
thread t1(simpleTask, 1);
thread t2(simpleTask, 2);
// Chờ threads hoàn thành
t1.join(); // Block until t1 finishes
t2.join(); // Block until t2 finishes
cout << "All threads completed!" << endl;
return 0;
}Lambda functions với threads
// Sử dụng lambda
thread t1([]() {
cout << "Lambda thread running!" << endl;
});
// Lambda với capture
int data = 42;
thread t2([&data]() {
cout << "Data from main: " << data << endl;
});
t1.join();
t2.join();Mutex và Thread Safety
#include <mutex>
mutex mtx; // Global mutex
int sharedCounter = 0;
void incrementCounter(int times) {
for (int i = 0; i < times; i++) {
// Critical section
mtx.lock();
sharedCounter++;
mtx.unlock();
}
}
// Hoặc sử dụng lock_guard (RAII)
void safeIncrement(int times) {
for (int i = 0; i < times; i++) {
lock_guard<mutex> lock(mtx); // Automatic unlock
sharedCounter++;
}
}Condition Variables
#include <condition_variable>
#include <queue>
mutex queueMutex;
condition_variable cv;
queue<int> dataQueue;
bool finished = false;
// Producer thread
void producer() {
for (int i = 0; i < 10; i++) {
{
lock_guard<mutex> lock(queueMutex);
dataQueue.push(i);
cout << "Produced: " << i << endl;
}
cv.notify_one(); // Notify waiting consumer
this_thread::sleep_for(chrono::milliseconds(100));
}
{
lock_guard<mutex> lock(queueMutex);
finished = true;
}
cv.notify_all();
}
// Consumer thread
void consumer(int id) {
while (true) {
unique_lock<mutex> lock(queueMutex);
cv.wait(lock, []{ return !dataQueue.empty() || finished; });
if (!dataQueue.empty()) {
int data = dataQueue.front();
dataQueue.pop();
lock.unlock();
cout << "Consumer " << id << " consumed: " << data << endl;
} else if (finished) {
break;
}
}
}Atomic Variables
#include <atomic>
atomic<int> atomicCounter(0);
void atomicIncrement(int times) {
for (int i = 0; i < times; i++) {
atomicCounter++; // Thread-safe without mutex
}
}
// Các operations khác
atomicCounter.store(100); // Set value
int value = atomicCounter.load(); // Get value
int old = atomicCounter.exchange(200); // Set new, return oldstd::async và std::future
#include <future>
// Async function
int calculation(int n) {
int sum = 0;
for (int i = 1; i <= n; i++) {
sum += i;
}
return sum;
}
int main() {
// Launch async task
future<int> result = async(launch::async, calculation, 1000);
// Do other work while calculation runs
cout << "Calculation running in background..." << endl;
// Get result (blocks if not ready)
int sum = result.get();
cout << "Result: " << sum << endl;
return 0;
}🔬 Phân tích & Giải thích chi tiết
Thread Lifecycle
Thread States:
- Created: Thread object tạo nhưng chưa start
- Running: Thread đang thực thi
- Blocked: Thread chờ resource (mutex, I/O)
- Finished: Thread hoàn thành execution
Join vs Detach:
thread t(someFunction);
// Option 1: Join - wait for thread to finish
t.join(); // Main thread waits
// Option 2: Detach - let thread run independently
t.detach(); // Main thread continues, thread runs in backgroundSynchronization Mechanisms
Mutex Types:
mutex: Basic mutual exclusionrecursive_mutex: Same thread có thể lock nhiều lầntimed_mutex: Có timeout cho lock operationsshared_mutex: Multiple readers, single writer (C++17)
Lock Types:
lock_guard: RAII lock, tự động unlockunique_lock: Flexible lock, có thể unlock manualshared_lock: Shared ownership (với shared_mutex)
Race Conditions và Data Races
Race Condition Example:
// UNSAFE - Race condition
int counter = 0;
void unsafeIncrement() {
for (int i = 0; i < 1000; i++) {
counter++; // Read-Modify-Write not atomic
}
}
// SAFE - Using mutex
mutex mtx;
void safeIncrement() {
for (int i = 0; i < 1000; i++) {
lock_guard<mutex> lock(mtx);
counter++;
}
}
// SAFE - Using atomic
atomic<int> atomicCounter(0);
void atomicIncrement() {
for (int i = 0; i < 1000; i++) {
atomicCounter++; // Atomic operation
}
}Deadlock Prevention
Common Deadlock Scenario:
mutex mtx1, mtx2;
// Thread 1
void thread1() {
lock_guard<mutex> lock1(mtx1);
this_thread::sleep_for(chrono::milliseconds(10));
lock_guard<mutex> lock2(mtx2); // Might deadlock
}
// Thread 2
void thread2() {
lock_guard<mutex> lock2(mtx2);
this_thread::sleep_for(chrono::milliseconds(10));
lock_guard<mutex> lock1(mtx1); // Might deadlock
}
// SOLUTION: Always lock in same order
void safeThread1() {
lock_guard<mutex> lock1(mtx1); // Always lock mtx1 first
lock_guard<mutex> lock2(mtx2); // Then mtx2
}
void safeThread2() {
lock_guard<mutex> lock1(mtx1); // Same order
lock_guard<mutex> lock2(mtx2);
}💻 Ví dụ minh họa
Ví dụ 1: Download Manager với Multithreading
#include <iostream>
#include <thread>
#include <vector>
#include <string>
#include <chrono>
#include <mutex>
#include <atomic>
#include <iomanip>
using namespace std;
// Simulate file download
struct DownloadTask {
string filename;
size_t totalSize;
atomic<size_t> downloadedSize;
atomic<bool> completed;
chrono::steady_clock::time_point startTime;
DownloadTask(const string& name, size_t size)
: filename(name), totalSize(size), downloadedSize(0), completed(false) {}
double getProgress() const {
return static_cast<double>(downloadedSize.load()) / totalSize * 100.0;
}
double getSpeed() const {
auto now = chrono::steady_clock::now();
auto duration = chrono::duration_cast<chrono::seconds>(now - startTime);
if (duration.count() > 0) {
return static_cast<double>(downloadedSize.load()) / duration.count();
}
return 0.0;
}
};
class DownloadManager {
private:
vector<unique_ptr<DownloadTask>> tasks;
mutex consoleMutex;
atomic<int> activeDownloads;
const int MAX_CONCURRENT = 3;
public:
DownloadManager() : activeDownloads(0) {}
void addDownload(const string& filename, size_t size) {
tasks.push_back(make_unique<DownloadTask>(filename, size));
}
void downloadFile(DownloadTask& task) {
// Wait for available slot
while (activeDownloads.load() >= MAX_CONCURRENT) {
this_thread::sleep_for(chrono::milliseconds(100));
}
activeDownloads++;
task.startTime = chrono::steady_clock::now();
{
lock_guard<mutex> lock(consoleMutex);
cout << "🚀 Starting download: " << task.filename << endl;
}
// Simulate download with random speed
size_t chunkSize = task.totalSize / 100; // 1% chunks
while (task.downloadedSize.load() < task.totalSize) {
// Simulate network delay
this_thread::sleep_for(chrono::milliseconds(50 + rand() % 100));
// Download chunk
size_t remaining = task.totalSize - task.downloadedSize.load();
size_t downloadSize = min(chunkSize, remaining);
task.downloadedSize += downloadSize;
// Random chance of temporary slowdown
if (rand() % 20 == 0) {
this_thread::sleep_for(chrono::milliseconds(200));
}
}
task.completed = true;
activeDownloads--;
{
lock_guard<mutex> lock(consoleMutex);
cout << "✅ Completed: " << task.filename
<< " (" << task.totalSize << " bytes)" << endl;
}
}
void startAllDownloads() {
vector<thread> downloadThreads;
// Start download threads
for (auto& task : tasks) {
downloadThreads.emplace_back(&DownloadManager::downloadFile, this, ref(*task));
}
// Progress monitoring thread
thread progressThread(&DownloadManager::monitorProgress, this);
// Wait for all downloads to complete
for (auto& t : downloadThreads) {
t.join();
}
// Stop progress monitoring
progressThread.detach();
cout << "\n🎉 All downloads completed!" << endl;
}
private:
void monitorProgress() {
while (true) {
bool allCompleted = true;
{
lock_guard<mutex> lock(consoleMutex);
cout << "\n📊 DOWNLOAD PROGRESS:" << endl;
cout << string(60, '-') << endl;
for (const auto& task : tasks) {
if (!task->completed.load()) {
allCompleted = false;
}
cout << "📄 " << setw(20) << left << task->filename
<< " [" << setw(6) << fixed << setprecision(1)
<< task->getProgress() << "%] "
<< "(" << setw(8) << task->downloadedSize.load()
<< "/" << task->totalSize << " bytes) "
<< setw(8) << static_cast<int>(task->getSpeed()) << " B/s"
<< (task->completed.load() ? " ✅" : " ⬇️") << endl;
}
cout << "\nActive downloads: " << activeDownloads.load()
<< "/" << MAX_CONCURRENT << endl;
}
if (allCompleted) break;
this_thread::sleep_for(chrono::seconds(1));
}
}
};
int main() {
cout << "=== MULTITHREADED DOWNLOAD MANAGER ===" << endl;
DownloadManager manager;
// Add some downloads
manager.addDownload("document.pdf", 2500000);
manager.addDownload("video.mp4", 15000000);
manager.addDownload("software.zip", 8000000);
manager.addDownload("music.mp3", 4500000);
manager.addDownload("image.jpg", 1200000);
manager.addDownload("data.csv", 3000000);
cout << "Starting downloads with max " << 3 << " concurrent connections..." << endl;
auto startTime = chrono::steady_clock::now();
manager.startAllDownloads();
auto endTime = chrono::steady_clock::now();
auto duration = chrono::duration_cast<chrono::seconds>(endTime - startTime);
cout << "Total time: " << duration.count() << " seconds" << endl;
return 0;
}Ví dụ 2: Producer-Consumer với Thread Pool
#include <iostream>
#include <thread>
#include <queue>
#include <vector>
#include <functional>
#include <condition_variable>
#include <future>
#include <atomic>
using namespace std;
// Task type
using Task = function<void()>;
class ThreadPool {
private:
vector<thread> workers;
queue<Task> tasks;
mutex queueMutex;
condition_variable condition;
atomic<bool> stop;
public:
ThreadPool(size_t numThreads) : stop(false) {
// Create worker threads
for (size_t i = 0; i < numThreads; i++) {
workers.emplace_back([this, i] {
cout << "👷 Worker " << i << " started" << endl;
while (true) {
Task task;
{
unique_lock<mutex> lock(queueMutex);
condition.wait(lock, [this] { return stop.load() || !tasks.empty(); });
if (stop.load() && tasks.empty()) {
break;
}
task = move(tasks.front());
tasks.pop();
}
// Execute task
task();
}
cout << "👷 Worker " << i << " stopped" << endl;
});
}
}
~ThreadPool() {
shutdown();
}
template<typename F, typename... Args>
auto enqueue(F&& f, Args&&... args) -> future<typename result_of<F(Args...)>::type> {
using return_type = typename result_of<F(Args...)>::type;
// Package task
auto task = make_shared<packaged_task<return_type()>>(
bind(forward<F>(f), forward<Args>(args)...)
);
future<return_type> result = task->get_future();
{
unique_lock<mutex> lock(queueMutex);
if (stop.load()) {
throw runtime_error("ThreadPool is stopped");
}
tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();
return result;
}
void shutdown() {
{
unique_lock<mutex> lock(queueMutex);
stop = true;
}
condition.notify_all();
for (thread& worker : workers) {
if (worker.joinable()) {
worker.join();
}
}
}
size_t getQueueSize() {
lock_guard<mutex> lock(queueMutex);
return tasks.size();
}
};
// Example work functions
int heavyCalculation(int n, const string& name) {
cout << "🔢 " << name << " calculating sum of 1 to " << n << "..." << endl;
int sum = 0;
for (int i = 1; i <= n; i++) {
sum += i;
// Simulate heavy work
if (i % 10000 == 0) {
this_thread::sleep_for(chrono::milliseconds(10));
}
}
cout << "✅ " << name << " completed: " << sum << endl;
return sum;
}
void fileProcessing(const string& filename) {
cout << "📁 Processing file: " << filename << endl;
// Simulate file processing
this_thread::sleep_for(chrono::milliseconds(500 + rand() % 1000));
cout << "✅ File processed: " << filename << endl;
}
string dataTransform(const string& input) {
cout << "🔄 Transforming data: " << input << endl;
// Simulate data transformation
this_thread::sleep_for(chrono::milliseconds(200 + rand() % 300));
string result = input + "_transformed";
cout << "✅ Transform completed: " << result << endl;
return result;
}
int main() {
cout << "=== THREAD POOL DEMO ===" << endl;
// Create thread pool with 4 workers
ThreadPool pool(4);
vector<future<int>> calculationResults;
vector<future<void>> fileResults;
vector<future<string>> transformResults;
cout << "\n=== SUBMITTING TASKS ===" << endl;
// Submit calculation tasks
for (int i = 1; i <= 5; i++) {
string taskName = "Task_" + to_string(i);
auto future = pool.enqueue(heavyCalculation, i * 50000, taskName);
calculationResults.push_back(move(future));
}
// Submit file processing tasks
vector<string> files = {"data1.txt", "config.xml", "image.jpg", "log.txt", "backup.zip"};
for (const string& filename : files) {
auto future = pool.enqueue(fileProcessing, filename);
fileResults.push_back(move(future));
}
// Submit data transformation tasks
vector<string> data = {"dataset_A", "dataset_B", "dataset_C"};
for (const string& input : data) {
auto future = pool.enqueue(dataTransform, input);
transformResults.push_back(move(future));
}
cout << "📊 Queue size: " << pool.getQueueSize() << " tasks" << endl;
cout << "\n=== COLLECTING RESULTS ===" << endl;
// Collect calculation results
cout << "\n📊 Calculation Results:" << endl;
for (size_t i = 0; i < calculationResults.size(); i++) {
int result = calculationResults[i].get();
cout << "Result " << (i + 1) << ": " << result << endl;
}
// Wait for file processing
cout << "\n📁 File Processing Results:" << endl;
for (size_t i = 0; i < fileResults.size(); i++) {
fileResults[i].get(); // Just wait for completion
cout << "File " << (i + 1) << " processed" << endl;
}
// Collect transformation results
cout << "\n🔄 Transformation Results:" << endl;
for (size_t i = 0; i < transformResults.size(); i++) {
string result = transformResults[i].get();
cout << "Transform " << (i + 1) << ": " << result << endl;
}
cout << "\n=== ALL TASKS COMPLETED ===" << endl;
return 0;
}Ví dụ 3: Real-time Data Processing Pipeline
#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <chrono>
#include <random>
#include <iomanip>
using namespace std;
// Data structures
struct SensorData {
int sensorId;
double temperature;
double humidity;
chrono::steady_clock::time_point timestamp;
SensorData(int id, double temp, double hum)
: sensorId(id), temperature(temp), humidity(hum),
timestamp(chrono::steady_clock::now()) {}
};
struct ProcessedData {
int sensorId;
double avgTemperature;
double avgHumidity;
bool alert;
chrono::steady_clock::time_point processTime;
ProcessedData(const SensorData& data, bool isAlert = false)
: sensorId(data.sensorId), avgTemperature(data.temperature),
avgHumidity(data.humidity), alert(isAlert),
processTime(chrono::steady_clock::now()) {}
};
// Thread-safe queue template
template<typename T>
class ThreadSafeQueue {
private:
queue<T> queue_;
mutable mutex mutex_;
condition_variable condition_;
public:
void push(T item) {
lock_guard<mutex> lock(mutex_);
queue_.push(item);
condition_.notify_one();
}
bool tryPop(T& item) {
lock_guard<mutex> lock(mutex_);
if (queue_.empty()) {
return false;
}
item = queue_.front();
queue_.pop();
return true;
}
void waitAndPop(T& item) {
unique_lock<mutex> lock(mutex_);
condition_.wait(lock, [this] { return !queue_.empty(); });
item = queue_.front();
queue_.pop();
}
bool empty() const {
lock_guard<mutex> lock(mutex_);
return queue_.empty();
}
size_t size() const {
lock_guard<mutex> lock(mutex_);
return queue_.size();
}
};
class DataProcessingPipeline {
private:
ThreadSafeQueue<SensorData> rawDataQueue;
ThreadSafeQueue<ProcessedData> processedDataQueue;
ThreadSafeQueue<ProcessedData> alertQueue;
atomic<bool> running;
atomic<int> totalProcessed;
atomic<int> totalAlerts;
// Threads
thread dataGeneratorThread;
thread dataProcessorThread;
thread alertHandlerThread;
thread monitorThread;
// Random number generator
mutable mt19937 rng;
mutable uniform_real_distribution<double> tempDist;
mutable uniform_real_distribution<double> humidityDist;
mutable uniform_int_distribution<int> sensorDist;
public:
DataProcessingPipeline()
: running(false), totalProcessed(0), totalAlerts(0),
rng(chrono::steady_clock::now().time_since_epoch().count()),
tempDist(15.0, 35.0), humidityDist(30.0, 80.0), sensorDist(1, 10) {}
~DataProcessingPipeline() {
stop();
}
void start() {
running = true;
// Start threads
dataGeneratorThread = thread(&DataProcessingPipeline::generateData, this);
dataProcessorThread = thread(&DataProcessingPipeline::processData, this);
alertHandlerThread = thread(&DataProcessingPipeline::handleAlerts, this);
monitorThread = thread(&DataProcessingPipeline::monitorSystem, this);
cout << "🚀 Data processing pipeline started!" << endl;
}
void stop() {
if (running.load()) {
running = false;
cout << "\n🛑 Stopping pipeline..." << endl;
// Join threads
if (dataGeneratorThread.joinable()) dataGeneratorThread.join();
if (dataProcessorThread.joinable()) dataProcessorThread.join();
if (alertHandlerThread.joinable()) alertHandlerThread.join();
if (monitorThread.joinable()) monitorThread.join();
cout << "✅ Pipeline stopped" << endl;
}
}
void runFor(int seconds) {
start();
this_thread::sleep_for(chrono::seconds(seconds));
stop();
}
private:
void generateData() {
cout << "🏭 Data generator started" << endl;
while (running.load()) {
// Generate random sensor data
int sensorId = sensorDist(rng);
double temp = tempDist(rng);
double humidity = humidityDist(rng);
// Occasionally generate extreme values for alerts
if (rng() % 20 == 0) {
temp = (rng() % 2 == 0) ? 50.0 : -10.0; // Extreme temperature
}
SensorData data(sensorId, temp, humidity);
rawDataQueue.push(data);
// Generate data every 100-300ms
this_thread::sleep_for(chrono::milliseconds(100 + rng() % 200));
}
cout << "🏭 Data generator stopped" << endl;
}
void processData() {
cout << "⚙️ Data processor started" << endl;
while (running.load() || !rawDataQueue.empty()) {
SensorData rawData;
if (rawDataQueue.tryPop(rawData)) {
// Simulate processing time
this_thread::sleep_for(chrono::milliseconds(50));
// Check for alerts
bool isAlert = (rawData.temperature > 40.0 || rawData.temperature < 0.0 ||
rawData.humidity > 90.0 || rawData.humidity < 20.0);
ProcessedData processed(rawData, isAlert);
processedDataQueue.push(processed);
if (isAlert) {
alertQueue.push(processed);
totalAlerts++;
}
totalProcessed++;
} else {
this_thread::sleep_for(chrono::milliseconds(10));
}
}
cout << "⚙️ Data processor stopped" << endl;
}
void handleAlerts() {
cout << "🚨 Alert handler started" << endl;
while (running.load() || !alertQueue.empty()) {
ProcessedData alertData;
if (alertQueue.tryPop(alertData)) {
cout << "🚨 ALERT! Sensor " << alertData.sensorId
<< " - Temp: " << fixed << setprecision(1) << alertData.avgTemperature
<< "°C, Humidity: " << alertData.avgHumidity << "%" << endl;
// Simulate alert handling (send email, SMS, etc.)
this_thread::sleep_for(chrono::milliseconds(100));
} else {
this_thread::sleep_for(chrono::milliseconds(50));
}
}
cout << "🚨 Alert handler stopped" << endl;
}
void monitorSystem() {
cout << "📊 System monitor started" << endl;
while (running.load()) {
cout << "\n📊 SYSTEM STATUS:" << endl;
cout << "Raw data queue: " << rawDataQueue.size() << " items" << endl;
cout << "Processed queue: " << processedDataQueue.size() << " items" << endl;
cout << "Alert queue: " << alertQueue.size() << " items" << endl;
cout << "Total processed: " << totalProcessed.load() << endl;
cout << "Total alerts: " << totalAlerts.load() << endl;
cout << "Processing rate: " << (totalProcessed.load() / 5.0) << " items/sec" << endl;
this_thread::sleep_for(chrono::seconds(5));
}
cout << "📊 System monitor stopped" << endl;
}
};
int main() {
cout << "=== REAL-TIME DATA PROCESSING PIPELINE ===" << endl;
DataProcessingPipeline pipeline;
cout << "Running pipeline for 20 seconds..." << endl;
pipeline.runFor(20);
cout << "\n🎉 Demo completed!" << endl;
return 0;
}🏋️ Thực hành
Bài tập 1: Parallel Matrix Multiplication
Implement parallel matrix multiplication sử dụng threads:
- Chia ma trận thành blocks
- Mỗi thread tính một block
- So sánh performance với sequential version
Bài tập 2: Web Server Simulator
Tạo simple web server simulator:
- Thread pool để handle requests
- Thread-safe request queue
- Statistics tracking (requests/second, response times)
Bài tập 3: Producer-Consumer với Multiple Producers
Extend producer-consumer pattern:
- Nhiều producer threads
- Một consumer thread
- Buffer với size limit
- Graceful shutdown mechanism
Bài tập 4: Parallel Image Processing
Simulate image processing:
- Divide image into sections
- Each thread processes a section
- Apply filters (blur, sharpen, etc.)
- Combine results
Lời giải chi tiết
Bài tập 1 - Parallel Matrix Multiplication:
#include <iostream>
#include <vector>
#include <thread>
#include <chrono>
using namespace std;
class ParallelMatrix {
private:
static void multiplyBlock(const vector<vector<int>>& A,
const vector<vector<int>>& B,
vector<vector<int>>& C,
int startRow, int endRow,
int startCol, int endCol,
int K) {
for (int i = startRow; i < endRow; i++) {
for (int j = startCol; j < endCol; j++) {
C[i][j] = 0;
for (int k = 0; k < K; k++) {
C[i][j] += A[i][k] * B[k][j];
}
}
}
}
public:
static vector<vector<int>> multiply(const vector<vector<int>>& A,
const vector<vector<int>>& B,
int numThreads = 4) {
int N = A.size();
int M = B[0].size();
int K = B.size();
vector<vector<int>> C(N, vector<int>(M, 0));
vector<thread> threads;
int blockSize = N / numThreads;
for (int t = 0; t < numThreads; t++) {
int startRow = t * blockSize;
int endRow = (t == numThreads - 1) ? N : (t + 1) * blockSize;
threads.emplace_back(multiplyBlock,
ref(A), ref(B), ref(C),
startRow, endRow, 0, M, K);
}
for (auto& t : threads) {
t.join();
}
return C;
}
};📋 Tóm tắt
Các điểm quan trọng
Core Threading Concepts:
std::thread- Tạo và quản lý threadsstd::mutex- Mutual exclusion locksstd::condition_variable- Thread synchronizationstd::atomic- Lock-free atomic operationsstd::async- High-level async programming
Synchronization Primitives:
- Mutex: Basic locking mechanism
- Lock Guards: RAII-style automatic locking
- Condition Variables: Wait/notify patterns
- Atomic Variables: Lock-free operations
Common Patterns:
- Producer-Consumer: Queue-based communication
- Thread Pool: Reusable worker threads
- Pipeline: Sequential processing stages
Best Practices
Thread Safety:
- Minimize shared state
- Use RAII locks (lock_guard, unique_lock)
- Prefer atomic operations over mutex when possible
- Design for deadlock prevention
Performance:
- Don't create too many threads (use thread pools)
- Balance workload across threads
- Consider cache locality
- Profile to find bottlenecks
Debugging:
- Use thread-safe logging
- Avoid shared global state
- Test with different thread counts
- Use thread sanitizers
Chuẩn bị cho bài tiếp theo
Bài học tiếp theo sẽ tìm hiểu về Move Semantics - tối ưu hóa performance:
- Rvalue References:
T&&và move constructors - std::move: Explicit move operations
- Perfect Forwarding:
std::forwardvà universal references - Copy vs Move: Khi nào sử dụng từng loại
- Performance Benefits: Tránh unnecessary copies
Multithreading là kỹ năng quan trọng cho ứng dụng hiệu suất cao. Hãy luyện tập để viết code concurrent an toàn!