-
Notifications
You must be signed in to change notification settings - Fork 12
Updated the repo to work with the application load balancer. #17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
bd0df2c
8bef83f
01d6c4f
13de56d
51e5d64
9e50652
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
// | ||
// File.swift | ||
// | ||
// | ||
// Created by Ralph Küpper on 1/5/21. | ||
// | ||
|
||
import AWSLambdaEvents | ||
import AWSLambdaRuntimeCore | ||
import ExtrasBase64 | ||
import NIO | ||
import NIOHTTP1 | ||
import Vapor | ||
|
||
// MARK: - Handler - | ||
|
||
struct ALBHandler: EventLoopLambdaHandler { | ||
|
||
typealias In = ALB.TargetGroupRequest | ||
typealias Out = ALB.TargetGroupResponse | ||
|
||
private let application: Application | ||
private let responder: Responder | ||
|
||
init(application: Application, responder: Responder) { | ||
self.application = application | ||
self.responder = responder | ||
} | ||
|
||
public func handle(context: Lambda.Context, event: ALB.TargetGroupRequest) | ||
-> EventLoopFuture<ALB.TargetGroupResponse> | ||
{ | ||
let vaporRequest: Vapor.Request | ||
do { | ||
vaporRequest = try Vapor.Request(req: event, in: context, for: self.application) | ||
} catch { | ||
return context.eventLoop.makeFailedFuture(error) | ||
} | ||
|
||
return self.responder.respond(to: vaporRequest).flatMap { ALB.TargetGroupResponse.from(response: $0, in: context) } | ||
} | ||
} | ||
|
||
// MARK: - Request - | ||
|
||
extension Vapor.Request { | ||
private static let bufferAllocator = ByteBufferAllocator() | ||
|
||
convenience init(req: ALB.TargetGroupRequest, in ctx: Lambda.Context, for application: Application) throws { | ||
var buffer: NIO.ByteBuffer? | ||
switch (req.body, req.isBase64Encoded) { | ||
case (let .some(string), true): | ||
let bytes = try string.base64decoded() | ||
buffer = Vapor.Request.bufferAllocator.buffer(capacity: bytes.count) | ||
buffer!.writeBytes(bytes) | ||
|
||
case (let .some(string), false): | ||
buffer = Vapor.Request.bufferAllocator.buffer(capacity: string.utf8.count) | ||
buffer!.writeString(string) | ||
|
||
case (.none, _): | ||
break | ||
} | ||
|
||
var nioHeaders = NIOHTTP1.HTTPHeaders() | ||
req.headers?.forEach { key, value in | ||
nioHeaders.add(name: key, value: value) | ||
} | ||
|
||
/*if let cookies = req., cookies.count > 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lets remove this |
||
nioHeaders.add(name: "Cookie", value: cookies.joined(separator: "; ")) | ||
}*/ | ||
|
||
var url: String = req.path | ||
if req.queryStringParameters.count > 0 { | ||
url += "?" | ||
for key in req.queryStringParameters.keys { | ||
// It leaves an ampersand (&) at the end, but who cares? | ||
url += key + "=" + (req.queryStringParameters[key] ?? "") + "&" | ||
} | ||
} | ||
|
||
ctx.logger.debug("The constructed URL is: \(url)") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd prefer this be at |
||
|
||
self.init( | ||
application: application, | ||
method: NIOHTTP1.HTTPMethod(rawValue: req.httpMethod.rawValue), | ||
url: Vapor.URI(path: url), | ||
version: HTTPVersion(major: 1, minor: 1), | ||
headers: nioHeaders, | ||
collectedBody: buffer, | ||
remoteAddress: nil, | ||
logger: ctx.logger, | ||
on: ctx.eventLoop | ||
) | ||
|
||
storage[ALB.TargetGroupRequest] = req | ||
} | ||
} | ||
|
||
extension ALB.TargetGroupRequest: Vapor.StorageKey { | ||
public typealias Value = ALB.TargetGroupRequest | ||
} | ||
|
||
// MARK: - Response - | ||
|
||
extension ALB.TargetGroupResponse { | ||
static func from(response: Vapor.Response, in context: Lambda.Context) -> EventLoopFuture<ALB.TargetGroupResponse> { | ||
// Create the headers | ||
var headers = [String: String]() | ||
response.headers.forEach { name, value in | ||
if let current = headers[name] { | ||
headers[name] = "\(current),\(value)" | ||
} else { | ||
headers[name] = value | ||
} | ||
} | ||
|
||
// Can we access the body right away? | ||
if let string = response.body.string { | ||
return context.eventLoop.makeSucceededFuture(.init( | ||
statusCode: AWSLambdaEvents.HTTPResponseStatus(code: response.status.code), | ||
headers: headers, | ||
body: string, | ||
isBase64Encoded: false | ||
)) | ||
} else if let bytes = response.body.data { | ||
return context.eventLoop.makeSucceededFuture(.init( | ||
statusCode: AWSLambdaEvents.HTTPResponseStatus(code: response.status.code), | ||
headers: headers, | ||
body: String(base64Encoding: bytes), | ||
isBase64Encoded: true | ||
)) | ||
} else { | ||
// See if it is a stream and try to gather the data | ||
return response.body.collect(on: context.eventLoop).map { (buffer) -> ALB.TargetGroupResponse in | ||
// Was there any content | ||
guard | ||
var buffer = buffer, | ||
let bytes = buffer.readBytes(length: buffer.readableBytes) | ||
else { | ||
return ALB.TargetGroupResponse( | ||
statusCode: AWSLambdaEvents.HTTPResponseStatus(code: response.status.code), | ||
headers: headers | ||
) | ||
} | ||
|
||
// Done | ||
return ALB.TargetGroupResponse( | ||
statusCode: AWSLambdaEvents.HTTPResponseStatus(code: response.status.code), | ||
headers: headers, | ||
body: String(base64Encoding: bytes), | ||
isBase64Encoded: true | ||
) | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -66,8 +66,8 @@ public extension Application.Lambda { | |
} | ||
} | ||
|
||
struct ConfigurationKey: StorageKey { | ||
typealias Value = LambdaServer.Configuration | ||
public struct ConfigurationKey: StorageKey { | ||
public typealias Value = LambdaServer.Configuration | ||
} | ||
} | ||
} | ||
|
@@ -79,13 +79,14 @@ public class LambdaServer: Server { | |
public enum RequestSource { | ||
case apiGateway | ||
case apiGatewayV2 | ||
// case applicationLoadBalancer // not in this release | ||
case applicationLoadBalancer | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🙌 |
||
case sqs | ||
} | ||
|
||
var requestSource: RequestSource | ||
var logger: Logger | ||
|
||
init(apiService: RequestSource = .apiGatewayV2, logger: Logger) { | ||
public init(apiService: RequestSource = .apiGatewayV2, logger: Logger) { | ||
self.requestSource = apiService | ||
self.logger = logger | ||
} | ||
|
@@ -115,6 +116,10 @@ public class LambdaServer: Server { | |
handler = APIGatewayHandler(application: application, responder: responder) | ||
case .apiGatewayV2: | ||
handler = APIGatewayV2Handler(application: application, responder: responder) | ||
case .applicationLoadBalancer: | ||
handler = ALBHandler(application: application, responder: responder) | ||
case .sqs: | ||
handler = SQSHandler(application: application, responder: responder) | ||
} | ||
|
||
self.lambdaLifecycle = Lambda.Lifecycle( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
// | ||
// File.swift | ||
// | ||
// | ||
// Created by Ralph Küpper on 10/23/21. | ||
// | ||
|
||
|
||
import AWSLambdaEvents | ||
import AWSLambdaRuntimeCore | ||
import ExtrasBase64 | ||
import NIO | ||
import NIOHTTP1 | ||
import Vapor | ||
|
||
// MARK: - Handler - | ||
|
||
struct SQSHandler: EventLoopLambdaHandler { | ||
|
||
typealias In = SQS.Event | ||
typealias Out = SQSResponse | ||
|
||
private let application: Application | ||
private let responder: Responder | ||
|
||
init(application: Application, responder: Responder) { | ||
self.application = application | ||
self.responder = responder | ||
} | ||
|
||
public func handle(context: Lambda.Context, event: SQS.Event) | ||
-> EventLoopFuture<SQSResponse> | ||
{ | ||
let vaporRequest: Vapor.Request | ||
do { | ||
vaporRequest = try Vapor.Request(req: event, in: context, for: self.application) | ||
} catch { | ||
return context.eventLoop.makeFailedFuture(error) | ||
} | ||
|
||
return self.responder.respond(to: vaporRequest).flatMap { SQSResponse.from(response: $0, in: context) } | ||
} | ||
} | ||
|
||
// MARK: - Request - | ||
|
||
extension Vapor.Request { | ||
private static let bufferAllocator = ByteBufferAllocator() | ||
|
||
convenience init(req: SQS.Event, in ctx: Lambda.Context, for application: Application) throws { | ||
let event = req.records.first! | ||
print("incoming events: ", req.records.count) | ||
/*var buffer: NIO.ByteBuffer? | ||
switch (req.body, req.isBase64Encoded) { | ||
case (let .some(string), true): | ||
let bytes = try string.base64decoded() | ||
buffer = Vapor.Request.bufferAllocator.buffer(capacity: bytes.count) | ||
buffer!.writeBytes(bytes) | ||
|
||
case (let .some(string), false): | ||
buffer = Vapor.Request.bufferAllocator.buffer(capacity: string.utf8.count) | ||
buffer!.writeString(string) | ||
|
||
case (.none, _): | ||
break | ||
} | ||
|
||
var nioHeaders = NIOHTTP1.HTTPHeaders() | ||
req.headers?.forEach { key, value in | ||
nioHeaders.add(name: key, value: value) | ||
} | ||
|
||
/*if let cookies = req., cookies.count > 0 { | ||
nioHeaders.add(name: "Cookie", value: cookies.joined(separator: "; ")) | ||
}*/ | ||
|
||
var url: String = req.path | ||
if req.queryStringParameters.count > 0 { | ||
url += "?" | ||
for key in req.queryStringParameters.keys { | ||
// It leaves an ampersand (&) at the end, but who cares? | ||
url += key + "=" + (req.queryStringParameters[key] ?? "") + "&" | ||
} | ||
}*/ | ||
var buffer: NIO.ByteBuffer? | ||
buffer = Vapor.Request.bufferAllocator.buffer(capacity: event.body.utf8.count) | ||
buffer!.writeString(event.body) | ||
|
||
let url = "/sqs" | ||
|
||
ctx.logger.debug("The constructed URL is: \(url)") | ||
|
||
self.init( | ||
application: application, | ||
method: NIOHTTP1.HTTPMethod.POST, | ||
url: Vapor.URI(path: url), | ||
version: HTTPVersion(major: 1, minor: 1), | ||
headers: [:], | ||
collectedBody: buffer, | ||
remoteAddress: nil, | ||
logger: ctx.logger, | ||
on: ctx.eventLoop | ||
) | ||
|
||
storage[SQS.Event] = req | ||
} | ||
} | ||
|
||
extension SQS.Event: Vapor.StorageKey { | ||
public typealias Value = SQS.Event | ||
} | ||
|
||
// MARK: - Response - | ||
|
||
struct SQSResponse: Codable { | ||
public var statusCode: HTTPResponseStatus | ||
public var statusDescription: String? | ||
public var headers: HTTPHeaders? | ||
public var multiValueHeaders: HTTPMultiValueHeaders? | ||
public var body: String | ||
public var isBase64Encoded: Bool | ||
|
||
public init( | ||
statusCode: HTTPResponseStatus, | ||
statusDescription: String? = nil, | ||
headers: HTTPHeaders? = nil, | ||
multiValueHeaders: HTTPMultiValueHeaders? = nil, | ||
body: String = "", | ||
isBase64Encoded: Bool = false | ||
) { | ||
self.statusCode = statusCode | ||
self.statusDescription = statusDescription | ||
self.headers = headers | ||
self.multiValueHeaders = multiValueHeaders | ||
self.body = body | ||
self.isBase64Encoded = isBase64Encoded | ||
} | ||
|
||
static func from(response: Vapor.Response, in context: Lambda.Context) -> EventLoopFuture<SQSResponse> { | ||
// Create the headers | ||
var headers: HTTPHeaders = [:] | ||
|
||
// Can we access the body right away? | ||
let string = response.body.string ?? "" | ||
return context.eventLoop.makeSucceededFuture(.init( | ||
statusCode: HTTPResponseStatus.ok, | ||
headers: headers, | ||
body: string, | ||
isBase64Encoded: false | ||
)) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be changed back