授课语音

实现线程安全的队列(Queue)

线程安全的队列是指多个线程同时访问该队列时能够保证数据的完整性与一致性,避免竞态条件和数据不一致的情况。我们可以使用多种方法来实现线程安全的队列,比如使用互斥锁(mutex)、自旋锁(spinlock)或者条件变量(condition variable)等同步机制。

以下是几种常见的实现方式:

1. 使用互斥锁实现线程安全队列

1.1 队列基本结构

我们首先定义一个基本的队列结构,队列包括头尾指针、队列长度、队列的最大容量等字段。

1.2 使用互斥锁保护队列

为了保证多线程环境下对队列的安全访问,我们使用 pthread_mutex_t(互斥锁)来保证在操作队列时的同步,防止多个线程同时访问队列导致竞态条件。

1.3 实现代码

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdbool.h>

// 队列节点结构
typedef struct Node {
    int data;
    struct Node* next;
} Node;

// 队列结构
typedef struct Queue {
    Node* front;          // 队列头
    Node* rear;           // 队列尾
    pthread_mutex_t lock; // 互斥锁,保证线程安全
} Queue;

// 初始化队列
void initQueue(Queue* q) {
    q->front = q->rear = NULL;
    pthread_mutex_init(&q->lock, NULL);
}

// 入队操作
void enqueue(Queue* q, int data) {
    pthread_mutex_lock(&q->lock); // 加锁,确保线程安全
    
    Node* newNode = (Node*)malloc(sizeof(Node));
    if (newNode == NULL) {
        printf("Memory allocation failed!\n");
        pthread_mutex_unlock(&q->lock); // 解锁
        return;
    }
    newNode->data = data;
    newNode->next = NULL;

    if (q->rear == NULL) {
        q->front = q->rear = newNode;
    } else {
        q->rear->next = newNode;
        q->rear = newNode;
    }

    pthread_mutex_unlock(&q->lock); // 解锁
}

// 出队操作
bool dequeue(Queue* q, int* data) {
    pthread_mutex_lock(&q->lock); // 加锁,确保线程安全

    if (q->front == NULL) {
        pthread_mutex_unlock(&q->lock); // 解锁
        return false; // 队列为空
    }

    Node* temp = q->front;
    *data = temp->data;
    q->front = q->front->next;

    if (q->front == NULL) {
        q->rear = NULL; // 队列为空时,rear 也应为 NULL
    }

    free(temp);
    pthread_mutex_unlock(&q->lock); // 解锁

    return true;
}

// 打印队列内容
void printQueue(Queue* q) {
    pthread_mutex_lock(&q->lock); // 加锁,确保线程安全

    Node* current = q->front;
    while (current != NULL) {
        printf("%d ", current->data);
        current = current->next;
    }
    printf("\n");

    pthread_mutex_unlock(&q->lock); // 解锁
}

// 销毁队列
void destroyQueue(Queue* q) {
    pthread_mutex_lock(&q->lock); // 加锁,确保线程安全

    Node* current = q->front;
    while (current != NULL) {
        Node* temp = current;
        current = current->next;
        free(temp);
    }

    pthread_mutex_unlock(&q->lock); // 解锁
    pthread_mutex_destroy(&q->lock); // 销毁锁
}

int main() {
    Queue q;
    initQueue(&q);

    enqueue(&q, 10);
    enqueue(&q, 20);
    enqueue(&q, 30);
    
    printQueue(&q);

    int value;
    if (dequeue(&q, &value)) {
        printf("Dequeued: %d\n", value);
    }
    
    printQueue(&q);

    destroyQueue(&q);
    return 0;
}

