RDBMS 다루기 - SQLAlchemy 활용
pip install sqlalchemy
pip install mysql-connector-python
MySqlWorkBench에서 실행
/* sample db 및 데이터 생성 */
DROP DATABASE if exists blog_db;
CREATE DATABASE blog_db;
use blog_db;
DROP TABLE if exists blog;
CREATE TABLE blog (
id int(11) NOT NULL AUTO_INCREMENT,
title varchar(200) NOT NULL,
author varchar(100) NOT NULL,
content varchar(4000) NOT NULL,
image_loc varchar(300) DEFAULT NULL,
modified_dt datetime NOT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
INSERT INTO blog_db.blog(title, author, content, modified_dt) values('테스트 title 1', '둘리', '테스트 컨텐츠 1', now());
INSERT INTO blog_db.blog(title, author, content, modified_dt) values('테스트 title 2', '길동', '테스트 컨텐츠 2', now());
INSERT INTO blog_db.blog(title, author, content, modified_dt) values('테스트 title 3', '도넛', '테스트 컨텐츠 3', now());
INSERT INTO blog_db.blog(title, author, content, modified_dt) values('테스트 title 4', '희동', '테스트 컨텐츠 4', now());
COMMIT;
DB_Fundamentals/db_basic.py
from sqlalchemy import create_engine, text
from sqlalchemy.pool import QueuePool
from sqlalchemy.exc import SQLAlchemyError
# database connection URL
# dialect+driver://username:password@host:port/database
DATABASE_CONN = "mysql+mysqlconnector://root:root1234@127.0.0.1:3306/blog_db"
# Engine 생성
engine = create_engine(DATABASE_CONN, poolclass=QueuePool,
pool_size=10, max_overflow=0)
try:
# Connection 얻기
conn = engine.connect()
# SQL 선언 및 text로 감싸기
query = "select id, title from blog"
stmt = text(query)
# SQL 호출하여 CursorResult 반환.
result = conn.execute(stmt)
print("type result:", result)
rows = result.fetchall()
print(rows)
# print(type(rows[0]))
# print(rows[0].id, rows[0].title)
# print(rows[0][0], rows[0][1])
# print(rows[0]._key_to_index)
result.close()
except SQLAlchemyError as e:
print(e)
finally:
# close() 메소드 호출하여 connection 반환.
conn.close()
DB_Fundamentals/pool_practice.py
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.pool import QueuePool, NullPool
# database connection URL
DATABASE_CONN = "mysql+mysqlconnector://root:root1234@localhost:3306/blog_db"
# engine = create_engine(DATABASE_CONN)
engine = create_engine(DATABASE_CONN,
poolclass=QueuePool,
#poolclass=NullPool, # Connection Pool 사용하지 않음.
pool_size=10, max_overflow=2
)
print("#### engine created")
def direct_execute_sleep(is_close: bool = False):
conn = engine.connect()
query = "select sleep(5)"
# text()는 SQLAlchemy에서 SQL 문자열을 안전하게 실행 가능한 객체로 감싸는 함수
result = conn.execute(text(query))
# rows = result.fetchall()
# print(rows)
result.close()
# 인자로 is_close가 True일 때만 connection close()
if is_close:
conn.close()
print("conn closed")
# is_close=True 값이 True, False 일때 비교
for ind in range(20):
print("loop index:", ind)
direct_execute_sleep(is_close=True)
print("end of loop")
MYSQL Workbench의 root 계정으로 접속해서 확인
/* connection 모니터링 스크립트. root로 수행 필요. */
select * from sys.session where db='blog_db' order by conn_id;
is_close=True 
is_close=False 
DB_Fundamentals/context_practice.py
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.pool import QueuePool, NullPool
# database connection URL
DATABASE_CONN = "mysql+mysqlconnector://root:root1234@localhost:3306/blog_db"
engine = create_engine(DATABASE_CONN,
echo=True, # 내부적으로 동작하는 sql문을 보여줌
poolclass=QueuePool,
#poolclass=NullPool,
pool_size=10, max_overflow=0)
# conn.close() 하지 않아도 with절을 빠져나가면 자동으로 close() 처리
def context_execute_sleep():
with engine.connect() as conn:
query = "select sleep(5)"
result = conn.execute(text(query))
result.close()
#conn.close()
for ind in range(20):
print("loop index:", ind)
context_execute_sleep()
print("end of loop")
DB엔진 모듈화
DB_Fundamentals/database.py
from sqlalchemy import create_engine, Connection
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.pool import QueuePool, NullPool
from contextlib import contextmanager
# database connection URL
DATABASE_CONN = "mysql+mysqlconnector://root:root1234@localhost:3306/blog_db"
engine = create_engine(DATABASE_CONN,
poolclass=QueuePool,
#poolclass=NullPool, # Connection Pool 사용하지 않음.
pool_size=10, max_overflow=0)
def direct_get_conn():
try:
conn = engine.connect()
return conn
except SQLAlchemyError as e:
print(e)
raise e
@contextmanager
def context_get_conn():
try:
conn = engine.connect()
yield conn
except SQLAlchemyError as e:
print(e)
raise e
finally:
conn.close()
DB_Fundamentals/module_direct.py
from sqlalchemy import text, Connection
from sqlalchemy.exc import SQLAlchemyError
from database import direct_get_conn
def execute_query(conn: Connection):
query = "select * from blog"
stmt = text(query)
# SQL 호출하여 CursorResult 반환.
result = conn.execute(stmt)
rows = result.fetchall()
print(rows)
result.close()
def execute_sleep(conn: Connection):
query = "select sleep(5)"
result = conn.execute(text(query))
result.close()
for ind in range(20):
try:
conn = direct_get_conn()
execute_sleep(conn)
print("loop index:", ind)
except SQLAlchemyError as e:
print(e)
finally:
conn.close()
print("connection is closed inside finally")
print("end of loop")
DB_Fundamentals/module_context.py
from sqlalchemy import text, Connection
from sqlalchemy.exc import SQLAlchemyError
from database import context_get_conn
def execute_query(conn: Connection):
query = "select * from blog"
stmt = text(query)
# SQL 호출하여 CursorResult 반환.
result = conn.execute(stmt)
rows = result.fetchall()
print(rows)
result.close()
def execute_sleep(conn: Connection):
query = "select sleep(5)"
result = conn.execute(text(query))
result.close()
for ind in range(20):
try:
with context_get_conn() as conn:
execute_sleep(conn)
print("loop index:", ind)
except SQLAlchemyError as e:
print(e)
finally:
#conn.close()
#print("connection is closed inside finally")
pass
print("end of loop")
DB_Fundamentals/cursor_fetch.py
from sqlalchemy import text, Connection
from sqlalchemy.exc import SQLAlchemyError
from database import direct_get_conn
try:
# Connection 얻기
conn = direct_get_conn()
# SQL 선언 및 text로 감싸기
query = "select id, title from blog"
stmt = text(query)
# SQL 호출하여 CursorResult 반환.
result = conn.execute(stmt)
rows = result.fetchall() # row Set을 개별 원소로 가지는 List로 반환.
#rows = result.fetchone() # row Set 단일 원소 반환
#rows = result.fetchmany(2) # row Set을 개별 원소로 가지는 List로 반환.
# rows = [row for row in result] # List Comprehension으로 row Set을 개별 원소로 가지는 List로 반환
print(rows)
print(type(rows))
# 개별 row를 컬럼명를 key로 가지는 dict로 반환하기
# row_dict = result.mappings().fetchall()
# print(row_dict)
# 코드레벨에서 컬럼명 명시화
# row = result.fetchone()
# print(row._key_to_index)
# rows = [(row.id, row.title) for row in result]
# print(rows)
result.close()
except SQLAlchemyError as e:
print("############# ", e)
#raise e
finally:
# close() 메소드 호출하여 connection 반환.
conn.close()
DB_Fundamentals/bind_variable.py
from sqlalchemy import text, Connection
from sqlalchemy.exc import SQLAlchemyError
from database import direct_get_conn
from datetime import datetime
try:
# Connection 얻기
conn = direct_get_conn()
# SQL 선언 및 text로 감싸기
# 1, 2, 3, 4 | '둘리', '길동'
query = '''select id, title, author from blog where id = :id and author = :author
and modified_dt < :modified_dt'''
stmt = text(query)
bind_stmt = stmt.bindparams(id=1, author='둘리', modified_dt=datetime.now())
# SQL 호출하여 CursorResult 반환.
result = conn.execute(bind_stmt)
rows = result.fetchall() # row Set을 개별 원소로 가지는 List로 반환.
print(rows)
result.close()
except SQLAlchemyError as e:
print("############# ", e)
#raise e
finally:
# close() 메소드 호출하여 connection 반환.
conn.close()