리눅스OS에서 epoll 로I/O Multiplexing 구현한건 아래 게시글 참조.

 

I/O Multiplexing epoll 채팅서버 만들기 with.C/C++

개발환경은 도커 debian trixie 컨테이너 입니다.ai의 도움을 받아 공부하며 작성된 게시글입니다.I/O Multiplexing여러 파일 디스크립터를 동시에 감시하여, 블로킹 없이 준비된(바로 실행 가능한) I/O만

dodontak.tistory.com


I/O Multiplexing

여러 파일 디스크립터를 동시에 감시하여, 블로킹 없이 준비된(바로 실행 가능한) I/O만 처리할 수 있도록 하는 기술이다. 특히 데이터 도착 시점을 예측할 수 없어서 준비됐는지 아닌지 모르는 네트워크 소켓이나 파이프와 같은 이벤트 기반 I/O에 주로 사용된다.

IOCP

epoll과의 차이

select, poll, epoll이 전통적인 I/O Multiplexing 모델 이라면, IOCP는 한 단계 더 진화한 모델이다.

epoll은 등록한 소켓을 쓰기나 읽기를 할 수 있는지 감지하고, 감지된 소켓에 대해 읽거나 쓰는 행위는 유저레벨에서 코드로 구현해야한다. 하지만 IOCP는 이 작업을 커널에서 대신해주고, 쓰기나 읽기가 완료되면 그것을 통지만 해준다. 그래서 Completion이 붙는 것.

 

개인적으로 느낀 가장 큰 차이는 epoll은 소켓이 논블로킹이어도 읽기나 쓰기작업이 가능할 때는 write/send, read/recv 함수에서 어쩔 수 없이 읽기/쓰기 작업이 완료될때까지 대기해야한다(Synchronous 하다). 하지만 IOCP는 읽기/쓰기를 아예 커널에 맡기고 바로 리턴해버리고, read/write작업은 커널이 한다(Asynchronous 하다). 데이터 복사마저 비동기로 처리해주기 때문에 유저레벨에서는 그동안 다른 일을 할 수 있다. 멀티플랙싱에 비동기가 내장되어있는 느낌.

 

epoll의 방식을 Reactor 패턴 : "준비됐는지 알려주는데, 처리는 네가 해라"

IOCP의 방식을 Proactor 패턴 : "처리까지 다 마쳤다고 알림"


IOCP 에코서버 만들기

IOCP를 사용한다면 보통 멀티스레드를 쓰겠지만 반드시 멀티스레드일 필요는 없으니 부가적인건 빼고 IOCP 서버에서 딱 필요한 부분만 만들어보자.

 

사용하는 함수/구조체 알아보기

WSAStartup 함수

winsock2.h 에 있다.

Windows Sockets API 초기화하는 함수. 윈도우는 리눅스와 달리 네트워크 기능을 사용하기 위해 명시적으로 관련 라이브러리(DLL)를 메모리에 로드하고 설정하는 과정이 필요하다. 

int WSAStartup(
  [in]  WORD      wVersionRequired, // Winsock 버전 설정
  [out] LPWSADATA lpWSAData
);
#include <winsock2.h>
#pragma comment(lib, "ws2_32.lib") // 라이브러리 링크

int main()
{
	WSADATA wsaData;
	if (WSAStartup(MAKEWORD(2, 2), &wsaData))
		HandleError("Failed to initialize WSA");
    /*...*/
}

WORD는 2바이트 데이터. MAKEWORD로 앞 바이트와 뒷바이트에 각각 2, 2를 넣어서 사용할 버전을 설정하는 것.

  • 성공 시 0 반환 실패시 오류코드 반환.

CreateIoCompletionPort 함수

winsock2.h 에 있다.

IOCP핸들 만들때도 쓰이고, 소켓을 IOCP핸들에 등록할때도 쓰인다. 요즘 만들었다면 두 기능을 분리했겠지만 옛날에 만들어서 두개의 기능이 하나의 함수에 통합되어있다.

HANDLE WINAPI CreateIoCompletionPort(
  _In_     HANDLE    FileHandle,
  _In_opt_ HANDLE    ExistingCompletionPort,
  _In_     ULONG_PTR CompletionKey,
  _In_     DWORD     NumberOfConcurrentThreads
);
#include <winsock2.h>
#pragma comment(lib, "ws2_32.lib") // 동적라이브러리

int main()
{
	/*...*/
	HANDLE hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
	if (hIOCP == nullptr)
		HandleError("Failed to create IOCP Handle");
    /*...*/
}
  • 2번인자가 NULL일때 새 IOCP핸들을 반환
  • 2번인자에 IOCP핸들을 넣었을 때 1번인자로 넣은 핸들을 감시 등록하고, 1번인자를 그대로 반환
  • 실패 시 NULL 반환

WSASocket 함수

winsock2.h 에 있다.

Winsock에서 소켓 만드는 함수. 유닉스의 socket 함수보다 더 세세한 설정이 있다. 

SOCKET WSAAPI WSASocketW( //WSASocketW 랑 WSASocket 이랑 똑같은거임. define 되어있다.
  [in] int                 af, // 주소 체계 AF_INET AF_INET6
  [in] int                 type, // 소켓 타입(tcp, udp) SOCK_STREAM SOCK_DGRAM
  [in] int                 protocol, // 프로토콜. 0 넣으면 위 설정에따라 알아서 결정
  [in] LPWSAPROTOCOL_INFOW lpProtocolInfo, // 특정 프로토콜 상세정보 기입용. 일반적으로 NULL 넣음
  [in] GROUP               g, // 소켓 그룹 식별자. 현재는 대부분 0으로 넣음.
  [in] DWORD               dwFlags // 소켓 속성 지정. 비동기 입출력을 위해 WSA_FLAG_OVERLAPPED 사용
);

 

#include <winsock2.h>
#pragma comment(lib, "ws2_32.lib") // 동적라이브러리

int main()
{
	/*...*/
	SOCKET listenSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
    if (listenSocket == INVALID_SOCKET)
		HandleError("Failed to create listenSocket");
    /*...*/
}

일반적으로는 예시의 설정만 사용한다. 자세히 알고싶다면 구글링해서 msbuild 문서 보자. 뭐가 많더라.

  • 오류가 발생하지 않으면 새 소켓 반환
  • 오류 발생하면 INVALID_SOCKET 반환, WSAGetLastError 함수로 에러코드 확인 가능.

 

GetQueuedCompletionStatus 함수

winsock2.h 에 있다.

IOCP의 핵심. 완료 통지가 올때까지 대기하다가 완료통지가 커널의 queue에 들어가면 그걸 꺼내서 유저에게 알려주는 함수.

BOOL GetQueuedCompletionStatus(
  [in]  HANDLE       CompletionPort, // IOCP 핸들
        LPDWORD      lpNumberOfBytesTransferred, // 실제로 전송된 바이트 수
  [out] PULONG_PTR   lpCompletionKey, // CreateIoCompletionPort 호출 시 설정한 키 (예: 세션 정보)
  [out] LPOVERLAPPED *lpOverlapped, // 비동기 함수 호출 시 전달했던 OVERLAPPED 구조체 주소
  [in]  DWORD        dwMilliseconds // 대기 시간 (INFINITE 면 작업이 생길 때까지 무한 대기)
);

아직 소개하지 않았지만 WSASend, WSARecv, AcceptEx, ConnectEx, DisconnectEx 와 같은 함수를 사용해 IO작업을 커널에 요청한다. 그 뒤에 맡긴 일이 완료됐을 때 커널의 queue에 완료 통지가 쌓이면, 그걸 이 함수로 꺼내서 확인한다.

이중에 중요한건 3번째 인자인 Overlapped인데, 보통 요청할 때 완료 이후 필요한 정보를 Overlapped를 상속받은 구조체에 적어 넣기 때문에 IO작업 이후의 처리를 하는데 매우 유용하게 쓰인다. overlapped** 인자로 넣어 overlapped*를 OUT으로 받는다.

  • 성공하면 true(0이 아닌 값), 실패하면 false(0)를 반환한다. 실패 이유는 GetLastError 함수로 확인할 수 있다.

OVERLAPPED 구조체

winsock2.h 에 있다. WSAOVERLAPPED 라는 것도 있는데 내부적으로 동일해서 호환이 된다고 한다.

typedef struct _OVERLAPPED {
  ULONG_PTR Internal; // [내부용] 운영체제가 작업 상태를 기록 (에러 코드 등)
  ULONG_PTR InternalHigh; // [내부용] 전송된 바이트 수를 기록
  union {
    struct {
      DWORD Offset; // 파일 입출력 시 사용 (소켓에서는 무시)
      DWORD OffsetHigh; // 파일 입출력 시 사용 (소켓에서는 무시)
    } DUMMYSTRUCTNAME;
    PVOID Pointer; // 예약됨
  } DUMMYUNIONNAME;
  HANDLE    hEvent; // [중요] 작업 완료 시 신호를 받을 이벤트 핸들
} OVERLAPPED, *LPOVERLAPPED;

이걸 상속하거나 포함하는 구조체를 만들어서 세션이나 enum값 같이 유저레벨에서 필요한 데이터를 더 담아 보낸다.

 

비동기 IO 요청 함수들

이 함수들로 IO 요청을 하고, GetQueuedCompletionStatus에서 완료통지를 받는다.

BOOL AcceptEx(
  [in]  SOCKET       sListenSocket,
  [in]  SOCKET       sAcceptSocket,
  [in]  PVOID        lpOutputBuffer,
  [in]  DWORD        dwReceiveDataLength,
  [in]  DWORD        dwLocalAddressLength,
  [in]  DWORD        dwRemoteAddressLength,
  [out] LPDWORD      lpdwBytesReceived,
  [in]  LPOVERLAPPED lpOverlapped
);

int WSAAPI WSARecv(
  [in]      SOCKET                             s,
  [in, out] LPWSABUF                           lpBuffers,
  [in]      DWORD                              dwBufferCount,
  [out]     LPDWORD                            lpNumberOfBytesRecvd,
  [in, out] LPDWORD                            lpFlags,
  [in]      LPWSAOVERLAPPED                    lpOverlapped,
  [in]      LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);

int WSAAPI WSASend(
  [in]  SOCKET                             s,
  [in]  LPWSABUF                           lpBuffers,
  [in]  DWORD                              dwBufferCount,
  [out] LPDWORD                            lpNumberOfBytesSent,
  [in]  DWORD                              dwFlags,
  [in]  LPWSAOVERLAPPED                    lpOverlapped,
  [in]  LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);

이외에도 ConnectEx, DisconnectEx 가 있다. WSARecv와 WSASend는 winsock2.h에 있다.

AcceptEx, ConnectEx, DisconnectEx는   mswsock.h에 있고, 이 세개는 표준이 아니라 확장함수다. 내 환경에서는 그냥 사용할 수 있었는데,  mswsock.dll의 확장 함수들은 공급자마다 구현이 다를 수 있어서 런타임에 동적으로 가져와야 올바르게 동작한다고 한다. 런타임을 동적으로 가져오는 부분은 아래 소스코드에서 주석으로 처리해놨다.

소스코드

생각보다 소스코드 양이 좀 많은데 간략하게 흐름을 보자면 이렇다.

  1. WSA 초기화
  2. IOCP 핸들 생성
  3. listen 소켓 생성, bind, listen
  4. listen 소켓을 IOCP 핸들에 감시 등록
  5. AcceptEx로 비동기 accept 요청.
    기존 accept와 다른점은 함수가 클라이언트 소켓을 리턴하지 않고, 클라이언트 소켓을 미리 만들어서 넣어줘야 한다.
  6. 그리고 GetQueuedCompletionStatus에서 완료통지 대기함.
  7. 누군가 Connect를 하면 커널이 accept를 마치고 완료통지를 함.
  8. 대기중이던 GetQueuedCompletionStatus에서 깨어남. Overlapped를 확인하니 accept 요청의 완료 통지임을 확인.
  9. 새 클라이언트가 들어왔으니 해당 클라이언트에 대해 WSARecv로 읽기 요청. 그리고 또 accpet도 계속 해야하니 listen 소켓에 대해서도 AcceptEx 요청. accept는 5~9 루프를 돌며 계속 받아줄 수 있음.
  10. 클라이언트에게 메시지가 와서 recv 작업 마치면 GetQueuedCompletionStatus에서 Recv요청의 완료통지 확인.
  11. recv에 온 데이터를 그대로 클라이언트에게 WSASend로 send 비동기 요청 함.
  12. ...

