一、需求分析
- 用户可以选择推送时间
- 推送条件为: 总是推送、当有未完成时推送
- 推送内容:与用户数据相关
二、核心功能
- 定时任务的创建时机
- 定时任务的持久化与重建
- 消息下发的渠道
三、具体实现
流程
数据模型
- 定时任务 apscheduler_jobs
以用户openid为主键
- 用户消息提醒的配置 user_notification_config
RPC调用
- url : http://127.0.0.1:7788/scheduler/jobs
- white-ip-list : 使用@app.before_request实现,在init.py
- body:
{
"id": "OPENID",
"func": "task:send_a4_notification",
"trigger": "cron",
"day_of_week": "0-6",
"month": "*",
"hour": "14",
"minute": "44",
"second": "30",
"replace_existing": "True"
}
四、附录
1. 代码
目录
│ auth.py
│ config.py
│ db.py
│ init.py
│ task.py
│ auth.py 用户生成flask的鉴权
import base64
# 用户名密码用:分隔
string = "username:pwd".encode('utf-8')
a = base64.b64encode(string)
# print(a)
# b'Z3Vlc3Q6Z3Vlc3Q='
print(b"basic " + a) # 最后必须拼接上basic
# b'basic Z3Vlc3Q6Z3Vlc3Q='
│ config.py
from flask_apscheduler.auth import HTTPBasicAuth
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
def send_ding_test():
print("ding")
class Config(object):
JOBS = [
# cron式定时调度,类似linux的crontab
# {
# 'id': 'system-getAccessToken',
# 'func': "task:getAccessToken",
# 'trigger': 'cron',
# 'day_of_week': '0-6',
# 'month': '*',
# 'hour': '14',
# 'minute': '22',
# 'second': '40',
# 'replace_existing': True
# }
{
'id': 'system-getAccessToken',
'func': "task:getAccessToken",
'trigger': 'interval',
'minutes': 30,
'replace_existing': True
}
]
# 存储定时任务(默认是存储在内存中)
SCHEDULER_JOBSTORES = {'default':SQLAlchemyJobStore(url='mysql+pymysql://root@localhost:3306/a4?password=pssword')}
# 设置时区,时区不一致会导致定时任务的时间错误
SCHEDULER_TIMEZONE = 'Asia/Shanghai'
# 一定要开启API功能,这样才可以用api的方式去查看和修改定时任务
SCHEDULER_API_ENABLED = True
# api前缀(默认是/scheduler)
SCHEDULER_API_PREFIX = '/scheduler'
# 配置允许执行定时任务的主机名
SCHEDULER_ALLOWED_HOSTS = ['*']
# auth验证。默认是关闭的,
SCHEDULER_AUTH = HTTPBasicAuth()
# 设置定时任务的执行器(默认是最大执行数量为10的线程池)
SCHEDULER_EXECUTORS = {'default': {'type': 'threadpool', 'max_workers': 10}}
# 另外flask-apscheduler内有日志记录器。name为apscheduler.scheduler和apscheduler.executors.default。如果需要保存日志,则需要对此日志记录器进行配置
│ db.py
import json
import mysql.connector
import datetime
db = mysql.connector.connect(
host ="localhost",
user ="root",
passwd="passwd",
database ="a4",
charset ='utf8',
)
def insertAccess(accessToken):
cursor = db.cursor()
sql ='''
INSERT INTO access_token(access_token,expires_in,get_time) VALUES (%s,%s,%s)
'''
param = (accessToken,7200,datetime.datetime.now())
cursor.execute(sql, param)
db.commit()
def getAccessToken():
cursor = db.cursor()
sql = "SELECT access_token,get_time FROM access_token ORDER BY ID DESC LIMIT 1"
cursor.execute(sql)
return cursor.fetchone()
cursor.close()
if __name__ == '__main__':
insertAccess("hello")
│ init.py
import logging
from flask import Flask
from config import Config # 上边的配置文件
import task
from flask_apscheduler.scheduler import APScheduler
from logging.handlers import RotatingFileHandler
from flask import abort, request
# flask-apscheduler内置有日志器,为了让内部的日志器打印的内容输出,我这里做了个配置
# 创建日志记录器,指明日志保存路径,每个日志大小,保存日志文件个数上限
file_log_handler = RotatingFileHandler('logs/runserver.log', maxBytes=1024*1024*100, backupCount=5)
# 创建日志的记录格式,],日志等级,记录时间,报错位置,行数,日志信息
formatter = logging.Formatter(
fmt="%(asctime)s - %(levelname)s - %(threadName)s:%(thread)s - %(filename)s - %(funcName)s - %(message)s",
datefmt='%Y-%m-%d %H:%M:%S %a'
)
# 为刚创建的日志记录器设置日志记录格式
file_log_handler.setFormatter(formatter)
# 为全局日志对象添加日志记录器
logger = logging.getLogger("apscheduler")
logger.addHandler(file_log_handler)
logger.setLevel(logging.INFO)
def create_app():
app = Flask(__name__)
app.config.from_object(Config)
scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()
# 配置api权限验证的回调函数
@scheduler.authenticate
def authenticate(auth):
return auth['username'] == 'root' and auth['password'] == 'H'
@app.before_request
def limit_remote_addr():
if request.remote_addr != '127.0.0.1':
abort(403) # Forbidden
return app
def main():
app = create_app()
app.run(host='127.0.0.1', port='7788',debug = True)
if __name__ == '__main__':
main()
│ task.py
import db
import datetime
import time
import requests
import json
# 发邮件
import smtplib
from email.mime.text import MIMEText
from email.header import Header
# 日志
import logging
import os
# 日志配置
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(filename)s %(levelname)s %(message)s',
datefmt='%a %d %b %Y %H:%M:%S',filename=os.path.join(os.getcwd(),'access_token_log.txt'), filemode='a')
# 第三方 SMTP 服务
mail_host="smtp.163.com" #设置服务器
mail_user="mail_user" #用户名
mail_pass="mail_pass" #163的授权码
EXPIRE_DURATION = 30
APPID = "APPID "
#小程序密钥
SECRET = "SECRET "
GET_ACCESS_TOKEN_URL = "https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid="+APPID+"&secret="+SECRET
# 收件人 以逗号分割
receivers = ['以逗号分割']
# 发送模板消息的业务层
def send_a4_notification():
print("ding")
# 判断token是否已过期,expire_duration = 30
def tokenIsExpire(getTime):
if(getTime == None):
return true
expireDate = getTime + datetime.timedelta(minutes=EXPIRE_DURATION)
nowDate = datetime.datetime.now()
return nowDate > expireDate
def getWechatTokenAndInsertToDb():
# 发送请求
tokenResult=requests.get(GET_ACCESS_TOKEN_URL)
errorLog = tokenResult.text
# 转为json
tokenResultDict = json.loads(errorLog)
# 正常
if('access_token' in tokenResultDict):
# 存至数据库中
db.insertAccess(tokenResultDict['access_token'])
return tokenResultDict['access_token']
# 发生错误
else:
# 发送错误邮箱
sendEmail("获取accessToken失败------------"+tokenResult.text,"获取accessToken失败")
# 并记录
logging.debug(errorLog)
# 发邮件给老子(suyu_wxapp@163.com)
def sendEmail(content,subject):
try:
smtp = smtplib.SMTP_SSL(mail_host,994) #实例化smtp服务器
message = MIMEText(content,"plain","utf-8")
message['Subject'] = subject #邮件标题
message['To'] = ','.join(receivers)
message['From'] = mail_user #发件人
smtp = smtplib.SMTP_SSL("smtp.163.com",994) #实例化smtp服务器
smtp.login(mail_user,mail_pass)#发件人登录
smtp.sendmail(mail_user,message['To'].split(','),message.as_string()) #as_string 对 message 的消息进行了封装
print("邮件发送成功")
smtp.close()
except smtplib.SMTPException:
logging.debug("无法发送邮件")
def getAccessToken():
logging.info("getAccessToken")
accessToken = db.getAccessToken()
if(accessToken == None):
return getWechatTokenAndInsertToDb()
accessTokenStr = accessToken[0]
getTime = accessToken[1]
# 判断是否过期
if(tokenIsExpire(getTime)):
return getWechatTokenAndInsertToDb()
else:
return accessTokenStr
if __name__ == '__main__':
print(getAccessToken())
2. Flask APScheduler的API
要在header上加鉴权
AUTHORIZATION:basic xxx,通过auth.py生成
- /scheduler [GET] > returns basic information about the webapp
- /scheduler/jobs [POST json job data] > adds a job to the scheduler
- /scheduler/jobs/<job_id> [GET] > returns json of job details
- /scheduler/jobs [GET] > returns json with details of all jobs
- /scheduler/jobs/<job_id> [DELETE] > deletes job from scheduler
- /scheduler/jobs/<job_id> [PATCH json job data] > updates an already existing job
- /scheduler/jobs/<job_id>/pause [POST] > pauses a job, returns json of job details
- /scheduler/jobs/<job_id>/resume [POST] > resumes a job, returns json of job details
- /scheduler/jobs/<job_id>/run [POST] > runs a job now, returns json of job details