이전까지 echo서버를 만들었다. 이제 스레드풀을 만들어서 멀티스레드로 작업할 수 있게 해보자.


나는 스레드의 구성을 메인스레드가 epoll_wait, 소켓에 read, write 전담을 하고, 워커스레드는 DB작업을 전담하도록 만들고자 한다. 이렇게 하는 이유는 내생각에 소켓통신보다 DB작업의 처리속도가 더 느릴거라 생각되서이다. DB를 내가 직접 만져보는 프로젝트는 처음이라 추측일 뿐이지만 gpt한테 물어보니 AI생각도 비슷했다.

 

아무튼 이번 목표는 스레드 풀을 만들고, 메인스레드가 소켓을 읽어서 일거리를 jobqueue에 집어넣으면 잠자고있던 워커스레드 하나가 깨어나서 일을 처리한다. 그리고 클라이언트에게 보낼 메시지를 Session의 WriteBufferQueue에 push하면 메인스레드의 epoll_wait가 write이벤트를 감지해서 클라이언트에게 보내주는 것이다.

WriteBufferQueue에 넣어서 클라에게 보내는건 이전에 완성했으니 이번엔 스레드풀과 잡큐만 잘 작동하게 만들면 된다.

 

스레드 풀 ThreadManager

처음에 이름을 ThreadPool로 지었는데 이것보다는 매니저가 더 적합한것 같아서 바꿨다.

그리고 여러 스레드가 접근 가능해야하기 때문에 extern을 사용해 전역변수로 만들었다. JobQueue도 마찬가지.

CoreGlobal.h

#pragma once

extern class ThreadManager*	GThreadManager;
extern class JobQueue*		GJobQueue;

CoreGlobal.cpp

#include "CoreGlobal.h"
#include "ThreadManager.h"
#include "JobQueue.h"

ThreadManager*	GThreadManager = nullptr;
JobQueue*	GJobQueue = nullptr;

class CoreGlobal
{
public:
	CoreGlobal()
	{
		GThreadManager = new ThreadManager();
		GJobQueue = new JobQueue();
	}
	~CoreGlobal()
	{
		delete GThreadManager;
		delete GJobQueue;
	}
}	GcoreGlobal;

 

ThreadManager.h

우선 어떻게 스레드를 cpu를 쓰지않고 자고있게 만들었다가 일이 생길때만 깨울 수 있는지 알아봤다.

mutex와 condition_variable을 사용하면 생각보다 쉽게 구현할 수 있었다.

void ThreadManager::func()
{
    std::unique_lock<std::mutex> lock(_m);
    _cv.wait(lock, [this]() { return !GJobQueue->Empty(); });
    /*...*/
}

 

mutex _m과 condition_variable _cv를 ThreadManager의 멤버변수로 선언해놓고 위와같이 코드를 짠다.

그렇게되면 _cv.wait(유니크락, 조건함수) 에서 조건이 false라면 스레드는 mutex를 unlock한 뒤 잠들고, true라면 mutex를 lock 한 뒤에 코드가 재개된다. 처음 스레드가 만들어졌을때는 당연하게도 잡큐에는 아무것도 들어있지 않을테니 모든 스레드는 mutex를 unlock하고, wait상태로 대기하게 될 것이다.

 

그러다 메인스레드가 GJobQueue에 Job을 넣으면서 아래와같이 _cv.notify_one(); 을 호출한다.

void	ThreadManager::InsertJob(JobRef	job)
{
	GJobQueue->PushJob(job);
	_cv.notify_one();
}

그러면 wait상태의 스레드들 중 하나가 깨어나서 코드를 진행한다.

 

이걸 활용해서 스레드 매니저를 만들어보자.

class ThreadManager
{
public:
	ThreadManager();
	~ThreadManager();
	
