阿里云函数计算 使用Python开发一个基于WSGI的HTTP触发器 (实战)

Sahar ·
更新时间:2024-09-20
· 760 次阅读

废话不多说,直接上干货

源码下载:

链接:https://pan.baidu.com/s/1OFMCtNglHHQnBlfedpdPmw 
提取码:w465

文章大纲:

1. 需求背景

2. 技术选型

3. 代码开发

4. 部署项目到阿里云

1. 需求背景

大致想实现一个 端口-手机号 配置的功能,并且可以接收到指定手机发来的短信,将消息过滤后转发至钉钉群。

ps:该项目仅是服务端,另一端是Android,Android配置硬件实现端口切换【一个端口对应一张手机卡,共16个端口】,使手机使用某张卡,随后将收到的短信发送至服务端,服务端过滤消息后,将消息转发到钉钉群。这样就可以方便整个公司的员工接收短信验证码,提高效率。

原型大致如下 :

(1)端口配置:

此处包含 增删改查 方法,相关数据存在redis,后文会介绍为什么选用redis。

(2)号码切换

切换到指定手机号,就可以收到该手机号发来的短信。

2. 技术选型

(1)Python 3.7,想挑战一下自己,并且可以学习一门新的语言,所以选择了Python

(2)WSGI,没有选用基于WSGI的框架(flask或者django),因为时间有限

(3)Redis,没有选用数据库,是因为该项目与业务关系不大,只是为了方便接收验证码

(4)阿里云函数计算 HTTP触发器

3. 代码开发

将项目部署到阿里云之前,需要让项目跑起来,并单测完成。

本地选用的是wsgiref.simple_server,来模拟一个服务。

实现simple_server有三种方式(一个函数,一个类,或者一个重载了__call__的类的实例),我选用的是最后一种方式。

ps:由于没有使用web框架,整个项目实现思想有点类似于Java Web

项目结构:

下面直接贴代码

(1)fcIndex.py

from sms_log import SMSLog from urllib.parse import unquote from redis_util import RedisUtil from port_phone_conf import PortPhoneConf from wsgiref.simple_server import make_server """路由配置""" URL_PATTERNS = ( ('/', 'list'), ('/list', 'list'), ('/insert', 'insert'), ('/update', 'update'), ('/delete', 'delete'), ('/sms', 'sms'), ('/set_current_conf', 'set_current_conf') ) class AppClass: @staticmethod def _match(path): """解析应该调用哪个app""" path = path.split('/')[1] # path = path[path.rfind('/'):] for url, app in URL_PATTERNS: if path in url: return app @staticmethod def parse_params(params): """解析请求参数""" if not params: return None result = {} items = params.split('&') for item in items: result[item.split("=")[0]] = item.split('=')[1] return result def __call__(self, environ, start_response): """入口""" path = environ.get('PATH_INFO', '/') app = self._match(path) if app: if app in ['list']: """get""" params = self.parse_params(environ['QUERY_STRING']) else: """post""" request_body_size = int(environ.get("CONTENT_LENGTH", 0)) request_body_str = unquote(environ["wsgi.input"].read(request_body_size).decode('utf-8')) params = self.parse_params(request_body_str) app = globals()[app] result = app(params) start_response("200 OK", [('Content-type', 'text/plain; charset=utf-8')]) return result else: start_response("404 NOT FOUND", [('Content-type', 'text/plain')]) return [b"Page dose not exists!"] def list(params): """端口配置列表展示 """ items = PortPhoneConf().list().encode() if not items: items = [] return [items, ] def insert(params): """新增配置""" try: port_no = params['port_no'] phone_no = params['phone_no'] key_words = params['key_words'] except Exception: raise Exception('参数错误,请刷新页面后重试') PortPhoneConf().insert(port_no, phone_no, key_words) return [b"success"] def update(params): """修改配置""" try: port_no = params['port_no'] phone_no = params['phone_no'] key_words = params['key_words'] except Exception: raise Exception('参数错误,请刷新页面后重试') PortPhoneConf().update(port_no, phone_no, key_words) return [b"success"] def delete(params): """删除配置""" try: port_no = params['port_no'] except Exception: raise Exception('参数错误,请刷新页面后重试') PortPhoneConf().delete(port_no) return [b"success"] def sms(params): """接收短信,并转发到钉钉群""" try: phone_no = params['phone_no'] msg = params['msg'] except Exception: raise Exception('参数错误,请刷新页面后重试') current_phone_no = RedisUtil.comm_get(RedisUtil.CURRENT_PORT_KEY) if phone_no == current_phone_no: raise Exception(phone_no + "对应端口未开启") SMSLog().redirect_to_ding_talk(msg) return [b"success"] def set_current_conf(params): """设置当前开启的端口""" try: phone_no = params['phone_no'] except Exception: raise Exception('参数错误,请刷新页面后重试') RedisUtil.comm_set(RedisUtil.CURRENT_PORT_KEY, phone_no) return [b"success"] if __name__ == "__main__": handler = AppClass() httpd = make_server('', 8080, handler) print("Serving on port 8080...") httpd.serve_forever()

