지난번에 소켓을 만들고 서버와 TCP연결까지 확인했다.

이제 소켓 담당 클래스인 PacketSession을 만들고, 송수신을 멀티스레드로 처리하도록 만들어볼것.

멀티스레드로 만드는 이유는 FSocket의 send와 recv가 기본적으로 블로킹이라서 그렇다.


PacketSession

PacketSession에서 RecvWorker와 SendWorker 쉐어드포인터를 생성해서 들고있는다.

RecvWorker와 SendWorker는 FRunnable을 상속받아서 언리얼에서 제공하는 멀티스레드기능을 쓴다.

 

RecvWorker는 워커스레드에서 돌아가면서 recv를 받다가 1개 패킷만큼의 분량을 받아내면 (패킷 헤더를 읽어서) PacketSession의 RecvPacketQueue에 집어넣는다(TQueue는 기본값이 스레드 세이프함). 이후 처리는 메인스레드에서 꺼내서 알아서 해야한다. (패킷 핸들러를 써서 하면 될 것 같다.)

 

SendWorker는 메인스레드가 SendPacketQueue에 넣어놓은걸 멀테스레드가 돌면서 계속 확인하고, 있으면 꺼내서 보내준다.

 

둘다 BusyWait 문제가 있어서 sleep을 넣던지 하면 좋을 것 같다. (지금은 안돼있다.)

 

PacketSession.h

#pragma once

#include "CoreMinimal.h"
#include "Client.h"

class CLIENT_API PacketSession : public TSharedFromThis<PacketSession>
{
public:
	PacketSession(class FSocket* Socket);
	~PacketSession();

	void Run();
	void Disconnect();
	
	UFUNCTION(blueprintCallable)
	void HandleRecvPackets();
	
	void SendPacket(SendBufferRef SendBuffer);
public:
	class FSocket* Socket;
	
	TSharedPtr<class RecvWorker> RecvWorkerThread;
	TSharedPtr<class SendWorker> SendWorkerThread;
	TQueue<TArray<uint8>> RecvPacketQueue;
	TQueue<SendBufferRef> SendPacketQueue;
};

PacketSession.cpp

#include "PacketSession.h"
#include "NetworkWorker.h"

PacketSession::PacketSession(class FSocket* Socket) : Socket(Socket)
{
}

PacketSession::~PacketSession()
{
	Disconnect();
}

void PacketSession::Run()
{
	RecvWorkerThread = MakeShared<RecvWorker>(Socket, AsShared());
	SendWorkerThread = MakeShared<SendWorker>(Socket, AsShared());
}

void PacketSession::Disconnect()
{
	if (RecvWorkerThread)
	{
		RecvWorkerThread->Destroy();
		RecvWorkerThread = nullptr;
	}
	
	if (SendWorkerThread)
	{
		SendWorkerThread->Destroy();
		SendWorkerThread = nullptr;
	}
}

void PacketSession::HandleRecvPackets()
{
	while (true)
	{
		TArray<uint8> Packet;
		if (RecvPacketQueue.Dequeue(OUT Packet) == false)
			break;
		// TODO
		//Clientpackethandler::HandlePacket(Packet);
	}
}

void PacketSession::SendPacket(SendBufferRef SendBuffer)
{
	SendPacketQueue.Enqueue(SendBuffer);
}

NetworkWorker

RecvWorker와 SendWorker를 구현한 위치.

FRunnable을 상속받으면 Init, Run, Exit 가상함수를 구현해야하고, 

FRunnableThread* Thread = FRunnableThread::Create(this, TEXT("RecvWorkerThread"));

위와같이 Create를 호출하면 내부적으로 Init, Run, Exit을 순서대로 호출해준다고 한다.

 

NetworkWorker.h

#pragma once

#include "CoreMinimal.h"
#include "Client.h"

class PacketSession;

struct CLIENT_API FPacketHeader
{
	FPacketHeader() : PacketID(0), PacketSize(0)
	{
	}

