A4 - v3 - 消息推送系统

A4 - v3 - 消息推送系统

黄鹏宇 524 2022-04-09

一、需求分析

  1. 用户可以选择推送时间
  2. 推送条件为: 总是推送、当有未完成时推送
  3. 推送内容:与用户数据相关

image.png

二、核心功能

  1. 定时任务的创建时机
  2. 定时任务的持久化与重建
  3. 消息下发的渠道

三、具体实现

流程

image.png

数据模型

  1. 定时任务 apscheduler_jobs
    image.png

以用户openid为主键

  1. 用户消息提醒的配置 user_notification_config
    image.png

RPC调用

    {
            "id": "OPENID",
            "func": "task:send_a4_notification",
            "trigger": "cron",
            "day_of_week": "0-6",
            "month": "*",
            "hour": "14",
            "minute": "44",
            "second": "30",
            "replace_existing": "True"
        }

四、附录

1. 代码

目录
│  auth.py
│  config.py
│  db.py
│  init.py
│  task.py
│  auth.py 用户生成flask的鉴权
import base64

# 用户名密码用:分隔
string = "username:pwd".encode('utf-8')

a = base64.b64encode(string)
# print(a)
# b'Z3Vlc3Q6Z3Vlc3Q='
print(b"basic " + a)  # 最后必须拼接上basic
# b'basic Z3Vlc3Q6Z3Vlc3Q='

│  config.py
from flask_apscheduler.auth import HTTPBasicAuth
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

def send_ding_test():
    print("ding")

class Config(object):

    JOBS = [
        # cron式定时调度,类似linux的crontab
        # {
        #     'id': 'system-getAccessToken',
        #     'func': "task:getAccessToken",
        #     'trigger': 'cron',
        #     'day_of_week': '0-6',
        #     'month': '*',
        #     'hour': '14',
        #     'minute': '22',
        #     'second': '40',
        #     'replace_existing': True
        # }

        {
            'id': 'system-getAccessToken',
            'func': "task:getAccessToken",
            'trigger': 'interval',
            'minutes': 30,
            'replace_existing': True
        }
    ]

    # 存储定时任务(默认是存储在内存中)  
    SCHEDULER_JOBSTORES  = {'default':SQLAlchemyJobStore(url='mysql+pymysql://root@localhost:3306/a4?password=pssword')}
    # 设置时区,时区不一致会导致定时任务的时间错误
    SCHEDULER_TIMEZONE = 'Asia/Shanghai'
    # 一定要开启API功能,这样才可以用api的方式去查看和修改定时任务
    SCHEDULER_API_ENABLED = True
    # api前缀(默认是/scheduler)
    SCHEDULER_API_PREFIX = '/scheduler'
    # 配置允许执行定时任务的主机名
    SCHEDULER_ALLOWED_HOSTS = ['*']
    # auth验证。默认是关闭的,
    SCHEDULER_AUTH = HTTPBasicAuth()
    # 设置定时任务的执行器(默认是最大执行数量为10的线程池)
    SCHEDULER_EXECUTORS = {'default': {'type': 'threadpool', 'max_workers': 10}}
    # 另外flask-apscheduler内有日志记录器。name为apscheduler.scheduler和apscheduler.executors.default。如果需要保存日志,则需要对此日志记录器进行配置
    

│  db.py
import json
import mysql.connector
import datetime

db = mysql.connector.connect(
	host ="localhost",
	user ="root",
	passwd="passwd",
	database ="a4",
	charset ='utf8',
)

def insertAccess(accessToken):
	cursor = db.cursor()
	sql =''' 
	INSERT INTO access_token(access_token,expires_in,get_time) VALUES (%s,%s,%s)
	'''
	param = (accessToken,7200,datetime.datetime.now())
	cursor.execute(sql, param)
	db.commit()


def getAccessToken():
	cursor = db.cursor()
	sql = "SELECT access_token,get_time FROM access_token ORDER BY ID DESC LIMIT 1"
	cursor.execute(sql)
	return cursor.fetchone()
	cursor.close()


if __name__ == '__main__':
	insertAccess("hello")
│  init.py
import logging
from flask import Flask
from config import Config  # 上边的配置文件
import task
from flask_apscheduler.scheduler import APScheduler
from logging.handlers import RotatingFileHandler
from flask import abort, request


# flask-apscheduler内置有日志器,为了让内部的日志器打印的内容输出,我这里做了个配置
# 创建日志记录器,指明日志保存路径,每个日志大小,保存日志文件个数上限
file_log_handler = RotatingFileHandler('logs/runserver.log', maxBytes=1024*1024*100, backupCount=5)
# 创建日志的记录格式,],日志等级,记录时间,报错位置,行数,日志信息
formatter = logging.Formatter(
    fmt="%(asctime)s - %(levelname)s - %(threadName)s:%(thread)s - %(filename)s - %(funcName)s - %(message)s",
    datefmt='%Y-%m-%d %H:%M:%S %a'
)
# 为刚创建的日志记录器设置日志记录格式
file_log_handler.setFormatter(formatter)
# 为全局日志对象添加日志记录器
logger = logging.getLogger("apscheduler")
logger.addHandler(file_log_handler)
logger.setLevel(logging.INFO)


