博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
抽屉之Tornado实战(7)--form表单验证
阅读量:6322 次
发布时间:2019-06-22

本文共 14441 字,大约阅读时间需要 48 分钟。

在这里,我们把form表单验证的代码进行工具化了,以后稍微修改一下参数就可以拿来用了

  先贴上代码

forms.py

from backend.form import fields  class BaseForm:     def __init__(self):        self._value_dict = {}        self._error_dict = {}        self._valid_status = True     def valid(self, handler):         for field_name, field_obj in self.__dict__.items():            if field_name.startswith('_'):                continue             if type(field_obj) == fields.CheckBoxField:                post_value = handler.get_arguments(field_name, None)            elif type(field_obj) == fields.FileField:                post_value = []                file_list = handler.request.files.get(field_name, [])                for file_item in file_list:                    post_value.append(file_item['filename'])            else:                post_value = handler.get_argument(field_name, None)             field_obj.match(field_name, post_value)            if field_obj.is_valid:                self._value_dict[field_name] = field_obj.value            else:                self._error_dict[field_name] = field_obj.error                self._valid_status = False        return self._valid_status

fields.py

import reimport os class Field:     def __init__(self):         self.is_valid = False        self.name = None        self.value = None        self.error = None     def match(self, name, value):        self.name = name         if not self.required:            self.is_valid = True            self.value = value        else:            if not value:                if self.custom_error_dict.get('required', None):                    self.error = self.custom_error_dict['required']                else:                    self.error = "%s is required" % name            else:                ret = re.match(self.REGULAR, value)                if ret:                    self.is_valid = True                    self.value = value                else:                    if self.custom_error_dict.get('valid', None):                        self.error = self.custom_error_dict['valid']                    else:                        self.error = "%s is invalid" % name  class StringField(Field):     REGULAR = "^.*$"     def __init__(self, custom_error_dict=None, required=True):         self.custom_error_dict = {}  # {'required': 'IP不能为空', 'valid': 'IP格式错误'}        if custom_error_dict:            self.custom_error_dict.update(custom_error_dict)         self.required = required         super(StringField, self).__init__()  class IPField(Field):     REGULAR = "^(25[0-5]|2[0-4]\d|[0-1]?\d?\d)(\.(25[0-5]|2[0-4]\d|[0-1]?\d?\d)){3}$"     def __init__(self, custom_error_dict=None, required=True):         self.custom_error_dict = {}  # {'required': 'IP不能为空', 'valid': 'IP格式错误'}        if custom_error_dict:            self.custom_error_dict.update(custom_error_dict)         self.required = required        super(IPField, self).__init__()  class EmailField(Field):     REGULAR = "^\w+([-+.']\w+)*@\w+([-.]\w+)*\.\w+([-.]\w+)*$"     def __init__(self, custom_error_dict=None, required=True):         self.custom_error_dict = {}  # {'required': 'IP不能为空', 'valid': 'IP格式错误'}        if custom_error_dict:            self.custom_error_dict.update(custom_error_dict)         self.required = required        super(EmailField, self).__init__()  class IntegerField(Field):     REGULAR = "^\d+$"     def __init__(self, custom_error_dict=None, required=True):         self.custom_error_dict = {}  # {'required': 'IP不能为空', 'valid': 'IP格式错误'}        if custom_error_dict:            self.custom_error_dict.update(custom_error_dict)         self.required = required        super(IntegerField, self).__init__()  class CheckBoxField(Field):     REGULAR = "^\d+$"     def __init__(self, custom_error_dict=None, required=True):         self.custom_error_dict = {}  # {'required': 'IP不能为空', 'valid': 'IP格式错误'}        if custom_error_dict:            self.custom_error_dict.update(custom_error_dict)         self.required = required        super(CheckBoxField, self).__init__()     def match(self, name, value):        self.name = name         if not self.required:            self.is_valid = True            self.value = value        else:            if not value:                if self.custom_error_dict.get('required', None):                    self.error = self.custom_error_dict['required']                else:                    self.error = "%s is required" % name            else:                if isinstance(name, list):                    self.is_valid = True                    self.value = value                else:                    if self.custom_error_dict.get('valid', None):                        self.error = self.custom_error_dict['valid']                    else:                        self.error = "%s is invalid" % name  class FileField(Field):     REGULAR = "^(\w+\.pdf)|(\w+\.mp3)|(\w+\.py)$"     def __init__(self, custom_error_dict=None, required=True):         self.custom_error_dict = {}  # {'required': 'IP不能为空', 'valid': 'IP格式错误'}        if custom_error_dict:            self.custom_error_dict.update(custom_error_dict)         self.required = required         super(FileField, self).__init__()     def match(self, name, file_name_list):        flag = True        self.name = name         if not self.required:            self.is_valid = True            self.value = file_name_list        else:            if not file_name_list:                if self.custom_error_dict.get('required', None):                    self.error = self.custom_error_dict['required']                else:                    self.error = "%s is required" % name                flag = False            else:                for file_name in file_name_list:                    if not file_name or not file_name.strip():                        if self.custom_error_dict.get('required', None):                            self.error = self.custom_error_dict['required']                        else:                            self.error = "%s is required" % name                        flag = False                        break                    else:                        ret = re.match(self.REGULAR, file_name)                        if not ret:                            if self.custom_error_dict.get('valid', None):                                self.error = self.custom_error_dict['valid']                            else:                                self.error = "%s is invalid" % name                            flag = False                            break             self.is_valid = flag     def save(self, request, upload_to=""):         file_metas = request.files[self.name]        for meta in file_metas:            file_name = meta['filename']            file_path_name = os.path.join(upload_to, file_name)            with open(file_path_name, 'wb') as up:                up.write(meta['body'])         upload_file_path_list = map(lambda path: os.path.join(upload_to, path), self.value)        self.value = list(upload_file_path_list)

