저번에 IOCP 비동기 send요청을 구현했고, 그걸 이용해서 간단한 채팅서버를 만들어봤다.

 

이번엔 멀티스레드를 적용시켜보자.


ThreadManager

AuthServer에서 거의 그대로 가져왔다.

Launch에 돌리고자 하는 함수를 넣어서 실행시킨다.

#pragma once

#include <functional>
#include <thread>

class ThreadManager
{
public:
	ThreadManager();
	~ThreadManager();

	void Launch(function<void()> callback);
	void Join();

	bool IsRunning() { return _running; }

private:
	static void InitTLS();
	static void DestroyTLS();
	bool _running;

	vector<thread> _workerThreads;
};
#include "pch.h"
#include "CoreTLS.h"
#include "ThreadManager.h"

ThreadManager::ThreadManager() : _running(true)
{
	InitTLS();
}

ThreadManager::~ThreadManager()
{
	Join();
}

void ThreadManager::Launch(function<void()> callback)
{
	_workerThreads.push_back(thread([=]()
		{
			InitTLS();
			callback();
			DestroyTLS();
		}
	));
}

void ThreadManager::Join()
{
	for (auto& t : _workerThreads)
	{
		t.join();
	}
}

void ThreadManager::InitTLS()
{
	static atomic<int> SThreadId(1);
	LThreadId = SThreadId.fetch_add(1);
}

void ThreadManager::DestroyTLS()
{
}

 

CoreTLS

#pragma once

extern thread_local int LThreadId;
#include "pch.h"
#include "CoreTLS.h"

thread_local int	LThreadId = 0;

멀티스레드 테스트

GameServer의 메인문에서 간단하게 테스트를 해보자.

아래 코드는 1초에 한번씩 "Hello Thread 스레드id" 를 출력하는 코드.

#include "pch.h"
#include "ThreadManager.h"
#include "CoreTLS.h"

void WorkerThread()
{
	while (1)
	{
		cout << "Hello Thread " << LThreadId << endl;
		this_thread::sleep_for(chrono::seconds(1));
	}
}

int main()
{
	ThreadManager manager;

	manager.Launch(WorkerThread);
	manager.Launch(WorkerThread);
}

의도한대로 잘 나온다!


채팅서버에 멀티스레드 적용하기

이전에는 Service->Start() 함수 안에서 while(1)로 iocpcore->dispatch()를 호출하여 GetQueuedCompletionStatus로 완료통지 받기를 반복시켰다. 방금 만든 ThreadManager를 사용해서 여러개의 스레드에서 대기하고있다가 하나씩 깨어나서 일을 하도록 해보자.

#include "pch.h"
#include "IocpCore.h"
#include "NetAddress.h"
#include "Listener.h"
#include "Service.h"
#include "ThreadManager.h"
#include "CoreTLS.h"

void WorkerThread(ServiceRef service)
{
	while (1)
	{
		service->GetIocpCore()->Dispatch();
		cout << "Worker thread " << LThreadId << " processed an I/O event" << endl;
	}
}

int main()
{
	ThreadManager threadManager;
	ServiceRef service = make_shared<Service>(NetAddress("0.0.0.0", 9000));
	service->Start();

	for (int i = 0; i < 5; i++)
	{
		threadManager.Launch([&service]() {
			WorkerThread(service);
			}
		);
	}
}

클라이언트로 연결해서 채팅을 쳐보면, 여러 스레드에서 일처리를 나눠서 하는걸 확인할 수 있다.


스레드 세이프 문제 해결

이제 스레드를 추가했으니, 지금까지 만들었던 코드들을 스레드세이프 하게 수정해줘야만 한다.

전체적으로 코드의 흐름을 따라가면서 수정했는데, 생각보다 뮤택스 락을 추가해야할 부분이 그렇게 많지는 않았다.

  • Service에서 새 세션이 accept될때마다 _session 벡터에 SessionRef를 넣는데 이부분 lock 처리
  • RegisterSend에서 _sendBuffer 건드릴 때 lock 처리

신경을 많이 써야했던 부분은 send파트였다.

sendEvent에 적힌 내용을 다 보낼때까지 sendEvent를 건드리면 안되기 때문에 Session에 

atomic<bool> _sendRegistered

를 추가하고, SendRegister 를 하기전에 매번 확인해서 아직 이전에 보낸 send작업이 완료되지 않았다면 Session의 sendbuffers queue에만 넣고, sendEvent는 건드리지 않아야한다. 그리고, 동시에 이전 send 작업을 완료한 다음, 그사이에 session에 쌓인 sendbuffer가 있다면 그걸 다시 처리해줘야한다.

 

그래서 아래와 같이 코드를 짜서 이미 다른 스레드가 registerSend를 호출했고, 그 이벤트가 아직 완료되지 않았다면 (processSend가 아직 호출되지 않았다면) Send Buffer를 하더라도 Session의 queue에만 넣고, registerSend는 호출하지 않도록 만들어줬다.

이 때 sendRegistered 변수는 확인과 변경이 원자적으로 일어나야 하기 때문에 compare_exchange_strong 을 사용했다.

