1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| from flask_sqlalchemy import SQLAlchemy, get_state import sqlalchemy.orm as orm from functools import partial
import logging
log = logging.getLogger(__name__)
class AutoRouteSession(orm.Session):
def __init__(self, db, autocommit=False, autoflush=False, **options): self.app = db.get_app() self._model_changes = {} orm.Session.__init__(self, autocommit=autocommit, autoflush=autoflush, bind=db.engine, binds=db.get_binds(self.app), **options)
def get_bind(self, mapper=None, clause=None): """ 根据配置及读写操作,自动更改数据库引擎 Args: mapper: clause:
Returns:
""" try: state = get_state(self.app) except (AssertionError, AttributeError, TypeError) as err: log.info("获取配置失败,使用默认数据库:{}".format(err)) return orm.Session.get_bind(self, mapper, clause)
if state is None or not self.app.config['SQLALCHEMY_BINDS']: if not self.app.debug: log.debug("未获取数据库绑定信息(SQLALCHEMY_BINDS),使用默认数据库") return orm.Session.get_bind(self, mapper, clause)
elif self._flushing: log.debug("当前使用master") return state.db.get_engine(self.app, bind='master')
else: log.debug("当前使用slave") return state.db.get_engine(self.app, bind='slave')
class AutoRouteSQLAlchemy(SQLAlchemy):
def create_scoped_session(self, options=None): """ 用于工厂类创建session Args: options:
Returns:
""" if options is None: options = {} scopefunc = options.pop('scopefunc', None) return orm.scoped_session( partial(AutoRouteSession, self, **options), scopefunc=scopefunc )
|