	FPacketHeader(uint16 PacketID, uint16 PacketSize) : PacketID(PacketID), PacketSize(PacketSize)
	{
	}

	friend FArchive& operator<<(FArchive& Ar, FPacketHeader& Header)
	{
		Ar << Header.PacketID;
		Ar << Header.PacketSize;
		return Ar;
	}
	uint16 PacketID;
	uint16 PacketSize;
};

/*----------------------------------------------------------------------------*\
|                                                                              |
|                                 RecvWorker                                   |
|                                                                              |
\*----------------------------------------------------------------------------*/
class CLIENT_API RecvWorker : public FRunnable
{
public:
	RecvWorker(FSocket* Socket, TSharedPtr<PacketSession> Session);
	~RecvWorker();

	virtual bool Init() override;
	virtual uint32 Run() override;
	virtual void Exit() override;
	
	void Destroy();
	
private:
	bool ReceivePacket(TArray<uint8>& OutPayload);
	bool ReceiveDesiredBytes(uint8* Results, int32 Size);

protected:
	FRunnableThread* Thread = nullptr;
	bool Running = true;
	
	FSocket* Socket;
	TWeakPtr<PacketSession> SessionRef;
};

/*----------------------------------------------------------------------------*\
|                                                                              |
|                                 SendWorker                                   |
|                                                                              |
\*----------------------------------------------------------------------------*/
class CLIENT_API SendWorker : public FRunnable
{
public:
	SendWorker(FSocket* Socket, TSharedPtr<PacketSession> Session);
	~SendWorker();

	virtual bool Init() override;
	virtual uint32 Run() override;
	virtual void Exit() override;

	bool SendPacket(SendBufferRef SendBuffer);

	void Destroy();

private:
	bool SendDesiredBytes(const uint8* Buffer, int32 Size);

protected:
	FRunnableThread* Thread = nullptr;
	bool Running = true;

	FSocket* Socket;
	TWeakPtr<PacketSession> SessionRef;
};

NetworkWorker.cpp

#include "NetworkWorker.h"
#include "PacketSession.h"
#include "Sockets.h"

/*----------------------------------------------------------------------------*\
|                                                                              |
|                                 RecvWorker                                   |
|                                                                              |
\*----------------------------------------------------------------------------*/

RecvWorker::RecvWorker(FSocket* Socket, TSharedPtr<PacketSession> Session)
	: Socket(Socket), SessionRef(Session)
{
	Thread = FRunnableThread::Create(this, TEXT("RecvWorkerThread"));
}

RecvWorker::~RecvWorker()
{
}

bool RecvWorker::Init()
{
	GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Green, TEXT("Recv Thread Init"));

	return true;
}

uint32 RecvWorker::Run()
{
	while (Running)
	{
		TArray<uint8> Packet;
		if (ReceivePacket(OUT Packet)) //내부에서 온전한 패킷 하나만큼 담을때까지 대기/while 돔
		{
			if (TSharedPtr<PacketSession> Session = SessionRef.Pin())
			{
				Session->RecvPacketQueue.Enqueue(Packet);
				GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Green, TEXT("Recv Some Data From Server!"));
			}
		}
	}
	return 0;
}

void RecvWorker::Exit() {}

bool RecvWorker::ReceiveDesiredBytes(uint8* Results, int32 Size)
{
	uint32 PendingDataSize;
	//연결 종료되면 packetrecvsize 0으로 옴 -> 연결 끊겼으면 종료.
	if (Socket->HasPendingData(PendingDataSize) == false || PendingDataSize <= 0)
		return false;
	int32 Offset = 0;

	while (Size > 0)
	{
		int32 NumRead = 0;
		Socket->Recv(Results + Offset, Size, OUT NumRead);
		check(NumRead <= Size); //읽은게 size보다 크면 크래시

		if (NumRead <= 0)
			return false;
		Offset += NumRead;
		Size -= NumRead;
	}
	
	return true;
}