	void	Launch(); //스레드 1개 추가
	void	Join(); //스레드 Join용
	void	WorkerThread(); // 스레드에서 반복하는 메인함수
	void	InsertJob(std::function<void()> callback); //잡큐에 잡을 넣는함수
	void	InsertJob(JobRef job); //잡큐에 잡을 넣는함수2
private:
	static void	InitTLS(); // 스레드 로컬 변경 함수(스레드ID 등)
	static void	DestroyTLS();

	std::vector<std::thread>	_workerThreads;
	std::mutex					_m;
	std::condition_variable		_cv;
};

 

Job을 넣는걸 스레드매니저 클래스에 넣어놓은게 좀 이상하긴 한데, 스레드 매니저에 있는 _cv로 스레드를 관리하다 보니 좀 어쩔 수 없는 것 같다. 안그러면 스레드관리용 condition_variable을 전역변수로 둬야하는데 그건 그것대로 이상하다.

int main(int ac, char** av)
{
	if (ac != 2)
		handle_error((string(av[0]) + " [port]").c_str(), 1);
	for (int i = 0; i < 5; i++)
	{
		GThreadManager->Launch();
	}
	/*...*/
	GThreadManager->Join();
}

void	ThreadManager::Launch()
{
	_workerThreads.push_back(std::thread([=](){
		InitTLS();
		WorkerThread();
		DestroyTLS();
	}));
}

void	ThreadManager::WorkerThread()
{
	while (true)
	{
	    JobRef job = nullptr;
        {
            std::unique_lock<std::mutex> lock(_m);
            _cv.wait(lock, [this]() { return !GJobQueue->Empty(); });
            std::cout << "thread No." << LThreadId << " Awaken!" << std::endl;
            job = GJobQueue->PopJob();
        }
		job->Execute();
	}
}

우선 위와 같이 메인에서 스레드 수를 설정하여 잠들게 한다. 나는 5개로 해봤는데 보통은 CPU의 스레드 수의 1.5~2배정도로 설정한다고 한다. 스레드를 몇개까지 설정하는게 좋은지는 실제 테스트를 해보면서 조절해봐야 한다.

 

여기서 중요한게 lock을 job에서 꺼내는 부분까지만 유지되도록 만들어야해서 스코프를 추가하여 Execute는 mutex lock을 하지 않은 상태로 호출해야한다. 그렇지 않으면 db작업을 하는 동안 lock이 걸려있어서 다른스레드들이 lock때문에 멈춰있게 된다.

 

JobQueue는 진짜 단순하게 Job을 담는 queue를 가지고있고, PopJob과 PushJob 할때 뮤텍스 락을 걸어서 데이터 레이스가 발생하지 않게만 만들었다. Job에는 함수를 넣어서 실행하도록 만들었다.

JobQueue.h

#pragma once

#include "Types.h"
#include <queue>
#include <mutex>
#include <functional>

class Job
{
public:
	Job(std::function<void()> callback);
	
	void	Execute();
private:
	std::function<void()>	_callback;
};

class JobQueue
{
public:
	void	PushJob(JobRef job);
	JobRef	PopJob();
	bool	Empty() { return _jobQueue.empty(); }
private:
	std::mutex		m;
	std::queue<JobRef>	_jobQueue;
};

 

자 이렇게 메인스레드가 JobQueue에 Job을 넣으면, 워커스레드가 깨어나서 Job을 꺼내고, Job을 만들때 집어넣은 함수를 호출하도록 만들어봤다. 이제 이 이 함수들이 실제로 호출되는곳을 한번 보자.

void	Session::ProcessRead()
{
	SSL*	ssl = _sslObject->GetSsl();
	do {
		int	readLen = SSL_read(ssl, _readBuffer.ReadPos(), READ_SIZE);
		/* ... */
	} while (SSL_has_pending(ssl));

	JobRef	job = std::make_shared<Job>([this, self_weak = std::weak_ptr<Session>(shared_from_this())](){
		if (std::shared_ptr<Session> session = self_weak.lock())
		{
			WriteBufferRef	writeBuffer = make_shared<WriteBuffer>(_readBuffer.ReadPos(), _readBuffer.DataSize());
			Send(writeBuffer);
			_readBuffer.OnRead(_readBuffer.DataSize());
			_readBuffer.Clean();
		}
	});
	GThreadManager->InsertJob(job);
}

