AcWing
  • 首页
  • 活动
  • 题库
  • 竞赛
  • 应用
  • 更多
    • 题解
    • 分享
    • 商店
    • 问答
    • 吐槽
  • App
  • 登录/注册

线程池C++98

作者: 作者的头像   半醒的狐狸 ,  2023-03-19 02:59:05 ,  所有人可见 ,  阅读 54


0


/*
线程池 pthread_create
pthread_cond_wait  // 通过条件变量让线程睡眠,等待任务到来
pthread_cond_signal  // 唤醒线程

线程创建 互斥锁 条件变量 消息队列
同一时刻只能由一个人来操作从队头取出,所以要加互斥锁;
而使用条件变量实现休眠/唤醒线程
*/

#include <iostream>
#include <queue>
#include <pthread.h>
#include <condition_variable>
#include <cstring>
using namespace std;

struct Task {
    String name;
    String type;
}

// 消息队列类
class TaskQueue
{
public:
    TaskQueue() {
        pthread_mutex_init(&m, NULL);
    }
    ~TaskQueue() {
        pthread_mutex_destory(&m);
    }

    // 添加任务
    void addTask(Task task) {
        pthread_mutex_lock(&m); 
        mq.push(task);
        pthread_mutex_unlock(&m);

    }
    // 取出任务
    Task getTask() {
        Task t;
        if (mq.size()) {
            pthread_mutex_lock(&m);
            t = mq.front();
            mq.pop();
            pthread_mutex_unlock(&m);
        }
        return t;
    }
    // 查询当前任务个数
    int getTaskNums() {
        return mq.size();
    }

private:
    queue<Task> mq;
    pthread_mutex_t m;
    condition_variable cv;
};

// 线程池结构体: 分为两个类,一个线程相关属性类,一个任务队列类
struct ThreadPool
{
public:
    // 创建线程池并初始化
    ThreadPool(int min, int max) {
        // 实例化任务队列和IDs
        taskQ = new TaskQueue;
        // 初始化线程池参数
        do {
            threadIDs = new pthread_t[max];
            if (threadIDs == nullptr) {
                cout << "malloc threadIDs fail..." << endl;
                break;
            }
            memset(threadIDs, 0, sizeof(pthread_t) * max);  // 申请的内存初始化为0
            minNum = min;
            maxNum = max;
            busyNum = 0;
            liveNum = min;
            exitNum = 0;
            // 互斥锁和条件变量初始化判定
            if (pthread_mutex_init(&mPool) != 0 || pthread_cond_init(&notEmpty) != 0) {
                cout << "mutex or condition init fail..." << endl;
                break;
            }

            shutdown = false;

            //创建线程:管理线程,和最小存活线程
            pthread_create(&managerID, NULL, manager, this);
            for (int i = 0; i < min; i ++ ) {
                pthread_create(&threadIDs[i], NULL, worker, this);
            }
            return ;
        } while (0);

        // 释放资源
        if (threadIDs) delete[] threadIDs;
        if (taskQ) delete taskQ;
    }

    // 给线程池添加任务(就是往消息队列里面加任务)
    void addTask(Task task) {
        if (shutdown) return ;
        taskQ->addTask(task);
        pthread_cond_signal(&notEmpty);
    }

    // 获得当前池中工作线程数
    int getBusyNums() {
        pthread_mutex_lock(&mPool);
        int BusyNums = this->busyNum;
        pthread_mutex_unlock(&mPool);
        return BusyNums;
    }

    // 获得当前池中存活线程数
    int getLiveNums() {
        pthread_mutex_lock(&mPool);
        int AliveNums = this->liveNum;
        pthread_mutex_unlock(&mPool);
        return AliveNums;
    }

    // 销毁线程池
    ~ThreadPool() {
        shutdown = true;
        pthread_join(managerID, NULL);
        for (int i = 0; i < liveNum; i ++ ) {
            pthread_cond_signal(&notEmpty);
        }

        if (taskQ) delete taskQ;
        if (threadIDs) delete[] threadIDs;

        pthread_mutex_destory(&mPool);
        pthread_cond_destory(&notEmpty);
    }
private:
    // 工作线程(消费者线程)任务函数
    static void* worker(void* arg);

    // 管理者线程数
    static void* manager(void* arg);

    // 单线程退出
    void threadExit() {
        pthread_t tid = pthread_self();
        for (int i = 0; i < maxNum; i ++ ) {
            if (threadIDs[i] == tid) {
                threadIDs[i] = 0;
                break;
            }
        }

        pthread_exit(NULL);
    }


private:
    TaskQueue *taskQ;           // 实例化消息队列

    pthread_t managerID;        // 管理者线程ID
    pthread_t *threadIDs;       // 工作线程IDs
    int minNum;                 // 最小线程数
    int maxNum;                 // 最大线程数
    int liveNum;                // 存活线程数
    int busyNum;                // 工作线程数
    int exitNum;                // 准备销毁的线程数
    pthread_mutex_t mPool;      // 锁整个线程池
    pthread_cond_t notEmpty;    // 消息队列是否为空

    bool shutdown;              // 是否关闭线程池
};

int main() 
{

    return 0;
}

0 评论

你确定删除吗?
1024
x

© 2018-2023 AcWing 版权所有  |  京ICP备17053197号-1
用户协议  |  隐私政策  |  常见问题  |  联系我们
AcWing
请输入登录信息
更多登录方式: 微信图标 qq图标
请输入绑定的邮箱地址
请输入注册信息