From 91531d1f432c0988c6a3adfbbc77306e4b58d509 Mon Sep 17 00:00:00 2001 From: Ahmet Alp Balkan Date: Mon, 12 Oct 2020 11:36:56 -0700 Subject: [PATCH] support grpc/h2c with httputil.ReverseProxy Signed-off-by: Ahmet Alp Balkan --- go.mod | 1 - go.sum | 5 +- runsd/main.go | 2 +- runsd/proxy.go | 161 +++++++++++++++++++++++++++---------------------- 4 files changed, 91 insertions(+), 78 deletions(-) diff --git a/go.mod b/go.mod index 241ae1e..1549c9f 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module cloud_run_proxy go 1.13 require ( - github.com/elazarl/goproxy v0.0.0-20200809112317-0581fc3aee2d github.com/google/go-cmp v0.4.0 github.com/miekg/dns v1.1.27 golang.org/x/net v0.0.0-20190923162816-aa69164e4478 diff --git a/go.sum b/go.sum index d880a24..3ad0d12 100644 --- a/go.sum +++ b/go.sum @@ -1,13 +1,9 @@ -github.com/elazarl/goproxy v0.0.0-20200809112317-0581fc3aee2d h1:rtM8HsT3NG37YPjz8sYSbUSdElP9lUsQENYzJDZDUBE= -github.com/elazarl/goproxy v0.0.0-20200809112317-0581fc3aee2d/go.mod h1:Ro8st/ElPeALwNFlcTpWmkr6IoMFfkjXAvTHpevnDsM= -github.com/elazarl/goproxy/ext v0.0.0-20190711103511-473e67f1d7d2/go.mod h1:gNh8nYJoAm43RfaxurUnxr+N1PwuFV3ZMl/efxlIlY8= github.com/go-logr/logr v0.1.0 h1:M1Tv3VzNlEHg6uyACnRdtrploV2P7wZqH8BoQMtz0cg= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/miekg/dns v1.1.27 h1:aEH/kqUzUxGJ/UHcEKdJY+ugH6WEzsEBBSPa8zuy1aM= github.com/miekg/dns v1.1.27/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM= -github.com/rogpeppe/go-charset v0.0.0-20180617210344-2471d30d28b4/go.mod h1:qgYeAmZ5ZIpBWTGllZSQnw97Dj+woV0toclVaRGI8pc= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 h1:ObdrDkeb4kJdCP557AjRjq69pTHfNouLtWZG7j9rPN8= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -26,6 +22,7 @@ golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20191216052735-49a3e744a425/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= k8s.io/klog/v2 v2.0.0-20200127113903-12be8a0d907a h1:Mcu95Qw9AYB0+JYxeTNY0aooqKFu8zdFVDr1Kigx5iI= k8s.io/klog/v2 v2.0.0-20200127113903-12be8a0d907a/go.mod h1:q4PVo0BneA7GsUJvFqoEvOCVmYJP0c5Y4VxrAYpJrIk= diff --git a/runsd/main.go b/runsd/main.go index 0da5e61..0475136 100644 --- a/runsd/main.go +++ b/runsd/main.go @@ -200,7 +200,7 @@ func main() { klog.V(1).Infof("skipping http proxy server initialization") } else { proxy := newReverseProxy(projectHash, region, flInternalDomain) - handler := allowh2c(absolutify(proxy.newHandler())) + handler := allowh2c(proxy.newReverseProxyHandler(http.DefaultTransport)) go func() { addr := net.JoinHostPort(net.IPv4(127, 0, 0, 1).String(), flHTTPProxyPort) klog.Fatalf("reverse proxy (ipv4) fail: %v", http.ListenAndServe(addr, handler)) diff --git a/runsd/proxy.go b/runsd/proxy.go index 9e73c33..e5d08f3 100644 --- a/runsd/proxy.go +++ b/runsd/proxy.go @@ -15,13 +15,15 @@ package main import ( + "bytes" + "context" "fmt" "io/ioutil" "net/http" + "net/http/httputil" "strings" "time" - "github.com/elazarl/goproxy" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" "k8s.io/klog/v2" @@ -41,70 +43,94 @@ func newReverseProxy(projectHash, currentRegion, internalDomain string) *reverse } } -func (rp *reverseProxy) newHandler() http.Handler { - proxy := goproxy.NewProxyHttpServer() - proxy.OnRequest().DoFunc(func(req *http.Request, ctx *goproxy.ProxyCtx) (*http.Request, *http.Response) { - ctx.UserData = time.Now() - klog.V(5).Infof("[proxy] start: method=%s url=%s headers=%d trailers=%d", req.Method, req.URL, len(req.Header), len(req.Trailer)) - for k, v := range req.Header { - klog.V(5).Infof("[proxy] > hdr=%s v=%#v", k, v) - } - runHost, err := resolveCloudRunHost(rp.internalDomain, req.Host, rp.currentRegion, rp.projectHash) - if err != nil { - // this only fails due to region code not being registered –which would be handled - // by the DNS resolver so the request should not come here with an invalid region. - klog.Warningf("WARN: reverse proxy failed to find a Cloud Run URL for host=%s: %v", req.Host, err) - return nil, goproxy.NewResponse(req, "text/plain", http.StatusInternalServerError, - fmt.Sprintf("runsd doesn't know how to handle host=%q: %v", req.Host, err)) - } - origHost := req.Host - req.URL.Scheme = "https" - req.URL.Host = runHost - req.Host = runHost - req.Header.Set("host", runHost) - klog.V(5).Infof("[proxy] rewrote host=%s to=%s newurl=%q", origHost, runHost, req.URL) - - idToken, err := identityToken("https://" + req.Host) - if err != nil { - klog.V(1).Infof("WARN: failed to get ID token for host=%s: %v", req.Host, err) - resp := new(http.Response) - resp.Body = ioutil.NopCloser(strings.NewReader(fmt.Sprintf("failed to fetch metadata token: %v", err))) - resp.StatusCode = http.StatusInternalServerError - return nil, resp - } - if req.Header.Get("authorization") == "" { - req.Header.Set("authorization", "Bearer "+idToken) - } - ua := req.Header.Get("user-agent") - req.Header.Set("user-agent", fmt.Sprintf("runsd version=%s", version)) - if ua != "" { - req.Header.Set("user-agent", req.Header.Get("user-agent")+"; "+ua) - } - return req, nil - }) - proxy.OnResponse().DoFunc(func(resp *http.Response, ctx *goproxy.ProxyCtx) *http.Response { - start := ctx.UserData - var took time.Duration - if v, ok := start.(time.Time); ok { - took = time.Since(v) - } - var rcode, hdrs, trailers int - if resp != nil { - rcode = resp.StatusCode - hdrs = len(resp.Header) - trailers = len(resp.Trailer) - for k, v := range resp.Header { - klog.V(7).Infof("[proxy] < hdr=%s v=%#v", k, v) - } - for k, v := range resp.Trailer { - klog.V(7).Infof("[proxy] < trailer=%s v=%#v", k, v) +const ( + ctxKeyEarlyResponse = `early-response` +) + +func (rp *reverseProxy) newReverseProxyHandler(tr http.RoundTripper) http.Handler { + tokenInject := authenticatingTransport{next: tr} + transport := loggingTransport{next: tokenInject} + + return &httputil.ReverseProxy{ + Transport: transport, + FlushInterval: -1, // to support grpc streaming responses + Director: func(req *http.Request) { + klog.V(5).Infof("[proxy] start: method=%s url=%s headers=%d trailers=%d", req.Method, req.URL, len(req.Header), len(req.Trailer)) + runHost, err := resolveCloudRunHost(rp.internalDomain, req.Host, rp.currentRegion, rp.projectHash) + if err != nil { + // this only fails due to region code not being registered –which would be handled + // by the DNS resolver so the request should not come here with an invalid region. + klog.Warningf("WARN: reverse proxy failed to find a Cloud Run URL for host=%s: %v", req.Host, err) + resp := &http.Response{ + Request: req, + StatusCode: http.StatusInternalServerError, + Body: ioutil.NopCloser(bytes.NewReader([]byte( + fmt.Sprintf("runsd doesn't know how to handle host=%q: %v", req.Host, err)))), + } + newReq := req.WithContext(context.WithValue(req.Context(), ctxKeyEarlyResponse, resp)) + *req = *newReq + return } + origHost := req.Host + req.URL.Scheme = "https" + req.URL.Host = runHost + req.Host = runHost + req.Header.Set("host", runHost) + klog.V(5).Infof("[proxy] rewrote host=%s to=%s newurl=%q", origHost, runHost, req.URL) + }, + } +} + +type authenticatingTransport struct { + next http.RoundTripper +} + +func (a authenticatingTransport) RoundTrip(req *http.Request) (*http.Response, error) { + if v, ok := req.Context().Value(ctxKeyEarlyResponse).(*http.Response); ok { + return v, nil + } + + idToken, err := identityToken("https://" + req.Host) + if err != nil { + klog.V(1).Infof("WARN: failed to get ID token for host=%s: %v", req.Host, err) + r := new(http.Response) + r.Body = ioutil.NopCloser(strings.NewReader(fmt.Sprintf("failed to fetch metadata token: %v", err))) + r.StatusCode = http.StatusInternalServerError + return r, nil + } + if req.Header.Get("authorization") == "" { + req.Header.Set("authorization", "Bearer "+idToken) + } + ua := req.Header.Get("user-agent") + req.Header.Set("user-agent", fmt.Sprintf("runsd version=%s", version)) + if ua != "" { + req.Header.Set("user-agent", req.Header.Get("user-agent")+"; "+ua) + } + return a.next.RoundTrip(req) +} + +type loggingTransport struct { + next http.RoundTripper +} + +func (l loggingTransport) RoundTrip(req *http.Request) (*http.Response, error) { + start := time.Now() + klog.V(5).Infof("[proxy] start: %s url=%s", req.Method, req.URL) + for k, v := range req.Header { + klog.V(6).Infof("[proxy] > hdr=%s v=%#v", k, v) + } + defer func() { + klog.V(5).Infof("[proxy] end: %s url=%s took=%s", + req.Method, req.URL, time.Since(start).Truncate(time.Millisecond)) + }() + + resp, err := l.next.RoundTrip(req) + if err != nil { + for k, v := range req.Header { + klog.V(6).Infof("[proxy] < hdr=%s v=%#v", k, v) } - klog.V(5).Infof("[proxy] end: method=%s url=%s resp_status=%d headers=%d trailers=%d took=%v", - ctx.Req.Method, ctx.Req.URL, rcode, hdrs, trailers, took) - return resp - }) - return proxy + } + return resp, err } func resolveCloudRunHost(internalDomain, hostname, curRegion, projectHash string) (string, error) { @@ -138,15 +164,6 @@ func mkCloudRunHost(svc, regionCode, projectHash string) string { return fmt.Sprintf("%s-%s-%s.a.run.app", svc, projectHash, regionCode) } -func absolutify(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // make the request URL absolute by adding a schema - // otherwise goproxy complains it "does not respond to non-proxy requests". - r.URL.Scheme = "http" - next.ServeHTTP(w, r) - }) -} - func allowh2c(next http.Handler) http.Handler { h2server := &http2.Server{IdleTimeout: time.Second * 60} return h2c.NewHandler(next, h2server)