Skip to content

Commit

Permalink
removed type check of response
Browse files Browse the repository at this point in the history
  • Loading branch information
kubesure committed May 2, 2022
1 parent 3a76a69 commit 7d96b72
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 22 deletions.
2 changes: 2 additions & 0 deletions http/httpworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ func httpDispatch(ctx context.Context, reqMsg f.HTTPRequest, resultStream chan<-
}

client := &http.Client{}
// TODO: add suport of secure connections
res, err := client.Do(req)

// FIXME: goroutine leak
if ctx.Err() != nil && res == nil {
log.LogAbortedRequest(RequestID(ctx), reqMsg.Message.ID, "Aborted")
responseStream <- f.Result{Err: &f.FJError{Code: f.RequestAborted, Message: fmt.Sprintf("Aborted %v", err)}}
Expand Down
41 changes: 19 additions & 22 deletions http/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,41 +44,38 @@ func (s *DispatchServer) FanoutFanin(request *HTTPRequest, stream HTTPForkJoinSe
resultStream := mtplx.Multiplex(ctx, nil)
log.LogInfo(RequestID(ctx), "Forked")
for result := range resultStream {
response, _ := result.X.(fj.HTTPResponse)
if result.Err != nil {
err := stream.Send(makeErrRes(result.Err.Code, result.Err.Message))
if err != nil {
//TODO: add message id to log if possible
log.LogResponseError(result.ID, "nil", fmt.Sprintf("Error while writing to stream: %v", err.Error()))
return status.Errorf(codes.Internal, "Error while writing to stream", err)
log.LogResponseError(result.ID, response.Message.ID, fmt.Sprintf("Error while writing to stream: %v", err.Error()))
return status.Errorf(codes.Internal, fmt.Sprintf("Error while request id: %s message id: %s writing to stream", result.ID, response.Message.ID), err)
}
} else {
response, ok := result.X.(fj.HTTPResponse)
if !ok {
// FIXME: duplicate logs message string
/*if !ok {
log.LogResponseError(result.ID, "nil", "type assertion error http.Response not found")
log.Println("type assertion err http.Response not found")
err := stream.Send(makeErrRes(fj.InternalError, "type assertion error http.Response not found"))
if err != nil {
//TODO: add message id to log if possible
log.LogResponseError(result.ID, "nil", fmt.Sprintf("Error while writing to stream: %v", err.Error()))
log.LogResponseError(result.ID, response.Message.ID, fmt.Sprintf("Error while writing to stream: %v", err.Error()))
return status.Errorf(codes.Internal, "Error while writing to stream", err)
}
} else {
method, _ := Message_Method_value[string(response.Message.Method)]
m := &Message{
URL: response.Message.URL,
Method: Message_Method(method),
Headers: response.Message.Headers,
Payload: response.Message.Payload,
StatusCode: uint32(response.Message.StatusCode),
}
r := HTTPResponse{Message: m}
err := stream.Send(&r)
if err != nil {
return err
}
}*/ //else {
method, _ := Message_Method_value[string(response.Message.Method)]
m := &Message{
URL: response.Message.URL,
Method: Message_Method(method),
Headers: response.Message.Headers,
Payload: response.Message.Payload,
StatusCode: uint32(response.Message.StatusCode),
}
r := HTTPResponse{Message: m}
err := stream.Send(&r)
if err != nil {
return err
}
}
//}
}
log.LogInfo(RequestID(ctx), "Joined")
return nil
Expand Down

0 comments on commit 7d96b72

Please sign in to comment.