Skip to content

Commit

Permalink
https auth cert error sreturned to client
Browse files Browse the repository at this point in the history
  • Loading branch information
kubesure committed May 23, 2022
1 parent cb61dcb commit 55feaf6
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 52 deletions.
1 change: 1 addition & 0 deletions api/httpforkjoin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ enum ErrorCode {
ConnectionError = 3;
ConcurrencyContextError = 4;
RequestAborted = 5;
AuthenticationError = 6;
}

message Error {
Expand Down
57 changes: 56 additions & 1 deletion cmd/httpInsecureserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,39 @@ func TestMutulAuth(t *testing.T) {
}
}

func TestInvalidMutulAuth(t *testing.T) {
conn := makeGrpcConn()
defer conn.Close()
c := h.NewHTTPForkJoinServiceClient(conn)

req := h.Request{Id: "BIN1", Messages: makeAuthErrMutualReq()}
stream, err := c.FanoutFanin(context.Background(), &req)
if err != nil {
t.Errorf("GRPC error call should have not failed with %v", err)
}

res := []*h.Response{}

for {
response, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
t.Errorf("%v.FanoutFanin = _, %v", c, err)
}
res = append(res, response)
}
if len(res) != 1 {
t.Error("there should be a 1 responses")
}

if res[0].Errors[0].Code != h.ErrorCode_AuthenticationError {
t.Errorf("there should be error code: %v", h.ErrorCode_AuthenticationError)
}

}