void	ThreadManager::InsertJob(JobRef	job)
{
	GJobQueue->PushJob(job);
	_cv.notify_one();
}

read작업 후에 Job을 만들고, InsertJob으로 JobQueue에 집어넣는다.

void	ThreadManager::WorkerThread()
{
	while (true)
	{
		std::unique_lock<std::mutex> lock(_m);
		_cv.wait(lock, [this]() { return !GJobQueue->Empty(); });
		std::cout << "thread No." << LThreadId << " Awaken!" << std::endl;
		JobRef job = GJobQueue->PopJob();
		job->Execute();
	}
}

void	Job::Execute()
{
	_callback();
}
/*
    if (std::shared_ptr<Session> session = self_weak.lock())
    {
        WriteBufferRef	writeBuffer = make_shared<WriteBuffer>(_readBuffer.ReadPos(), _readBuffer.DataSize());
        Send(writeBuffer);
        _readBuffer.OnRead(_readBuffer.DataSize());
        _readBuffer.Clean();
    }
*/

그러면 워커스레드 하나가 깨어나서 위 함수를 마저 실행시킬 것이고, 여기서 실행되는 callback은 아까 JobRef를 생성할때 넣은 람다함수. 지금은 WriteBuffer를 만들고, read버퍼의 내용을 그대로 복사한 뒤에 session의 send함수로 WriteBufferQueue에 집어넣고 epoll_event write를 활성화 시켜준다 (이부분은 저번에 함).

 

이렇게 에코서버에 멀티스레드를 적용시켜봤다!

잘 된다!

 

추가로 만약 5개의 스레드가 있는데 일감을 동시에 100개 넣어버리면 5개만 처리하고 95개는 남아버리는거 아닌가? 하는 걱정이 있어서 테스트해봤다.

void	ThreadManager::WorkerThread()
{
	while (true)
	{
		JobRef job = nullptr;
		{
			std::unique_lock<std::mutex> lock(_m);
			_cv.wait(lock, [this]() { return !GJobQueue->Empty(); });
			job = GJobQueue->PopJob();
		}
		std::this_thread::sleep_for(std::chrono::seconds(3));
		std::cout << "thread No." << LThreadId << " Awaken!" << std::endl;
		job->Execute();
	}
}

이렇게 3초뒤에 execute함수를 실행시키도록 만들어서 서버가 echo 3초안에 스레드보다 더 많은 메시지를 보내봤다.

cv가 내부에서 알아서 잘 처리하는지 문제없이 모든 메시지에 대해 처리가 잘 된다.

다만 openssl s_client의 문제인지 echo로 돌아온 문자를 제대로 받지 못하긴 했다. 제대로 확인하려면 stdin과 socket write를 하는 스레드와 read와 stdout하는 스레드를 나눈 프로그램을 직접 만들어서 확인해봐야 하는데 지금 할 필요는 없으니 다음에 하도록 하자.

 


공부에 도움이 된 블로그 게시글

 

씹어먹는 C++ - <15 - 5. C++ 쓰레드풀(ThreadPool) 만들기>

모두의 코드 씹어먹는 C++ - <15 - 5. C++ 쓰레드풀(ThreadPool) 만들기> 작성일 : 2019-05-19 이 글은 68592 번 읽혔습니다. 이번 강좌에서는에 대해 다룹니다.안녕하세요 여러분! 이번 강좌에서는 여태까지

modoocode.com

 

'프로젝트 > Project_Island' 카테고리의 다른 글

8. PacketHandler  (0) 2026.03.04
7. Protobuf, PacketHeader, PacketSession  (0) 2026.03.04
5. WriteBuffer와 Send작업  (0) 2026.03.02
4. epoll기반 서버 기초다지기  (0) 2026.03.01
3. 인증서버 설계를 위한 공부  (0) 2026.02.21

+ Recent posts