本文介绍: 在开发Flask应用中一定会遇到执行耗时任务,但是Flask轻量级同步框架,即在单个请求服务会阻被塞,直到任务完成(注意:当前请求阻塞不会影响到其他请求)。解决异步问题有两种思路,一种是借助外部工具实现异步例如消息队列(RabbitMQ)、 异步任务队列(Celery+Redis);另一种借助Python中的进程线程协程解决异步。我的是小项目选择因此选择了第二种方法。经过测试,我在Flask使用协程gevent)会被阻塞

1 问题说明

1.1 任务简述

开发Flask应用中一定会遇到执行耗时任务,但是Flask轻量级的同步框架,即在单个请求服务会阻被塞,直到任务完成(注意:当前请求被阻塞不会影响到其他请求)。

解决异步问题有两种思路,一种是借助外部工具实现异步例如消息队列(RabbitMQ)、 异步任务队列(Celery+Redis);另一种借助Python中的进程线程协程解决异步。我的是小项目选择因此选择了第二种方法。

经过测试,我在Flask使用协程gevent)会被阻塞使用进程multiprocessing)不会被阻塞操作数据库出了问题(有可能是我没操作正确的问题);最后选择使用线程threading)。

注意:使用线程会出现线程安全问题,

1.2 注意的问题

(1)线程安全

"""
!!! 注意
1 由于flask-SQLAlchemy对SQLAlchemy进行了封装,所以是线程安全的,可以在线程中直接使用;
2 原生SQLAlchemy中
        # 2.1 直接使用Session是线程不安全的,不推荐使用
        self.session = Session(self.engine)

        # 2.2 直接使用scoped_session是线程安全的,推荐使用
        # 获取sessionmaker
        session_factory = sessionmaker(engine)
        # scoped_session是线程安全的
        # 注意,此session不能使用Query对象
        session = scoped_session(session_factory)
        
!!!
"""

(2)使用数据库

# 注意:高版本的Flask-SQLAlchemy(我的版本Flask-SQLAlchemy==3.1.1,SQLAlchemy==2.0.16)必须使用”with app.app_context()“,本质原因是Flask关联的SQLAlchemy版本太高
# 否则无法插入数据库报错错误内容如下低版本不会出现此问题):
"""
This typically means that you attempted to use functionality that needed
the current application. To solve this, set up an application context
with app.app_context(). See the documentation for more information.
"""

2 工程布局

项目布局如下
在这里插入图片描述

3 源代码

(1)main.py

from blueprint import init_blueprint
from config_app import app
from config_db import init_mysql_db


# 初始化MySQL
init_mysql_db()

init_blueprint()

if __name__ == '__main__':
    app.run(host='0.0.0.0', debug=True)

(2)config_app.py

from flask import Flask
from flask_cors import CORS


def create_app():
    flask_app = Flask(__name__)
    CORS(flask_app, supports_credentials=True)
    return flask_app


app = create_app()

(3)config_db.py


from flask_sqlalchemy import SQLAlchemy
from flask_marshmallow import Marshmallow

# 添加pymysql驱动连接MySQL数据库
import pymysql

from config_app import app

pymysql.install_as_MySQLdb()

"""
!!! 注意
1 由于flask-SQLAlchemy对SQLAlchemy进行了封装,所以是线程安全的,可以在线程中直接使用;
2 原生SQLAlchemy中
        # 2.1 直接使用Session是线程不安全的,不推荐使用
        self.session = Session(self.engine)

        # 2.2 直接使用scoped_session是线程安全的,推荐使用
        # 获取sessionmaker
        session_factory = sessionmaker(engine)
        # scoped_session是线程安全的
        # 注意,此session不能使用Query对象
        session = scoped_session(session_factory)
        
!!!
"""

# 创建MySQL单实例
mysql_db = SQLAlchemy()

# 创建Schema
mysql_schema = Marshmallow()


# 创建数据库
"""
create database async default character set utf8mb4 collate utf8mb4_unicode_ci;
"""

