Skip to content

Commit ca8b14e

Browse files
committed
Updated code
1 parent ac98bb2 commit ca8b14e

File tree

4 files changed

+151
-7
lines changed

4 files changed

+151
-7
lines changed

cmd/mqttsub/main.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
)
1515

1616
var (
17-
Format = "%-10v %-40v %-28v\n"
17+
Format = "%-10v %-40v %-40v\n"
1818
Header sync.Once
1919
)
2020

@@ -56,13 +56,13 @@ func EventHandler(_ context.Context, app gopi.App, evt_ gopi.Event) {
5656

5757
Header.Do(func() {
5858
fmt.Printf(Format, "TYPE", "TOPIC", "DATA")
59-
fmt.Printf(Format, strings.Repeat("-", 10), strings.Repeat("-", 40), strings.Repeat("-", 28))
59+
fmt.Printf(Format, strings.Repeat("-", 10), strings.Repeat("-", 40), strings.Repeat("-", 40))
6060
})
6161
message := strings.TrimPrefix(fmt.Sprint(evt.Type()), "MOSQ_FLAG_EVENT_")
6262
topic := TruncateString(evt.Topic(), 40)
6363
data := "<nil>"
6464
if len(evt.Data()) > 0 {
65-
data = TruncateString(strconv.Quote(string(evt.Data())), 28)
65+
data = TruncateString(strconv.Quote(string(evt.Data())), 40)
6666
}
6767
fmt.Printf(Format, message, topic, data)
6868
}

mosquitto.go

+24-4
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package mosquitto
22

