Skip to content

Commit

Permalink
feat: support tmq config msg.consume.rawdata
Browse files Browse the repository at this point in the history
  • Loading branch information
huskar-t committed Feb 19, 2025
1 parent 6210295 commit 529d651
Show file tree
Hide file tree
Showing 4 changed files with 385 additions and 5 deletions.
6 changes: 3 additions & 3 deletions controller/ws/tmq/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ const TaosTMQKey = "taos_tmq"
const (
TMQSubscribe = "subscribe"
TMQPoll = "poll"
TMQFetch = "fetch"
TMQFetch = "fetch" // fetch_raw_block
TMQFetchBlock = "fetch_block"
TMQFetchRaw = "fetch_raw"
TMQFetchRaw = "fetch_raw" // tmq_get_raw
TMQFetchJsonMeta = "fetch_json_meta"
TMQCommit = "commit"
TMQUnsubscribe = "unsubscribe"
Expand All @@ -16,7 +16,7 @@ const (
TMQCommitted = "committed"
TMQPosition = "position"
TMQListTopics = "list_topics"
TMQFetchRawNew = "fetch_raw_data"
TMQFetchRawNew = "fetch_raw_data" // tmq_get_raw
)
const (
TMQRawMessage = 3
Expand Down
6 changes: 5 additions & 1 deletion controller/ws/tmq/tmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ type TMQSubscribeReq struct {
TZ string `json:"tz"`
App string `json:"app"`
IP string `json:"ip"`
MsgConsumeRawdata string `json:"msg_consume_rawdata"`
}

type TMQSubscribeResp struct {
Expand Down Expand Up @@ -520,6 +521,9 @@ func (t *TMQ) subscribe(ctx context.Context, session *melody.Session, req *TMQSu
if len(req.MaxPollIntervalMS) != 0 {
tmqOptions["max.poll.interval.ms"] = req.MaxPollIntervalMS
}
if len(req.MsgConsumeRawdata) != 0 {
tmqOptions["msg.consume.rawdata"] = req.MsgConsumeRawdata
}
var errCode int32
for k, v := range tmqOptions {
errCode = wrapper.TMQConfSet(tmqConfig, k, v)
Expand Down Expand Up @@ -1370,7 +1374,7 @@ func canGetData(messageType int32) bool {

func messageTypeIsValid(messageType int32) bool {
switch messageType {
case common.TMQ_RES_DATA, common.TMQ_RES_TABLE_META, common.TMQ_RES_METADATA:
case common.TMQ_RES_DATA, common.TMQ_RES_TABLE_META, common.TMQ_RES_METADATA, common.TMQ_RES_RAWDATA:
return true
}
return false
Expand Down
Loading

0 comments on commit 529d651

Please sign in to comment.