From f24b713ffbbc4765d6c7a52023d3d0543b508bd7 Mon Sep 17 00:00:00 2001 From: t_max <1172915550@qq.com> Date: Wed, 19 Feb 2025 14:27:50 +0800 Subject: [PATCH 1/2] feat: support tmq config `msg.consume.rawdata` --- controller/ws/tmq/const.go | 6 +- controller/ws/tmq/tmq.go | 15 +- controller/ws/tmq/tmq_test.go | 404 +++++++++++++++++++++++++++++++++- driver/common/const.go | 1 + 4 files changed, 421 insertions(+), 5 deletions(-) diff --git a/controller/ws/tmq/const.go b/controller/ws/tmq/const.go index 5915f7ba..ee16a31f 100644 --- a/controller/ws/tmq/const.go +++ b/controller/ws/tmq/const.go @@ -4,9 +4,9 @@ const TaosTMQKey = "taos_tmq" const ( TMQSubscribe = "subscribe" TMQPoll = "poll" - TMQFetch = "fetch" + TMQFetch = "fetch" // fetch_raw_block TMQFetchBlock = "fetch_block" - TMQFetchRaw = "fetch_raw" + TMQFetchRaw = "fetch_raw" // tmq_get_raw TMQFetchJsonMeta = "fetch_json_meta" TMQCommit = "commit" TMQUnsubscribe = "unsubscribe" @@ -16,7 +16,7 @@ const ( TMQCommitted = "committed" TMQPosition = "position" TMQListTopics = "list_topics" - TMQFetchRawNew = "fetch_raw_data" + TMQFetchRawNew = "fetch_raw_data" // tmq_get_raw ) const ( TMQRawMessage = 3 diff --git a/controller/ws/tmq/tmq.go b/controller/ws/tmq/tmq.go index ad8baed5..430c739f 100644 --- a/controller/ws/tmq/tmq.go +++ b/controller/ws/tmq/tmq.go @@ -400,6 +400,7 @@ type TMQSubscribeReq struct { TZ string `json:"tz"` App string `json:"app"` IP string `json:"ip"` + MsgConsumeRawdata string `json:"msg_consume_rawdata"` } type TMQSubscribeResp struct { @@ -520,6 +521,9 @@ func (t *TMQ) subscribe(ctx context.Context, session *melody.Session, req *TMQSu if len(req.MaxPollIntervalMS) != 0 { tmqOptions["max.poll.interval.ms"] = req.MaxPollIntervalMS } + if len(req.MsgConsumeRawdata) != 0 { + tmqOptions["msg.consume.rawdata"] = req.MsgConsumeRawdata + } var errCode int32 for k, v := range tmqOptions { errCode = wrapper.TMQConfSet(tmqConfig, k, v) @@ -1090,6 +1094,11 @@ func (t *TMQ) fetchJsonMeta(ctx context.Context, session *melody.Session, req *T return } message := t.tmpMessage + if !canGetMeta(message.Type) { + logger.Errorf("message type can not get meta, type:%d", message.Type) + wsTMQErrorMsg(ctx, session, logger, 0xffff, fmt.Sprintf("message type can not get meta, type: %d", message.Type), action, req.ReqID, &req.MessageID) + return + } if message.Index != req.MessageID { logger.Errorf("message ID are not equal, req:%d, message:%d", req.MessageID, message.Index) wsTMQErrorMsg(ctx, session, logger, 0xffff, "message ID is not equal", action, req.ReqID, &req.MessageID) @@ -1368,9 +1377,13 @@ func canGetData(messageType int32) bool { return messageType == common.TMQ_RES_DATA || messageType == common.TMQ_RES_METADATA } +func canGetMeta(messageType int32) bool { + return messageType == common.TMQ_RES_TABLE_META || messageType == common.TMQ_RES_METADATA +} + func messageTypeIsValid(messageType int32) bool { switch messageType { - case common.TMQ_RES_DATA, common.TMQ_RES_TABLE_META, common.TMQ_RES_METADATA: + case common.TMQ_RES_DATA, common.TMQ_RES_TABLE_META, common.TMQ_RES_METADATA, common.TMQ_RES_RAWDATA: return true } return false diff --git a/controller/ws/tmq/tmq_test.go b/controller/ws/tmq/tmq_test.go index 1ebe1e3d..012da3c0 100644 --- a/controller/ws/tmq/tmq_test.go +++ b/controller/ws/tmq/tmq_test.go @@ -28,9 +28,11 @@ import ( "github.com/taosdata/taosadapter/v3/controller/ws/query" "github.com/taosdata/taosadapter/v3/controller/ws/wstool" "github.com/taosdata/taosadapter/v3/db" + "github.com/taosdata/taosadapter/v3/driver/common" "github.com/taosdata/taosadapter/v3/driver/common/parser" "github.com/taosdata/taosadapter/v3/driver/common/tmq" "github.com/taosdata/taosadapter/v3/log" + "github.com/taosdata/taosadapter/v3/tools/layout" "github.com/taosdata/taosadapter/v3/tools/parseblock" ) @@ -849,7 +851,7 @@ func TestMeta(t *testing.T) { SnapshotEnable: "true", WithTableName: "true", OffsetReset: "earliest", - EnableBatchMeta: "true", + EnableBatchMeta: "1", SessionTimeoutMS: "12000", MaxPollIntervalMS: "300000", } @@ -1009,6 +1011,11 @@ func writeRaw(t *testing.T, rawData []byte) { if strings.Contains(err.Error(), "use of closed network connection") { return } + var closeErr *websocket.CloseError + if errors.As(err, &closeErr) && closeErr.Code == websocket.CloseAbnormalClosure { + finish <- struct{}{} + return + } t.Error(err) finish <- struct{}{} return @@ -3405,3 +3412,398 @@ func TestPollError(t *testing.T) { assert.NoError(t, err) assert.NotEqual(t, 0, pollResp.Code, string(msg)) } + +func TestConsumeRawdata(t *testing.T) { + code, message := doHttpSql("create database if not exists test_ws_rawdata WAL_RETENTION_PERIOD 86400") + if code != 0 { + t.Fatalf("create database failed: %s", message) + } + code, message = doHttpSql("create topic if not exists test_tmq_meta_ws_topic with meta as DATABASE test_ws_rawdata") + if code != 0 { + t.Fatalf("create topic failed: %s", message) + } + + s := httptest.NewServer(router) + defer s.Close() + ws, _, err := websocket.DefaultDialer.Dial("ws"+strings.TrimPrefix(s.URL, "http")+"/rest/tmq", nil) + if err != nil { + t.Error(err) + return + } + defer func() { + err = ws.Close() + assert.NoError(t, err) + }() + init := &TMQSubscribeReq{ + ReqID: 0, + User: "root", + Password: "taosdata", + GroupID: "test", + Topics: []string{"test_tmq_meta_ws_topic"}, + AutoCommit: "true", + AutoCommitIntervalMS: "5000", + SnapshotEnable: "true", + WithTableName: "true", + OffsetReset: "earliest", + EnableBatchMeta: "1", + SessionTimeoutMS: "12000", + MaxPollIntervalMS: "300000", + MsgConsumeRawdata: "1", + } + b, _ := json.Marshal(init) + msg, err := doWebSocket(ws, TMQSubscribe, b) + assert.NoError(t, err) + var subscribeResp TMQSubscribeResp + err = json.Unmarshal(msg, &subscribeResp) + assert.NoError(t, err) + assert.Equal(t, 0, subscribeResp.Code, subscribeResp.Message) + + code, message = doHttpSql("create table test_ws_rawdata.stb (ts timestamp," + + "c1 bool," + + "c2 tinyint," + + "c3 smallint," + + "c4 int," + + "c5 bigint," + + "c6 tinyint unsigned," + + "c7 smallint unsigned," + + "c8 int unsigned," + + "c9 bigint unsigned," + + "c10 float," + + "c11 double," + + "c12 binary(20)," + + "c13 nchar(20)" + + ")" + + "tags(tts timestamp," + + "tc1 bool," + + "tc2 tinyint," + + "tc3 smallint," + + "tc4 int," + + "tc5 bigint," + + "tc6 tinyint unsigned," + + "tc7 smallint unsigned," + + "tc8 int unsigned," + + "tc9 bigint unsigned," + + "tc10 float," + + "tc11 double," + + "tc12 binary(20)," + + "tc13 nchar(20)" + + ")") + if code != 0 { + t.Fatalf("create table failed: %s", message) + } + now := time.Now().Round(time.Millisecond).UTC() + nowStr := now.Format(time.RFC3339Nano) + code, message = doHttpSql(fmt.Sprintf("create table test_ws_rawdata.ctb using test_ws_rawdata.stb tags('%s', true,1,1,1,1,1,1,1,1,1,1,'tg','ntg')", nowStr)) + if code != 0 { + t.Fatalf("insert failed: %s", message) + } + code, message = doHttpSql(fmt.Sprintf("insert into test_ws_rawdata.ctb values('%s',true,1,1,1,1,1,1,1,1,1,1,'vl','nvl')", nowStr)) + if code != 0 { + t.Fatalf("insert failed: %s", message) + } + b, _ = json.Marshal(&TMQPollReq{ + ReqID: 3, + BlockingTime: 500, + }) + gotRawMessage := false + for i := 0; i < 5; i++ { + msg, err = doWebSocket(ws, TMQPoll, b) + assert.NoError(t, err) + var pollResp TMQPollResp + err = json.Unmarshal(msg, &pollResp) + assert.NoError(t, err) + assert.Equal(t, 0, pollResp.Code, string(msg)) + if pollResp.HaveMessage { + if pollResp.MessageType == common.TMQ_RES_RAWDATA { + gotRawMessage = true + // can not call fetch + b, _ = json.Marshal(TMQFetchReq{ReqID: 101, MessageID: pollResp.MessageID}) + msg, err = doWebSocket(ws, TMQFetch, b) + assert.NoError(t, err) + var fetchResp TMQFetchResp + err = json.Unmarshal(msg, &fetchResp) + assert.NoError(t, err) + assert.Equal(t, uint64(101), fetchResp.ReqID, fetchResp) + assert.NotEqual(t, 0, fetchResp.Code, fetchResp) + // can not call fetch_block + b, _ = json.Marshal(TMQFetchBlockReq{ReqID: 102, MessageID: pollResp.MessageID}) + msg, err = doWebSocket(ws, TMQFetchBlock, b) + assert.NoError(t, err) + var fetchBlockResp WSTMQErrorResp + err = json.Unmarshal(msg, &fetchBlockResp) + assert.NoError(t, err) + assert.Equal(t, uint64(102), fetchBlockResp.ReqID, fetchResp) + assert.NotEqual(t, 0, fetchBlockResp.Code, fetchBlockResp) + // can not call fetch_json_meta + b, _ = json.Marshal(TMQFetchJsonMetaReq{ReqID: 103, MessageID: pollResp.MessageID}) + msg, err = doWebSocket(ws, TMQFetchJsonMeta, b) + assert.NoError(t, err) + var fetchJsonMetaResp TMQFetchJsonMetaResp + err = json.Unmarshal(msg, &fetchJsonMetaResp) + assert.NoError(t, err) + assert.Equal(t, uint64(103), fetchJsonMetaResp.ReqID, fetchJsonMetaResp) + assert.NotEqual(t, 0, fetchJsonMetaResp.Code, fetchJsonMetaResp) + } + b, _ = json.Marshal(TMQFetchRawReq{ReqID: 100, MessageID: pollResp.MessageID}) + msg, err = doWebSocket(ws, TMQFetchRawNew, b) + assert.NoError(t, err) + resp := parseFetchRawNewResponse(msg) + assert.Equal(t, uint64(0xffffffffffffffff), resp.Flag, resp.Flag) + assert.Equal(t, uint32(0), resp.Code, resp.Message) + assert.Equal(t, uint16(1), resp.Version) + assert.Equal(t, uint64(TMQFetchRawNewMessage), resp.Action) + assert.Greater(t, resp.Time, uint64(0)) + assert.Equal(t, uint64(100), resp.ReqID) + assert.Equal(t, pollResp.MessageID, resp.MessageID) + assert.Equal(t, int(resp.RawBlockLength), len(resp.TMQRawBlock)) + + writeMsg := make([]byte, 30+resp.RawBlockLength) + binary.LittleEndian.PutUint64(writeMsg, resp.ReqID) + binary.LittleEndian.PutUint64(writeMsg[8:], resp.MessageID) + binary.LittleEndian.PutUint64(writeMsg[16:], TMQRawMessage) + binary.LittleEndian.PutUint32(writeMsg[24:], resp.RawBlockLength) + binary.LittleEndian.PutUint16(writeMsg[28:], resp.MetaType) + copy(writeMsg[30:], resp.TMQRawBlock) + writeConsumeRawdata(t, writeMsg) + } + } + if !assert.True(t, gotRawMessage) { + return + } + b, _ = json.Marshal(&TMQUnsubscribeReq{ + ReqID: 6, + }) + msg, err = doWebSocket(ws, TMQUnsubscribe, b) + assert.NoError(t, err) + var unsubscribeResp TMQUnsubscribeResp + err = json.Unmarshal(msg, &unsubscribeResp) + assert.NoError(t, err) + assert.Equal(t, 0, unsubscribeResp.Code, unsubscribeResp.Message) + + err = ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + assert.NoError(t, err) + time.Sleep(time.Second * 5) + + w := httptest.NewRecorder() + body := strings.NewReader("describe stb") + req, _ := http.NewRequest(http.MethodPost, "/rest/sql/test_ws_rawdata_target", body) + req.RemoteAddr = "127.0.0.1:33333" + req.Header.Set("Authorization", "Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04") + router.ServeHTTP(w, req) + assert.Equal(t, 200, w.Code) + var resp wstool.TDEngineRestfulResp + err = jsoniter.Unmarshal(w.Body.Bytes(), &resp) + assert.NoError(t, err) + expect := [][]driver.Value{ + {"ts", "TIMESTAMP", float64(8), ""}, + {"c1", "BOOL", float64(1), ""}, + {"c2", "TINYINT", float64(1), ""}, + {"c3", "SMALLINT", float64(2), ""}, + {"c4", "INT", float64(4), ""}, + {"c5", "BIGINT", float64(8), ""}, + {"c6", "TINYINT UNSIGNED", float64(1), ""}, + {"c7", "SMALLINT UNSIGNED", float64(2), ""}, + {"c8", "INT UNSIGNED", float64(4), ""}, + {"c9", "BIGINT UNSIGNED", float64(8), ""}, + {"c10", "FLOAT", float64(4), ""}, + {"c11", "DOUBLE", float64(8), ""}, + {"c12", "VARCHAR", float64(20), ""}, + {"c13", "NCHAR", float64(20), ""}, + {"tts", "TIMESTAMP", float64(8), "TAG"}, + {"tc1", "BOOL", float64(1), "TAG"}, + {"tc2", "TINYINT", float64(1), "TAG"}, + {"tc3", "SMALLINT", float64(2), "TAG"}, + {"tc4", "INT", float64(4), "TAG"}, + {"tc5", "BIGINT", float64(8), "TAG"}, + {"tc6", "TINYINT UNSIGNED", float64(1), "TAG"}, + {"tc7", "SMALLINT UNSIGNED", float64(2), "TAG"}, + {"tc8", "INT UNSIGNED", float64(4), "TAG"}, + {"tc9", "BIGINT UNSIGNED", float64(8), "TAG"}, + {"tc10", "FLOAT", float64(4), "TAG"}, + {"tc11", "DOUBLE", float64(8), "TAG"}, + {"tc12", "VARCHAR", float64(20), "TAG"}, + {"tc13", "NCHAR", float64(20), "TAG"}, + } + for index, values := range expect { + for i := 0; i < 4; i++ { + assert.Equal(t, values[i], resp.Data[index][i]) + } + } + + w = httptest.NewRecorder() + body = strings.NewReader("select * from stb limit 1") + req, _ = http.NewRequest(http.MethodPost, "/rest/sql/test_ws_rawdata_target", body) + req.RemoteAddr = "127.0.0.1:33333" + req.Header.Set("Authorization", "Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04") + router.ServeHTTP(w, req) + assert.Equal(t, 200, w.Code) + err = jsoniter.Unmarshal(w.Body.Bytes(), &resp) + assert.NoError(t, err) + expect = [][]driver.Value{ + { + now.Format(layout.LayoutMillSecond), + true, + float64(1), + float64(1), + float64(1), + float64(1), + float64(1), + float64(1), + float64(1), + float64(1), + float64(1), + float64(1), + "vl", + "nvl", + now.Format(layout.LayoutMillSecond), + true, + float64(1), + float64(1), + float64(1), + float64(1), + float64(1), + float64(1), + float64(1), + float64(1), + float64(1), + float64(1), + "tg", + "ntg", + }, + } + assert.Equal(t, expect, resp.Data) + + for i := 0; i < 5; i++ { + time.Sleep(time.Second * 3) + code, message := doHttpSql("drop topic if exists test_tmq_meta_ws_topic") + if code != 0 { + t.Log(message) + continue + } + code, message = doHttpSql("drop database if exists test_ws_rawdata_target") + if code != 0 { + t.Log(message) + continue + } + code, message = doHttpSql("drop database if exists test_ws_rawdata") + if code != 0 { + t.Log(message) + continue + } + break + } +} + +func writeConsumeRawdata(t *testing.T, rawData []byte) { + w := httptest.NewRecorder() + body := strings.NewReader("create database if not exists test_ws_rawdata_target WAL_RETENTION_PERIOD 86400") + req, _ := http.NewRequest(http.MethodPost, "/rest/sql", body) + req.RemoteAddr = "127.0.0.1:33333" + req.Header.Set("Authorization", "Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04") + router.ServeHTTP(w, req) + assert.Equal(t, 200, w.Code) + s := httptest.NewServer(router) + defer s.Close() + ws, _, err := websocket.DefaultDialer.Dial("ws"+strings.TrimPrefix(s.URL, "http")+"/rest/ws", nil) + if err != nil { + t.Error(err) + return + } + defer func() { + err = ws.Close() + assert.NoError(t, err) + }() + const ( + AfterConnect = 1 + AfterWriteRaw = 2 + ) + + status := 0 + //total := 0 + finish := make(chan struct{}) + //var jsonResult [][]interface{} + testMessageHandler := func(_ int, message []byte) error { + //json + switch status { + case AfterConnect: + var d query.WSConnectResp + err = json.Unmarshal(message, &d) + if err != nil { + return err + } + if d.Code != 0 { + return fmt.Errorf("%s %d,%s", query.WSConnect, d.Code, d.Message) + } + //query + status = AfterWriteRaw + err = ws.WriteMessage(websocket.BinaryMessage, rawData) + if err != nil { + return err + } + case AfterWriteRaw: + var d query.WSWriteMetaResp + err = json.Unmarshal(message, &d) + if err != nil { + return err + } + if d.Code != 0 { + return fmt.Errorf("%s %d,%s", query.WSQuery, d.Code, d.Message) + } + finish <- struct{}{} + } + return nil + } + go func() { + for { + mt, message, err := ws.ReadMessage() + if err != nil { + if strings.Contains(err.Error(), "use of closed network connection") { + return + } + var closeErr *websocket.CloseError + if errors.As(err, &closeErr) && closeErr.Code == websocket.CloseAbnormalClosure { + finish <- struct{}{} + return + } + t.Error(err) + finish <- struct{}{} + return + } + err = testMessageHandler(mt, message) + if err != nil { + if mt == websocket.BinaryMessage { + t.Error(err, message) + } else { + t.Error(err, string(message)) + } + finish <- struct{}{} + return + } + } + }() + + connect := &query.WSConnectReq{ + ReqID: 0, + User: "root", + Password: "taosdata", + DB: "test_ws_rawdata_target", + } + + b, _ := json.Marshal(connect) + action, _ := json.Marshal(&wstool.WSAction{ + Action: query.WSConnect, + Args: b, + }) + status = AfterConnect + err = ws.WriteMessage( + websocket.TextMessage, + action, + ) + if err != nil { + t.Error(err) + return + } + <-finish + err = ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + assert.NoError(t, err) +} diff --git a/driver/common/const.go b/driver/common/const.go index ae97d949..b650fd0c 100644 --- a/driver/common/const.go +++ b/driver/common/const.go @@ -37,6 +37,7 @@ const ( TMQ_RES_DATA = 1 TMQ_RES_TABLE_META = 2 TMQ_RES_METADATA = 3 + TMQ_RES_RAWDATA = 4 ) var TypeLengthMap = map[int]int{ From dfef255cc513507212f1fe2cb2d20ed286d45438 Mon Sep 17 00:00:00 2001 From: t_max <1172915550@qq.com> Date: Wed, 19 Feb 2025 16:28:48 +0800 Subject: [PATCH 2/2] enh: rename fetch_raw_data --- controller/ws/tmq/const.go | 2 +- controller/ws/tmq/tmq.go | 4 ++-- controller/ws/tmq/tmq_test.go | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/controller/ws/tmq/const.go b/controller/ws/tmq/const.go index ee16a31f..b0274b24 100644 --- a/controller/ws/tmq/const.go +++ b/controller/ws/tmq/const.go @@ -16,7 +16,7 @@ const ( TMQCommitted = "committed" TMQPosition = "position" TMQListTopics = "list_topics" - TMQFetchRawNew = "fetch_raw_data" // tmq_get_raw + TMQFetchRawData = "fetch_raw_data" // tmq_get_raw ) const ( TMQRawMessage = 3 diff --git a/controller/ws/tmq/tmq.go b/controller/ws/tmq/tmq.go index 430c739f..e1d7e161 100644 --- a/controller/ws/tmq/tmq.go +++ b/controller/ws/tmq/tmq.go @@ -131,7 +131,7 @@ func NewTMQController() *TMQController { return } t.fetchRawBlock(ctx, session, &req) - case TMQFetchRawNew: + case TMQFetchRawData: var req TMQFetchRawReq err = json.Unmarshal(action.Args, &req) if err != nil { @@ -1013,7 +1013,7 @@ func (t *TMQ) fetchRawBlock(ctx context.Context, session *melody.Session, req *T } func (t *TMQ) fetchRawBlockNew(ctx context.Context, session *melody.Session, req *TMQFetchRawReq) { - action := TMQFetchRawNew + action := TMQFetchRawData logger := t.logger.WithField("action", action).WithField(config.ReqIDKey, req.ReqID) logger.Tracef("fetch raw request:%+v", req) if t.consumer == nil { diff --git a/controller/ws/tmq/tmq_test.go b/controller/ws/tmq/tmq_test.go index 012da3c0..96593184 100644 --- a/controller/ws/tmq/tmq_test.go +++ b/controller/ws/tmq/tmq_test.go @@ -3057,7 +3057,7 @@ func TestTMQ_FetchRawNew(t *testing.T) { // fetch raw new b, _ = json.Marshal(TMQFetchRawReq{ReqID: 100, MessageID: pollResp.MessageID}) - msg, err = doWebSocket(ws, TMQFetchRawNew, b) + msg, err = doWebSocket(ws, TMQFetchRawData, b) assert.NoError(t, err) resp := parseFetchRawNewResponse(msg) assert.Equal(t, uint64(0xffffffffffffffff), resp.Flag, resp.Flag) @@ -3085,7 +3085,7 @@ func TestTMQ_FetchRawNew(t *testing.T) { // fetch wrong b, _ = json.Marshal(TMQFetchRawReq{ReqID: 100, MessageID: 8000}) - msg, err = doWebSocket(ws, TMQFetchRawNew, b) + msg, err = doWebSocket(ws, TMQFetchRawData, b) assert.NoError(t, err) resp = parseFetchRawNewResponse(msg) assert.Equal(t, uint64(0xffffffffffffffff), resp.Flag, resp.Flag) @@ -3545,7 +3545,7 @@ func TestConsumeRawdata(t *testing.T) { assert.NotEqual(t, 0, fetchJsonMetaResp.Code, fetchJsonMetaResp) } b, _ = json.Marshal(TMQFetchRawReq{ReqID: 100, MessageID: pollResp.MessageID}) - msg, err = doWebSocket(ws, TMQFetchRawNew, b) + msg, err = doWebSocket(ws, TMQFetchRawData, b) assert.NoError(t, err) resp := parseFetchRawNewResponse(msg) assert.Equal(t, uint64(0xffffffffffffffff), resp.Flag, resp.Flag)