web开发拦截器的方法是什么-创新互联

这篇文章主要介绍“web开发拦截器的方法是什么”,在日常操作中,相信很多人在web开发拦截器的方法是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”web开发拦截器的方法是什么”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

站在用户的角度思考问题,与客户深入沟通,找到万柏林网站设计与万柏林网站推广的解决方案,凭借多年的经验,让设计与互联网技术结合,创造个性化、用户体验好的作品,建站类型包括:网站设计、做网站、企业官网、英文网站、手机端网站、网站推广、域名与空间、虚拟空间、企业邮箱。业务覆盖万柏林地区。

interceptor,拦截器:

在请求处理环节的某处加入处理,有可能是中断后续的处理;

类似java的structs框架中的拦截器,servelet中也有拦截器;

分类:

根据拦截点不同,分:请求挂载;响应拦截;

根据影响面,分:全局拦截(在Application中拦截);局部拦截(在Router中拦截);

不能在handler中作拦截,即不能在一个完整的功能模块中拦截;

在__call__()进去之后,__call__()return之前作拦截;

web开发拦截器的方法是什么

注:

依次执行,一环扣一环,上一步的输出是下一步的输入;

拦截器可以是多个,多个拦截器是有顺序的;

数据response之前执行的命名为pre_interceptor,之后的执行命名为post_interceptor;

某些特定功能要求最终返回给用户为404,经fn后rerturn None,这样达到目的就行;

加入拦截器功能的方式:

方式1:

Application和Router类中直接加入;

把拦截器的相关方法,属性分别添加到相关类中;

Router的拦截器是每个实例都不一样,适合用此种;

方式2:

Mixin;

Application和Router都需要这个拦截器功能,这两个类有无关系,可用mixin方式,将属性方法组合进来;

Application适合使用此种;

拦截器fn函数的设计:

def fn(app,request:Request)->Request: pass  #fn不能影响数据继续向下一级的传递,即是透明的,如handle的输入要是request,前面的拦截的输出也要是request,handle处理后是response,经拦截器最终到__call__()也要是response

def fn(app,reqeust:Request)->Request: pass  #引入app即Application的实例,是为以后从Application上获取一些全局信息

上下文支持:

为把一些应用数据、配置数据、数据库连接等全局共享数据提供给所有对象使用,增加一个字典,存储这些共享数据;

为方便访问,提供字典的属性化访问的类,且该字典可写;

例:

class Context(dict):

   def __getattr__(self, item):

       try:

           return self[item]

       except KeyError:

           raise AttributeError('Attribute {} Not Found'.fomrat(item))

   def __setattr__(self, key, value):

       self[key] = value

class Application:

   ctx = Context()

   ROUTERS = []

   def __init__(self, **kwargs):

       self.ctx.app = self

       for k, v in kwargs:

           self.ctx[k] = v

Router的每一个实例中增加上下文属性,实例自己使用;

Router实例如何使用全局上下文?

用新的处理方法,增加NestedContext类,每一个Router实例的上下文字典内部关联一个全局字典的引用,如果自己的字典中找不到,就去全局里找;

Router实例什么时候关联全局字典?

在路由注册时较合适,只需修改下注册函数即可

例,错误示例:

class Mixin:

        # def __init__(self): pass  #仅实现功能,不能有初始化

class A:

        def __init__(self): pass

class C(Mixin, A):

        def __init__(self):

                  super().__init__()  #错误,这样用的是Mixin的初始化方法,把A的覆盖掉了

例:

class Context(dict):

   def __getattr__(self, item):

       try:

           return self[item]

       except KeyError:

           raise AttributeError('Attribute {} Not Found'.format(item))

   def __setattr__(self, key, value):

       self[key] = value

class NestedContext(Context):

   def __init__(self, globalcontext:Context=None):

       super().__init__()  #此句可没有,父类中未初始化,按标准流程应写上

       self.relate(globalcontext)

   def relate(self, globalcontext:Context=None):

       self.globalcontext = globalcontext

   def __getattr__(self, item):

       if item in self.keys():

           return self[item]

       return self.globalcontext[item]

ctx = Context()

ctx.x = 6

ctx.y = 'a'

nc = NestedContext()

nc.relate(ctx)

nc.x = 8

print(nc)  #

print(nc.x)  #自己的

print(nc.y)  #全局的

print(nc.z)  #KeyError

输出:

{'globalcontext': {'y': 'a', 'x': 6}, 'x': 8}

8

a

Traceback (most recent call last):

 File "E:/git_practice/cmdb/example_wsgi_interceptor.py", line 37, in

   print(nc.z)

 File "E:/git_practice/cmdb/example_wsgi_interceptor.py", line 24, in __getattr__

   return self.globalcontext[item]

KeyError: 'z'

完整代码:

将如下代码改为单例(单例模式,只允许创建一个实例);

多线程时,要么锁要么信号量;

多进程时,用进程中的信号量;

例:

from wsgiref.simple_server import make_server

from webob import Request, Response, dec, exc

import re

class DictObj:

   def __init__(self, d: dict):

       if not isinstance(d, dict):

           self.__dict__['_dict'] = {}

       else:

           self.__dict__['_dict'] = d

   def __getattr__(self, item):

       try:

           return self._dict[item]

       except KeyError:

           raise AttributeError('Attribute {} Not Found '.format(self._dict))

   def __setattr__(self, key, value):

       raise NotImplementedError

class Context(dict):

   def __getattr__(self, item):

       try:

           return self[item]

       except KeyError:

           raise AttributeError('Attrubute {} Not found'.format(item))

   def __setattr__(self, key, value):

       self[key] = value

