Skip to content

Commit ac98bb2

Browse files
committed
Updated code
1 parent d80b5f4 commit ac98bb2

File tree

6 files changed

+77
-61
lines changed

6 files changed

+77
-61
lines changed

cmd/mqttpub/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func Main(app gopi.App, args []string) error {
2323
return gopi.ErrBadParameter.WithPrefix("topic")
2424
} else if len(args) == 0 {
2525
return gopi.ErrHelp
26-
} else if err := client.Connect(mosquitto.MOSQ_FLAG_EVENT_ALL); err != nil {
26+
} else if err := client.Connect(); err != nil {
2727
return err
2828
} else {
2929
// Wait for connect

cmd/mqttsub/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func Main(app gopi.App, args []string) error {
2828

2929
// Connect client
3030
client := app.UnitInstance("mosquitto").(mosquitto.Client)
31-
if err := client.Connect(mosquitto.MOSQ_FLAG_EVENT_ALL); err != nil {
31+
if err := client.Connect(); err != nil {
3232
return err
3333
}
3434

mosquitto.go

+21-11
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ type (
2121

2222
// Client implements an MQTT client
2323
type Client interface {
24-
// Connect to MQTT broker with event flags
25-
Connect(Flags) error
24+
// Connect to MQTT broker with options
25+
Connect(...Opt) error
2626

2727
// Disconnect from MQTT broker
2828
Disconnect() error
@@ -36,7 +36,8 @@ type Client interface {
3636
// Publish []byte data to topic and return request-id
3737
Publish(string, []byte, ...Opt) (int, error)
3838

39-
// Wait for a specific request-id
39+
// Wait for a specific request-id or 0 for a connect or disconnect event
40+
// with context (for timeout)
4041
WaitFor(context.Context, int) (Event, error)
4142

4243
// Implements gopi.Unit
@@ -59,16 +60,19 @@ type Event interface {
5960

6061
// Function options
6162
type Opt struct {
62-
Type Option
63-
Int int
64-
Bool bool
63+
Type Option
64+
Int int
65+
Bool bool
66+
Flags Flags
6567
}
6668

6769
////////////////////////////////////////////////////////////////////////////////
6870
// MQTT Options
6971

70-
func OptQOS(value int) Opt { return Opt{Type: MOSQ_OPTION_QOS, Int: value} }
71-
func OptRetain(value bool) Opt { return Opt{Type: MOSQ_OPTION_RETAIN, Bool: value} }
72+
func OptQOS(value int) Opt { return Opt{Type: MOSQ_OPTION_QOS, Int: value} }
73+
func OptRetain(value bool) Opt { return Opt{Type: MOSQ_OPTION_RETAIN, Bool: value} }
74+
func OptFlags(value Flags) Opt { return Opt{Type: MOSQ_OPTION_FLAGS, Flags: value} }
75+
func OptKeepaliveSecs(value int) Opt { return Opt{Type: MOSQ_OPTION_KEEPALIVE, Int: value} }
7276

7377
////////////////////////////////////////////////////////////////////////////////
7478
// CONSTANTS
@@ -88,9 +92,11 @@ const (
8892
)
8993

9094
const (
91-
MOSQ_OPTION_NONE Option = iota
92-
MOSQ_OPTION_QOS // IntValue
93-
MOSQ_OPTION_RETAIN // BoolValue
95+
MOSQ_OPTION_NONE Option = iota
96+
MOSQ_OPTION_QOS // IntValue
97+
MOSQ_OPTION_RETAIN // BoolValue
98+
MOSQ_OPTION_FLAGS // FlagsValue
99+
MOSQ_OPTION_KEEPALIVE // IntValue
94100
)
95101

96102
////////////////////////////////////////////////////////////////////////////////
@@ -140,6 +146,10 @@ func (o Option) String() string {
140146
return "MOSQ_OPTION_QOS"
141147
case MOSQ_OPTION_RETAIN:
142148
return "MOSQ_OPTION_RETAIN"
149+
case MOSQ_OPTION_FLAGS:
150+
return "MOSQ_OPTION_FLAGS"
151+
case MOSQ_OPTION_KEEPALIVE:
152+
return "MOSQ_OPTION_KEEPALIVE"
143153
default:
144154
return "[?? Invalid Option value]"
145155
}

unit/mosquitto/init.go

+5-11
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package mosquitto
22

33
import (
4-
"time"
5-
64
// Frameworks
75
gopi "github.com/djthorpe/gopi/v2"
86
)
@@ -14,19 +12,15 @@ func init() {
1412
Config: func(app gopi.App) error {
1513
app.Flags().FlagString("mqtt.user", "", "Username")
1614
app.Flags().FlagString("mqtt.password", "", "Password")
17-
app.Flags().FlagString("mqtt.host", "", "MQTT Broke hostname")
18-
app.Flags().FlagUint("mqtt.port", 0, "MQTT Broker port")
19-
app.Flags().FlagDuration("mqtt.keepalive", 60*time.Second, "MQTT Broker keepalive")
15+
app.Flags().FlagString("mqtt.broker", "", "Broker host:port")
2016
return nil
2117
},
2218
New: func(app gopi.App) (gopi.Unit, error) {
2319
return gopi.New(Mosquitto{
24-
User: app.Flags().GetString("mqtt.user", gopi.FLAG_NS_DEFAULT),
25-
Password: app.Flags().GetString("mqtt.password", gopi.FLAG_NS_DEFAULT),
26-
Host: app.Flags().GetString("mqtt.host", gopi.FLAG_NS_DEFAULT),
27-
Port: app.Flags().GetUint("mqtt.port", gopi.FLAG_NS_DEFAULT),
28-
Keepalive: app.Flags().GetDuration("mqtt.keepalive", gopi.FLAG_NS_DEFAULT),
29-
Bus: app.Bus(),
20+
User: app.Flags().GetString("mqtt.user", gopi.FLAG_NS_DEFAULT),
21+
Password: app.Flags().GetString("mqtt.password", gopi.FLAG_NS_DEFAULT),
22+
Broker: app.Flags().GetString("mqtt.broker", gopi.FLAG_NS_DEFAULT),
23+
Bus: app.Bus(),
3024
}, app.Log().Clone(Mosquitto{}.Name()))
3125
},
3226
})

unit/mosquitto/mosquitto.go

+44-31
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ package mosquitto
33
import (
44
"context"
55
"fmt"
6+
"net"
7+
"regexp"
68
"strconv"
79
"sync"
8-
"time"
910

1011
// Frameworks
1112
gopi "github.com/djthorpe/gopi/v2"
@@ -18,20 +19,17 @@ import (
1819
// TYPES
1920

2021
type Mosquitto struct {
21-
ClientId string
22-
User string
23-
Password string
24-
Host string
25-
Port uint
26-
Keepalive time.Duration
27-
Bus gopi.Bus
22+
ClientId string
23+
User string
24+
Password string
25+
Broker string
26+
Bus gopi.Bus
2827
}
2928

3029
type mosquitto struct {
3130
host string
3231
port uint
3332
client *mosq.Client
34-
keepalive time.Duration
3533
connected bool
3634
bus gopi.Bus
3735

@@ -40,6 +38,13 @@ type mosquitto struct {
4038
sync.WaitGroup
4139
}
4240

41+
////////////////////////////////////////////////////////////////////////////////
42+
// GLOBALS
43+
44+
var (
45+
reHostPort = regexp.MustCompile("^([^\\:]+)\\:(\\d+)$")
46+
)
47+
4348
////////////////////////////////////////////////////////////////////////////////
4449
// IMPLEMENTATION gopi.Unit
4550

@@ -60,7 +65,6 @@ func (config Mosquitto) New(log gopi.Logger) (gopi.Unit, error) {
6065
// IMPLEMENTATION mosquitto.Client
6166

6267
func (this *mosquitto) Init(config Mosquitto) error {
63-
6468
// Bus
6569
if config.Bus == nil {
6670
return gopi.ErrBadParameter.WithPrefix("bus")
@@ -71,29 +75,26 @@ func (this *mosquitto) Init(config Mosquitto) error {
7175
// Initialize
7276
if err := mosq.Init(); err != nil {
7377
return err
74-
} else if client, err := mosq.New(config.ClientId, true, uintptr(0)); err != nil {
78+
} else if client, err := mosq.New(config.ClientId, true, 0); err != nil {
7579
return fmt.Errorf("New: %w", err)
7680
} else {
7781
this.client = client
7882
}
7983

80-
// Check host and port
81-
if config.Host == "" {
82-
return gopi.ErrBadParameter.WithPrefix("host")
83-
} else {
84-
this.host = config.Host
85-
}
86-
if config.Port == 0 {
87-
this.port = mosq.MOSQ_DEFAULT_PORT
88-
} else {
89-
this.port = config.Port
84+
// Add the port on the end if not added
85+
if config.Broker == "" {
86+
return gopi.ErrBadParameter.WithPrefix("-mqtt.broker")
87+
} else if reHostPort.MatchString(config.Broker) == false {
88+
config.Broker = fmt.Sprintf("%v:%v", config.Broker, mosq.MOSQ_DEFAULT_PORT)
9089
}
91-
92-
// Set keep alive
93-
if config.Keepalive == 0 {
94-
return gopi.ErrBadParameter.WithPrefix("keepalive")
90+
// Check host and port
91+
if host, port, err := net.SplitHostPort(config.Broker); err != nil {
92+
return gopi.ErrBadParameter.WithPrefix("-mqtt.broker")
93+
} else if port_, err := strconv.ParseUint(port, 10, 32); err != nil {
94+
return gopi.ErrBadParameter.WithPrefix("-mqtt.broker")
9595
} else {
96-
this.keepalive = config.Keepalive
96+
this.host = host
97+
this.port = uint(port_)
9798
}
9899

99100
// Set credentials
@@ -129,10 +130,24 @@ func (this *mosquitto) Close() error {
129130
////////////////////////////////////////////////////////////////////////////////
130131
// CONNECT AND DISCONNECT
131132

132-
func (this *mosquitto) Connect(flags iface.Flags) error {
133+
func (this *mosquitto) Connect(opts ...iface.Opt) error {
133134
this.Mutex.Lock()
134135
defer this.Mutex.Unlock()
135136

137+
// Process options
138+
flags := iface.MOSQ_FLAG_EVENT_ALL
139+
keepalive_secs := int(60)
140+
for _, opt := range opts {
141+
switch opt.Type {
142+
case iface.MOSQ_OPTION_FLAGS:
143+
flags = opt.Flags
144+
case iface.MOSQ_OPTION_KEEPALIVE:
145+
keepalive_secs = opt.Int
146+
default:
147+
return gopi.ErrBadParameter.WithPrefix(fmt.Sprint(opt.Type))
148+
}
149+
}
150+
136151
// Set flags
137152
if flags&iface.MOSQ_FLAG_EVENT_CONNECT == iface.MOSQ_FLAG_EVENT_CONNECT {
138153
this.client.SetConnectCallback(func(userInfo uintptr, rc int) {
@@ -196,9 +211,7 @@ func (this *mosquitto) Connect(flags iface.Flags) error {
196211
}
197212

198213
// Perform connection, start loop
199-
if keepalive_secs := int(this.keepalive.Seconds()); keepalive_secs < 1 {
200-
return gopi.ErrBadParameter.WithPrefix("keepalive")
201-
} else if err := this.client.LoopStart(); err != nil {
214+
if err := this.client.LoopStart(); err != nil {
202215
return err
203216
} else if err := this.client.Connect(this.host, int(this.port), keepalive_secs, false); err != nil {
204217
this.client.LoopStop(true)
@@ -244,7 +257,7 @@ func (this *mosquitto) Version() string {
244257
func (this *mosquitto) String() string {
245258
str := "<mosq.Client"
246259
str += " version=" + strconv.Quote(this.Version())
247-
str += " host=" + fmt.Sprintf("%v:%v", this.host, this.port)
260+
str += " broker=" + fmt.Sprintf("%v:%v", this.host, this.port)
248261
str += " connected=" + fmt.Sprint(this.connected)
249262
return str + ">"
250263
}

unit/mosquitto/mosquitto_test.go

+5-6
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,15 @@ import (
1717
)
1818

1919
const (
20-
TEST_SERVER = "test.mosquitto.org"
21-
TEST_PORT_PLAINTEXT = 1883
20+
TEST_SERVER = "test.mosquitto.org:1883"
2221
)
2322

2423
func Test_Mosquitto_000(t *testing.T) {
2524
t.Log("Test_Mosquitto_000")
2625
}
2726

2827
func Test_Mosquitto_001(t *testing.T) {
29-
args := []string{"-mqtt.host", TEST_SERVER, "-mqtt.port", fmt.Sprint(TEST_PORT_PLAINTEXT)}
28+
args := []string{"-mqtt.broker", TEST_SERVER}
3029
if app, err := app.NewTestTool(t, Main_Test_Mosquitto_001, args, "mosquitto"); err != nil {
3130
t.Error(err)
3231
} else {
@@ -40,7 +39,7 @@ func Main_Test_Mosquitto_001(app gopi.App, t *testing.T) {
4039
bus.DefaultHandler(gopi.EVENT_NS_DEFAULT, func(_ context.Context, _ gopi.App, evt gopi.Event) {
4140
t.Log(evt)
4241
})
43-
if err := mosquitto.Connect(mosq.MOSQ_FLAG_EVENT_ALL); err != nil {
42+
if err := mosquitto.Connect(); err != nil {
4443
t.Error(err)
4544
} else if _, err := mosquitto.Subscribe("#"); err != nil {
4645
t.Error(err)
@@ -50,7 +49,7 @@ func Main_Test_Mosquitto_001(app gopi.App, t *testing.T) {
5049
}
5150

5251
func Test_Mosquitto_002(t *testing.T) {
53-
args := []string{"-mqtt.host", TEST_SERVER, "-mqtt.port", fmt.Sprint(TEST_PORT_PLAINTEXT)}
52+
args := []string{"-mqtt.broker", TEST_SERVER}
5453
if app, err := app.NewTestTool(t, Main_Test_Mosquitto_002, args, "mosquitto"); err != nil {
5554
t.Error(err)
5655
} else {
@@ -64,7 +63,7 @@ func Main_Test_Mosquitto_002(app gopi.App, t *testing.T) {
6463
bus.DefaultHandler(gopi.EVENT_NS_DEFAULT, func(_ context.Context, _ gopi.App, evt gopi.Event) {
6564
t.Log(evt)
6665
})
67-
if err := mosquitto.Connect(mosq.MOSQ_FLAG_EVENT_ALL); err != nil {
66+
if err := mosquitto.Connect(); err != nil {
6867
t.Error(err)
6968
} else {
7069
time.Sleep(2 * time.Second)

0 commit comments

Comments
 (0)