void Session::Send(SendBufferRef sendBuffer)
{
	{
		lock_guard<mutex> lock(_m);
		_sendBuffers.push(sendBuffer);
	}

	bool expected = false;
	if (_sendRegistered.compare_exchange_strong(expected, true))
	{
		RegisterSend();
	}
}

void Session::RegisterSend()
{
	_sendEvent.Init();
	_sendEvent.SetOwner(shared_from_this());

	{
		lock_guard<mutex> lock(_m);
		while (!_sendBuffers.empty())
		{
			SendBufferRef sendBuffer = _sendBuffers.front();
			_sendBuffers.pop();
			_sendEvent.PushBack(sendBuffer);
		}
	}

	vector<WSABUF> wsaBufs;
	for (auto& sendBuffer : _sendEvent.GetSendBuffers())
	{
		WSABUF wsaBuf;
		wsaBuf.buf = reinterpret_cast<char*>(sendBuffer->GetBuffer());
		wsaBuf.len = sendBuffer->GetDataLen();
		wsaBufs.push_back(wsaBuf);
	}

	DWORD numOfBytes = 0;
	if (SOCKET_ERROR == WSASend(_socket, wsaBufs.data(), static_cast<DWORD>(wsaBufs.size()),
		OUT & numOfBytes, 0, (LPWSAOVERLAPPED)&_sendEvent, nullptr))
	{
		int errorCode = WSAGetLastError();
		if (errorCode != WSA_IO_PENDING)
		{
			//TODO 적절한 처리
		}
	}
}

void Session::ProcessSend(int32 numOfBytes)
{
	_sendEvent.Clear();
	{
		lock_guard<mutex> lock(_m);
		if (_sendBuffers.empty())
		{
			_sendRegistered.store(false);
			return;
		}
	}
	RegisterSend();
}

Partial Send 문제 해결

스레드와는 관계없는 내용이지만 생각난 김에 처리했다.

Partial Send 문제는 만약 (거의 그럴 일 없지만) 커널의 sendbuffer가 꽉 차서 내가 보내려는 버퍼를 전부 전송할 수 없을 때 일부만 send에 성공하는 문제다. 이 경우 GetQueuedCompletionStatus 에서 나오는 numOfByte가 내가 보내려 했던 바이트 수보다 적게 나온다. 완료통지는 정상적이지만, 실제로 내가 보내려한 데이터는 다 보내지 못한 것.

 그래서 ProcessSend에서 이에 대한 처리를 해줘야만 한다.

 

만약 보낸 바이트 < 보내려 했던 바이트 일 경우 sendEvent 에 있는 sendBuffers를 가져와서 보낸거는 pop 해주고, 부분적으로 보내진 버퍼는 보내지 못한 부분만 새로 만들어 sendBuffers의 맨앞에 배치한 뒤에 다시 RegisterSend를 호출하게 했다. ( 이 과정에서 그동안 새로 쌓인 Session의 sendBuffers도 처리된다.)

void Session::ProcessSend(int32 numOfBytes)
{
	// 보낸 바이트 수 < 보내려 했던 바이트 일 경우 처리.
	if (numOfBytes < _sendEvent.GetWantSendBytes())
	{
		uint32 sendedBytes = numOfBytes;
		deque<SendBufferRef>& sendBuffers = _sendEvent.GetSendBuffers();
		while (!sendBuffers.empty())
		{
			if (sendedBytes >= sendBuffers.front()->GetDataLen())
			{
				sendedBytes -= sendBuffers.front()->GetDataLen();
				sendBuffers.pop_front();
			}
			else
			{
				BYTE* pos = sendBuffers.front()->GetPosPtr(sendedBytes);
				int32 dataLen = sendBuffers.front()->GetDataLen() - sendedBytes;

				SendBufferRef newone = make_shared<SendBuffer>(pos, dataLen);
				sendBuffers.pop_front();
				_sendEvent.PushFront(newone);
				RegisterSend();
				return;
			}
		}
		return;
	}
	_sendEvent.Clear();

	{
		lock_guard<mutex> lock(_m);
		if (_sendBuffers.empty())
		{
			_sendRegistered.store(false);
			return;
		}
	}
	RegisterSend();
}

현재까지의 git 버전

 

Feat: 멀티스레딩, partial send 처리 · Dodontak/Project_Island_GameServer@baff21f

Session - Send 함수 추가. SendBufferRef 를 받아서 스레드 세이프하게 _sendBuffers 에 넣고, 내가 첫번째로 넣었다면, RegisterSend 호출하는 함수. - RegisterSend에서 _sendBuffers 건드릴 때 스레드세이프하게 수정

github.com

 

'프로젝트 > Project_Island' 카테고리의 다른 글

31. 더미 클라이언트  (0) 2026.03.31
30. Connect, Disconnect 구현  (0) 2026.03.31
28. Send, Broadcast 구현  (0) 2026.03.29
27. IOCP accept, recv까지 구현  (0) 2026.03.28
26. NetAddress  (0) 2026.03.27

+ Recent posts