지난번에 JobQueue를 만들어 Room이 이를 상속받고 Room에 대한 멤버함수는 모두 JobQueue의 DoAsync를 사용하도록 수정해서 10개 스레드가 room의 함수를 호출하면 1개만 일하고 9개는 멈춰서 대기하는 상황을 해결했다. DoAsync를 사용함으로써 1개 스레드가 일할 때 9개는 JobQueue에 job을 넣고 빠져나온 뒤에 자기 할 일을 한다!
그리고 TLS 관련 버그 도 해결했다. (심각한 성능 이슈가 예상되지만)
아무튼 JobQueue에 대해 다시 생각해보면 1개가 일하고 9개는 Job을 넣고 빠져나오는데, 만약 1개 스레드가 처리하는 job의 양보다 잡큐에 넣어지는 job의 양이 더 많다면 그 하나의 스레드는 끝도없이 일하게 될 것이다. 그 말은 room이 100개가 있다 치면 그중에 몇개의 방이나 특정 지역만 돌아가고, 나머지는 아예 멈춰있게 될 수도 있다는 말.
애초에 이런 상황이 생기면 최대한 잘 해결해봐야 서버 전체에 전체적으로 렉이 걸리게 되지 않을까 싶긴 한데 어찌됐든 한 지역만 돌아가는것보단 나으니까 해결해보자.
구상은 이렇다.
- JobQueue에서 Job을 처리하는데 시간이 너무 많이 지남.
- GlobalQueue에 일감이 남아있는 JobQueue를 넣어서 여유있는 다른 스레드가 처리하도록 함.
- 다른 스레드에서 IocpCore->Dispatch를 처리하고, 시간이 남았다면 GlobalQueue에서 JobQueue를 꺼내 Execute함.
이 과정에서 한 스레드가 Dispatch를 처리하는동안 해당 JobQueue에 Push하고, 다른 스레드는 GlobalQueue를 통해 처리하려 하면 하나의 JobQueue를 두개의 스레드가 처리하는 문제가 발생하는 것 아닌가? 하고 의심했는데, JobQueue::Execute()와 Push()를 보면 그런 상황에서는 _jobCount가 0일 수가 없기 때문에 다른 스레드에서 push를 한다 해도 execute까지 실행하지는 않는다. 그래서 해당 JobQueue에 대한 일처리가 진행되려면 반드시 GlobalQueue에서 꺼내서 실행해야만 한다.
void JobQueue::Execute()
{
LCurrentJobQueue = this;
while (true)
{
vector<JobRef> jobs;
_jobs.PopAll(OUT jobs);
const int32 jobCount = static_cast<int32>(jobs.size());
for (int32 i = 0; i < jobCount; i++)
jobs[i]->Execute();
// 남은 일감이 0개라면 종료
if (_jobCount.fetch_sub(jobCount) == jobCount)
{
LCurrentJobQueue = nullptr;
return;
}
//TODO 일감이 남았는데 시간이 지났다면 글로벌 큐에 등록하고 빠져나오기
}
}
void JobQueue::Push(JobRef job, bool pushOnly)
{
const int32 prevCount = _jobCount.fetch_add(1);
_jobs.Push(job); // lock
// 첫번째 Job을 넣은 쓰레드가 실행까지 담당
if (prevCount == 0)
{
/*...*/
}
}
스레드 로컬 변수 LEndTickCount 추가
스레드가 얼마나 오랫동안 일할지 정하는 용도.
CoreTLS
extern thread_local uint64 LEndTickCount;
thread_local uint64 LEndTickCount = 0;
GameServer.cpp
GetTickCount64함수를 사용해 워커 스레드가 언제까지 일을 하게 할지 설정한다.
enum
{ //64ms넘어가면 다른 스레드에 일감을 넘김.
WORKER_TICK = 64
};
void WorkerThread(ServiceRef service)
{
while (true)
{
LEndTickCount = ::GetTickCount64() + WORKER_TICK;
service->GetIocpCore()->Dispatch();
//글로벌 잡에서 꺼내서 execute시키는 함수.(구현예정)
ThreadManager::DoGlobalQueueWork();
}
}
IocpCore->Dispatch 함수 수정
지금은 네트워크 패킷이 들어올때까지 무한정 대기하는데, 적당히 대기하고 나서 다른 일(글로벌 큐)을 하도록 수정해준다.
void WorkerThread(ServiceRef service)
{
while (true)
{
LEndTickCount = ::GetTickCount64() + WORKER_TICK;
service->GetIocpCore()->Dispatch(10);
ThreadManager::DoGlobalQueueWork();
}
}
bool IocpCore::Dispatch(uint32 timeoutMs)
{
DWORD numOfBytes = 0;
ULONG_PTR completionKey;
OVERLAPPED* overlapped = nullptr;
bool result = GetQueuedCompletionStatus(_iocpHandle, &numOfBytes, &completionKey,
&overlapped, timeoutMs); // INFINITE 에서 timeoutMs로 수정됨.
if (result != 0)
{
/*...*/
}
else
{
int32 errCode = ::WSAGetLastError();
if (errCode == WAIT_TIMEOUT)
return false; // 타임아웃인 경우 그냥 종료하도록 추가
/*...*/
}
return true;
}
GlobalQueue
단순히 jobQueue를 넣었다 뺐다 할 수 있게 만든 클래스다.
#pragma once
#include "LockQueue.h"
class GlobalQueue
{
public:
GlobalQueue() {}
~GlobalQueue() {}
void Push(JobQueueRef jobQueue) { _jobQueues.Push(jobQueue); }
JobQueueRef Pop() { return _jobQueues.Pop(); }
private:
LockQueue<JobQueueRef> _jobQueues;
};
JobQueue 수정
void JobQueue::Push(JobRef job, bool pushOnly)
{
const int32 prevCount = _jobCount.fetch_add(1);
_jobs.Push(job); // lock
// 첫번째 Job을 넣은 쓰레드가 실행까지 담당
if (prevCount == 0)
{
// 이미 실행중인 JobQueue가 없으면 실행
if (LCurrentJobQueue == nullptr && pushOnly == false)
{
Execute();
}
else
{// 잡큐에 처음 넣긴 했는데 실행은 안할거면 글로벌 큐에 넣어서 다른 스레드가 실행하게 함
GGlobalQueue->Push(shared_from_this());
}
}
}
void JobQueue::Execute()
{
LCurrentJobQueue = this;
while (true)
{
vector<JobRef> jobs;
_jobs.PopAll(OUT jobs);
const int32 jobCount = static_cast<int32>(jobs.size());
for (int32 i = 0; i < jobCount; i++)
jobs[i]->Execute();
// 남은 일감이 0개라면 종료
if (_jobCount.fetch_sub(jobCount) == jobCount)
{
LCurrentJobQueue = nullptr;
return;
}
// 정해진 시간보다 오래 걸렸다면 다른 스레드에 일감을 넘김
const uint64 now = ::GetTickCount64();
if (now >= LEndTickCount)
{
LCurrentJobQueue = nullptr;
GGlobalQueue->Push(shared_from_this());
break;
}
}
}
ThreadManager에 DoGlobalQueueWork 함수 추가
GGlobalQueue 에서 JobQueue를 꺼내서 다른 스레드가 바빠서 못한 Execute를 해준다.
void ThreadManager::DoGlobalQueueWork()
{
while (true)
{
uint64 now = ::GetTickCount64();
if (now > LEndTickCount)
break;
JobQueueRef jobQueue = GGlobalQueue->Pop();
if (jobQueue == nullptr)
break;
jobQueue->Execute();
}
}
테스트
엄청나게 패킷이 몰리지 않으면 웬만하면 64ms만에 처리하지 못할 일이 없으니까 sleep을 넣어서 강제로 발생시켜봤다.
더미 클라이언트에서 서버로 모든 클라이언트가 C_CHAT을 200ms마다 보내고 있는데 서버가 받으면 아래와 같은 과정을 거친다.
- Handdle_C_CHAT 에서 player->ChatTest 호출
- player가 속한 Room의 DoAsync( Room::BroadCast, 패킷sendBuffer) 호출
- BroadCast 잡 만들어서 넣고, 처음 넣은 스레드가 꺼내서 호출.
여기서 BroadCast 끝에 sleep 70ms을 넣어서 처리를 느리게 만들어서 일이 빠르게 처리되지 않고 쌓이게 만든 다음에 글로벌 큐에서 정상적으로 꺼내서 쓰는지 확인해봤다.
void Room::Broadcast(SendBufferRef sendBuffer)
{
for (auto& player : _players)
{
if (auto owner = player.second->_owner.lock())
{
owner->Send(sendBuffer);
}
}
this_thread::sleep_for(chrono::milliseconds(70));
}

잘 된다. 근데 sleep 70ms를 줘도 생각보다 자주 글로벌 큐에 넣는 경우가 발생하지는 않았다. 가끔 생긴다.
현재까지의 git 버전 (테스트용 sleep과 print는 뺌)
Feat: GlobalJobQueue · Dodontak/Project_Island_GameServer@c97d5f2
스레드에서 JobQueue의 일을 너무 오랫동안 처리하면 GlobalQueue로 JobQueue를 넣어서 다른 스레드가 처리하도록 넘김.
github.com
'프로젝트 > Project_Island' 카테고리의 다른 글
| 45. 언리얼 클라이언트 만들기 (0) | 2026.04.20 |
|---|---|
| 44. JobTimer (0) | 2026.04.18 |
| 42. JobQueue(1) (0) | 2026.04.16 |
| 41. Room, Player 만들기, TLS 버그해결 (0) | 2026.04.14 |
| 40. 외부 라이브러리들 dll -> lib 변경 (0) | 2026.04.13 |