void RecvWorker::Destroy()
{
	Running = false;
}

bool RecvWorker::ReceivePacket(TArray<uint8>& OutPacket)
{
	// 패킷 헤더 파싱
	const int32 HeaderSize = sizeof(FPacketHeader);
	TArray<uint8> HeaderBuffer;
	HeaderBuffer.AddZeroed(HeaderSize);

	if (ReceiveDesiredBytes(HeaderBuffer.GetData(), HeaderSize) == false)
		return false;
	//Id, Size 추출
	FPacketHeader Header;
	
	
	{
		FMemoryReader Reader(HeaderBuffer);
		Reader << Header;
		UE_LOG(LogTemp, Log, TEXT("Recv PacketId : %d, PacketSize : %d"), Header.PacketID, Header.PacketSize);
	}
	//패킷 헤더 복사
	OutPacket = HeaderBuffer;
	//패킷 내용 파싱
	TArray<uint8> PayloadBuffer;
	const int32 PayloadSize = Header.PacketSize - HeaderSize;
	if (PayloadSize == 0)
		return true;
	OutPacket.AddZeroed(PayloadSize);

	if (ReceiveDesiredBytes(&OutPacket[HeaderSize], PayloadSize))
		return true;
	return false;
}

/*----------------------------------------------------------------------------*\
|                                                                              |
|                                 SendWorker                                   |
|                                                                              |
\*----------------------------------------------------------------------------*/
SendWorker::SendWorker(FSocket* Socket, TSharedPtr<PacketSession> Session) : Socket(Socket), SessionRef(Session)
{
	Thread = FRunnableThread::Create(this, TEXT("SendWorkerThread"));
}

SendWorker::~SendWorker()
{
}

bool SendWorker::Init()
{
	GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Green, FString::Printf(TEXT("Send Thread Init")));

	return true;
}

uint32 SendWorker::Run()
{
	while (Running)
	{
		SendBufferRef SendBuffer;

		if (TSharedPtr<PacketSession> Session = SessionRef.Pin())
		{
			if (Session->SendPacketQueue.Dequeue(OUT SendBuffer))
			{
				SendPacket(SendBuffer);
			}
		}
		// Sleep?
	}
	return 0;
}

void SendWorker::Exit() {}

bool SendWorker::SendPacket(SendBufferRef SendBuffer)
{
	if (SendDesiredBytes(SendBuffer->GetBuffer(), SendBuffer->GetDataLen()) == false)
		return false;
	return true;
}

void SendWorker::Destroy()
{
	Running = false;
}

bool SendWorker::SendDesiredBytes(const uint8* Buffer, int32 Size)
{
	while (Size > 0)
	{
		int32 BytesSent = 0;
		if (Socket->Send(Buffer, Size, BytesSent) == false)
			return false;

		Size -= BytesSent;
		Buffer += BytesSent;
	}
	return true;
}

 

Client.h

언리얼 프로젝트 생성하면 처음부터 있는 파일인데, 왜인지 게임서버강의에서는 이 위치에 SendBuffer, PacketHeader 그리고 쉐어드포인터 매크로를 넣어놨다. 여기있는것들은 모두 ServerPacketHandler에서 쓰이는 것들. 나중에 서버패킷핸들러 헤더에  include한다.

 

SendBuffer는 게임서버에 있던걸 그대로 가져왔고 몇가지 언리얼용으로 바꾼게 전부. 기능은 똑같다.

#pragma once

#include "CoreMinimal.h"

struct PacketHeader
{
	uint16 id;
	uint16 size;
};