33
import (
44
"context"
5+
"fmt"
56
"strings"
7+
"time"
68

79
// Frameworks
810
"github.com/djthorpe/gopi/v2"
@@ -36,6 +38,12 @@ type Client interface {
3638
// Publish []byte data to topic and return request-id
3739
Publish(string, []byte, ...Opt) (int, error)
3840

41+
// Publish JSON data to topic and return request-id
42+
PublishJSON(string, interface{}, ...Opt) (int, error)
43+
44+
// Publish measurements in influxdata line protocol format and return request-id
45+
PublishInflux(string, string, map[string]interface{}, ...Opt) (int, error)
46+
3947
// Wait for a specific request-id or 0 for a connect or disconnect event
4048
// with context (for timeout)
4149
WaitFor(context.Context, int) (Event, error)
@@ -60,10 +68,12 @@ type Event interface {
6068

6169
// Function options
6270
type Opt struct {
63-
Type Option
64-
Int int
65-
Bool bool
66-
Flags Flags
71+
Type Option
72+
Int int
73+
Bool bool
74+
Flags Flags
75+
String string
76+
Timestamp time.Time
6777
}
6878

6979
////////////////////////////////////////////////////////////////////////////////
@@ -73,6 +83,10 @@ func OptQOS(value int) Opt { return Opt{Type: MOSQ_OPTION_QOS, Int: va
7383
func OptRetain(value bool) Opt { return Opt{Type: MOSQ_OPTION_RETAIN, Bool: value} }
7484
func OptFlags(value Flags) Opt { return Opt{Type: MOSQ_OPTION_FLAGS, Flags: value} }
7585
func OptKeepaliveSecs(value int) Opt { return Opt{Type: MOSQ_OPTION_KEEPALIVE, Int: value} }
86+
func OptTag(name, value string) Opt {
87+
return Opt{Type: MOSQ_OPTION_TAG, String: fmt.Sprintf("%s=%s", strings.TrimSpace(name), strings.TrimSpace(value))}
88+
}
89+
func OptTimestamp(value time.Time) Opt { return Opt{Type: MOSQ_OPTION_TIMESTAMP, Timestamp: value} }
7690

7791
////////////////////////////////////////////////////////////////////////////////
7892
// CONSTANTS
@@ -97,6 +111,8 @@ const (
97111
MOSQ_OPTION_RETAIN // BoolValue
98112
MOSQ_OPTION_FLAGS // FlagsValue
99113
MOSQ_OPTION_KEEPALIVE // IntValue
114+
MOSQ_OPTION_TAG // StringValue
115+
MOSQ_OPTION_TIMESTAMP // TimeValue
100116
)
101117

102118
////////////////////////////////////////////////////////////////////////////////
@@ -150,6 +166,10 @@ func (o Option) String() string {
150166
return "MOSQ_OPTION_FLAGS"
151167
case MOSQ_OPTION_KEEPALIVE:
152168
return "MOSQ_OPTION_KEEPALIVE"
169+
case MOSQ_OPTION_TAG:
170+
return "MOSQ_OPTION_TAG"
171+
case MOSQ_OPTION_TIMESTAMP:
172+
return "MOSQ_OPTION_TIMESTAMP"
153173
default:
154174
return "[?? Invalid Option value]"
155175
}

unit/mosquitto/mosquitto.go

+58
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ package mosquitto
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
67
"net"
78
"regexp"
89
"strconv"
10+
"strings"
911
"sync"
1012

1113
// Frameworks
@@ -339,6 +341,62 @@ func (this *mosquitto) Publish(topic string, data []byte, opts ...iface.Opt) (in
339341
}
340342
}
341343

344+
////////////////////////////////////////////////////////////////////////////////
345+
// PUBLISH JSON & INFLUX FORMATS
346+
347+
func (this *mosquitto) PublishJSON(topic string, data interface{}, opts ...iface.Opt) (int, error) {
348+
if json, err := json.Marshal(data); err != nil {
349+
return 0, err
350+
} else {
351+
return this.Publish(topic, json, opts...)
352+
}
353+
}
354+
355+
// Influx line protocol
356+
// https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/
357+
// Include one or more OptTag(name,value) for tags
358+
// Include one OptTimestamp(time.Time) to set timestamp
359+
func (this *mosquitto) PublishInflux(topic string, measurement string, fields map[string]interface{}, opts ...iface.Opt) (int, error) {
360+
// Check parameters
361+
if len(fields) == 0 {
362+
return 0, gopi.ErrBadParameter.WithPrefix("fields")
363+
}
364+
if measurement == "" {
365+
return 0, gopi.ErrBadParameter.WithPrefix("measurement")
366+
}
367+
368+
// Process options
369+
str := strings.TrimSpace(measurement)
370+
ts := ""
371+
other := make([]iface.Opt, 0, len(opts))
372+
for _, opt := range opts {
373+
switch opt.Type {
374+
case iface.MOSQ_OPTION_TAG:
375+
str += "," + opt.String
376+
case iface.MOSQ_OPTION_TIMESTAMP:
377+
ts = " " + fmt.Sprint(opt.Timestamp.UnixNano())
378+
default:
379+
other = append(other, opt)
380+
}
381+
}
382+
383+
// Process fields
384+
delim := " "
385+
for k, v := range fields {
386+
switch v.(type) {
387+
case float32, float64, bool, int, uint, int8, uint8, int16, uint16, int32, uint32, int64, uint64:
388+
str += delim + fmt.Sprintf("%v=%v", strings.TrimSpace(k), v)
389+
case string:
390+
str += delim + fmt.Sprintf("%v=%v", strings.TrimSpace(k), strconv.Quote(v.(string)))
391+
default:
392+
return 0, gopi.ErrBadParameter.WithPrefix(k)
393+
}
394+
delim = ","
395+
}
396+
397+
return this.Publish(topic, []byte(str+ts), other...)
398+
}
399+
342400
////////////////////////////////////////////////////////////////////////////////
343401
// WAIT FOR
344402

unit/mosquitto/mosquitto_test.go

+66
Original file line numberDiff line numberDiff line change
@@ -84,3 +84,69 @@ func Main_Test_Mosquitto_002(app gopi.App, t *testing.T) {
8484
}
8585
}
8686
}
87+
88+
func Test_Mosquitto_003(t *testing.T) {
89+
args := []string{"-mqtt.broker", TEST_SERVER}
90+
if app, err := app.NewTestTool(t, Main_Test_Mosquitto_003, args, "mosquitto"); err != nil {
91+
t.Error(err)
92+
} else {
93+
app.Run()
94+
}
95+
}
96+
97+
func Main_Test_Mosquitto_003(app gopi.App, t *testing.T) {
98+
mosquitto := app.UnitInstance("mosquitto").(mosq.Client)
99+
bus := app.Bus()
100+
bus.DefaultHandler(gopi.EVENT_NS_DEFAULT, func(_ context.Context, _ gopi.App, evt gopi.Event) {
101+
t.Log(evt)
102+
})
103+
if err := mosquitto.Connect(); err != nil {
104+
t.Error(err)
105+
} else {
106+
time.Sleep(2 * time.Second)
107+
for i := 0; i < 10; i++ {
108+
if _, err := mosquitto.PublishJSON("test", 100.8); err != nil {
109+
t.Error(err)
110+
}
111+
time.Sleep(100 * time.Millisecond)
112+
}
113+
if err := mosquitto.Disconnect(); err != nil {
114+
t.Error(err)
115+
}
116+
}
117+
}
118+
119+
func Test_Mosquitto_004(t *testing.T) {
120+
args := []string{"-mqtt.broker", TEST_SERVER}
121+
if app, err := app.NewTestTool(t, Main_Test_Mosquitto_004, args, "mosquitto"); err != nil {
122+
t.Error(err)
123+
} else {
124+
app.Run()
125+
}
126+
}
127+
128+
func Main_Test_Mosquitto_004(app gopi.App, t *testing.T) {
129+
mosquitto := app.UnitInstance("mosquitto").(mosq.Client)
130+
bus := app.Bus()
131+
bus.DefaultHandler(gopi.EVENT_NS_DEFAULT, func(_ context.Context, _ gopi.App, evt gopi.Event) {
132+
t.Log(evt)
133+
})
134+
if err := mosquitto.Connect(); err != nil {
135+
t.Error(err)
136+
} else {
137+
time.Sleep(2 * time.Second)
138+
for i := 0; i < 10; i++ {
139+
fields := map[string]interface{}{
140+
"v1": i,
141+
"v2": float64(i) / 2,
142+
}
143+
if _, err := mosquitto.PublishInflux("test", "test", fields, mosq.OptTag("host", "rpi4.lan"), mosq.OptTimestamp(time.Now())); err != nil {
144+
t.Error(err)
145+
}
146+
time.Sleep(100 * time.Millisecond)
147+
}
148+
if err := mosquitto.Disconnect(); err != nil {
149+
t.Error(err)
150+
}
151+
}
152+
}

0 commit comments

Comments
 (0)