class NestedContext(Context):

   def __init__(self, globalcontext:Context=None):

       super().__init__()

       self.relate(globalcontext)

   def relate(self, globalcontext:Context=None):

       self.globalcontext = globalcontext

   def __getattr__(self, item):

       if item in self.keys():

           return self[item]

       return self.globalcontext[item]

class Router:

   pattern = '/({[^{}:]+:?[^{}:]*})' # /{name:str}

   regex = re.compile(pattern)

   TYPEPATTERNS = {

       'str': r'[^/]+',

       'word': r'\w+',

       'int': r'[+-]?\d+',

       'float': r'[+-]\d+.\d+',

       'any': r'.+'

   }

   TYPECAST = {

       'str': str,

       'word': str,

       'int': int,

       'float': float,

       'any': str

   }

   def _transform(self, kv: str):

       name, _, type = kv.strip('/{}').partition(':')

       return '/(?P<{}>{})'.format(name, self.TYPEPATTERNS.get(type, '\w+')), name, self.TYPECAST.get(type, str)

   def _parse(self, src: str):

       start = 0

       res = ''

       translator = {}

       while True:

           matcher = self.regex.search(src, start)

           if matcher:

               res += matcher.string[start: matcher.start()]

               tmp = self._transform(matcher.string[matcher.start():matcher.end()])

               res += tmp[0]

               translator[tmp[1]] = tmp[2]

               start = matcher.end()

           else:

               break

       if res:

           return res, translator

       else:

           return src, translator

   def __init__(self, prefix: str=''):

       self.__prefix = prefix.rstrip('/\\')

       self.__routertable = []  #[(methods, regex, translator, handler)]

       self.pre_interceptor = []

       self.post_interceptor = []

       self.ctx = NestedContext()

   @property

   def prefix(self):

       return self.__prefix

   def register_preinterceptor(self, fn):

       self.pre_interceptor.append(fn)

       return fn

   def register_postinterceptor(self, fn):

       self.post_interceptor.append(fn)

       return fn

   def route(self, rule, *methods):

       def wrapper(handler):

           pattern, translator = self._parse(rule)

           self.__routertable.append((methods, re.compile(pattern), translator, handler))

           return handler

       return wrapper

   def get(self, pattern):

       return self.route(pattern, 'GET')

   def post(self, pattern):

       return self.route(pattern, 'POST')

   def head(self, pattern):

       return self.route(pattern, 'HEAD')

   def match(self, request: Request)->Response:

       print(request.path)

       if not request.path.startswith(self.prefix):

           return

       for fn in self.pre_interceptor:

           request = fn(self.ctx, request)

       for methods, regex, translator, handler in self.__routertable:

           print(methods, regex, translator, handler)

           if not methods or request.method.upper() in methods:

               matcher = regex.search(request.path.replace(self.prefix, '', 1))

               if matcher:

                   print(matcher)

                   newdict = {}

                   for k, v in matcher.groupdict().items():

                       newdict[k] = translator[k](v)

                   print(newdict)

                   request.vars = DictObj(newdict)

                   return handler(request)

       # return

class Application:

   ctx = Context()

   ROUTERS = []

   def __init__(self, **kwargs):

       self.ctx.app = self

       for k, v in kwargs:

           self.ctx[k] = v

   PRE_INTERCEPTOR = []

   POST_INTERCEPTOR = []

   @classmethod

   def register_preinterceptor(cls, fn):

       cls.PRE_INTERCEPTOR.append(fn)

       return fn

   @classmethod

   def register_postinterceptor(cls, fn):

       cls.POST_INTERCEPTOR.append(fn)

       return fn

   @classmethod

   def register(cls, router: Router):

       router.ctx.relate(cls.ctx)

       router.ctx.router = router

       cls.ROUTERS.append(router)

   @dec.wsgify

   def __call__(self, request: Request) -> Response:

       for fn in self.PRE_INTERCEPTOR:

           request = fn(self.ctx, request)

       for router in self.ROUTERS:

           response = router.match(request)

           for fn in self.POST_INTERCEPTOR:

               response = fn(self.ctx, request, response)

           if response:

               return response

       raise exc.HTTPNotFound('

the page not found

')

idx = Router()

py = Router('/python')

Application.register(idx)

Application.register(py)

# @py.get('/{name:str}')

# @py.get('/{id:int}')

@py.get('/{name:str}/{id:int}')

def showpython(request):

   res = Response()

   # print(request.__dict__)

   # res.body = '

hello python; vars = {}

'.format(request.vars.name).encode()

   res.body = '

hello python; vars = {}

'.format(request.vars.id).encode()

   return res

@idx.route('^/$')

def index(request):

   res = Response()

   res.body = '

welcome

'.encode()

   return res

@Application.register_preinterceptor

def showheaders(ctx: Context, request: Request) -> Response:

   print(request.path)

   print(request.user_agent)

   return request

@py.register_preinterceptor

def showprefix(ctx: Context, request: Request)->Response:

   print('~~~~~~~prefix = {}'.format(ctx.router.prefix))

   return request

if __name__ == '__main__':

   ip = '127.0.0.1'

   port = 9999

   server = make_server(ip, port, Application())

   try:

       server.serve_forever()

   except Exception as e:

       print(e)

   finally:

       server.shutdown()

       server.server_close()

到此,关于“web开发拦截器的方法是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注创新互联网站,小编会继续努力为大家带来更多实用的文章!

另外有需要云服务器可以了解下创新互联cdcxhl.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。


当前名称:web开发拦截器的方法是什么-创新互联
文章源于:http://pwwzsj.com/article/ghdcg.html