1.4 代码解释

  1. 结构定义

    • Queue:包含队列的头指针 front、尾指针 rear,以及 lock 互斥锁。
    • Node:每个队列元素的节点,包含 data 和指向下一个节点的指针。
  2. 初始化队列: 使用 initQueue 初始化队列,并创建一个互斥锁。

  3. 入队操作(enqueue)

    • 申请一个新节点,加入到队列尾部。
    • 使用 pthread_mutex_lockpthread_mutex_unlock 来确保在多线程环境下对队列的访问是安全的。
  4. 出队操作(dequeue)

    • 如果队列非空,从队列头部删除一个节点,并返回节点的值。
    • 同样使用锁来保证线程安全。
  5. 销毁队列

    • 使用 destroyQueue 来释放队列中所有节点的内存。

2. 使用自旋锁实现线程安全队列

与互斥锁不同,自旋锁是一种轻量级的同步机制,适用于锁持有时间较短的场景。它的特点是线程在等待锁的时候不会被挂起,而是会持续检查锁是否被释放,因此对短时间锁定的场景更加高效。

2.1 自旋锁队列实现代码示例

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdbool.h>

// 队列节点结构
typedef struct Node {
    int data;
    struct Node* next;
} Node;

// 队列结构
typedef struct Queue {
    Node* front;          // 队列头
    Node* rear;           // 队列尾
    pthread_spinlock_t lock; // 自旋锁,保证线程安全
} Queue;

// 初始化队列
void initQueue(Queue* q) {
    q->front = q->rear = NULL;
    pthread_spin_init(&q->lock, PTHREAD_PROCESS_PRIVATE);
}

// 入队操作
void enqueue(Queue* q, int data) {
    pthread_spin_lock(&q->lock); // 获取自旋锁
    
    Node* newNode = (Node*)malloc(sizeof(Node));
    if (newNode == NULL) {
        printf("Memory allocation failed!\n");
        pthread_spin_unlock(&q->lock); // 解锁
        return;
    }
    newNode->data = data;
    newNode->next = NULL;

    if (q->rear == NULL) {
        q->front = q->rear = newNode;
    } else {
        q->rear->next = newNode;
        q->rear = newNode;
    }

    pthread_spin_unlock(&q->lock); // 解锁
}

// 出队操作
bool dequeue(Queue* q, int* data) {
    pthread_spin_lock(&q->lock); // 获取自旋锁

    if (q->front == NULL) {
        pthread_spin_unlock(&q->lock); // 解锁
        return false; // 队列为空
    }

    Node* temp = q->front;
    *data = temp->data;
    q->front = q->front->next;

    if (q->front == NULL) {
        q->rear = NULL; // 队列为空时,rear 也应为 NULL
    }

    free(temp);
    pthread_spin_unlock(&q->lock); // 解锁

    return true;
}

// 打印队列内容
void printQueue(Queue* q) {
    pthread_spin_lock(&q->lock); // 获取自旋锁

    Node* current = q->front;
    while (current != NULL) {
        printf("%d ", current->data);
        current = current->next;
    }
    printf("\n");

    pthread_spin_unlock(&q->lock); // 解锁
}

// 销毁队列
void destroyQueue(Queue* q) {
    pthread_spin_lock(&q->lock); // 获取自旋锁

    Node* current = q->front;
    while (current != NULL) {
        Node* temp = current;
        current = current->next;
        free(temp);
    }

    pthread_spin_unlock(&q->lock); // 解锁
    pthread_spin_destroy(&q->lock); // 销毁自旋锁
}

int main() {
    Queue q;
    initQueue(&q);

    enqueue(&q, 10);
    enqueue(&q, 20);
    enqueue(&q, 30);
    
    printQueue(&q);

    int value;
    if (dequeue(&q, &value)) {
        printf("Dequeued: %d\n", value);
    }
    
    printQueue(&q);

    destroyQueue(&q);
    return 0;
}

3. 总结

  • 使用 互斥锁自旋锁 来保护队列的访问,确保线程安全。
  • 互斥锁适合持锁时间较长的场景,而自旋锁适合持锁时间较短的场景。
  • 可以通过适当选择同步机制提高并发效率,避免出现死锁、竞态条件等问题。
去1:1私密咨询

系列课程: