DB Pool
DB connection Pool(이하 컨넥션 풀)은 데이터베이스와의 통신을 위한 Connection 변수의 모음을 말한다.
데이터베이스와 단순 통신을 한다면 어떠한 요청이 있을 때 컨넥션 변수를 만들어 연결 및 처리 후 해제되는 과정을 거치는데, 프로그램이 종료될 때 까지 계속 데이터베이스를 사용한다면 연결/해제의 과정이 필요할까?
프로그램이 실행될 때 컨넥션을 미리 만들어 두었다가 요청이 있을 때 마다 처리만 해준다면 “생성/해제”라는 무지막지한 비용을 아낄 수 있을 것이다.
미리 만들어 두었다가 요청이 있을 때 마다 처리
사실 전에 만들었던 쓰레드 풀에 데이터베이스 컨넥션만 추가한 것이 되겠다.
환경
데이터베이스 풀을 직접 만들어보기 위해 아래와 같이 환경을 구성 하였습니다.
- Windows 11 Pro (x64)
- MSYS2
- base-devel
- Mingw-w64-x86_64-toolchain
- Mingw-w64-x86_64-cmake
- Mingw-w64-x86_64-spdlog
- Mingw-w64-x86_64-postgresql
- PostgreSQL 13.5
- VS Code
- MSYS2
설계
디렉토리 및 파일 구성
- / (Root)
- build
- include
- common.h
- pool.h
- lib
- src
- main.cpp
- pool.cpp
- CMakelists.txt
파일 설명
- common.h
- src 폴더 하위 모든 소스에 공통적으로 사용할 객체 및 함수들의 선언이 되어있는 헤더
- pool.h / pool.cpp
- 데이터베이스 풀에 관련 내용 선언 및 정의가 되어있는 파일들
- main.cpp
- 데이터베이스 풀을 테스트할 메인 함수가 들어있는 파일
- CMakelists.txt
- cmake config file
구현
/** * @file common.h * @author JongBin Park (jongbin@devbin.kr) * @brief * @version 0.1 * @date 2022-03-29 * * @copyright Copyright (c) 2022 * */ #ifndef COMMON_H #define COMMON_H #include <spdlog/spdlog.h> #include <unistd.h> #endif /* COMMON_H */
- spdlog : C++에서 사용할 수 있는 아주 유용한 Log 라이브러리
- unistd.h : POSIX 운영체제 API
* 본인은 windows.h 대신 사용했고, 사실상 sleep() 밖에 쓰지 않음
pool.h
/** * @file pool.h * @author JongBin Park (jongbin@devbin.kr) * @brief * @version 0.1 * @date 2022-03-29 * * @copyright Copyright (c) 2022 * */ #ifndef POOL_H #define POOL_H #define WILL_DELETE_OBJECT #define MAX_CONNECTION 10 #define CONN_STR "dbname = 데이터베이스 명 \ user = 데이터베이스 유저명 \ password = 데이터베이스 패스워드 \ host = 데이터베이스 호스트 \ hostaddr = 호스트 IP \ port = 5432" #include "common.h" #include <string> #include "libpq-fe.h" #include <thread> #include <mutex> #include <condition_variable> #include <queue> #include <vector> #include <string> using namespace std; typedef struct tParam_s { unsigned int id; PGconn *conn; PGresult *pgResult; } tParam_t; typedef struct query_s { string query; void *(*callBack)(void *); } query_t; typedef vector<vector<char *>> * ResultTable; class Pool { private: static thread **pool; /* Critical section */ static mutex mt; static condition_variable cond; static queue<query_t *> queue; static void dequeueQuery(void *tParam); /* Critical section */ public: Pool(); ~Pool(); static void init(); static void enqueueQuery(string query, void *(*callBack)(void *)); }; #endif /* POOL_H */
- #define WILL_DELETE_OBJECT
- 동적으로 만든 객체들이 추 후 delete 가 되는지 확인하기 위한 더미 매크로
- #define MAX_CONNECTION 10
- 데이터베이스 컨텍션 풀의 갯수를 결정하는 매크로, 한 개의 쓰레드당 한 개의 컨넥션 풀을 가진다.
- #define CONN_STR
- PostgreSQL 에 접속하기 위한 정보가 들어간 매크로
- #include “libpq-fe.h”
- PostgreSQL 데이터베이스 C/C++ 라이브러리
- #include <thread>
- C++11 부터 적용된 내장된 쓰레드 호환성을 위해 pthread.h를 사용해도 상관 없음
- #include <mutex>
- 생성된 여러 쓰레드들의 동시 접근 제어를 하기위한 뮤텍스 헤더
- #include <condition_variable>\
- 특정 조건에 쓰레드를 제어하기 위한 상태 변수 헤더
- typedef struct sParam_s {} tParam_t
- 쓰레드가 생성될 때 갖는 구조체
- typedef query_s {} query_t
- 데이터베이스에 어떤 요청을 할 때 필요한 정보를 갖는 구제초
- typedef vector<vector<char *>> * ResultTable
- 데이터베이스 Select 결과를 담는 변수
- class Pool
- 데이터베이스 컨넥션 풀 클래스
pool.cpp
/** * @file pool.cpp * @author JongBin Park (jongbin@devbin.kr) * @brief connection pool * @version 0.1 * @date 2022-04-02 * * @copyright Copyright (c) 2022 * */ #include "pool.h" thread **Pool::pool; /* Critical section */ mutex Pool::mt; condition_variable Pool::cond; queue<query_t *> Pool::queue; /* Critical section */ void Pool::enqueueQuery(string query, void *(*callBack)(void *)) { spdlog::trace("start enqueueQuery()"); query_t *tmp = new query_t; tmp->query = query; tmp->callBack = callBack; mt.lock(); queue.push(tmp); cond.notify_one(); mt.unlock(); spdlog::trace("end enqueueQuery()"); return; } void Pool::dequeueQuery(void *tParam) { tParam_t *t = (tParam_t *)tParam; string query; void *(*callBack)(void *) = nullptr; spdlog::trace("start dequeueQuery() [ID : {:d}]", t->id); while(true) { unique_lock<mutex> lock(mt); if(queue.empty()) cond.wait(lock); else { query = queue.front()->query; callBack = queue.front()->callBack; delete queue.front(); queue.pop(); lock.unlock(); t->pgResult = PQexec(t->conn, query.c_str()); if(PQresultStatus(t->pgResult) == PGRES_EMPTY_QUERY) spdlog::info("empty result [ID : {:d}]", t->id); else if(PQresultStatus(t->pgResult) == PGRES_COMMAND_OK) spdlog::info("success to execute but not result [ID : {:d}]", t->id); else if(PQresultStatus(t->pgResult) == PGRES_TUPLES_OK) { spdlog::info("success to execute and have result [ID : {:d}]", t->id); unsigned int i, j, row, col = 0; row = PQntuples(t->pgResult); col = PQnfields(t->pgResult); if(row != 0) { /* FIXME: will be deleted : NO */ vector<vector<char *>> *executedQueryResult = new vector(row, vector<char *>(col, nullptr)); for(i=0;i<row;i++) { for(j=0;j<col;j++) { (*executedQueryResult)[i][j] = PQgetvalue(t->pgResult, i, j); spdlog::trace("{:s}", PQgetvalue(t->pgResult, i, j)); } } spdlog::info("sucess to fetch db result"); if(callBack != nullptr) { callBack((vector<vector<char *>> *)executedQueryResult); spdlog::info("sucess to execute callback function about query result"); } else spdlog::error("callback function pointer is nullptr"); } else spdlog::warn("result is empty"); } else if(PQresultStatus(t->pgResult) == PGRES_BAD_RESPONSE) { spdlog::error("server bad response [ID : {:d}]", t->id); spdlog::error(PQresultErrorMessage(t->pgResult)); } else if(PQresultStatus(t->pgResult) == PGRES_NONFATAL_ERROR) { spdlog::error("fail to execute but try again [ID : {:d}]", t->id); spdlog::error(PQresultErrorMessage(t->pgResult)); } else if(PQresultStatus(t->pgResult) == PGRES_FATAL_ERROR) { spdlog::error("fatal error [ID : {:d}]", t->id); spdlog::error(PQresultErrorMessage(t->pgResult)); } spdlog::trace(query.c_str()); PQclear(t->pgResult); } } spdlog::trace("end dequeueQuery() [ID : {:d}]", t->id); return; } Pool::Pool() { init(); return; } Pool::~Pool() { spdlog::trace("start ~Pool()"); unsigned int i = 0; for(i=0;i<MAX_CONNECTION;i++) { delete *(pool + i); } delete pool; spdlog::info("Success to destory DB pool"); spdlog::trace("end ~Pool()"); return; } void Pool::init() { spdlog::trace("start Pool()"); unsigned int i = 0; tParam_t *tmpParam; pool = new thread*[MAX_CONNECTION]; /* will be deleted : OK */ for(i=0;i<MAX_CONNECTION;i++) { tmpParam = new tParam_t; /* will be deleted : OK */ tmpParam->id = i; tmpParam->conn = PQconnectdb(CONN_STR); if(PQstatus(tmpParam->conn) != CONNECTION_OK) { spdlog::critical("fail to connect db server"); spdlog::critical("check the server and configure"); delete tmpParam; continue; } else { *(pool + i) = new thread(dequeueQuery, tmpParam); /* will be deleted : OK */ (*(pool + i))->detach(); usleep(100); spdlog::info("Success to create DB pool [ID : {:d}]", i); } } spdlog::trace("end Pool()"); return; }
- init()
- Pool 클래스 초기화
- static 으로 선언 되어 있기 때문에 필요한 것들은 대부분 new를 통해 동적할당 해줌
- enqueueQuery()
- 생성된 쓰레드가 바라보고 있는 큐에 요청을 넣는 함수
- 쿼리와 콜백 함수가 들어있는 구조체를 동적으로 할당 후 처리가 완료되면 delete
- dequeueQuery()
- 실제 생성되는 쓰레드 함수
- 무한으로 루프를 돌며 큐에 적재된 요청을 꺼내 처리
main.cpp
#include "common.h" #include "pool.h" void *printResult(void *param) { while(1==1) { if(param != nullptr) { spdlog::critical("value : {:s} [address={:p}]", ((*(ResultTable)param))[0][0], (void*)param); delete (ResultTable)param; break; } else { sleep(1); } } return nullptr; } int main() { spdlog::set_level(spdlog::level::info); spdlog::info("start main()"); unsigned int i = 0; Pool::init(); sleep(2); for(i=0;i<100000000000;i++) { Pool::enqueueQuery("select * from test2 where 1=1", printResult); } sleep(3600); spdlog::info("end main()"); return 0; }
- printResult()
- 콜백함수
- 큐에 넣은 요청이 완료 되면 실행되는 함수이며 select 의 결과 값을 인자로 받아 사용
- main()
- 프로그램이 시작될 때 실행되는 함수
- 컨넥션 풀을 초기화하고 10개의 컨넥션 풀을 갖고 천 억번의 요청