Skip to content

Commit

Permalink
support grpc/h2c with httputil.ReverseProxy
Browse files Browse the repository at this point in the history
Signed-off-by: Ahmet Alp Balkan <ahmetb@google.com>
  • Loading branch information
ahmetb committed Oct 12, 2020
1 parent af7f022 commit 91531d1
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 78 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module cloud_run_proxy
go 1.13

require (
github.com/elazarl/goproxy v0.0.0-20200809112317-0581fc3aee2d
github.com/google/go-cmp v0.4.0
github.com/miekg/dns v1.1.27
golang.org/x/net v0.0.0-20190923162816-aa69164e4478
Expand Down
5 changes: 1 addition & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
github.com/elazarl/goproxy v0.0.0-20200809112317-0581fc3aee2d h1:rtM8HsT3NG37YPjz8sYSbUSdElP9lUsQENYzJDZDUBE=
github.com/elazarl/goproxy v0.0.0-20200809112317-0581fc3aee2d/go.mod h1:Ro8st/ElPeALwNFlcTpWmkr6IoMFfkjXAvTHpevnDsM=
github.com/elazarl/goproxy/ext v0.0.0-20190711103511-473e67f1d7d2/go.mod h1:gNh8nYJoAm43RfaxurUnxr+N1PwuFV3ZMl/efxlIlY8=
github.com/go-logr/logr v0.1.0 h1:M1Tv3VzNlEHg6uyACnRdtrploV2P7wZqH8BoQMtz0cg=
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/miekg/dns v1.1.27 h1:aEH/kqUzUxGJ/UHcEKdJY+ugH6WEzsEBBSPa8zuy1aM=
github.com/miekg/dns v1.1.27/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM=
github.com/rogpeppe/go-charset v0.0.0-20180617210344-2471d30d28b4/go.mod h1:qgYeAmZ5ZIpBWTGllZSQnw97Dj+woV0toclVaRGI8pc=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 h1:ObdrDkeb4kJdCP557AjRjq69pTHfNouLtWZG7j9rPN8=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
Expand All @@ -26,6 +22,7 @@ golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20191216052735-49a3e744a425/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
k8s.io/klog/v2 v2.0.0-20200127113903-12be8a0d907a h1:Mcu95Qw9AYB0+JYxeTNY0aooqKFu8zdFVDr1Kigx5iI=
k8s.io/klog/v2 v2.0.0-20200127113903-12be8a0d907a/go.mod h1:q4PVo0BneA7GsUJvFqoEvOCVmYJP0c5Y4VxrAYpJrIk=
2 changes: 1 addition & 1 deletion runsd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func main() {
klog.V(1).Infof("skipping http proxy server initialization")
} else {
proxy := newReverseProxy(projectHash, region, flInternalDomain)
handler := allowh2c(absolutify(proxy.newHandler()))
handler := allowh2c(proxy.newReverseProxyHandler(http.DefaultTransport))
go func() {
addr := net.JoinHostPort(net.IPv4(127, 0, 0, 1).String(), flHTTPProxyPort)
klog.Fatalf("reverse proxy (ipv4) fail: %v", http.ListenAndServe(addr, handler))
Expand Down
161 changes: 89 additions & 72 deletions runsd/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
package main

import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net/http"
"net/http/httputil"
"strings"
"time"

"github.com/elazarl/goproxy"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"k8s.io/klog/v2"
Expand All @@ -41,70 +43,94 @@ func newReverseProxy(projectHash, currentRegion, internalDomain string) *reverse
}
}

func (rp *reverseProxy) newHandler() http.Handler {
proxy := goproxy.NewProxyHttpServer()
proxy.OnRequest().DoFunc(func(req *http.Request, ctx *goproxy.ProxyCtx) (*http.Request, *http.Response) {
ctx.UserData = time.Now()
klog.V(5).Infof("[proxy] start: method=%s url=%s headers=%d trailers=%d", req.Method, req.URL, len(req.Header), len(req.Trailer))
for k, v := range req.Header {
klog.V(5).Infof("[proxy] > hdr=%s v=%#v", k, v)
}
runHost, err := resolveCloudRunHost(rp.internalDomain, req.Host, rp.currentRegion, rp.projectHash)
if err != nil {
// this only fails due to region code not being registered –which would be handled
// by the DNS resolver so the request should not come here with an invalid region.
klog.Warningf("WARN: reverse proxy failed to find a Cloud Run URL for host=%s: %v", req.Host, err)
return nil, goproxy.NewResponse(req, "text/plain", http.StatusInternalServerError,
fmt.Sprintf("runsd doesn't know how to handle host=%q: %v", req.Host, err))
}
origHost := req.Host
req.URL.Scheme = "https"
req.URL.Host = runHost
req.Host = runHost
req.Header.Set("host", runHost)
klog.V(5).Infof("[proxy] rewrote host=%s to=%s newurl=%q", origHost, runHost, req.URL)

idToken, err := identityToken("https://" + req.Host)
if err != nil {
klog.V(1).Infof("WARN: failed to get ID token for host=%s: %v", req.Host, err)
resp := new(http.Response)
resp.Body = ioutil.NopCloser(strings.NewReader(fmt.Sprintf("failed to fetch metadata token: %v", err)))
resp.StatusCode = http.StatusInternalServerError
return nil, resp
}
if req.Header.Get("authorization") == "" {
req.Header.Set("authorization", "Bearer "+idToken)
}
ua := req.Header.Get("user-agent")
req.Header.Set("user-agent", fmt.Sprintf("runsd version=%s", version))
if ua != "" {
req.Header.Set("user-agent", req.Header.Get("user-agent")+"; "+ua)
}
return req, nil
})
proxy.OnResponse().DoFunc(func(resp *http.Response, ctx *goproxy.ProxyCtx) *http.Response {
start := ctx.UserData
var took time.Duration
if v, ok := start.(time.Time); ok {
took = time.Since(v)
}
var rcode, hdrs, trailers int
if resp != nil {
rcode = resp.StatusCode
hdrs = len(resp.Header)
trailers = len(resp.Trailer)
for k, v := range resp.Header {
klog.V(7).Infof("[proxy] < hdr=%s v=%#v", k, v)
}
for k, v := range resp.Trailer {
klog.V(7).Infof("[proxy] < trailer=%s v=%#v", k, v)
const (
ctxKeyEarlyResponse = `early-response`
)

func (rp *reverseProxy) newReverseProxyHandler(tr http.RoundTripper) http.Handler {
tokenInject := authenticatingTransport{next: tr}
transport := loggingTransport{next: tokenInject}

return &httputil.ReverseProxy{
Transport: transport,
FlushInterval: -1, // to support grpc streaming responses
Director: func(req *http.Request) {
klog.V(5).Infof("[proxy] start: method=%s url=%s headers=%d trailers=%d", req.Method, req.URL, len(req.Header), len(req.Trailer))
runHost, err := resolveCloudRunHost(rp.internalDomain, req.Host, rp.currentRegion, rp.projectHash)
if err != nil {
// this only fails due to region code not being registered –which would be handled
// by the DNS resolver so the request should not come here with an invalid region.
klog.Warningf("WARN: reverse proxy failed to find a Cloud Run URL for host=%s: %v", req.Host, err)
resp := &http.Response{
Request: req,
StatusCode: http.StatusInternalServerError,
Body: ioutil.NopCloser(bytes.NewReader([]byte(
fmt.Sprintf("runsd doesn't know how to handle host=%q: %v", req.Host, err)))),
}
newReq := req.WithContext(context.WithValue(req.Context(), ctxKeyEarlyResponse, resp))
*req = *newReq
return
}
origHost := req.Host
req.URL.Scheme = "https"
req.URL.Host = runHost
req.Host = runHost
req.Header.Set("host", runHost)
klog.V(5).Infof("[proxy] rewrote host=%s to=%s newurl=%q", origHost, runHost, req.URL)
},
}
}

type authenticatingTransport struct {
next http.RoundTripper
}

func (a authenticatingTransport) RoundTrip(req *http.Request) (*http.Response, error) {
if v, ok := req.Context().Value(ctxKeyEarlyResponse).(*http.Response); ok {
return v, nil
}

idToken, err := identityToken("https://" + req.Host)
if err != nil {
klog.V(1).Infof("WARN: failed to get ID token for host=%s: %v", req.Host, err)
r := new(http.Response)
r.Body = ioutil.NopCloser(strings.NewReader(fmt.Sprintf("failed to fetch metadata token: %v", err)))
r.StatusCode = http.StatusInternalServerError
return r, nil
}
if req.Header.Get("authorization") == "" {
req.Header.Set("authorization", "Bearer "+idToken)
}
ua := req.Header.Get("user-agent")
req.Header.Set("user-agent", fmt.Sprintf("runsd version=%s", version))
if ua != "" {
req.Header.Set("user-agent", req.Header.Get("user-agent")+"; "+ua)
}
return a.next.RoundTrip(req)
}

type loggingTransport struct {
next http.RoundTripper
}

func (l loggingTransport) RoundTrip(req *http.Request) (*http.Response, error) {
start := time.Now()
klog.V(5).Infof("[proxy] start: %s url=%s", req.Method, req.URL)
for k, v := range req.Header {
klog.V(6).Infof("[proxy] > hdr=%s v=%#v", k, v)
}
defer func() {
klog.V(5).Infof("[proxy] end: %s url=%s took=%s",
req.Method, req.URL, time.Since(start).Truncate(time.Millisecond))
}()

resp, err := l.next.RoundTrip(req)
if err != nil {
for k, v := range req.Header {
klog.V(6).Infof("[proxy] < hdr=%s v=%#v", k, v)
}
klog.V(5).Infof("[proxy] end: method=%s url=%s resp_status=%d headers=%d trailers=%d took=%v",
ctx.Req.Method, ctx.Req.URL, rcode, hdrs, trailers, took)
return resp
})
return proxy
}
return resp, err
}

func resolveCloudRunHost(internalDomain, hostname, curRegion, projectHash string) (string, error) {
Expand Down Expand Up @@ -138,15 +164,6 @@ func mkCloudRunHost(svc, regionCode, projectHash string) string {
return fmt.Sprintf("%s-%s-%s.a.run.app", svc, projectHash, regionCode)
}

func absolutify(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// make the request URL absolute by adding a schema
// otherwise goproxy complains it "does not respond to non-proxy requests".
r.URL.Scheme = "http"
next.ServeHTTP(w, r)
})
}

func allowh2c(next http.Handler) http.Handler {
h2server := &http2.Server{IdleTimeout: time.Second * 60}
return h2c.NewHandler(next, h2server)
Expand Down

0 comments on commit 91531d1

Please sign in to comment.