Powered by Tachyonic Project Luxon Framework v1.4.0

Luxon Framework for rapid application development. (luxon)

Version

Source code for luxon.core.handlers.wsgi.response

# -*- coding: utf-8 -*-
# Copyright (c) 2018 Christiaan Frans Rademan.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
#   list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
#   this list of conditions and the following disclaimer in the documentation
#   and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holders nor the names of its
#   contributors may be used to endorse or promote products derived from
#   this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
# THE POSSIBILITY OF SUCH DAMAGE.
from io import BytesIO
from collections import OrderedDict
from http.cookies import SimpleCookie, CookieError

from luxon import __identity__
from luxon import constants as const
from luxon.utils.encoding import is_ascii
from luxon.utils.timezone import TimezoneGMT, to_gmt, format_http_datetime
from luxon.core.handlers.wsgi.redirects import Redirects
from luxon.structs.cidict import CiDict
from luxon.utils.encoding import if_unicode_to_bytes
from luxon.utils.http import (parse_cache_control_header,
                              ETags)
from luxon.utils import js

GMT_TIMEZONE = TimezoneGMT()


[docs]class Response(Redirects): """Represents an HTTP response to a client request. Args: env (dict): A WSGI environment dict passed in from the server. As per PEP-3333. start_response (function): callback function supplied by the server which takes the HTTP status and headers as arguments. Attributes: status (int): HTTP status code. (e.g. '200') Default 200. etags (obj): The luxon.utils.http.Etags obj associated with the Response. cache_control (str): The value of cache-control in the Response header. content_type (str): Mime format of the content (e.g. application/xml; charset=utf-8). expire (str): The value of expire in the Response header. last_modified (str): The value of last-modified in the Response header. age (int): The value of age in the Response header. """ _DEFAULT_CONTENT_TYPE = const.APPLICATION_JSON _BODILESS_STATUS_CODES = ( 100, 101, 204, 304, ) _STREAM_BLOCK_SIZE = 8 * 1024 # 8 KiB _DEFAULT_ENCODING = 'UTF-8' __slots__ = ( 'content_type', '_content_length', '_stream', '_headers', '_http_response_status_code', '_cookies', '_start_response', '_etags', ) def __init__(self, environ, start_response): self.content_type = None self._content_length = None self._stream = None self._headers = {} self._start_response = start_response # Default Response Status Used internally. self._http_response_status_code = 204 # Some Default Headers.. self._headers['X-Powered-By'] = __identity__ self._cookies = None self._etags = None def __repr__(self): return '<%s: %s>' % (self.__class__.__name__, self.status) def __str__(self): return '<%s: %s>' % (self.__class__.__name__, self.status) @property def content_length(self): """Value of bytes in response. Returns: int: total octects for response. """ if self._content_length: return str(self._content_length) try: return self._stream.getbuffer().nbytes except AttributeError: pass try: return len(self._stream) except TypeError: return None @content_length.setter def content_length(self, value): self._content_length = value
[docs] def body(self, obj): """Set Response Body. Accepts following objects: 'str', and 'bytes', if str will be encoded to bytes. file, iter like objects must return bytes. OrderedDict, dict and list will be translated json and encoded to 'UTF-8' Args: obj (object): Any valid object for response body. """ if self._http_response_status_code == 204: self._http_response_status_code = 200 if isinstance(obj, (str, bytes,)): # If Body is string, bytes. obj = if_unicode_to_bytes(obj) if self.content_type is None: self.content_type = self._DEFAULT_CONTENT_TYPE self._stream = obj elif isinstance(obj, (OrderedDict, dict, list, tuple,)): # If JSON serializeable object. self.content_type = const.APPLICATION_JSON self._stream = if_unicode_to_bytes(js.dumps(obj)) elif hasattr(obj, 'json'): # If JSON serializeable object. self.content_type = const.APPLICATION_JSON self._stream = if_unicode_to_bytes(obj.json) elif hasattr(obj, 'read') or hasattr(obj, '__iter__'): # If body content behaves like file. if self.content_type is None: self.content_type = const.APPLICATION_OCTET_STREAM self._stream = obj else: raise ValueError('resource not returning acceptable object %s' % type(obj))
[docs] def write(self, value): """Write bytes to response body. Args: value (bytes): Data to be written. Returns: int: The number of bytes written. """ value = if_unicode_to_bytes(value) if not isinstance(self._stream, BytesIO): self._stream = BytesIO() length = self._stream.write(value) if self._http_response_status_code == 204: self._http_response_status_code = 200 if self.content_type is None: self.content_type = self._DEFAULT_CONTENT_TYPE return length
@property def status(self): return self._http_response_status_code @status.setter def status(self, value): self._http_response_status_code = int(value)
[docs] def set_header(self, name, value): """Set a header for this response to a given value. Names and values must be convertable to 'str' or be str'. Strings must contain only US-ASCII characters. Args: name (str): Header name (case-insensitive). value (str): Value for the header. """ name = str(name) value = str(value) self._headers[name.title()] = value
@property def etag(self): if self._etags is None: self._etags = ETags(set_callback=self.append_header) return self._etags @etag.setter def etag(self, value): self.delete_header('etag') self._etags = ETags(value, set_callback=self.append_header) @property def cache_control(self): return parse_cache_control_header(self.get_header('cache-control')) @cache_control.setter def cache_control(self, options): self.append_header('cache-control', options) @property def expires(self): expires = self.get_header('expires') if expires is not None: return to_gmt(expires, src=TimezoneGMT()) @expires.setter def expires(self, value): self.set_header('expires', format_http_datetime(value)) @property def last_modified(self): last_modified = self.get_header('last-modified') if last_modified is not None: return to_gmt(last_modified, src=TimezoneGMT()) @last_modified.setter def last_modified(self, value): self.set_header('last-modified', format_http_datetime(value)) @property def age(self): try: return int(self.get_header('age')) except ValueError: raise ValueError('Invalid Integer Value for age') @age.setter def age(self, value): self.set_header('age', str(value))
[docs] def delete_header(self, name): """Delete a header that was previously set for this response. If the header was not previously set, nothing is done (no error is raised). Names and values must be convertable to 'str' or be str'. Strings must contain only US-ASCII characters. Args: name (str): Header name (case-insensitive). """ self._headers.pop(name.title(), None)
[docs] def append_header(self, name, value): """Set or append a header for this response. If the header already exists, the new value will be appended to it, delimited by a comma. Some header specifications support this format, Set-Cookie being the notable exceptions. Names and values must be convertable to 'str' or be str'. Strings must contain only US-ASCII characters. Args: name (str): Header name (case-insensitive). value (str): Value for the header. """ name = str(name) value = str(value) name = name.title() if name in self._headers: value = self._headers[name] + ',' + value self._headers[name] = value
[docs] def set_headers(self, headers): """Set several headers at once. Calling this method overwrites existing values, if any. Names and values must be convertable to 'str' or be str'. Strings must contain only US-ASCII characters. Args: headers (dict or list): A dictionary of header names and values to set, or a 'list' of (*name*, *value*) tuples. Raises: ValueError: `headers` was not a 'dict' or 'list' of 'tuple'. """ if isinstance(headers, (dict, CiDict,)): headers = headers.items() _headers = self._headers for name, value in headers: name = str(name) value = str(value) _headers[name.title()] = value
[docs] def get_header(self, name): """Retrieve the raw string value for the given header. Names and values must be convertable to 'str' or be str'. Strings must contain only US-ASCII characters. Args: name (str): Header name (case-insensitive). Returns: str: The header's value if set, otherwise None. """ return self._headers.get(name.title(), None)
def __call__(self): # Localized for a little more speed. status = self.status headers = self._headers content_type = self.content_type content_length = self.content_length # Set Content-Length Header. if content_length is not None: headers['Content-Length'] = str(content_length) # Set Content-Type Header. if content_type is not None: headers['Content-Type'] = content_type elif (content_type is None and status not in self._BODILESS_STATUS_CODES): headers['Content-Type'] = self._DEFAULT_CONTENT_TYPE headers = list(self._headers.items()) # Load cookies. if self._cookies is not None: headers += [('Set-Cookie', c.OutputString()) for c in self._cookies.values()] self._start_response("%s %s" % (status, const.HTTP_STATUS_CODES[status]), headers) return self def __iter__(self): stream = self._stream status = self.status _STREAM_BLOCK_SIZE = self._STREAM_BLOCK_SIZE if status not in self._BODILESS_STATUS_CODES: try: # Rewind file like object to beginning stream.seek(0) except AttributeError: pass try: while True: chunk = stream.read(_STREAM_BLOCK_SIZE) if not chunk: break yield chunk except AttributeError: # If iterable body... try: for i in range(0, len(stream), _STREAM_BLOCK_SIZE): yield stream[i:i + _STREAM_BLOCK_SIZE] except TypeError: pass yield b'' def close(self): if hasattr(self._stream, 'close'): self._stream.close()
[docs] def read(self): """Reads the entire Response body. """ try: self._stream.seek(0) return self._stream.read() except AttributeError: return self._stream