(2)port_phone_conf.py

import json import time from redis_util import RedisUtil class PortPhoneConf: """端口-手机号操作类""" def insert(self, port_no, phone_no, key_words): """新增端口配置""" conf = {'port_no': port_no, 'phone_no': phone_no, 'key_words': key_words, 'gmt_create': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), 'gmt_modified': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} items = self.list() if not items: items = [] else: items = json.loads(items) items.append(conf) RedisUtil.comm_set(RedisUtil.CONF_KEY, items) def update(self, port_no, phone_no, key_words): """修改端口配置""" items = json.loads(self.list()) for item in items: if item["port_no"] == port_no: item["phone_no"] = phone_no item["key_words"] = key_words item["gmt_modified"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) RedisUtil.comm_set(RedisUtil.CONF_KEY, items) def delete(self, port_no): """删除端口配置""" global items if port_no: items = json.loads(self.list()) for item in items: if item["port_no"] == port_no: items.remove(item) break RedisUtil.comm_set(RedisUtil.CONF_KEY, items) def list(self): """获取端口配置列表""" return RedisUtil.comm_get(RedisUtil.CONF_KEY)

(3)redis_util.py

import json import redis class RedisUtil: """获取redis连接,并且操作redis""" CONF_KEY = "select::port_phone_conf" CURRENT_PORT_KEY = "select::port_phone_conf_current" @staticmethod def get_conn(): """返回连接""" conn = redis.Redis(host="106.15.176.xxx", port=6379) return conn @staticmethod def comm_set(key, values): """通用set方法""" global conn try: conn = RedisUtil.get_conn() conn.set(key, json.dumps(values, ensure_ascii=False).encode("utf-8")) finally: conn.close() @staticmethod def comm_get(key): """通用get方法""" global conn try: conn = RedisUtil.get_conn() items = conn.get(key) finally: conn.close() if items: items = items.decode("utf-8") return items

(4)sms_log.py

import json import requests class SMSLog: @staticmethod def redirect_to_ding_talk(msg): """过滤消息并转发到钉钉群""" if ("阿里云" in msg) or ("钉钉" in msg): url = 'https://oapi.dingtalk.com/robot/send?access_token=xxx' data = {"msgtype": "text", "text": {"content": msg}} headers = {'Content-Type': 'application/json'} requests.request("post", url, json=data, headers=headers)

(5)调试

接下来,就可以运行 fcIndex.py#__main__ 来测试功能了!

4. 部署项目到阿里云

本地单测完成,就可以部署到阿里云了,不要沾沾自喜,后面还有好多坑等着呢。

(1)创建函数和服务

进入阿里云函数计算控制台#函数_服务 #新建服务

新增函数

特别注意:函数入口一点要写对,不然会找不到入口(重要)

(2)下载自定义模块代码包

