前言:什麼是 Celery?#
Celery 是一個開源的、非同步的任務佇列系統,主要用來處理大量的背景作業。它能夠讓你的應用程式在處理一些長時間運算或繁重的任務時不會阻塞主執行緒。這意味著,你可以把需要耗時的任務(如發送郵件、處理影像等)放到背景中執行,讓用戶端的請求不會因此被卡住。
簡單來說,Celery 是專門用來處理即時性較低、耗時的任務,像是:
- 發送電子郵件
- 影像或檔案處理
- 定時任務(例如每日報告)
- 大量數據處理
Celery 的運作依賴三個主要部分:
- Celery 客戶端:就像是一個發號施令的指揮官,負責將任務交給 Celery,通常是跟 Flask 一起運作,讓 Flask 可以發送需要處理的任務。
- Celery Worker:這些就像是工人,專門負責執行那些由客戶端交給 Celery 的任務。你可以啟動一個或多個工人來處理這些任務,當工作量變大時,也可以增加更多工人來分擔工作。
- 消息代理(Broker):它是負責在客戶端和工人之間傳遞訊息的中介,就像是送信的郵差。常用的消息代理有 RabbitMQ 和 Redis,它們幫助傳遞任務訊息給 Celery 工人。
參考Git Repo: repo#
環境設置#
首先,請確保你的 Python 版本是 3.7.x 或更高,並安裝 virtualenv 來建立虛擬環境:
pip install virtualenv
virtualenv venv
. venv/bin/activate
接著,安裝所需要的套件:
pip install flask
pip install celery
pip install flower
pip install redis
或是使用 requirements.txt 一次安裝:
pip install -r requirements.txt
建立 Flask 伺服器#
為了讓架構更加清晰,我們將 Flask 的初始化移到 app/init.py,以便後續擴充功能。
# main.py
from app import *
app = create_app('default')
@app.route('/')
def index():
return "Hello World"
if __name__ == '__main__':
app.run(app.config['HOST'], app.config['PORT'], app.config['DEBUG'])
在 create_app 函式中,你可以指定要使用的設定檔類別,這樣 Flask 就會載入相應的設定。
# config.py
config = {
'development': DevelopmentConfig,
'testing': TestingConfig,
'production': ProductionConfig,
'default': DevelopmentConfig
}
新增 API#
接下來,我們建立一個測試 API,簡單回傳一個訊息:
# app/api/testapi.py
from flask import Blueprint, jsonify
testapi = Blueprint('testapi', __name__)
@testapi.route('/testapi', methods=['GET'])
def testapi_m():
try:
return jsonify({"msg": "testapi OK"})
except Exception as e:
current_app.logger.warning(e, exc_info=True)
return jsonify({"msg": "testapi fail"})
記得在 app/init.py 中註冊 Blueprint:
# app/__init__.py
from app.api.testapi import testapi
app.register_blueprint(testapi)
建立 Celery Worker#
在 celeryworker.py 中建立一個 Celery 實例並推入 Flask 應用程式的上下文中,這樣 Celery 就能夠訪問 Flask 設定和其他資源。
# celeryworker.py
from app import create_app, celery
app = create_app('default')
app.app_context().push()
建立一個 Task#
在 tasks 資料夾中建立一個 add.py,這個 Task 會將傳入的兩個數字相加後回傳結果。
# tasks/add.py
from app import celery
@celery.task(name='add')
def add(x, y):
print('Hello job add')
result = x + y
return result
設定 Celery 設定檔#
Celery 的設定檔可以讓我們設定許多選項,例如使用的訊息代理、時區等。將設定寫在 celeryconfig.py 中,以便維護和管理。
# tasks/celeryconfig.py
# 設定要匯入的 tasks
imports = (
'tasks.add',
'tasks.periodic'
)
# 設定時區
enable_utc = False
timezone = 'Asia/Taipei'
# 設定 Broker 和 Backend
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/0'
# 設定定時任務
from datetime import timedelta
from celery.schedules import crontab
beat_schedule = {
'printy-run every 10 seconds': {
'task': 'printy',
'schedule': timedelta(seconds=10),
'args': (8, 2)
}
}
在 app/init.py 中讀取這個設定檔:
# app/__init__.py
celery = Celery(__name__)
celery.config_from_object('tasks.celeryconfig')
啟動服務#
首先,在一個新的 terminal 中啟動 Flask 伺服器:
python main.py
接著,在另一個 terminal 中啟動 Celery Worker:
- Windows 使用者:
celery -A celeryworker.celery worker --loglevel=info --pool=solo
- Linux 使用者:
celery -A celeryworker.celery worker --loglevel=info
測試請求#
可以使用瀏覽器或 Postman 測試 API:
測試 testapi:
- 請求網址:http://127.0.0.1:5001/testapi
- 回應結果:
{
"msg": "testapi OK"
}
測試 add 任務:
- 請求網址:http://127.0.0.1:5001/test_add
- 回應結果:
{
"RESULT": 35
}
定時任務#
在 tasks 資料夾中建立 periodic.py,並在其中設定一個定時任務。這個任務每隔 10 秒執行一次,執行簡單的加法運算。
# tasks/periodic.py
from app import celery
@celery.task(name='printy')
def printy(a, b):
"""執行定時任務"""
print('job printy')
print(a + b)
return a + b
在 celeryconfig.py 中設定此任務的執行時間:
beat_schedule = {
'printy-run every 10 seconds': {
'task': 'printy',
'schedule': timedelta(seconds=10), # 每 10 秒執行一次
'args': (8, 2)
}
}
加上定時任務後,再次測試#
如果 Flask 和 Celery Worker 還在執行,可以直接執行以下命令啟動 Celery Beat:
celery -A celeryworker.celery beat -l info -s log/celerybeat-schedule
Flower 監控工具#
在生產環境中,我們可能無法時時刻刻查看 Celery 任務的狀態,這時可以使用 Flower 來監控 Celery。
啟動 Flower:
flower -A celeryworker.celery --port=5555
在瀏覽器中打開 http://localhost:5555/,你就可以看到所有任務的狀態,並進行即時監控。
結語#
到這裡,你的 Flask + Celery + Flower 配置就完成了!在開發和測試階段,你需要同時啟動多個 terminal,這時可以使用工具像是 supervisor 來管理服務,這樣能夠方便地管理多個進程和服務。
希望這篇文章幫助你理解如何在 Flask 中整合 Celery 來處理異步任務,並讓你的應用程式更加高效!