class MysqlConf:
    acc = "root"
    pwd = "123456"
    host = "192.168.108.200"
    port = 3306
    db = "async"


mysql_conf = MysqlConf()


# 初始化MySQL数据库
def init_mysql_db():

    # 配置MySQL数据库url
    db_url = "mysql://" + mysql_conf.acc + ":" + mysql_conf.pwd + "@" + mysql_conf.host + ":" + str(mysql_conf.port) + "/" + mysql_conf.db
    app.config["SQLALCHEMY_DATABASE_URI"] = db_url

    # 关闭sqlalchemy自动跟踪数据库
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

    # 显示底层执行的SQL语句
    app.config['SQLALCHEMY_ECHO'] = True

    # 解决‘No application found. Either work inside a view function or push an application context.’
    app.app_context().push()

    # 初始化app
    mysql_db.init_app(app)

    # 初始化schema
    mysql_schema.init_app(app)


# 初始化table
def init_table():
    # 删除
    mysql_db.drop_all()
    # 创建
    mysql_db.create_all()

(4)dao.py

import datetime

from config_db import mysql_db as db


# Create table of bm_record
class User(db.Model):
    # Record table
    __tablename__ = "as_user"

    id = db.Column("us_id", db.Integer, nullable=False, primary_key=True, autoincrement=True)
    name = db.Column("us_name", db.String(100))
    age = db.Column("us_age", db.Integer)
    create_time = db.Column("us_create_time", db.DateTime, default=datetime.datetime.now)


# 插入数据
def insert_record_dao(user: User):
    # Add data
    db.session.add(user)
    # Commit data
    db.session.commit()

(5)blueprint.py

# 构建蓝本
import threading
import time
from concurrent.futures import ThreadPoolExecutor

from flask import Blueprint, jsonify

from config_app import app
from config_db import init_table
from dao import insert_record_dao, User

user = Blueprint("user", __name__)


# 注册蓝本
def init_blueprint():
    app.register_blueprint(user, url_prefix='/user')


@user.route("/initdb")
def init_db():
    init_table()
    return jsonify("success")


@user.route("/add")
def add_user():
    user: User = User()
    user.name = "zhangsan"
    user.age = 12

    time.sleep(10)
    insert_record_dao(user)

    return jsonify(user.id)


executor = ThreadPoolExecutor(3)
@user.route("/thread_pool")
def run_task_thread_pool():
    executor.submit(_run_thread_pool)
    return jsonify("success")


# 执行线程
def _run_thread_pool():
    print("Run thread pool")
    time.sleep(2)
    user: User = User()
    user.name = "thread pool"
    user.age = 12

    # 注意:必须使用”with app.app_context()“,否则无法插入数据库,并且不会报错
    with app.app_context():
        insert_record_dao(user)

    print("End thread pool")
    pass


@user.route("/thread")
def run_task_thread():
    task = threading.Thread(target=_run_thread, name='thread-01')
    task.start()

    return jsonify("success")


# 执行线程
def _run_thread():
    print("Run thread")
    time.sleep(2)

    user: User = User()
    user.name = "thread"
    user.age = 12

    # 注意:高版本的Flask-SQLAlchemy(我的版本Flask-SQLAlchemy==3.1.1,SQLAlchemy==2.0.16)必须使用”with app.app_context()“,本质原因是Flask关联的SQLAlchemy版本太高
    # 否则无法插入数据库报错错误内容如下低版本不会出现此问题):
    """
    This typically means that you attempted to use functionality that needed
    the current application. To solve this, set up an application context
    with app.app_context(). See the documentation for more information.
    """
    with app.app_context():
        insert_record_dao(user)

    print("End thread")
    pass

4 请求截图

(1)初始化数据

在这里插入图片描述

(2)添加用户

在这里插入图片描述

(3)使用线程池

在这里插入图片描述

(4)使用线程

在这里插入图片描述

原文地址:https://blog.csdn.net/make_progress/article/details/134756301

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_32718.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注