0%

flask + SQLAlchemy设置读写分离

参考文档

https://www.jb51.net/article/174365.htm
https://gist.github.com/trustrachel/6828122#file-routing-py

步骤

在配置中添加以下配置

1
2
3
4
5
SQLALCHEMY_DATABASE_URI = 'xxx'
SQLALCHEMY_BINDS = {
'xxx',
'xxx',
}

改写session

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from flask_sqlalchemy import SQLAlchemy, get_state
import sqlalchemy.orm as orm
from functools import partial

import logging

log = logging.getLogger(__name__)


class AutoRouteSession(orm.Session):

def __init__(self, db, autocommit=False, autoflush=False, **options):
self.app = db.get_app()
self._model_changes = {}
orm.Session.__init__(self, autocommit=autocommit, autoflush=autoflush,
bind=db.engine,
binds=db.get_binds(self.app), **options)

def get_bind(self, mapper=None, clause=None):
"""
根据配置及读写操作,自动更改数据库引擎
Args:
mapper:
clause:

Returns:

"""
try:
state = get_state(self.app)
except (AssertionError, AttributeError, TypeError) as err:
log.info("获取配置失败,使用默认数据库:{}".format(err))
return orm.Session.get_bind(self, mapper, clause)

# 如果没有设置SQLALCHEMY_BINDS,则默认使用SQLALCHEMY_DATABASE_URI
if state is None or not self.app.config['SQLALCHEMY_BINDS']:
if not self.app.debug:
log.debug("未获取数据库绑定信息(SQLALCHEMY_BINDS),使用默认数据库")
return orm.Session.get_bind(self, mapper, clause)

# insert、update、delete操作使用master
elif self._flushing:
log.debug("当前使用master")
return state.db.get_engine(self.app, bind='master')

# 其他操作使用slave
else:
log.debug("当前使用slave")
return state.db.get_engine(self.app, bind='slave')


class AutoRouteSQLAlchemy(SQLAlchemy):

def create_scoped_session(self, options=None):
"""
用于工厂类创建session
Args:
options:

Returns:

"""
if options is None:
options = {}
scopefunc = options.pop('scopefunc', None)
return orm.scoped_session(
partial(AutoRouteSession, self, **options), scopefunc=scopefunc
)

实例化

1
db = AutoRouteSQLAlchemy()