JobTimer 클래스 추가

ms뒤에 JobQueue에 등록됐으면 하는 Job을 JobTimer에 시간순으로 저장한 뒤에, 스레드워커에서 실행시킨다.

#pragma once

struct JobData
{
	JobData(weak_ptr<JobQueue> owner, JobRef job) : owner(owner), job(job) {}

	weak_ptr<JobQueue>	owner;
	JobRef				job;
};

struct TimerItem
{
	bool operator<(const TimerItem& other) const
	{
		return executeTick > other.executeTick;
	}

	uint64 executeTick = 0;
	JobData* jobData = nullptr;
};

class JobTimer
{
public:
	void			Reserve(uint64 tickAfter, weak_ptr<JobQueue> owner, JobRef job);
	void			Distribute(uint64 now);
	void			Clear();

private:
	mutex _m;
	priority_queue<TimerItem>	_items;
	atomic<bool>				_distributing = false;
};
#include "pch.h"
#include "JobTimer.h"
#include "JobQueue.h"

void JobTimer::Reserve(uint64 tickAfter, weak_ptr<JobQueue> owner, JobRef job)
{
	const uint64 executeTick = ::GetTickCount64() + tickAfter;
	JobData* jobData = new JobData(owner, job);

	lock_guard<mutex> lock(_m);

	_items.push(TimerItem{ executeTick, jobData });
}

void JobTimer::Distribute(uint64 now)
{
	// 한 번에 1 쓰레드만 통과
	if (_distributing.exchange(true) == true)
		return;

	vector<TimerItem> items;

	{
		lock_guard<mutex> lock(_m);

		while (_items.empty() == false)
		{
			const TimerItem& timerItem = _items.top();
			if (now < timerItem.executeTick)
				break;

			items.push_back(timerItem);
			_items.pop();
		}
	}

	for (TimerItem& item : items)
	{
		if (JobQueueRef owner = item.jobData->owner.lock())
			owner->Push(item.jobData->job);

		delete item.jobData;
	}

	// 끝났으면 풀어준다
	_distributing.store(false);
}

void JobTimer::Clear()
{
	lock_guard<mutex> lock(_m);

	while (_items.empty() == false)
	{
		const TimerItem& timerItem = _items.top();
		delete timerItem.jobData;
		_items.pop();
	}
}

 

CoreGlobal

#pragma once

extern class DBConnectionPool* GDBConnectionPool;
extern class GlobalQueue* GGlobalQueue;
extern class JobTimer* GJobTimer;
DBConnectionPool* GDBConnectionPool = nullptr;
GlobalQueue* GGlobalQueue = nullptr;
JobTimer* GJobTimer = nullptr;

class CoreGlobal
{
public:
	CoreGlobal()
	{
		GDBConnectionPool = new DBConnectionPool();
		GGlobalQueue = new GlobalQueue();
		GJobTimer = new JobTimer();
	}
	~CoreGlobal()
	{
		delete GDBConnectionPool;
		delete GGlobalQueue;
		delete GJobTimer;
	}
}	GcoreGlobal;

JobQueue에 DoTimer 함수 추가

class JobQueue : public enable_shared_from_this<JobQueue>
{
public:
	/*...*/
	void DoTimer(uint64 tickAfter, function<void()>&& callback)
	{
		JobRef job = make_shared<Job>(std::move(callback));
		GJobTimer->Reserve(tickAfter, shared_from_this(), job);
	}

	template<typename T, typename Ret, typename... Args>
	void DoTimer(uint64 tickAfter, Ret(T::* memFunc)(Args...), Args... args)
	{
		shared_ptr<T> owner = static_pointer_cast<T>(shared_from_this());
		JobRef job = make_shared<Job>(owner, memFunc, std::forward<Args>(args)...);
		GJobTimer->Reserve(tickAfter, shared_from_this(), job);
	}

	void					ClearJobs() { _jobs.Clear(); }

public:
	/*...*/
};

ThreadManager

void ThreadManager::DistributeReservedJobs()
{
	const uint64 now = ::GetTickCount64();

	GJobTimer->Distribute(now);
}

GameServer

void WorkerThread(ServiceRef service)
{
	while (true)
	{
		LEndTickCount = ::GetTickCount64() + WORKER_TICK;
		service->GetIocpCore()->Dispatch(10);

		ThreadManager::DoGlobalQueueWork();

		ThreadManager::DistributeReservedJobs();
	}
}

테스트

일단 더미클라이언트에서 Chat을 보내는 부분은 주석처리해놓자.

지금은 클라이언트가 TCP/TLS 접속에 성공하면 Login 패킷을 보내고, 패킷이 오간다음 EnterRoom을 한다.

void Handle_C_ENTER_ROOM(const PacketSessionRef& session, const Protocol::C_ENTER_ROOM& pkt)
{
	Protocol::S_ENTER_ROOM response;
	/*...*/

	//room->DoAsync(&Room::Enter, player);
	room->DoTimer(5000, &Room::Enter, player); //5초뒤에 S_CHAT 패킷 전달

	response.set_success(true);
    
	this_thread::sleep_for(chrono::seconds(2)); //2초 뒤에 ENTER_ROOM 성공 패킷 전달
	session->Send(ClientPacketHandler::MakeSendBuffer(response)); 
}

기존에 패킷핸들러에서 DoAsync로 enter를 호출했던걸 DoTimer로 변경했다.

시간차이를 확인할 수 있도록 S_ENTER_ROOM 패킷을 보내기 전에 2초 sleep을 넣어줬다.

DoAsync (주석처리) 사용 시
DoTimer 5000 (5초 딜레이) 사용 시

 

DoAsync를 사용하면 Chat 패킷이 먼저 전송되기 때문에 Player x entered Room 이 먼저 출력되고

DoTimer를 사용하면 ENTER_ROOM 성공 패킷이 먼저 전송되기 때문에 enter room success 패킷이 먼저 전송된다.


지금까지의 git 버전 (테스트용 코드는 뺐음)

 

Feat: JobTimer · Dodontak/Project_Island_GameServer@7434a4a

JobTimer 추가. 지정한 시간이 지나면 JobQueue에 Job을 Push해준다.

github.com

 

 

+ Recent posts