이런식으로 IOCP 에코서버를 구현해봤다.

#include <array>
#include <vector>
#include <iostream>

// WSAStartup SOCKADDR_IN WSASocket bind listen 
// RecvEx SendEx CreateIoCompletionPort GetQueuedCompletionStatus
#include <winsock2.h>

// AcceptEx LPFN_CONNECTEX LPFN_DISCONNECTEX LPFN_ACCEPTEX
#include <mswsock.h>

#pragma comment(lib, "ws2_32.lib") // 동적라이브러리 링크용
#pragma comment(lib, "mswsock.lib") // 동적라이브러리 링크용

//LPFN_ACCEPTEX		fnAcceptEx = nullptr;

using namespace std;

enum class IO_TYPE { ACCEPT, RECV, SEND };

struct OverlappedEx {
	WSAOVERLAPPED overlapped; // [필수] 반드시 첫 번째 멤버!
	SOCKET socket;            // 어떤 소켓의 작업인가?
	IO_TYPE type;             // 읽기인가, 쓰기인가, 접속인가?
};

array<BYTE, 1000> recvBuffer;
array<BYTE, 1000> sendBuffer;

void HandleError(const char* errmsg);
void RegisterAccept(SOCKET listenSocket);
void RegisterRecv(SOCKET clientSocket);
void RegisterSend(SOCKET clientSocket, DWORD sendLen);

int main()
{
	// WSA 초기화 및 IOCP 생성
	WSADATA wsaData;
	if (WSAStartup(MAKEWORD(2, 2), &wsaData))
		HandleError("Failed to initialize WSA");

	HANDLE hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
	if (hIOCP == nullptr)
		HandleError("Failed to create IOCP Handle");

	// listenSocket 생성
	SOCKET listenSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
	if (listenSocket == INVALID_SOCKET)
		HandleError("Failed to create listenSocket");

	SOCKADDR_IN serverAddr;
	memset(&serverAddr, 0, sizeof(serverAddr));
	serverAddr.sin_family = AF_INET;
	serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);
	serverAddr.sin_port = htons(9000);

	if (bind(listenSocket, (SOCKADDR*)&serverAddr, sizeof(serverAddr)) == SOCKET_ERROR)
		HandleError("Failed to bind listenSocket");

	if (listen(listenSocket, SOMAXCONN) == SOCKET_ERROR)
		HandleError("Failed to listen listenSocket");

	// listenSocket을 IOCP에 등록
	CreateIoCompletionPort((HANDLE)listenSocket, hIOCP, (ULONG_PTR)listenSocket, 0);

	/*
	 AcceptEx 함수 포인터 가져오기
	SOCKET dummySocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
	GUID guid = WSAID_ACCEPTEX;
	LPVOID fn = reinterpret_cast<LPVOID*>(&fnAcceptEx);
	DWORD bytes = 0;
	WSAIoctl(dummySocket, SIO_GET_EXTENSION_FUNCTION_POINTER, &guid, sizeof(guid), fn, sizeof(fn), & bytes, NULL, NULL);
	closesocket(dummySocket);
	*/

	RegisterAccept(listenSocket);

	vector<SOCKET> clientSockets;

	while (true)
	{
		DWORD numOfBytes = 0;
		ULONG_PTR completionKey;
		OverlappedEx* overlapped = nullptr;

		BOOL bRet = GetQueuedCompletionStatus(hIOCP, &numOfBytes, &completionKey, (LPOVERLAPPED*)&overlapped, INFINITE);

		if (overlapped->type == IO_TYPE::ACCEPT)
		{
			// AcceptEx 완료. 새 클라이언트 접속 성공.
			SOCKET clientSocket = overlapped->socket;
			clientSockets.push_back(clientSocket);
			// 새 클라이언트 소켓을 IOCP에 등록
			CreateIoCompletionPort((HANDLE)clientSocket, hIOCP, (ULONG_PTR)clientSocket, 0);

			RegisterRecv(clientSocket);
			RegisterAccept(listenSocket);
		}
		else if (overlapped->type == IO_TYPE::RECV)
		{
			// 클라이언트로부터 데이터 수신 완료 recvBuffer에 데이터가 담겨있음
			SOCKET clientSocket = overlapped->socket;
			memcpy(sendBuffer.data(), recvBuffer.data(), numOfBytes); // 받은 데이터를 그대로 보내는 에코 서버

			RegisterSend(clientSocket, numOfBytes);
			RegisterRecv(clientSocket);
		}
		else if (overlapped->type == IO_TYPE::SEND)
		{

		}
	}
}

