Python 에서 Postgresql 를 다루기 위한 psycopg 라이브러리에 대해 공부해보자. Sync, Async 및 fastapi 에서의 사용 방법을 다룬다.
0. psycopg3 설치
1
2
3
$ pip install "psycopg[binary]"
$ pip install "psycopg[binary,pool]" # psycopg_pool 포함
1. psycopg 동기식 사용 (sync)
1) DB 접속
autocommit 옵션을 사용하면 모든 변경 사항이 즉시 반영된다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import psycopg
from psycopg import connection, sql
from psycopg.rows import class_row
from pydantic import BaseModel
def connect_db(DATABASE_URL: str) -> connection.Connection | None:
"""Connect to the PostgreSQL database server
참고:
- current_date => datetime.date (time 데이터 없음)
- current_timestamp, now() => datetime.datetime
"""
try:
conn = psycopg.connect(DATABASE_URL, autocommit=True)
# Test connection
with conn.cursor() as cur:
cur.execute("select current_timestamp, 'ok' as result")
data = cur.fetchone()
print("data[0]:", data[0], type(data[0]))
print("data[1]:", data[1], type(data[1]))
assert data[1] == "ok"
return conn
except psycopg.Error as e:
print("Unable to connect!", e)
def main(DATABASE_URL: str):
conn = connect_db(DATABASE_URL)
if conn is None:
return
if __name__ == "__main__":
# load_dotenv()
DATABASE_URL = os.getenv("DATABASE_URL")
main(DATABASE_URL)
2) pydantic 자료구조
dataclasses 또는 pydantic 을 사용하면 select 할 때 편리하다.
- insert 할 때는 클래스 사용이 별 도움이 못된다.
- pydantic 의 validator 데코레이터를 이용하면 제약사항을 기술할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from pydantic import BaseModel
from pytz import timezone
class TestRow(BaseModel):
id: int | None
logdate: datetime
info: dict
phones: List[str] = []
content: str | None = ""
@validator("content")
def content_default(cls, v):
print(f"validator(content): {v}") # => None
return v or "no data" # if null, set default value
def main(DATABASE_URL: str):
conn = connect_db(DATABASE_URL)
# 직접 정의하거나 dict 으로부터 parse_obj 로 생성
data = TestRow.parse_obj(
{
"logdate": datetime.now(timezone("Asia/Seoul")),
"info": {
"customer": "Alex Cross",
"items": {"product": "Tea", "qty": 6},
},
"phones": ["010-1234-5678", "064-1234-5678"],
"content": "얼어붙은 플레이어의 귀환 (미완) - `제리엠`",
}
)
print("data:", data)
3) insert 데이터
SQL 인젝션을 방어하기 위해 sql.Identifier
, sql.Literal
등을 적극 사용하자.
- json 데이터는 한번 dumps 시킨 후에 사용해야 한다
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from psycopg import connection, sql
def insert_data(conn: connection.Connection, data: TestRow):
with conn.cursor() as cur:
stmt = sql.SQL(
"INSERT INTO {} (logdate, info, phones, content) VALUES ({}, {}, {}, {})"
).format(
sql.Identifier("test"), # table name
sql.Literal(data.logdate),
sql.Literal(json.dumps(data.info)), # json -> str
sql.Literal(data.phones),
data.content,
)
# print("SQL:", stmt.as_string)
cur.execute(stmt)
def main(DATABASE_URL: str):
conn = connect_db(DATABASE_URL)
with conn:
try:
insert_data(conn, data)
except psycopg.Error as e:
print("Unable to insert data!", e)
4) select 데이터
row_factory
를 사용하여 class 생성자로 레코드를 가공하도록 하였다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from psycopg import connection, sql
from psycopg.rows import class_row
def select_data(conn: connection.Connection) -> TestRow | None:
# use row_factory with pydantic BaseModel
with conn.cursor(row_factory=class_row(TestRow)) as cur:
# Query the database and obtain data as Python objects.
cur.execute(
sql.SQL("SELECT * FROM {}").format(sql.Identifier("public", "test"))
)
obj = cur.fetchone()
if not obj:
print("No data!")
return None
return obj
def main(DATABASE_URL: str):
conn = connect_db(DATABASE_URL)
with conn:
try:
row = select_data(conn)
print("\n==>", row)
except psycopg.Error as e:
print("Unable to insert data!", e)
2. psycopg 비동기식 사용 (async)
psycopg3 에서는 asyncpg 등을 사용하지 않아도 자체적으로 비동기 처리를 지원한다.
1) DB 접속
autocommit=False
상태이면 반드시 commit()
을 해주어야 반영된다.
- autocommit 옵션의 기본값은 False 이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import asyncio
import psycopg
from psycopg import AsyncConnection, sql
async def connect_db(DATABASE_URL: str):
"""Connect to the PostgreSQL database server
참고:
- current_date => datetime.date (time 데이터 없음)
- current_timestamp, now() => datetime.datetime
"""
try:
aconn = await AsyncConnection.connect(DATABASE_URL) # autocommit=False
async with aconn:
# Test connection
async with aconn.cursor() as cur:
await cur.execute("select current_timestamp, 'ok' as result")
data = await cur.fetchone()
print("data[0]:", data[0], type(data[0]))
print("data[1]:", data[1], type(data[1]))
assert data[1] == "ok"
return True
except psycopg.Error as e:
print("Unable to connect!", e)
return False
async def main(DATABASE_URL: str):
if not await connect_db(DATABASE_URL):
print("cannot connect to db!")
return
if __name__ == "__main__":
# load_dotenv()
DATABASE_URL = os.getenv("DATABASE_URL")
# async call from main
loop = asyncio.get_event_loop()
asyncio.run(main(DATABASE_URL))
loop.close()
2) insert 데이터
비동기 연결 객체는 with 구문과 강하게 연결되어 있어서 함께 사용해야 한다.
- 다른 함수로 연결 객체를 전달하려면
with 구문
아래에서 해야 한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
async def insert_data(aconn: AsyncConnection, data: TestRow):
async with aconn.cursor() as cur:
stmt = sql.SQL(
"INSERT INTO {} (logdate, info, phones, content) VALUES ({}, {}, {}, {})"
).format(
sql.Identifier("test_async"), # table name
sql.Literal(data.logdate),
sql.Literal(json.dumps(data.info)), # json -> str
sql.Literal(data.phones),
data.content,
)
# print("SQL:", stmt.as_string)
await cur.execute(stmt)
await aconn.commit()
async def main(DATABASE_URL: str):
aconn = await AsyncConnection.connect(DATABASE_URL)
async with aconn:
loop.add_signal_handler(signal.SIGINT, aconn.cancel)
try:
await insert_data(aconn, data)
rows = await select_data(aconn)
for record in rows:
print(record)
except psycopg.Error as e:
print("Unable to insert data!", e)
if __name__ == "__main__":
# load_dotenv()
DATABASE_URL = os.getenv("DATABASE_URL")
# async call from main
loop = asyncio.get_event_loop()
asyncio.run(main(DATABASE_URL))
loop.close()
3) select 데이터
async/await 키워드 외에 특별한 사항은 없다. (asyncio 인터페이스)
1
2
3
4
5
6
7
8
async def select_data(aconn: AsyncConnection) -> TestRow | None:
# use row_factory with pydantic BaseModel
async with aconn.cursor(row_factory=class_row(TestRow)) as cur:
# Query the database and obtain data as Python objects.
await cur.execute(
sql.SQL("SELECT * FROM {}").format(sql.Identifier("public", "test_async"))
)
return await cur.fetchall()
3. psycopg_pool 을 이용한 fastapi 와 연계 사용
fastapi 에서 psycopg 를 사용하기 위해서는 psycopg_pool
이 필요하다.
참고
1) AsyncConnectionPool 생성 및 해제
- startup 이벤트 : AsyncConnectionPool 생성
- shutdown 이벤트 : AsyncConnectionPool 해제
- endpoint 사용시 : Pool 에서 async connection 객체를 얻어 사용
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from psycopg_pool import AsyncConnectionPool
from fastapi import FastAPI
app = FastAPI()
@app.on_event("startup")
def open_pool():
"""create database connection pool"""
app.state.pool = AsyncConnectionPool(DATABASE_URL, max_size=500)
@app.on_event("shutdown")
async def close_pool():
"""close database connection pool"""
await app.state.pool.close()
2) Pool 로부터 비동기 연결 객체를 가져와 사용하기
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@app.get("/my_data/")
async def get_my_data():
return await my_query(app.state.pool)
async def my_query(pool: AsyncConnectionPool):
async with pool.connection() as conn:
async with conn.cursor(row_factory=class_row(TestRow)) as cur:
await cur.execute("SELECT * FROM public.test_async")
rows = await cur.fetchall()
return {"data": rows}
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8000)
9. Review
비동기를 사용하는 경우 psycopg3 가 asyncpg 보다 더 사용자 친화적이다. <출처>
- psycopg3 관련 자료가 얼마 없다. psycopg2 를 잘 사용하고 있는데 굳이 옮길 필요가 있느냐라는 분위기이다. <참고>
- 별점 : psycopg3 1k, asyncpg 6k, aiopg 1.3k (마지막 업데이트 2022년 10월)
psycopg2 와 psycopg3(버전 3.0.15)의 성능 비교 <출처>
- psycopg3으로 SQL 쿼리를 실행하는 것이 psycopg2를 사용하는 것보다 메모리 효율성이 4~5배 더 높다는 것을 알 수 있었다.
- 그러나 insert 에 대해서 2.4 ~ 2.5 배 더 느렸다. (최적화가 미흡한 상태라 추측)
- 현재 psycopg3 최신 버전은
3.2.0.dev1
이다. 실험 당시와는 많이 달라졌음.
- 현재 psycopg3 최신 버전은
트랜잭션은 transaction 구문을 사용하는 것이 훨씬 편한다.
트랜잭션 상태에 따라 commit 또는 rollback 이 적용된다
1
2
3
4
5
6
7
8
9
10
11
try:
async with pool.connection() as conn:
await set_role(conn)
# 정상 처리시 commit, 실패시 rollback
async with conn.transaction(tx_name):
await conn.execute(stmt_new_person)
await conn.execute(stmt_new_order)
return True
except Exception as e:
print(f" - transaction[{tx_name}] fail: {e}")
return False
끝! 읽어주셔서 감사합니다.