在form.py这个文件,做了一件什么事呢?代码就定义了父类,主要是判断要验证内容的类型,然后取值,然后又调用了在fields.py里类的match方法,最后把验证后结果信息返回(vaild方法)。

  而在fields.py文件里,主要是对为空检测,合法性检测,并把检测结果返回给form.py的vaild方法里(match方法)

 

首先

  • form组件只做为空检测,合法性检测,并没做超时检测,内部可获取验证状态_valid_status--True/False,错误信息_error_dict,验证通过时的用户数据_value_dict

  • 验证类型:字符串,IP,邮箱,数字,复选框,文件

再者,怎么用?

  • 分析你的应用场景,需要对哪几个类型进行验证,定义一个类,把需要的验证类型写入到构造方法里,记得继承一下BaseForm类,并且继承一下父类的构造方法

  • 在构造方法里,实例Field对象时,可以传入自定制错误类型信息custom_error_dict,required=False可为空设置

    from backend.form.forms import BaseFormfrom backend.form.fields import StringFieldfrom backend.form.fields import IntegerFieldfrom backend.form.fields import EmailField  class SendMsgForm(BaseForm):     def __init__(self):        self.email = EmailField(custom_error_dict={
    'required': '注册邮箱不能为空.', 'valid': '注册邮箱格式错误.'}) super(SendMsgForm, self).__init__() class RegisterForm(BaseForm): def __init__(self): self.username = StringField() self.email = EmailField() self.password = StringField() self.email_code = StringField() super(RegisterForm, self).__init__() class LoginForm(BaseForm): def __init__(self): self.user = StringField() self.pwd = StringField() self.code = StringField() super(LoginForm, self).__init__()

     最后

    • 在post方法里,调用一下form对象的vaild方法(把handler对象,也就是self传入),接下来只要根据form对象里检测完后的信息进行相应的操作

