diff --git a/api/httpforkjoin.proto b/api/httpforkjoin.proto index 37ca722..827b366 100644 --- a/api/httpforkjoin.proto +++ b/api/httpforkjoin.proto @@ -62,6 +62,7 @@ enum ErrorCode { ConnectionError = 3; ConcurrencyContextError = 4; RequestAborted = 5; + AuthenticationError = 6; } message Error { diff --git a/cmd/httpInsecureserver_test.go b/cmd/httpInsecureserver_test.go index fbcc0e8..9e79a58 100644 --- a/cmd/httpInsecureserver_test.go +++ b/cmd/httpInsecureserver_test.go @@ -106,6 +106,39 @@ func TestMutulAuth(t *testing.T) { } } +func TestInvalidMutulAuth(t *testing.T) { + conn := makeGrpcConn() + defer conn.Close() + c := h.NewHTTPForkJoinServiceClient(conn) + + req := h.Request{Id: "BIN1", Messages: makeAuthErrMutualReq()} + stream, err := c.FanoutFanin(context.Background(), &req) + if err != nil { + t.Errorf("GRPC error call should have not failed with %v", err) + } + + res := []*h.Response{} + + for { + response, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + t.Errorf("%v.FanoutFanin = _, %v", c, err) + } + res = append(res, response) + } + if len(res) != 1 { + t.Error("there should be a 1 responses") + } + + if res[0].Errors[0].Code != h.ErrorCode_AuthenticationError { + t.Errorf("there should be error code: %v", h.ErrorCode_AuthenticationError) + } + +} + func TestInvalidHTTPURLForkJoin(t *testing.T) { conn := makeGrpcConn() defer conn.Close() @@ -253,7 +286,7 @@ func makeInValidURLRequests() []*h.Message { } func makeSlowResponseRequests() []*h.Message { - delayedMesg := &h.Message{Method: h.Message_POST, URL: "http://localhost:8000/healthz"} + delayedMesg := &h.Message{Method: h.Message_POST, URL: "http://localhost:8000/healthz", ActiveDeadLineSeconds: 10} msgs := []*h.Message{} msgs = append(msgs, delayedMesg) return msgs @@ -289,3 +322,25 @@ func makeMutualAuthRequestsCfg() []*h.Message { msgs = append(msgs, msg) return msgs } + +func makeAuthErrMutualReq() []*h.Message { + msg := &h.Message{ + Method: h.Message_GET, + URL: "https://localhost:8000/mutual", + Authentication: h.Message_MUTUAL, + ActiveDeadLineSeconds: 10, Id: "001"} + + clientcrt, _ := ioutil.ReadFile("..//certs//client.crt") + clientkey, _ := ioutil.ReadFile("..//certs//client.key") + ca, _ := ioutil.ReadFile("..//certs//ca.crt") + + mcreds := h.Message_MutualAuthCredentials{ + ClientCertificate: string(clientcrt), + ClientKey: string(clientkey), + CACertificate: string(ca)} + + msg.MutualAuthCredentials = &mcreds + msgs := []*h.Message{} + msgs = append(msgs, msg) + return msgs +} diff --git a/http/httpforkjoin.pb.go b/http/httpforkjoin.pb.go index 39b9796..c3a9e70 100644 --- a/http/httpforkjoin.pb.go +++ b/http/httpforkjoin.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.25.0-devel -// protoc v3.14.0 +// protoc-gen-go v1.28.0 +// protoc v3.19.4 // source: api/httpforkjoin.proto package http @@ -29,6 +29,7 @@ const ( ErrorCode_ConnectionError ErrorCode = 3 ErrorCode_ConcurrencyContextError ErrorCode = 4 ErrorCode_RequestAborted ErrorCode = 5 + ErrorCode_AuthenticationError ErrorCode = 6 ) // Enum value maps for ErrorCode. @@ -40,6 +41,7 @@ var ( 3: "ConnectionError", 4: "ConcurrencyContextError", 5: "RequestAborted", + 6: "AuthenticationError", } ErrorCode_value = map[string]int32{ "InternalError": 0, @@ -48,6 +50,7 @@ var ( "ConnectionError": 3, "ConcurrencyContextError": 4, "RequestAborted": 5, + "AuthenticationError": 6, } ) @@ -688,7 +691,7 @@ var file_api_httpforkjoin_proto_rawDesc = []byte{ 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0f, 0x2e, 0x68, 0x74, 0x74, 0x70, 0x2e, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x43, 0x6f, 0x64, 0x65, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, - 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2a, 0x89, 0x01, 0x0a, 0x09, 0x45, 0x72, 0x72, 0x6f, 0x72, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2a, 0xa2, 0x01, 0x0a, 0x09, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x11, 0x0a, 0x0d, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x10, 0x00, 0x12, 0x10, 0x0a, 0x0c, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x10, 0x01, 0x12, 0x11, 0x0a, 0x0d, 0x52, 0x65, 0x73, @@ -697,12 +700,14 @@ var file_api_httpforkjoin_proto_rawDesc = []byte{ 0x03, 0x12, 0x1b, 0x0a, 0x17, 0x43, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x78, 0x74, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x10, 0x04, 0x12, 0x12, 0x0a, 0x0e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x41, 0x62, 0x6f, 0x72, 0x74, 0x65, 0x64, - 0x10, 0x05, 0x32, 0x45, 0x0a, 0x13, 0x48, 0x54, 0x54, 0x50, 0x46, 0x6f, 0x72, 0x6b, 0x4a, 0x6f, - 0x69, 0x6e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x2e, 0x0a, 0x0b, 0x46, 0x61, 0x6e, - 0x6f, 0x75, 0x74, 0x46, 0x61, 0x6e, 0x69, 0x6e, 0x12, 0x0d, 0x2e, 0x68, 0x74, 0x74, 0x70, 0x2e, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0e, 0x2e, 0x68, 0x74, 0x74, 0x70, 0x2e, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x08, 0x5a, 0x06, 0x2e, 0x3b, 0x68, - 0x74, 0x74, 0x70, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x10, 0x05, 0x12, 0x17, 0x0a, 0x13, 0x41, 0x75, 0x74, 0x68, 0x65, 0x6e, 0x74, 0x69, 0x63, 0x61, + 0x74, 0x69, 0x6f, 0x6e, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x10, 0x06, 0x32, 0x45, 0x0a, 0x13, 0x48, + 0x54, 0x54, 0x50, 0x46, 0x6f, 0x72, 0x6b, 0x4a, 0x6f, 0x69, 0x6e, 0x53, 0x65, 0x72, 0x76, 0x69, + 0x63, 0x65, 0x12, 0x2e, 0x0a, 0x0b, 0x46, 0x61, 0x6e, 0x6f, 0x75, 0x74, 0x46, 0x61, 0x6e, 0x69, + 0x6e, 0x12, 0x0d, 0x2e, 0x68, 0x74, 0x74, 0x70, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x0e, 0x2e, 0x68, 0x74, 0x74, 0x70, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x30, 0x01, 0x42, 0x08, 0x5a, 0x06, 0x2e, 0x3b, 0x68, 0x74, 0x74, 0x70, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/http/httpforkjoin_grpc.pb.go b/http/httpforkjoin_grpc.pb.go index e687612..d0c741a 100644 --- a/http/httpforkjoin_grpc.pb.go +++ b/http/httpforkjoin_grpc.pb.go @@ -1,4 +1,8 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.19.4 +// source: api/httpforkjoin.proto package http diff --git a/http/httpworker.go b/http/httpworker.go index 1aca52e..4002cb7 100644 --- a/http/httpworker.go +++ b/http/httpworker.go @@ -70,41 +70,49 @@ func httpDispatch(ctx context.Context, reqMsg HTTPRequest, resultStream chan<- f req.Header.Add(k, v) } - var client *http.Client + client, cerr := newClient(reqMsg, req) - client = newClient(reqMsg, client, req, log) - - res, err := client.Do(req) - - if ctx.Err() != nil && res == nil { - log.LogAbortedRequest(RequestID(ctx), reqMsg.Message.ID, - fmt.Sprintf("Aborted. Too longer than active deadline %v", reqMsg.Message.ActiveDeadLine)) - responseStream <- f.Result{ID: RequestID(ctx), X: makeErrorResponse(reqMsg, http.StatusRequestTimeout), - Err: &f.FJError{Code: f.RequestAborted, - Message: fmt.Sprintf("Request aborted took longer than %v seconds %v", - reqMsg.Message.ActiveDeadLine, err)}, + if cerr != nil { + log.LogAuthenticationError(RequestID(ctx), reqMsg.Message.ID, cerr.Message) + responseStream <- f.Result{ID: RequestID(ctx), + X: makeErrorResponse(reqMsg, http.StatusRequestTimeout), + Err: &f.FJError{Code: f.AuthenticationError, Message: cerr.Message}, } - } else if err != nil { - log.LogRequestDispatchError(RequestID(ctx), reqMsg.Message.ID, err.Error()) - responseStream <- f.Result{ID: RequestID(ctx), X: makeErrorResponse(reqMsg, http.StatusBadGateway), - Err: &f.FJError{Code: f.ConnectionError, Message: fmt.Sprintf("Error in dispatching request: %v", err)}} } else { - bb, err := ioutil.ReadAll(res.Body) - if err != nil { - log.LogResponseError(RequestID(ctx), reqMsg.Message.ID, err.Error()) - responseStream <- f.Result{ID: RequestID(ctx), X: makeResponse(reqMsg, res, nil), - Err: &f.FJError{Code: f.ResponseError, Message: fmt.Sprintf("Error reading http response: %v", err)}} - } + res, err := client.Do(req) + if ctx.Err() != nil && res == nil { + log.LogAbortedRequest(RequestID(ctx), reqMsg.Message.ID, + fmt.Sprintf("Aborted. Too longer than active deadline %v", reqMsg.Message.ActiveDeadLine)) + responseStream <- f.Result{ + ID: RequestID(ctx), X: makeErrorResponse(reqMsg, http.StatusRequestTimeout), + Err: &f.FJError{Code: f.RequestAborted, + Message: fmt.Sprintf("Request aborted took longer than %v seconds %v", + reqMsg.Message.ActiveDeadLine, err)}, + } + } else if err != nil { + log.LogRequestDispatchError(RequestID(ctx), reqMsg.Message.ID, err.Error()) + responseStream <- f.Result{ID: RequestID(ctx), + X: makeErrorResponse(reqMsg, http.StatusBadGateway), + Err: &f.FJError{Code: f.ConnectionError, Message: fmt.Sprintf("Error in dispatching request: %v", err)}} + } else { + bb, err := ioutil.ReadAll(res.Body) + if err != nil { + log.LogResponseError(RequestID(ctx), reqMsg.Message.ID, err.Error()) + responseStream <- f.Result{ID: RequestID(ctx), + X: makeResponse(reqMsg, res, nil), + Err: &f.FJError{Code: f.ResponseError, Message: fmt.Sprintf("Error reading http response: %v", err)}} + } - hr := makeResponse(reqMsg, res, bb) + hr := makeResponse(reqMsg, res, bb) - for k, values := range res.Header { - for _, value := range values { - hr.Message.Add(k, value) + for k, values := range res.Header { + for _, value := range values { + hr.Message.Add(k, value) + } } + defer res.Body.Close() + responseStream <- f.Result{ID: RequestID(ctx), X: hr} } - defer res.Body.Close() - responseStream <- f.Result{ID: RequestID(ctx), X: hr} } }() @@ -112,26 +120,35 @@ func httpDispatch(ctx context.Context, reqMsg HTTPRequest, resultStream chan<- f resultStream <- r } -func newClient(reqMsg HTTPRequest, client *http.Client, req *http.Request, log *f.StandardLogger) *http.Client { +func newClient(reqMsg HTTPRequest, req *http.Request) (*http.Client, *f.FJError) { + var client *http.Client if reqMsg.Message.Authentication == NONE { client = &http.Client{} } else if reqMsg.Message.Authentication == BASIC { caCertPool := x509.NewCertPool() - caCertPool.AppendCertsFromPEM([]byte(reqMsg.Message.BasicAtuhCredentials.ServerCertificate)) - client = &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - RootCAs: caCertPool, + ok := caCertPool.AppendCertsFromPEM([]byte(reqMsg.Message.BasicAtuhCredentials.ServerCertificate)) + if !ok { + return nil, &f.FJError{Code: f.AuthenticationError, + Message: "Failed to append server certificate"} + } else { + client = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + RootCAs: caCertPool, + }, }, - }, + } + req.SetBasicAuth(reqMsg.Message.BasicAtuhCredentials.UserName, + reqMsg.Message.BasicAtuhCredentials.Password) + return client, nil } - req.SetBasicAuth(reqMsg.Message.BasicAtuhCredentials.UserName, - reqMsg.Message.BasicAtuhCredentials.Password) + } else if reqMsg.Message.Authentication == MUTUAL { certificate, err := tls.X509KeyPair([]byte(reqMsg.Message.MutualAuthCredentials.ClientCertificate), []byte(reqMsg.Message.MutualAuthCredentials.ClientKey)) if err != nil { - log.Fatalf("could not load client key pair: %s", err) + return nil, &f.FJError{Code: f.AuthenticationError, + Message: "Error parsing client certificate"} } ca := []byte(reqMsg.Message.MutualAuthCredentials.CACertificate) @@ -139,7 +156,8 @@ func newClient(reqMsg HTTPRequest, client *http.Client, req *http.Request, log * caCertPool := x509.NewCertPool() if ok := caCertPool.AppendCertsFromPEM(ca); !ok { - log.Fatalf("failed to append ca certs") + return nil, &f.FJError{Code: f.AuthenticationError, + Message: "Failed to append CA certificate"} } client = &http.Client{ @@ -151,7 +169,7 @@ func newClient(reqMsg HTTPRequest, client *http.Client, req *http.Request, log * }, } } - return client + return client, nil } func makeResponse(reqMsg HTTPRequest, res *http.Response, bb []byte) HTTPResponse { diff --git a/http/service.go b/http/service.go index 6ca078e..10a8401 100644 --- a/http/service.go +++ b/http/service.go @@ -21,7 +21,6 @@ type DispatchServer struct { UnimplementedHTTPForkJoinServiceServer } -// TODO: Validate Input //FanoutFanin Fans out each http message to http dispatch works using the fork join interface func (s *DispatchServer) FanoutFanin(request *Request, stream HTTPForkJoinService_FanoutFaninServer) error { ctx := context.WithValue(context.Background(), CtxRequestID, request.Id) diff --git a/logwrapper.go b/logwrapper.go index d697a2e..2331c07 100644 --- a/logwrapper.go +++ b/logwrapper.go @@ -21,9 +21,11 @@ var ( requestDispatchError = LogEvent{RequestError, "Request Id: %s Message Id: %s Request Dispatch Error: %v"} abortRequest = LogEvent{RequestAborted, "Request Id: %s Message Id: %s Abort Request: %v"} responseError = LogEvent{ResponseError, "Request Id: %s Message Id: %s Response error: %v"} - connectionError = LogEvent{ConnectionError, "Request Id: %s Message Id: %s Connection error: %v"} + connectionError = LogEvent{ConnectionError, "Request Id: %s Message Id: %s Connection Error: %v"} infoRequest = LogEvent{RequestInfo, "Request Id: %s Message Id: %s: %s"} info = LogEvent{Info, "Request Id: %s %s "} + heartBeatMsg = LogEvent{HeartBeatInfo, "Request Id: %s Message Id: %s Heat beat: %v"} + authenticationError = LogEvent{AuthenticationError, "Request Id: %s Message Id: %s Authentication Error: %v"} ) func (l *StandardLogger) LogInvalidRequest(requestID, messageID, message string) { @@ -42,6 +44,14 @@ func (l *StandardLogger) LogAbortedRequest(requestID, messageID, message string) l.Errorf(abortRequest.message, requestID, messageID, message) } +func (l *StandardLogger) LogHeartBeatMsg(requestID, messageID, message string) { + l.Errorf(heartBeatMsg.message, requestID, messageID, message) +} + +func (l *StandardLogger) LogAuthenticationError(requestID, messageID, message string) { + l.Errorf(authenticationError.message, requestID, messageID, message) +} + //TODO: have one info or request and one generic func (l *StandardLogger) LogRequestInfo(requestID, message string) { l.Infof(infoRequest.message, requestID, message) diff --git a/types.go b/types.go index 6e81472..ccf95a0 100644 --- a/types.go +++ b/types.go @@ -36,8 +36,10 @@ const ( ConnectionError ConcurrencyContextError RequestAborted + AuthenticationError RequestInfo Info + HeartBeatInfo ) //composite object to hold data for multiplexed go routines