Source code for test_server.server

# Copyright 2015-2018 Gregory Petukhov (lorien@lorien.name)
# *
# Licensed under the MIT License
import logging
from threading import Thread, Event
import time
import types
from six.moves.urllib.parse import urljoin
from collections import defaultdict
try:
    from collections.abc import Iterable
except ImportError:
    from collections import Iterable

import six
from webtest.http import StopableWSGIServer
from waitress import task
import bottle

from test_server.error import TestServerError

__all__ = ('TestServer', 'WaitTimeoutError')
logger = logging.getLogger('test_server.server') # pylint: disable=invalid-name

if six.PY3:
    # Original (from waitress.compat.tobytes):
    # def tobytes(s):
    #    return bytes(s, 'latin-1')
    task.tobytes = lambda x: bytes(x, 'utf-8')


def _hval_custom(value):
    value = bottle.tonat(value)
    if '\n' in value or '\r' in value:# or '\0' in value:
        raise ValueError(
            'Header value must not contain control characters: %r' % value
        )
    return value


bottle._hval_origin = bottle._hval # pylint: disable=protected-access
bottle._hval = _hval_custom # pylint: disable=protected-access



[docs]class WaitTimeoutError(Exception): pass
def bytes_to_unicode(obj, charset): if isinstance(obj, six.text_type): return obj elif isinstance(obj, six.binary_type): return obj.decode(charset) elif isinstance(obj, list): return [bytes_to_unicode(x, charset) for x in obj] elif isinstance(obj, tuple): return tuple(bytes_to_unicode(x, charset) for x in obj) elif isinstance(obj, dict): return dict(bytes_to_unicode(x, charset) for x in obj.items()) else: return obj class WebApplication(bottle.Bottle): # pylint: disable=abstract-method,protected-access def __init__(self, test_server): self._server = test_server super(WebApplication, self).__init__() def get_param(self, key, method='get', clear_once=True): method_key = '%s.%s' % (method, key) if method_key in self._server.response_once: value = self._server.response_once[method_key] if clear_once: del self._server.response_once[method_key] return value elif key in self._server.response_once: value = self._server.response_once[key] if clear_once: del self._server.response_once[key] return value elif method_key in self._server.response: return self._server.response[method_key] elif key in self._server.response: return self._server.response[key] else: raise TestServerError('Parameter %s does not exists in ' 'server response data' % key) ## pylint: disable=arguments-differ #def decode_argument(self, value, **kwargs): # # pylint: disable=unused-argument # return value.decode(self._server.request['charset']) def handle_any_request(self, path): from test_server import __version__ from bottle import request, LocalResponse method = request.method.lower() sleep = self.get_param('sleep', method) if sleep: time.sleep(sleep) self._server.request['client_ip'] = ( request.environ.get('REMOTE_ADDR') ) self._server.request['args'] = {} self._server.request['args_binary'] = {} for key in request.params.keys(): # pylint: disable=no-member self._server.request['args'][key] = ( request.params.getunicode(key) # pylint: disable=no-member ) #self._server.request['args_binary'][key] = request.params[key] self._server.request['headers'] = request.headers path = request.fullpath if isinstance(path, six.binary_type): path = path.decode('utf-8') self._server.request['path'] = path self._server.request['method'] = method.upper() cookies = {} for key, value in request.cookies.items(): # pylint: disable=no-member cookies[key] = {} cookies[key]['name'] = key cookies[key]['value'] = value self._server.request['cookies'] = cookies self._server.request['data'] = ( request.body.read() # pylint: disable=no-member ) self._server.request['files'] = defaultdict(list) for file_ in request.files.values(): # pylint: disable=no-member self._server.request['files'][file_.name].append({ 'name': file_.name, 'raw_filename': file_.raw_filename, 'content_type': file_.content_type, 'filename': file_.filename, 'content': file_.file.read(), }) callback = self.get_param('callback', method) if callback: res = callback() if not isinstance(res, types.GeneratorType): res = [res] for item in res: assert ( isinstance(item, dict) and 'type' in item and item['type'] in ('response',) ) bottle_res = LocalResponse() if item['type'] == 'response': assert all( x in ('type', 'status', 'headers', 'cookies', 'body') for x in item.keys() ) if 'status' in item: bottle_res.status = item['status'] if 'headers' in item: for key, val in item['headers']: bottle_res.add_header(key, val) if 'cookies' in item: for key, val in item['cookies']: bottle_res.set_cookie(key, val) if 'body' in item: # use list `[foo]`, see comments below bottle_res.body = [item['body']] self._server.request['done'] = True return bottle_res else: response = { 'code': 200, 'headers': [], 'data': b'', } response['code'] = self.get_param('code', method) for key, val in self.get_param('cookies', method): # Set-Cookie: name=newvalue; expires=date; # path=/; domain=.example.org. response['headers'].append( ('Set-Cookie', '%s=%s' % (key, val))) for key, value in self.get_param('headers', method): response['headers'].append((key, value)) response['headers'].append( ('Listen-Port', str(self._server.port))) data = self.get_param('data', method) if isinstance(data, six.string_types): response['data'] = data elif isinstance(data, six.binary_type): response['data'] = data elif isinstance(data, Iterable): try: response['data'] = next(data) except StopIteration: response['code'] = 503 else: raise TestServerError('Data parameter should ' 'be string or iterable ' 'object') header_keys = [x[0].lower() for x in response['headers']] if 'content-type' not in header_keys: response['headers'].append( ('Content-Type', 'text/html; charset=%s' % self._server.response['charset']) ) if 'server' not in header_keys: response['headers'].append( ('Server', 'TestServer/%s' % __version__)) bottle_response = LocalResponse() bottle_response.status = response['code'] # Use list because if use just scalar object # then on python3 there is an strange error # unsupported response type int bottle_response.body = [response['data']] for key, val in response['headers']: bottle_response.add_header(key, val) self._server.request['done'] = True return bottle_response
[docs]class TestServer(object): def __init__(self, port=0, address='127.0.0.1', engine=None): if engine: # be nice to expectations of old API import warnings text = "value of parameter engine is ignored with version 0.0.31" text += " and param might get removed later" warnings.warn(text, DeprecationWarning) self.request = {} self.response = {} self.response_once = {} self.port = port self.address = address self._handler = None self._thread = None self._server = None self._started = Event() self.config = {} self.config.update({ 'port': self.port, }) self.reset()
[docs] def reset(self): self.request.clear() self.request.update({ 'args': {}, 'args_binary': {}, 'headers': {}, 'cookies': None, 'path': None, 'method': None, 'data': None, 'files': {}, 'client_ip': None, 'done': False, 'charset': 'utf-8', }) self.response.clear() self.response.update({ 'code': 200, 'data': '', 'headers': [], 'cookies': [], 'callback': None, 'sleep': None, 'charset': 'utf-8', }) self.response_once.clear()
def _build_web_app(self): """Build bottle web application that is served by HTTP server""" app = WebApplication(self) app.route('<path:re:.*>', method='ANY')(app.handle_any_request) return app
[docs] def server_thread(self, server_created): """Ask HTTP server start processing requests This function is supposed to be run in separate thread. """ # pylint: disable=line-too-long # params: https://github.com/Pylons/waitress/blob/master/waitress/adjustments.py#L79 self._server = StopableWSGIServer( host=self.address, port=self.port, threads=1, expose_tracebacks=False,#True, application=self._build_web_app() ) server_created.set() self._server.run()
[docs] def start(self, daemon=True): """Start the HTTP server.""" server_created = Event() self._thread = Thread( target=self.server_thread, args=[server_created] ) self._thread.daemon = daemon self._thread.start() if not server_created.wait(2): raise Exception('Could not create test server app instance') self._server.wait()
[docs] def stop(self): """Stop tornado loop and wait for thread finished it work.""" self._server.shutdown() self._thread.join()
[docs] def get_url(self, path='', port=None): """Build URL that is served by HTTP server.""" if port is None: port = self.port return urljoin('http://%s:%d/' % (self.address, port), path)
[docs] def wait_request(self, timeout): """Stupid implementation that eats CPU.""" start = time.time() while True: if self.request['done']: break time.sleep(0.01) if time.time() - start > timeout: raise WaitTimeoutError('No request processed in %d seconds' % timeout)