diff --git a/http/httpworker.go b/http/httpworker.go index aede286..989e7d3 100644 --- a/http/httpworker.go +++ b/http/httpworker.go @@ -69,8 +69,10 @@ func httpDispatch(ctx context.Context, reqMsg f.HTTPRequest, resultStream chan<- } client := &http.Client{} + // TODO: add suport of secure connections res, err := client.Do(req) + // FIXME: goroutine leak if ctx.Err() != nil && res == nil { log.LogAbortedRequest(RequestID(ctx), reqMsg.Message.ID, "Aborted") responseStream <- f.Result{Err: &f.FJError{Code: f.RequestAborted, Message: fmt.Sprintf("Aborted %v", err)}} diff --git a/http/service.go b/http/service.go index c98da59..c38c148 100644 --- a/http/service.go +++ b/http/service.go @@ -44,41 +44,38 @@ func (s *DispatchServer) FanoutFanin(request *HTTPRequest, stream HTTPForkJoinSe resultStream := mtplx.Multiplex(ctx, nil) log.LogInfo(RequestID(ctx), "Forked") for result := range resultStream { + response, _ := result.X.(fj.HTTPResponse) if result.Err != nil { err := stream.Send(makeErrRes(result.Err.Code, result.Err.Message)) if err != nil { - //TODO: add message id to log if possible - log.LogResponseError(result.ID, "nil", fmt.Sprintf("Error while writing to stream: %v", err.Error())) - return status.Errorf(codes.Internal, "Error while writing to stream", err) + log.LogResponseError(result.ID, response.Message.ID, fmt.Sprintf("Error while writing to stream: %v", err.Error())) + return status.Errorf(codes.Internal, fmt.Sprintf("Error while request id: %s message id: %s writing to stream", result.ID, response.Message.ID), err) } } else { - response, ok := result.X.(fj.HTTPResponse) - if !ok { - // FIXME: duplicate logs message string + /*if !ok { log.LogResponseError(result.ID, "nil", "type assertion error http.Response not found") log.Println("type assertion err http.Response not found") err := stream.Send(makeErrRes(fj.InternalError, "type assertion error http.Response not found")) if err != nil { - //TODO: add message id to log if possible - log.LogResponseError(result.ID, "nil", fmt.Sprintf("Error while writing to stream: %v", err.Error())) + log.LogResponseError(result.ID, response.Message.ID, fmt.Sprintf("Error while writing to stream: %v", err.Error())) return status.Errorf(codes.Internal, "Error while writing to stream", err) } - } else { - method, _ := Message_Method_value[string(response.Message.Method)] - m := &Message{ - URL: response.Message.URL, - Method: Message_Method(method), - Headers: response.Message.Headers, - Payload: response.Message.Payload, - StatusCode: uint32(response.Message.StatusCode), - } - r := HTTPResponse{Message: m} - err := stream.Send(&r) - if err != nil { - return err - } + }*/ //else { + method, _ := Message_Method_value[string(response.Message.Method)] + m := &Message{ + URL: response.Message.URL, + Method: Message_Method(method), + Headers: response.Message.Headers, + Payload: response.Message.Payload, + StatusCode: uint32(response.Message.StatusCode), + } + r := HTTPResponse{Message: m} + err := stream.Send(&r) + if err != nil { + return err } } + //} } log.LogInfo(RequestID(ctx), "Joined") return nil