def create_app():
    app = Flask(__name__)
    app.config.from_object(Config)

    scheduler = APScheduler()
    scheduler.init_app(app)
    scheduler.start()
    
    # 配置api权限验证的回调函数
    @scheduler.authenticate
    def authenticate(auth):
        return auth['username'] == 'root' and auth['password'] == 'H'
    
    @app.before_request
    def limit_remote_addr():
        if request.remote_addr != '127.0.0.1':
            abort(403)  # Forbidden
    return app




def main():
    app = create_app()
    app.run(host='127.0.0.1', port='7788',debug = True)

if __name__ == '__main__':
    main()

│  task.py
import db
import datetime
import time
import requests
import json

# 发邮件
import smtplib
from email.mime.text import MIMEText
from email.header import Header

# 日志
import logging
import os

# 日志配置
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(filename)s %(levelname)s %(message)s',
                    datefmt='%a %d %b %Y %H:%M:%S',filename=os.path.join(os.getcwd(),'access_token_log.txt'), filemode='a')

# 第三方 SMTP 服务
mail_host="smtp.163.com"  #设置服务器
mail_user="mail_user"    #用户名
mail_pass="mail_pass"   #163的授权码


EXPIRE_DURATION = 30

APPID = "APPID "
#小程序密钥
SECRET = "SECRET "
GET_ACCESS_TOKEN_URL = "https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid="+APPID+"&secret="+SECRET

# 收件人 以逗号分割
receivers = ['以逗号分割']


# 发送模板消息的业务层
def send_a4_notification():
    print("ding")

# 判断token是否已过期,expire_duration = 30
def tokenIsExpire(getTime):
    if(getTime == None):
        return true
    expireDate = getTime + datetime.timedelta(minutes=EXPIRE_DURATION)
    nowDate = datetime.datetime.now()
    return nowDate > expireDate

def getWechatTokenAndInsertToDb():
    # 发送请求
    tokenResult=requests.get(GET_ACCESS_TOKEN_URL)
    errorLog = tokenResult.text
    # 转为json
    tokenResultDict = json.loads(errorLog)
    # 正常
    if('access_token' in tokenResultDict):
        # 存至数据库中
        db.insertAccess(tokenResultDict['access_token'])
        return tokenResultDict['access_token']
    # 发生错误
    else:
        # 发送错误邮箱
        sendEmail("获取accessToken失败------------"+tokenResult.text,"获取accessToken失败") 
        # 并记录   
        logging.debug(errorLog)

# 发邮件给老子(suyu_wxapp@163.com)
def sendEmail(content,subject):
    try:
        smtp = smtplib.SMTP_SSL(mail_host,994) #实例化smtp服务器
        message = MIMEText(content,"plain","utf-8")
        message['Subject'] = subject #邮件标题
        message['To'] = ','.join(receivers)
        message['From'] = mail_user #发件人
        smtp = smtplib.SMTP_SSL("smtp.163.com",994) #实例化smtp服务器
        smtp.login(mail_user,mail_pass)#发件人登录
        smtp.sendmail(mail_user,message['To'].split(','),message.as_string()) #as_string 对 message 的消息进行了封装
        print("邮件发送成功")
        smtp.close()
    except smtplib.SMTPException:
        logging.debug("无法发送邮件") 

def getAccessToken():
    logging.info("getAccessToken") 

    accessToken = db.getAccessToken()
    if(accessToken == None):
        return getWechatTokenAndInsertToDb()

    accessTokenStr = accessToken[0]
    getTime = accessToken[1]
    # 判断是否过期 
    if(tokenIsExpire(getTime)):
        return getWechatTokenAndInsertToDb()
    else:
        return accessTokenStr

if __name__ == '__main__':
    print(getAccessToken())


2. Flask APScheduler的API

要在header上加鉴权
AUTHORIZATION:basic xxx,通过auth.py生成

  • /scheduler [GET] > returns basic information about the webapp
  • /scheduler/jobs [POST json job data] > adds a job to the scheduler
  • /scheduler/jobs/<job_id> [GET] > returns json of job details
  • /scheduler/jobs [GET] > returns json with details of all jobs
  • /scheduler/jobs/<job_id> [DELETE] > deletes job from scheduler
  • /scheduler/jobs/<job_id> [PATCH json job data] > updates an already existing job
  • /scheduler/jobs/<job_id>/pause [POST] > pauses a job, returns json of job details
  • /scheduler/jobs/<job_id>/resume [POST] > resumes a job, returns json of job details
  • /scheduler/jobs/<job_id>/run [POST] > runs a job now, returns json of job details