From e28e863e3894542d85447e7c3ec42c5aae16a610 Mon Sep 17 00:00:00 2001 From: philhuan Date: Mon, 13 Feb 2023 22:19:14 +0800 Subject: [PATCH 1/3] feat: http action --- pkg/actions/http/http.go | 64 +++++++++++ pkg/actions/http/http_test.go | 197 ++++++++++++++++++++++++++++++++++ pkg/actions/http/param.go | 144 +++++++++++++++++++++++++ pkg/actions/http/response.go | 50 +++++++++ 4 files changed, 455 insertions(+) create mode 100644 pkg/actions/http/http.go create mode 100644 pkg/actions/http/http_test.go create mode 100644 pkg/actions/http/param.go create mode 100644 pkg/actions/http/response.go diff --git a/pkg/actions/http/http.go b/pkg/actions/http/http.go new file mode 100644 index 0000000..bc82934 --- /dev/null +++ b/pkg/actions/http/http.go @@ -0,0 +1,64 @@ +package http + +import ( + "fmt" + "github.com/shiningrush/fastflow/pkg/entity/run" +) + +// HTTP action +type HTTP struct { +} + +// Name Action name +func (h *HTTP) Name() string { + return ActionHTTP +} + +// ParameterNew +func (h *HTTP) ParameterNew() interface{} { + return &HTTPParams{} +} + +// Run +func (h *HTTP) Run(ctx run.ExecuteContext, params interface{}) error { + p, ok := params.(*HTTPParams) + if !ok { + return fmt.Errorf("params type mismatch, want *HTTPParams, got %T", params) + } + err := p.validate() + if err != nil { + err = fmt.Errorf("validate HTTP Params failed, %w", err) + ctx.Trace(err.Error()) + return err + } + + request, err := p.buildRequest(ctx.Context()) + if err != nil { + err = fmt.Errorf("build request failed, %w", err) + ctx.Trace(err.Error()) + return err + } + cli := p.getClient() + + ctx.Tracef("start request %v", request.URL) + response, err := cli.Do(request) + if err != nil { + err = fmt.Errorf("do http request failed, %w", err) + ctx.Trace(err.Error()) + return err + } + + httpResponse, err := ParseHTTPResponse(response, p) + if err != nil { + err = fmt.Errorf("parse http response failed, %w", err) + ctx.Trace(err.Error()) + return err + } + + key := p.RespSaveKey + if key == "" { + key = DefaultResponseSaveKey + } + ctx.WithValue(key, httpResponse) + return nil +} diff --git a/pkg/actions/http/http_test.go b/pkg/actions/http/http_test.go new file mode 100644 index 0000000..6e99805 --- /dev/null +++ b/pkg/actions/http/http_test.go @@ -0,0 +1,197 @@ +package http + +import ( + "context" + "encoding/json" + "io/ioutil" + "log" + "net/http" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/shiningrush/fastflow/pkg/entity/run" + "github.com/stretchr/testify/mock" +) + +func TestMain(m *testing.M) { + go mockHTTPServer() + time.Sleep(time.Second) + os.Exit(m.Run()) +} + +const addr = "127.0.0.1:12345" + +func TestHTTP_Run(t *testing.T) { + ctx := &run.MockExecuteContext{} + var ( + saveKey string + response *HTTPResponse + ) + ctx.On("WithValue", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + saveKey = args[0].(string) + response = args[1].(*HTTPResponse) + }) + + ctx.On("Tracef", mock.Anything, mock.Anything).Return() + ctx.On("Trace", mock.Anything, mock.Anything).Return() + ctx.On("WithValue", mock.Anything, mock.Anything).Return() + + ctx.On("Context").Return(context.Background()) + + type args struct { + ctx run.ExecuteContext + params interface{} + } + url := "http://" + addr + tests := []struct { + name string + args args + wantSaveKey string + wantResponse *HTTPResponse + wantErr bool + }{ + { + name: "get", + args: args{ + ctx: ctx, + params: &HTTPParams{ + URL: url, + Query: map[string]string{ + "q": "test", + }, + }, + }, + wantSaveKey: "httpResponse", + wantResponse: &HTTPResponse{Body: map[string]interface{}{ + "method": "GET", + "query": "q=test", + }}, + wantErr: false, + }, + { + name: "post", + args: args{ + ctx: ctx, + params: &HTTPParams{ + Method: MethodPost, + URL: url, + Body: map[string]interface{}{ + "a": 100, + }, + RespSaveKey: "aa", + }, + }, + wantSaveKey: "aa", + wantResponse: &HTTPResponse{Body: map[string]interface{}{ + "method": "POST", + "body": `{"a":100}`, + }}, + wantErr: false, + }, + { + name: "empty url", + args: args{ + ctx: ctx, + params: &HTTPParams{ + URL: "", + Query: map[string]string{ + "q": "test", + }, + }, + }, + wantSaveKey: "", + wantErr: true, + }, + { + name: "err url", + args: args{ + ctx: ctx, + params: &HTTPParams{ + Method: MethodGet, + URL: "err", + Query: map[string]string{ + "q": "test", + }, + }, + }, + wantSaveKey: "", + wantErr: true, + }, + { + name: "err status code", + args: args{ + ctx: ctx, + params: &HTTPParams{ + Method: MethodGet, + URL: url, + Query: map[string]string{ + "q": "test", + }, + RawBody: "error", + }, + }, + wantSaveKey: "", + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + h := &HTTP{} + saveKey = "" + response = nil + err := h.Run(tt.args.ctx, tt.args.params) + if (err != nil) != tt.wantErr { + t.Errorf("Run() error = %v, wantErr %v", err, tt.wantErr) + } else { + t.Logf("want err: %v", err) + } + if tt.wantSaveKey != "" { + assert.Equal(t, tt.wantSaveKey, saveKey) + } + + if tt.wantResponse != nil && response != nil { + if response.Body != nil { + assert.Equal(t, tt.wantResponse.Body, response.Body) + } else { + assert.Equal(t, tt.wantResponse.Raw, response.Raw) + } + } else { + t.Log(response) + } + }) + } +} + +func mockHTTPServer() { + type response struct { + Method string `json:"method,omitempty"` + Query string `json:"query,omitempty"` + Body string `json:"body,omitempty"` + } + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + bs, err := ioutil.ReadAll(r.Body) + if err != nil { + log.Printf("Error reading body: %v", err) + } + resp := &response{ + Method: r.Method, + Query: r.URL.Query().Encode(), + Body: string(bs), + } + + if string(bs) == "error" { + w.WriteHeader(504) + } + + bytes, _ := json.Marshal(resp) + _, _ = w.Write(bytes) + }) + + err := http.ListenAndServe(addr, nil) + if err != nil { + log.Fatal("ListenAndServe: ", err) + } +} diff --git a/pkg/actions/http/param.go b/pkg/actions/http/param.go new file mode 100644 index 0000000..b9d0b21 --- /dev/null +++ b/pkg/actions/http/param.go @@ -0,0 +1,144 @@ +package http + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" + "time" +) + +const ( + ActionHTTP = "http" + + DefaultTimeout float64 = 10 * 60 // 10 minutes + DefaultResponseContentType = "application/json" + DefaultResponseSaveKey = "httpResponse" + DefaultResponseHandler = ResponseHandlerJSON +) + +type Method string + +const ( + MethodGet Method = "GET" + MethodHead Method = "HEAD" + MethodPost Method = "POST" + MethodPut Method = "PUT" + MethodPatch Method = "PATCH" // RFC 5789 + MethodDelete Method = "DELETE" + MethodConnect Method = "CONNECT" + MethodOptions Method = "OPTIONS" + MethodTrace Method = "TRACE" +) + +const ( + HeaderContentTypeKey = "content-type" + + ContentTypeJSON = "application/json" +) + +type ResponseHandler string + +const ( + ResponseHandlerJSON = "json" + ResponseHandlerNone = "none" + ResponseHandlerXML = "xml" +) + +type HTTPParams struct { + Method Method `yaml:"method" json:"method"` + URL string `yaml:"url" json:"url"` + Path string `yaml:"path" json:"path"` + Query map[string]string `yaml:"query" json:"query"` + // Body 结构化的 body, 发送请求会用 json 序列化 + Body interface{} `yaml:"body" json:"body"` + RawBody string `yaml:"rawBody" json:"rawBody"` + Header http.Header `yaml:"header" json:"header"` + + // Client Options + TimeoutSec float64 `yaml:"timeoutSec" json:"timeoutSec"` + // Resp Options + UseNumber bool `yaml:"useNumber" json:"useNumber"` + RespHandler ResponseHandler `yaml:"responseHandler" json:"responseHandler"` + RespSaveKey string `yaml:"respKey" json:"respKey"` +} + +func (p *HTTPParams) validate() error { + if p.URL == "" { + return fmt.Errorf("url cannot be empty") + } + + if p.TimeoutSec == 0 { + p.TimeoutSec = DefaultTimeout + } + + if p.Method == "" { + p.Method = MethodGet + } + if p.Header == nil { + p.Header = http.Header{} + } + return nil +} + +func (p *HTTPParams) getURL() string { + urlBuilder := strings.Builder{} + urlBuilder.WriteString(p.URL) + if len(p.Path) > 0 { + if (!strings.HasSuffix(p.URL, "/")) && (!strings.HasPrefix(p.Path, "/")) { + urlBuilder.WriteString("/") + } + urlBuilder.WriteString(p.Path) + } + if len(p.Query) != 0 { + query := url.Values{} + for k, v := range p.Query { + query.Add(k, v) + } + urlBuilder.WriteString("?") + urlBuilder.WriteString(query.Encode()) + } + return urlBuilder.String() +} + +func (p *HTTPParams) buildRequest(ctx context.Context) (*http.Request, error) { + var ( + bodyReader io.Reader + header = p.Header + ) + if p.Body != nil { + bs, _ := json.Marshal(p.Body) + bodyReader = bytes.NewReader(bs) + header.Add(HeaderContentTypeKey, ContentTypeJSON) + } + + if len(p.RawBody) > 0 { + bodyReader = strings.NewReader(p.RawBody) + } + + request, err := http.NewRequestWithContext(ctx, string(p.Method), p.getURL(), bodyReader) + if err != nil { + return nil, fmt.Errorf("new request failed, %w", err) + } + if header != nil { + request.Header = header + } + return request, nil +} + +func (p *HTTPParams) getClient() *http.Client { + return &http.Client{ + Timeout: time.Duration(p.TimeoutSec * float64(time.Second)), + } +} + +func (p *HTTPParams) getResponseHandler() ResponseHandler { + if p.RespHandler == "" { + return DefaultResponseHandler + } + return p.RespHandler +} diff --git a/pkg/actions/http/response.go b/pkg/actions/http/response.go new file mode 100644 index 0000000..a6a3fca --- /dev/null +++ b/pkg/actions/http/response.go @@ -0,0 +1,50 @@ +package http + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" +) + +type HTTPResponse struct { + ResponseHandler ResponseHandler + Body map[string]interface{} + Raw []byte +} + +func ParseHTTPResponse(response *http.Response, p *HTTPParams) (*HTTPResponse, error) { + if response.StatusCode/100 != 2 { + bs, _ := ioutil.ReadAll(response.Body) + return nil, fmt.Errorf("http failed: code:%d, message:%s, body:%s", response.StatusCode, response.Status, string(bs)) + } + + contentType := response.Header.Get(HeaderContentTypeKey) + + respBytes, err := ioutil.ReadAll(response.Body) + if err != nil { + return nil, fmt.Errorf("read response body failed:%w", err) + } + + body := map[string]interface{}{} + resp := &HTTPResponse{ + ResponseHandler: p.getResponseHandler(), + Body: body, + Raw: respBytes, + } + + switch p.getResponseHandler() { + case ResponseHandlerNone: + return resp, nil + case ResponseHandlerJSON: + decoder := json.NewDecoder(bytes.NewReader(respBytes)) + if p.UseNumber { + decoder.UseNumber() + } + err := decoder.Decode(&body) + return resp, err + // TODO xml ... etc + } + return nil, fmt.Errorf("not supprt content type: %s, body: %s", contentType, string(respBytes)) +} From 79afe91618d7b097d3fdbfddffc4e569f2e6b1f0 Mon Sep 17 00:00:00 2001 From: philhuan Date: Mon, 13 Feb 2023 22:52:28 +0800 Subject: [PATCH 2/3] feat: modify pkg --- examples/programming/main.go | 29 ++++++++++++++++++++++-- pkg/actions/{http => ahttp}/http.go | 2 +- pkg/actions/{http => ahttp}/http_test.go | 2 +- pkg/actions/{http => ahttp}/param.go | 4 ++-- pkg/actions/{http => ahttp}/response.go | 2 +- 5 files changed, 32 insertions(+), 7 deletions(-) rename pkg/actions/{http => ahttp}/http.go (98%) rename pkg/actions/{http => ahttp}/http_test.go (99%) rename pkg/actions/{http => ahttp}/param.go (98%) rename pkg/actions/{http => ahttp}/response.go (98%) diff --git a/examples/programming/main.go b/examples/programming/main.go index 4f08f33..fa6ec10 100644 --- a/examples/programming/main.go +++ b/examples/programming/main.go @@ -3,7 +3,9 @@ package main import ( "errors" "fmt" + "github.com/shiningrush/fastflow/pkg/actions/ahttp" "log" + "net/http" "time" "github.com/shiningrush/fastflow" @@ -31,11 +33,12 @@ func main() { // Register action fastflow.RegisterAction([]run.Action{ &PrintAction{}, + &ahttp.HTTP{}, }) // init keeper, it used to e keeper := mongoKeeper.NewKeeper(&mongoKeeper.KeeperOption{ - Key: "worker-1", + Key: "worker-1", // if your mongo does not set user/pwd, you should remove it ConnStr: "mongodb://root:pwd@127.0.0.1:27017/fastflow?authSource=admin", Database: "mongo-demo", @@ -57,6 +60,7 @@ func main() { } go createDagAndInstance() + go startTestHTTPServer() // start fastflow if err := fastflow.Start(&fastflow.InitialOption{ @@ -76,11 +80,20 @@ func createDagAndInstance() { BaseInfo: entity.BaseInfo{ ID: "test-dag", }, - Name: "test", + Name: "test", + Status: entity.DagStatusNormal, Tasks: []entity.Task{ {ID: "task1", ActionName: "PrintAction"}, {ID: "task2", ActionName: "PrintAction", DependOn: []string{"task1"}}, {ID: "task3", ActionName: "PrintAction", DependOn: []string{"task2"}}, + { + ID: "task4", + ActionName: ahttp.ActionHTTP, + Params: map[string]interface{}{ + "url": "http://localhost:12345", + }, + DependOn: []string{"task3"}, + }, }, } if err := ensureDagCreated(dag); err != nil { @@ -114,3 +127,15 @@ func ensureDagCreated(dag *entity.Dag) error { } return nil } + +func startTestHTTPServer() { + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + log.Printf("http server called") + _, _ = w.Write([]byte(`{"hello": "world"}`)) + }) + + err := http.ListenAndServe(":12345", nil) + if err != nil { + log.Fatal("ListenAndServe: ", err) + } +} diff --git a/pkg/actions/http/http.go b/pkg/actions/ahttp/http.go similarity index 98% rename from pkg/actions/http/http.go rename to pkg/actions/ahttp/http.go index bc82934..a3820c6 100644 --- a/pkg/actions/http/http.go +++ b/pkg/actions/ahttp/http.go @@ -1,4 +1,4 @@ -package http +package ahttp import ( "fmt" diff --git a/pkg/actions/http/http_test.go b/pkg/actions/ahttp/http_test.go similarity index 99% rename from pkg/actions/http/http_test.go rename to pkg/actions/ahttp/http_test.go index 6e99805..cdc986e 100644 --- a/pkg/actions/http/http_test.go +++ b/pkg/actions/ahttp/http_test.go @@ -1,4 +1,4 @@ -package http +package ahttp import ( "context" diff --git a/pkg/actions/http/param.go b/pkg/actions/ahttp/param.go similarity index 98% rename from pkg/actions/http/param.go rename to pkg/actions/ahttp/param.go index b9d0b21..f34e569 100644 --- a/pkg/actions/http/param.go +++ b/pkg/actions/ahttp/param.go @@ -1,4 +1,4 @@ -package http +package ahttp import ( "bytes" @@ -13,7 +13,7 @@ import ( ) const ( - ActionHTTP = "http" + ActionHTTP = "HTTP" DefaultTimeout float64 = 10 * 60 // 10 minutes DefaultResponseContentType = "application/json" diff --git a/pkg/actions/http/response.go b/pkg/actions/ahttp/response.go similarity index 98% rename from pkg/actions/http/response.go rename to pkg/actions/ahttp/response.go index a6a3fca..9d21152 100644 --- a/pkg/actions/http/response.go +++ b/pkg/actions/ahttp/response.go @@ -1,4 +1,4 @@ -package http +package ahttp import ( "bytes" From 3a6d9d0fa4965ae0795bd74c80445ea42426a1e9 Mon Sep 17 00:00:00 2001 From: philhuan Date: Mon, 13 Feb 2023 22:55:38 +0800 Subject: [PATCH 3/3] style: sort import --- examples/programming/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/programming/main.go b/examples/programming/main.go index fa6ec10..addfc69 100644 --- a/examples/programming/main.go +++ b/examples/programming/main.go @@ -3,13 +3,13 @@ package main import ( "errors" "fmt" - "github.com/shiningrush/fastflow/pkg/actions/ahttp" "log" "net/http" "time" "github.com/shiningrush/fastflow" mongoKeeper "github.com/shiningrush/fastflow/keeper/mongo" + "github.com/shiningrush/fastflow/pkg/actions/ahttp" "github.com/shiningrush/fastflow/pkg/entity" "github.com/shiningrush/fastflow/pkg/entity/run" "github.com/shiningrush/fastflow/pkg/mod"