void HandleError(const char* errmsg)
{
	cerr << errmsg << endl;
	exit(1);
}

void RegisterAccept(SOCKET listenSocket)
{
	SOCKET newClientSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
	OverlappedEx* acceptOverlapped = new OverlappedEx();
	acceptOverlapped->type = IO_TYPE::ACCEPT;
	acceptOverlapped->socket = newClientSocket;
	char acceptBuffer[(sizeof(SOCKADDR_IN) + 16) * 2]; // AcceptEx가 요구하는 버퍼 크기
	DWORD bytes = 0;
	AcceptEx(listenSocket, newClientSocket, acceptBuffer, 0, sizeof(SOCKADDR_IN) + 16,
		sizeof(SOCKADDR_IN) + 16, &bytes, &acceptOverlapped->overlapped);
}

void RegisterRecv(SOCKET clientSocket)
{
	WSABUF wsaBuf;
	wsaBuf.buf = reinterpret_cast<char*>(recvBuffer.data());
	wsaBuf.len = recvBuffer.size();

	DWORD numOfBytes = 0;
	DWORD flags = 0;

	OverlappedEx* recvOverlapped = new OverlappedEx();
	recvOverlapped->type = IO_TYPE::RECV;
	recvOverlapped->socket = clientSocket;

	if (SOCKET_ERROR == WSARecv(clientSocket, &wsaBuf, 1, OUT & numOfBytes, OUT & flags,
		(LPWSAOVERLAPPED)recvOverlapped, nullptr))
	{
		int errorCode = WSAGetLastError();
		if (errorCode != WSA_IO_PENDING)
			HandleError("Failed to post WSARecv");
	}
}

void RegisterSend(SOCKET clientSocket, DWORD sendLen)
{
	OverlappedEx* sendOverlapped = new OverlappedEx();
	sendOverlapped->type = IO_TYPE::SEND;
	sendOverlapped->socket = clientSocket;

	vector<WSABUF> wsaBufs;
	WSABUF wsaBuf;
	wsaBuf.buf = reinterpret_cast<char*>(sendBuffer.data());
	wsaBuf.len = sendLen;
	wsaBufs.push_back(wsaBuf);

	DWORD numOfBytes = 0;

	if (SOCKET_ERROR == WSASend(clientSocket, wsaBufs.data(), static_cast<DWORD>(wsaBufs.size()),
		OUT & numOfBytes, 0, (LPWSAOVERLAPPED)sendOverlapped, nullptr))
	{
		int errorCode = WSAGetLastError();
		if (errorCode != WSA_IO_PENDING)
			HandleError("Failed to WSASend");
	}
}

 

버퍼를 전역으로 recv send 하나씩만 들고있고, 제대로된 에러처리나 disconnect 처리도 안했으니 주의. 아마 여러 클라가 동시에 사용하면 문제가 생길거다.

잘 작동한다!

 

 

 

+ Recent posts