import ioimport datetimeimport jsonfrom backend.utils import check_codefrom backend.core.request_handler import BaseRequestHandlerfrom forms import accountfrom backend.utils.response import BaseResponsefrom backend import commonsfrom models import chouti_orm as ORMfrom sqlalchemy import and_, or_  class CheckCodeHandler(BaseRequestHandler):    def get(self, *args, **kwargs):        stream = io.BytesIO()        img, code = check_code.create_validate_code()        img.save(stream, "png")        self.session["CheckCode"] = code        self.write(stream.getvalue())  class LoginHandler(BaseRequestHandler):    def post(self, *args, **kwargs):        #对象里有self.status=False,self.data=None,self.summary=None,self.message={}        rep = BaseResponse()        form = account.LoginForm()        if form.valid(self):            if form._value_dict['code'].lower() != self.session["CheckCode"].lower():                rep.message = {
'code': '验证码错误'} self.write(json.dumps(rep.__dict__)) return conn = ORM.session() obj = conn.query(ORM.UserInfo).filter( or_( and_(ORM.UserInfo.email == form._value_dict['user'], ORM.UserInfo.password == form._value_dict['pwd']), and_(ORM.UserInfo.username == form._value_dict['user'], ORM.UserInfo.password == form._value_dict['pwd']) )).first() if not obj: rep.message = {
'user': '用户名邮箱或密码错误'} self.write(json.dumps(rep.__dict__)) return self.session['is_login'] = True self.session['user_info'] = obj.__dict__ rep.status = True else: rep.message = form._error_dict self.write(json.dumps(rep.__dict__)) class RegisterHandler(BaseRequestHandler): def post(self, *args, **kwargs): rep = BaseResponse() form = account.RegisterForm() if form.valid(self): current_date = datetime.datetime.now() limit_day = current_date - datetime.timedelta(minutes=1) conn = ORM.session() is_valid_code = conn.query(ORM.SendMsg).filter(ORM.SendMsg.email == form._value_dict['email'], ORM.SendMsg.code == form._value_dict['email_code'], ORM.SendMsg.ctime > limit_day).count() if not is_valid_code: rep.message['email_code'] = '邮箱验证码不正确或过期' self.write(json.dumps(rep.__dict__)) return has_exists_email = conn.query(ORM.UserInfo).filter(ORM.UserInfo.email == form._value_dict['email']).count() if has_exists_email: rep.message['email'] = '邮箱已经存在' self.write(json.dumps(rep.__dict__)) return has_exists_username = conn.query(ORM.UserInfo).filter( ORM.UserInfo.username == form._value_dict['username']).count() if has_exists_username: rep.message['email'] = '用户名已经存在' self.write(json.dumps(rep.__dict__)) return form._value_dict['ctime'] = current_date form._value_dict.pop('email_code') obj = ORM.UserInfo(**form._value_dict) conn.add(obj) conn.query(ORM.SendMsg).filter_by(email=form._value_dict['email']).delete() conn.commit() self.session['is_login'] = True self.session['user_info'] = obj.__dict__ rep.status = True else: rep.message = form._error_dict self.write(json.dumps(rep.__dict__)) class SendMsgHandler(BaseRequestHandler): def post(self, *args, **kwargs): rep = BaseResponse() form = account.SendMsgForm() if form.valid(self): email = form._value_dict['email'] conn = ORM.session() has_exists_email = conn.query(ORM.UserInfo).filter(ORM.UserInfo.email == form._value_dict['email']).count() if has_exists_email: rep.summary = "此邮箱已经被注册" self.write(json.dumps(rep.__dict__)) return current_date = datetime.datetime.now() code = commons.random_code() count = conn.query(ORM.SendMsg).filter_by(**form._value_dict).count() if not count: insert = ORM.SendMsg(code=code, email=email, ctime=current_date) conn.add(insert) conn.commit() rep.status = True else: limit_day = current_date - datetime.timedelta(hours=1) times = conn.query(ORM.SendMsg).filter(ORM.SendMsg.email == email, ORM.SendMsg.ctime > limit_day, ORM.SendMsg.times >= 10, ).count() if times: rep.summary = "'已经超过今日最大次数(1小时后重试)'" else: unfreeze = conn.query(ORM.SendMsg).filter(ORM.SendMsg.email == email, ORM.SendMsg.ctime < limit_day).count() if unfreeze: conn.query(ORM.SendMsg).filter_by(email=email).update({
"times": 0}) conn.query(ORM.SendMsg).filter_by(email=email).update({
"times": ORM.SendMsg.times + 1, "code": code, "ctime": current_date}, synchronize_session="evaluate") conn.commit() rep.status = True else: rep.summary = form._error_dict['email'] self.write(json.dumps(rep.__dict__))

 

转载于:https://www.cnblogs.com/zcok168/p/9797711.html

你可能感兴趣的文章
ElasticSearch 版本选择及分布式环境搭建
查看>>
101.数据卷备份恢复 docker网络模式、报错处理、配置桥接
查看>>
usermod 命令、mkpasswd命令及用户密码管理
查看>>
Python学习,还在用正则或者bs4做爬虫吗?来试试css选择器吧
查看>>
windows系统redis sentinel 集群搭建
查看>>
Qt5开发及实例学习之标准字体对话框类QFontDialog:选择字体设置文本编辑器
查看>>
gdb常用命令
查看>>
如何从请求、传输、渲染3个方面提升Web前端性能
查看>>
java基础:基本类型占用字节数
查看>>
条形码设计软件BarTender实用教程——模板对象常见问题解答
查看>>
(五)java spring cloud版b2b2c社交电商spring cloud分布式微服务-路由网关(zuul)
查看>>
23种设计模式知识要点,你都了解了吗?
查看>>
给awstats增加纯真IP库qqwry.dat支持
查看>>
PhalApi-RabbitMQ基于PhalApi专业队列拓展
查看>>
ubuntu12.04--无法获得锁 /var/lib/dpkg/lock - open (1...
查看>>
git冲突 head
查看>>
OSChina 周二乱弹 —— 啥时候能低调的炫个富?
查看>>
OSChina 周五乱弹 —— 吃货的自我修养
查看>>
OSChina 周六乱弹 —— 为啥你没对象
查看>>
OSChina 周六乱弹 —— 生命诚可贵,啤酒价更高
查看>>