import app # Replace with your actual application import severless_wsgi
# If you need to send additional content types as text, add then directly # to the whitelist: # # serverless_wsgi.TEXT_MIME_TYPES.append("application/custom+json")
try: from urllib import urlencode except ImportError: from urllib.parse import urlencode
from flask import Flask
try: from cStringIO import StringIO except ImportError: try: from StringIO import StringIO except ImportError: from io import StringIO
from werkzeug.wrappers import BaseRequest
__version__ = '0.0.4'
defmake_environ(event): environ = {} for hdr_name, hdr_value in event['headers'].items(): hdr_name = hdr_name.replace('-', '_').upper() if hdr_name in ['CONTENT_TYPE', 'CONTENT_LENGTH']: environ[hdr_name] = hdr_value continue
defall_casings(input_string): ifnot input_string: yield"" else: first = input_string[:1] if first.lower() == first.upper(): for sub_casing in all_casings(input_string[1:]): yield first + sub_casing else: for sub_casing in all_casings(input_string[1:]): yield first.lower() + sub_casing yield first.upper() + sub_casing
defsplit_headers(headers): """ If there are multiple occurrences of headers, create case-mutated variations in order to pass them through APIGW. This is a hack that's currently needed. See: https://github.com/logandk/serverless-wsgi/issues/11 Source: https://github.com/Miserlou/Zappa/blob/master/zappa/middleware.py """ new_headers = {}
for key in headers.keys(): values = headers.get_all(key) iflen(values) > 1: for value, casing inzip(values, all_casings(key)): new_headers[casing] = value eliflen(values) == 1: new_headers[key] = values[0]
return new_headers
defgroup_headers(headers): new_headers = {}
for key in headers.keys(): new_headers[key] = headers.get_all(key)
return new_headers
defencode_query_string(event): multi = event.get(u"multiValueQueryStringParameters") if multi: return url_encode(MultiDict((i, j) for i in multi for j in multi[i])) else: return url_encode(event.get(u"queryString") or {})
if path_info.startswith(script_name): path_info = path_info[len(script_name) :] or"/"
ifu"body"in event: body = event[u"body"] or"" else: body = ""
if event.get("isBase64Encoded", False): body = base64.b64decode(body) ifisinstance(body, string_types): body = to_bytes(body, charset="utf-8")
environ = { "CONTENT_LENGTH": str(len(body)), "CONTENT_TYPE": headers.get(u"Content-Type", ""), "PATH_INFO": url_unquote(path_info), "QUERY_STRING": encode_query_string(event), "REMOTE_ADDR": event["requestContext"] .get(u"identity", {}) .get(u"sourceIp", ""), "REMOTE_USER": event["requestContext"] .get(u"authorizer", {}) .get(u"principalId", ""), "REQUEST_METHOD": event["httpMethod"], "SCRIPT_NAME": script_name, "SERVER_NAME": headers.get(u"Host", "lambda"), "SERVER_PORT": headers.get(u"X-Forwarded-Port", "80"), "SERVER_PROTOCOL": "HTTP/1.1", "wsgi.errors": sys.stderr, "wsgi.input": BytesIO(body), "wsgi.multiprocess": False, "wsgi.multithread": False, "wsgi.run_once": False, "wsgi.url_scheme": headers.get(u"X-Forwarded-Proto", "http"), "wsgi.version": (1, 0), "serverless.authorizer": event["requestContext"].get(u"authorizer"), "serverless.event": event, "serverless.context": context, # TODO: Deprecate the following entries, as they do not comply with the WSGI # spec. For custom variables, the spec says: # # Finally, the environ dictionary may also contain server-defined variables. # These variables should be named using only lower-case letters, numbers, dots, # and underscores, and should be prefixed with a name that is unique to the # defining server or gateway. "API_GATEWAY_AUTHORIZER": event["requestContext"].get(u"authorizer"), "event": event, "context": context, }
for key, value in environ.items(): ifisinstance(value, string_types): environ[key] = wsgi_encoding_dance(value)
for key, value in headers.items(): key = "HTTP_" + key.upper().replace("-", "_") if key notin ("HTTP_CONTENT_TYPE", "HTTP_CONTENT_LENGTH"): environ[key] = value
if event.get("requestContext").get("elb"): # If the request comes from ALB we need to add a status description returndict["statusDescription"] = u"%d %s" % ( response.status_code, HTTP_STATUS_CODES[response.status_code], )
if response.data: mimetype = response.mimetype or"text/plain" if ( mimetype.startswith("text/") or mimetype in TEXT_MIME_TYPES ) andnot response.headers.get("Content-Encoding", ""): returndict["body"] = response.get_data(as_text=True) returndict["isBase64Encoded"] = False else: returndict["body"] = base64.b64encode(response.data).decode("utf-8") returndict["isBase64Encoded"] = True