func TestInvalidHTTPURLForkJoin(t *testing.T) {
conn := makeGrpcConn()
defer conn.Close()
Expand Down Expand Up @@ -253,7 +286,7 @@ func makeInValidURLRequests() []*h.Message {
}

func makeSlowResponseRequests() []*h.Message {
delayedMesg := &h.Message{Method: h.Message_POST, URL: "http://localhost:8000/healthz"}
delayedMesg := &h.Message{Method: h.Message_POST, URL: "http://localhost:8000/healthz", ActiveDeadLineSeconds: 10}
msgs := []*h.Message{}
msgs = append(msgs, delayedMesg)
return msgs
Expand Down Expand Up @@ -289,3 +322,25 @@ func makeMutualAuthRequestsCfg() []*h.Message {
msgs = append(msgs, msg)
return msgs
}

func makeAuthErrMutualReq() []*h.Message {
msg := &h.Message{
Method: h.Message_GET,
URL: "https://localhost:8000/mutual",
Authentication: h.Message_MUTUAL,
ActiveDeadLineSeconds: 10, Id: "001"}

clientcrt, _ := ioutil.ReadFile("..//certs//client.crt")
clientkey, _ := ioutil.ReadFile("..//certs//client.key")
ca, _ := ioutil.ReadFile("..//certs//ca.crt")

mcreds := h.Message_MutualAuthCredentials{
ClientCertificate: string(clientcrt),
ClientKey: string(clientkey),
CACertificate: string(ca)}

msg.MutualAuthCredentials = &mcreds
msgs := []*h.Message{}
msgs = append(msgs, msg)
return msgs
}
23 changes: 14 additions & 9 deletions http/httpforkjoin.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions http/httpforkjoin_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

98 changes: 58 additions & 40 deletions http/httpworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,76 +70,94 @@ func httpDispatch(ctx context.Context, reqMsg HTTPRequest, resultStream chan<- f
req.Header.Add(k, v)
}

var client *http.Client
client, cerr := newClient(reqMsg, req)

client = newClient(reqMsg, client, req, log)

res, err := client.Do(req)

if ctx.Err() != nil && res == nil {
log.LogAbortedRequest(RequestID(ctx), reqMsg.Message.ID,
fmt.Sprintf("Aborted. Too longer than active deadline %v", reqMsg.Message.ActiveDeadLine))
responseStream <- f.Result{ID: RequestID(ctx), X: makeErrorResponse(reqMsg, http.StatusRequestTimeout),
Err: &f.FJError{Code: f.RequestAborted,
Message: fmt.Sprintf("Request aborted took longer than %v seconds %v",
reqMsg.Message.ActiveDeadLine, err)},
if cerr != nil {
log.LogAuthenticationError(RequestID(ctx), reqMsg.Message.ID, cerr.Message)
responseStream <- f.Result{ID: RequestID(ctx),
X: makeErrorResponse(reqMsg, http.StatusRequestTimeout),
Err: &f.FJError{Code: f.AuthenticationError, Message: cerr.Message},
}
} else if err != nil {
log.LogRequestDispatchError(RequestID(ctx), reqMsg.Message.ID, err.Error())
responseStream <- f.Result{ID: RequestID(ctx), X: makeErrorResponse(reqMsg, http.StatusBadGateway),
Err: &f.FJError{Code: f.ConnectionError, Message: fmt.Sprintf("Error in dispatching request: %v", err)}}
} else {
bb, err := ioutil.ReadAll(res.Body)
if err != nil {
log.LogResponseError(RequestID(ctx), reqMsg.Message.ID, err.Error())
responseStream <- f.Result{ID: RequestID(ctx), X: makeResponse(reqMsg, res, nil),
Err: &f.FJError{Code: f.ResponseError, Message: fmt.Sprintf("Error reading http response: %v", err)}}
}
res, err := client.Do(req)
if ctx.Err() != nil && res == nil {
log.LogAbortedRequest(RequestID(ctx), reqMsg.Message.ID,
fmt.Sprintf("Aborted. Too longer than active deadline %v", reqMsg.Message.ActiveDeadLine))
responseStream <- f.Result{
ID: RequestID(ctx), X: makeErrorResponse(reqMsg, http.StatusRequestTimeout),
Err: &f.FJError{Code: f.RequestAborted,
Message: fmt.Sprintf("Request aborted took longer than %v seconds %v",
reqMsg.Message.ActiveDeadLine, err)},
}
} else if err != nil {
log.LogRequestDispatchError(RequestID(ctx), reqMsg.Message.ID, err.Error())
responseStream <- f.Result{ID: RequestID(ctx),
X: makeErrorResponse(reqMsg, http.StatusBadGateway),
Err: &f.FJError{Code: f.ConnectionError, Message: fmt.Sprintf("Error in dispatching request: %v", err)}}
} else {
bb, err := ioutil.ReadAll(res.Body)
if err != nil {
log.LogResponseError(RequestID(ctx), reqMsg.Message.ID, err.Error())
responseStream <- f.Result{ID: RequestID(ctx),
X: makeResponse(reqMsg, res, nil),
Err: &f.FJError{Code: f.ResponseError, Message: fmt.Sprintf("Error reading http response: %v", err)}}
}

hr := makeResponse(reqMsg, res, bb)
hr := makeResponse(reqMsg, res, bb)

for k, values := range res.Header {
for _, value := range values {
hr.Message.Add(k, value)
for k, values := range res.Header {
for _, value := range values {
hr.Message.Add(k, value)
}
}
defer res.Body.Close()
responseStream <- f.Result{ID: RequestID(ctx), X: hr}
}
defer res.Body.Close()
responseStream <- f.Result{ID: RequestID(ctx), X: hr}
}
}()

r := <-responseStream
resultStream <- r
}

func newClient(reqMsg HTTPRequest, client *http.Client, req *http.Request, log *f.StandardLogger) *http.Client {
func newClient(reqMsg HTTPRequest, req *http.Request) (*http.Client, *f.FJError) {
var client *http.Client
if reqMsg.Message.Authentication == NONE {
client = &http.Client{}
} else if reqMsg.Message.Authentication == BASIC {
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM([]byte(reqMsg.Message.BasicAtuhCredentials.ServerCertificate))
client = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: caCertPool,
ok := caCertPool.AppendCertsFromPEM([]byte(reqMsg.Message.BasicAtuhCredentials.ServerCertificate))
if !ok {
return nil, &f.FJError{Code: f.AuthenticationError,
Message: "Failed to append server certificate"}
} else {
client = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: caCertPool,
},
},
},
}
req.SetBasicAuth(reqMsg.Message.BasicAtuhCredentials.UserName,
reqMsg.Message.BasicAtuhCredentials.Password)
return client, nil
}
req.SetBasicAuth(reqMsg.Message.BasicAtuhCredentials.UserName,
reqMsg.Message.BasicAtuhCredentials.Password)

} else if reqMsg.Message.Authentication == MUTUAL {
certificate, err := tls.X509KeyPair([]byte(reqMsg.Message.MutualAuthCredentials.ClientCertificate),
[]byte(reqMsg.Message.MutualAuthCredentials.ClientKey))
if err != nil {
log.Fatalf("could not load client key pair: %s", err)
return nil, &f.FJError{Code: f.AuthenticationError,
Message: "Error parsing client certificate"}
}

ca := []byte(reqMsg.Message.MutualAuthCredentials.CACertificate)

caCertPool := x509.NewCertPool()

if ok := caCertPool.AppendCertsFromPEM(ca); !ok {
log.Fatalf("failed to append ca certs")
return nil, &f.FJError{Code: f.AuthenticationError,
Message: "Failed to append CA certificate"}
}

client = &http.Client{
Expand All @@ -151,7 +169,7 @@ func newClient(reqMsg HTTPRequest, client *http.Client, req *http.Request, log *
},
}
}
return client
return client, nil
}

func makeResponse(reqMsg HTTPRequest, res *http.Response, bb []byte) HTTPResponse {
Expand Down
1 change: 0 additions & 1 deletion http/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ type DispatchServer struct {
UnimplementedHTTPForkJoinServiceServer
}

// TODO: Validate Input
//FanoutFanin Fans out each http message to http dispatch works using the fork join interface
func (s *DispatchServer) FanoutFanin(request *Request, stream HTTPForkJoinService_FanoutFaninServer) error {
ctx := context.WithValue(context.Background(), CtxRequestID, request.Id)
Expand Down
12 changes: 11 additions & 1 deletion logwrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ var (
requestDispatchError = LogEvent{RequestError, "Request Id: %s Message Id: %s Request Dispatch Error: %v"}
abortRequest = LogEvent{RequestAborted, "Request Id: %s Message Id: %s Abort Request: %v"}
responseError = LogEvent{ResponseError, "Request Id: %s Message Id: %s Response error: %v"}
connectionError = LogEvent{ConnectionError, "Request Id: %s Message Id: %s Connection error: %v"}
connectionError = LogEvent{ConnectionError, "Request Id: %s Message Id: %s Connection Error: %v"}
infoRequest = LogEvent{RequestInfo, "Request Id: %s Message Id: %s: %s"}
info = LogEvent{Info, "Request Id: %s %s "}
heartBeatMsg = LogEvent{HeartBeatInfo, "Request Id: %s Message Id: %s Heat beat: %v"}
authenticationError = LogEvent{AuthenticationError, "Request Id: %s Message Id: %s Authentication Error: %v"}
)

func (l *StandardLogger) LogInvalidRequest(requestID, messageID, message string) {
Expand All @@ -42,6 +44,14 @@ func (l *StandardLogger) LogAbortedRequest(requestID, messageID, message string)
l.Errorf(abortRequest.message, requestID, messageID, message)
}

func (l *StandardLogger) LogHeartBeatMsg(requestID, messageID, message string) {
l.Errorf(heartBeatMsg.message, requestID, messageID, message)
}

func (l *StandardLogger) LogAuthenticationError(requestID, messageID, message string) {
l.Errorf(authenticationError.message, requestID, messageID, message)
}

//TODO: have one info or request and one generic
func (l *StandardLogger) LogRequestInfo(requestID, message string) {
l.Infof(infoRequest.message, requestID, message)
Expand Down
2 changes: 2 additions & 0 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ const (
ConnectionError
ConcurrencyContextError
RequestAborted
AuthenticationError
RequestInfo
Info
HeartBeatInfo
)

//composite object to hold data for multiplexed go routines
Expand Down

0 comments on commit 55feaf6

Please sign in to comment.