Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/hjw/http action #14

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions examples/programming/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"errors"
"fmt"
"log"
"net/http"
"time"

"github.com/shiningrush/fastflow"
mongoKeeper "github.com/shiningrush/fastflow/keeper/mongo"
"github.com/shiningrush/fastflow/pkg/actions/ahttp"
"github.com/shiningrush/fastflow/pkg/entity"
"github.com/shiningrush/fastflow/pkg/entity/run"
"github.com/shiningrush/fastflow/pkg/mod"
Expand All @@ -31,11 +33,12 @@ func main() {
// Register action
fastflow.RegisterAction([]run.Action{
&PrintAction{},
&ahttp.HTTP{},
})

// init keeper, it used to e
keeper := mongoKeeper.NewKeeper(&mongoKeeper.KeeperOption{
Key: "worker-1",
Key: "worker-1",
// if your mongo does not set user/pwd, you should remove it
ConnStr: "mongodb://root:pwd@127.0.0.1:27017/fastflow?authSource=admin",
Database: "mongo-demo",
Expand All @@ -57,6 +60,7 @@ func main() {
}

go createDagAndInstance()
go startTestHTTPServer()

// start fastflow
if err := fastflow.Start(&fastflow.InitialOption{
Expand All @@ -76,11 +80,20 @@ func createDagAndInstance() {
BaseInfo: entity.BaseInfo{
ID: "test-dag",
},
Name: "test",
Name: "test",
Status: entity.DagStatusNormal,
Tasks: []entity.Task{
{ID: "task1", ActionName: "PrintAction"},
{ID: "task2", ActionName: "PrintAction", DependOn: []string{"task1"}},
{ID: "task3", ActionName: "PrintAction", DependOn: []string{"task2"}},
{
ID: "task4",
ActionName: ahttp.ActionHTTP,
Params: map[string]interface{}{
"url": "http://localhost:12345",
},
DependOn: []string{"task3"},
},
},
}
if err := ensureDagCreated(dag); err != nil {
Expand Down Expand Up @@ -114,3 +127,15 @@ func ensureDagCreated(dag *entity.Dag) error {
}
return nil
}

func startTestHTTPServer() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
log.Printf("http server called")
_, _ = w.Write([]byte(`{"hello": "world"}`))
})

err := http.ListenAndServe(":12345", nil)
if err != nil {
log.Fatal("ListenAndServe: ", err)
}
}
64 changes: 64 additions & 0 deletions pkg/actions/ahttp/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package ahttp

import (
"fmt"
"github.com/shiningrush/fastflow/pkg/entity/run"
)

// HTTP action
type HTTP struct {
}

// Name Action name
func (h *HTTP) Name() string {
return ActionHTTP

Check warning on line 14 in pkg/actions/ahttp/http.go

View check run for this annotation

Codecov / codecov/patch

pkg/actions/ahttp/http.go#L13-L14

Added lines #L13 - L14 were not covered by tests
}

// ParameterNew
func (h *HTTP) ParameterNew() interface{} {
return &HTTPParams{}

Check warning on line 19 in pkg/actions/ahttp/http.go

View check run for this annotation

Codecov / codecov/patch

pkg/actions/ahttp/http.go#L18-L19

Added lines #L18 - L19 were not covered by tests
}

// Run
func (h *HTTP) Run(ctx run.ExecuteContext, params interface{}) error {
p, ok := params.(*HTTPParams)
if !ok {
return fmt.Errorf("params type mismatch, want *HTTPParams, got %T", params)
}

Check warning on line 27 in pkg/actions/ahttp/http.go

View check run for this annotation

Codecov / codecov/patch

pkg/actions/ahttp/http.go#L26-L27

Added lines #L26 - L27 were not covered by tests
err := p.validate()
if err != nil {
err = fmt.Errorf("validate HTTP Params failed, %w", err)
ctx.Trace(err.Error())
return err
}

request, err := p.buildRequest(ctx.Context())
if err != nil {
err = fmt.Errorf("build request failed, %w", err)
ctx.Trace(err.Error())
return err
}

Check warning on line 40 in pkg/actions/ahttp/http.go

View check run for this annotation

Codecov / codecov/patch

pkg/actions/ahttp/http.go#L37-L40

Added lines #L37 - L40 were not covered by tests
cli := p.getClient()

ctx.Tracef("start request %v", request.URL)
response, err := cli.Do(request)
if err != nil {
err = fmt.Errorf("do http request failed, %w", err)
ctx.Trace(err.Error())
return err
}

httpResponse, err := ParseHTTPResponse(response, p)
if err != nil {
err = fmt.Errorf("parse http response failed, %w", err)
ctx.Trace(err.Error())
return err
}

key := p.RespSaveKey
if key == "" {
key = DefaultResponseSaveKey
}
ctx.WithValue(key, httpResponse)
return nil
}
197 changes: 197 additions & 0 deletions pkg/actions/ahttp/http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package ahttp

import (
"context"
"encoding/json"
"io/ioutil"
"log"
"net/http"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/shiningrush/fastflow/pkg/entity/run"
"github.com/stretchr/testify/mock"
)

func TestMain(m *testing.M) {
go mockHTTPServer()
time.Sleep(time.Second)
os.Exit(m.Run())
}

const addr = "127.0.0.1:12345"

func TestHTTP_Run(t *testing.T) {
ctx := &run.MockExecuteContext{}
var (
saveKey string
response *HTTPResponse
)
ctx.On("WithValue", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
saveKey = args[0].(string)
response = args[1].(*HTTPResponse)
})

ctx.On("Tracef", mock.Anything, mock.Anything).Return()
ctx.On("Trace", mock.Anything, mock.Anything).Return()
ctx.On("WithValue", mock.Anything, mock.Anything).Return()

ctx.On("Context").Return(context.Background())

type args struct {
ctx run.ExecuteContext
params interface{}
}
url := "http://" + addr
tests := []struct {
name string
args args
wantSaveKey string
wantResponse *HTTPResponse
wantErr bool
}{
{
name: "get",
args: args{
ctx: ctx,
params: &HTTPParams{
URL: url,
Query: map[string]string{
"q": "test",
},
},
},
wantSaveKey: "httpResponse",
wantResponse: &HTTPResponse{Body: map[string]interface{}{
"method": "GET",
"query": "q=test",
}},
wantErr: false,
},
{
name: "post",
args: args{
ctx: ctx,
params: &HTTPParams{
Method: MethodPost,
URL: url,
Body: map[string]interface{}{
"a": 100,
},
RespSaveKey: "aa",
},
},
wantSaveKey: "aa",
wantResponse: &HTTPResponse{Body: map[string]interface{}{
"method": "POST",
"body": `{"a":100}`,
}},
wantErr: false,
},
{
name: "empty url",
args: args{
ctx: ctx,
params: &HTTPParams{
URL: "",
Query: map[string]string{
"q": "test",
},
},
},
wantSaveKey: "",
wantErr: true,
},
{
name: "err url",
args: args{
ctx: ctx,
params: &HTTPParams{
Method: MethodGet,
URL: "err",
Query: map[string]string{
"q": "test",
},
},
},
wantSaveKey: "",
wantErr: true,
},
{
name: "err status code",
args: args{
ctx: ctx,
params: &HTTPParams{
Method: MethodGet,
URL: url,
Query: map[string]string{
"q": "test",
},
RawBody: "error",
},
},
wantSaveKey: "",
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
h := &HTTP{}
saveKey = ""
response = nil
err := h.Run(tt.args.ctx, tt.args.params)
if (err != nil) != tt.wantErr {
t.Errorf("Run() error = %v, wantErr %v", err, tt.wantErr)
} else {
t.Logf("want err: %v", err)
}
if tt.wantSaveKey != "" {
assert.Equal(t, tt.wantSaveKey, saveKey)
}

if tt.wantResponse != nil && response != nil {
if response.Body != nil {
assert.Equal(t, tt.wantResponse.Body, response.Body)
} else {
assert.Equal(t, tt.wantResponse.Raw, response.Raw)
}
} else {
t.Log(response)
}
})
}
}

func mockHTTPServer() {
type response struct {
Method string `json:"method,omitempty"`
Query string `json:"query,omitempty"`
Body string `json:"body,omitempty"`
}
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
bs, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Printf("Error reading body: %v", err)
}
resp := &response{
Method: r.Method,
Query: r.URL.Query().Encode(),
Body: string(bs),
}

if string(bs) == "error" {
w.WriteHeader(504)
}

bytes, _ := json.Marshal(resp)
_, _ = w.Write(bytes)
})

err := http.ListenAndServe(addr, nil)
if err != nil {
log.Fatal("ListenAndServe: ", err)
}
}
Loading