查看函数计算的文档,我们会发现,Python只提供了标准模块和一些常用模块。如果需要使用自定义的模块,则需要将它们与代码一起打包,在这个项目中,自定义模块是requests和redis。官方推荐使用Fun,个人觉得代价有点高,放弃。文档中有这样一段话:如果没有 docker 环境,且不涉及动态链接库(.so)、编译二进制程序等,只是安装语言依赖,那么可以直接使用 pip install -t . PyMySQL 的方式进行安装。这种方式不管函数计算的执行环境中是否安装了这些 python 库,都会下载下来,会增加代码包的大小。

进入项目目录 ,执行以下命令,下载代码包。

pip install -t . redis; pip install -t . requests;

再来看看我们的项目目录:

(3)修改 fcIndex.py 文件

查看文档发现,不支持最后一种方式【ps:最后一种方式只是为了单测方便】,所以我们的代码需要变动一下,改变后的代码如下:

from sms_log import SMSLog from urllib.parse import unquote from redis_util import RedisUtil from port_phone_conf import PortPhoneConf from wsgiref.simple_server import make_server def handler(environ, start_response): """函数计算调用入口""" return AppClass(environ, start_response) """路由配置""" URL_PATTERNS = ( ('/', 'list'), ('/list', 'list'), ('/insert', 'insert'), ('/update', 'update'), ('/delete', 'delete'), ('/sms', 'sms'), ('/set_current_conf', 'set_current_conf') ) class AppClass: @staticmethod def _match(path): """解析应该调用哪个app""" path = path.split('/')[1] # path = path[path.rfind('/'):] for url, app in URL_PATTERNS: if path in url: return app @staticmethod def parse_params(params): """解析请求参数""" if not params: return None result = {} items = params.split('&') for item in items: result[item.split("=")[0]] = item.split('=')[1] return result def __init__(self, environ, start_response): self.environ = environ self.start_response = start_response def __iter__(self): path = self.environ.get('PATH_INFO', '/') app = self._match(path) if app: if app in ['list']: """get""" params = {} # params = self.parse_params(self.environ['QUERY_STRING']) else: """post""" request_body_size = int(self.environ.get("CONTENT_LENGTH", 0)) request_body_str = unquote(self.environ["wsgi.input"].read(request_body_size).decode('utf-8')) params = self.parse_params(request_body_str) app = globals()[app] result = app(params) self.start_response("200 OK", [('Content-type', 'text/plain; charset=utf-8')]) yield result else: self.start_response("404 NOT FOUND", [('Content-type', 'text/plain')]) yield b"Page dose not exists!" def list(params): """端口配置列表展示 """ items = PortPhoneConf().list().encode() if not items: items = [] return items def insert(params): """新增配置""" try: port_no = params['port_no'] phone_no = params['phone_no'] key_words = params['key_words'] except Exception: raise Exception('参数错误,请刷新页面后重试') PortPhoneConf().insert(port_no, phone_no, key_words) return b"success" def update(params): """修改配置""" try: port_no = params['port_no'] phone_no = params['phone_no'] key_words = params['key_words'] except Exception: raise Exception('参数错误,请刷新页面后重试') PortPhoneConf().update(port_no, phone_no, key_words) return b"success" def delete(params): """删除配置""" try: port_no = params['port_no'] except Exception: raise Exception('参数错误,请刷新页面后重试') PortPhoneConf().delete(port_no) return b"success" def sms(params): """接收短信,并转发到钉钉群""" try: phone_no = params['phone_no'] msg = params['msg'] except Exception: raise Exception('参数错误,请刷新页面后重试') current_phone_no = RedisUtil.comm_get(RedisUtil.CURRENT_PORT_KEY) if phone_no == current_phone_no: raise Exception(phone_no + "对应端口未开启") SMSLog().redirect_to_ding_talk(msg) return b"success" def set_current_conf(params): """设置当前开启的端口""" try: phone_no = params['phone_no'] except Exception: raise Exception('参数错误,请刷新页面后重试') RedisUtil.comm_set(RedisUtil.CURRENT_PORT_KEY, phone_no) return b"success"

(4)上传代码

快成功了。。。

(5)测试

保存之后,选择在线编辑,进行测试,nice


作者:新新许愿树



用python 阿里云 python开发 函数 HTTP wsgi 实战 阿里 Python

需要 登录 后方可回复, 如果你还没有账号请 注册新账号