class SendBuffer : public TSharedFromThis<SendBuffer>
{
public:
	SendBuffer() = default;
	SendBuffer(BYTE* buffer, uint32 dataLen);
	SendBuffer(uint32 dataLen);//for AppendBuffer
	~SendBuffer();

public: //use at fill buffer
	bool AppendBuffer(BYTE* buffer, uint32 dataLen);
	BYTE* WritePos() { return &_buffer[_writePos]; }

public: //use at write
	BYTE* GetBuffer() { return &_buffer[0]; }
	BYTE* GetPosPtr(uint32 pos) { return &_buffer[pos]; }
	uint32 GetFreeSize() { return _allocSize - _writePos; }
	uint32 GetDataLen() { return _writePos; }
	bool OnWrite(uint32 dataLen);

private:
	uint32			_writePos = 0;
	uint32			_allocSize = 0;
	TArray<BYTE>	_buffer;
};

#define USING_SHARED_PTR(name) using name##Ref = TSharedPtr<name>;

class Session;
class PacketSession;

USING_SHARED_PTR(Session);
USING_SHARED_PTR(PacketSession);
USING_SHARED_PTR(SendBuffer);

Client.cpp

#include "Client.h"
#include "Modules/ModuleManager.h"

IMPLEMENT_PRIMARY_GAME_MODULE( FDefaultGameModuleImpl, Client, "Client" );


SendBuffer::SendBuffer(BYTE* buffer, uint32 dataLen)
	: _writePos(dataLen), _allocSize(dataLen)
{
	_buffer.SetNum(dataLen);
	memcpy(&_buffer[0], buffer, dataLen);
}

SendBuffer::SendBuffer(uint32 dataLen) : _writePos(0), _allocSize(dataLen)
{
	_buffer.SetNum(dataLen);
}

SendBuffer::~SendBuffer() {}

bool	SendBuffer::AppendBuffer(BYTE* buffer, uint32 dataLen)
{
	if (_writePos + dataLen > _allocSize)
		return false;
	memcpy(&_buffer[_writePos], buffer, dataLen);
	_writePos += dataLen;
	return true;
}

bool SendBuffer::OnWrite(uint32 dataLen)
{
	if (dataLen > GetFreeSize())
		return false;
	_writePos += dataLen;
	return true;
}

수신 테스트

이렇게 하고, 우선 패킷 수신을 확인해보기 위해 서버 메인스레드에서 아래와 같이 메인문에 추가했다.

1초마다 연결된 클라이언트들에게 HelloWorld! 를 보낸다.

while (true)
{
	Protocol::S_CHAT pkt;
	pkt.set_msg("HelloWorld!");
	auto sendBuffer = ClientPacketHandler::MakeSendBuffer(pkt);

	service->broad_cast_test(sendBuffer);
	this_thread::sleep_for(1s);
}

 

클라이언트의 아래 코드에서 확인한다.

RecvWorker가 1개의 온전한 패킷을 받으면 패킷세션의 큐에 넣는데 그때마다 출력되게 만들었다. (위에도 똑같이 있음)

uint32 RecvWorker::Run()
{
	while (Running)
	{
		TArray<uint8> Packet;
		if (ReceivePacket(OUT Packet)) //내부에서 온전한 패킷 하나만큼 담을때까지 대기/while 돔
		{
			if (TSharedPtr<PacketSession> Session = SessionRef.Pin())
			{
				Session->RecvPacketQueue.Enqueue(Packet);
				GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Green, TEXT("Recv Some Data From Server!"));
			}
		}
	}
	return 0;
}

 

빌드를 하고 서버를 켜놓은 상태로 언리얼에디터에서 스타트 해본다.

1초마다 보낸 패킷을 언리얼 클라에서 정상적으로 받고있다.


패킷을 받아서 큐에 넣는 것만 해봤는데, 다음엔 서버패킷핸들러를 살려서 쓰고, 패킷 송신도 해보겠다.

지금까지의 git 버전

언리얼 클라이언트

 

Feat: PacketSession, NetworkWorkers · Dodontak/Project_Island_Client@714c8bd

패킷 세션과 RecvWorker, SendWorker 추가.

github.com

서버는 수신 테스트 부분만 바뀌어서 새로 Commit하지 않았음.

 

+ Recent posts