-
Notifications
You must be signed in to change notification settings - Fork 66
/
Copy pathconnection.go
179 lines (145 loc) · 5.27 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package kafka
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"os"
"strconv"
"strings"
"github.com/Shopify/sarama"
"github.com/project-flogo/core/support/log"
)
// todo core should add support for shared connections and replace this
var connections = make(map[string]*KafkaConnection)
type KafkaConnection struct {
kafkaConfig *sarama.Config
brokers []string
syncProducer sarama.SyncProducer
}
func (c *KafkaConnection) Connection() sarama.SyncProducer {
return c.syncProducer
}
func (c *KafkaConnection) Stop() error {
return c.syncProducer.Close()
}
func getConnectionKey(settings *Settings) string {
var connKey string
connKey += settings.BrokerUrls
if settings.TrustStore != "" {
connKey += settings.TrustStore
}
if settings.User != "" {
connKey += settings.User
}
return connKey
}
func getKafkaConnection(logger log.Logger, settings *Settings) (*KafkaConnection, error) {
connKey := getConnectionKey(settings)
if conn, ok := connections[connKey]; ok {
logger.Debugf("Reusing cached Kafka connection [%s]", connKey)
return conn, nil
}
newConn := &KafkaConnection{}
newConn.kafkaConfig = sarama.NewConfig()
newConn.kafkaConfig.Producer.Return.Errors = true
newConn.kafkaConfig.Producer.RequiredAcks = sarama.WaitForAll
newConn.kafkaConfig.Producer.Retry.Max = 5
newConn.kafkaConfig.Producer.Return.Successes = true
brokerUrls := strings.Split(settings.BrokerUrls, ",")
if len(brokerUrls) < 1 {
return nil, fmt.Errorf("BrokerUrl [%s] is invalid, require at least one broker", settings.BrokerUrls)
}
brokers := make([]string, len(brokerUrls))
for brokerNo, broker := range brokerUrls {
err := validateBrokerUrl(broker)
if err != nil {
return nil, fmt.Errorf("BrokerUrl [%s] format invalid for reason: [%v]", broker, err)
}
brokers[brokerNo] = broker
}
newConn.brokers = brokers
logger.Debugf("Kafka brokers: [%v]", brokers)
//clientKeystore
/*
Its worth mentioning here that when the keystore for kafka is created it must support RSA keys via
the -keyalg RSA option. If not then there will be ZERO overlap in supported cipher suites with java.
see: https://issues.apache.org/jira/browse/KAFKA-3647
for more info
*/
if settings.TrustStore != "" {
if trustPool, err := getCerts(logger, settings.TrustStore); err == nil {
config := tls.Config{
RootCAs: trustPool,
InsecureSkipVerify: true}
newConn.kafkaConfig.Net.TLS.Enable = true
newConn.kafkaConfig.Net.TLS.Config = &config
logger.Debugf("Kafka initialized truststore from [%v]", settings.TrustStore)
} else {
return nil, err
}
}
// SASL
if settings.User != "" {
if len(settings.Password) == 0 {
return nil, fmt.Errorf("password not provided for user: %s", settings.User)
}
newConn.kafkaConfig.Net.SASL.Enable = true
newConn.kafkaConfig.Net.SASL.User = settings.User
newConn.kafkaConfig.Net.SASL.Password = settings.Password
logger.Debugf("Kafka SASL params initialized; user [%v]", settings.User)
}
syncProducer, err := sarama.NewSyncProducer(newConn.brokers, newConn.kafkaConfig)
if err != nil {
return nil, fmt.Errorf("failed to create a Kafka SyncProducer. Check any TLS or SASL parameters carefully. Reason given: [%s]", err)
}
newConn.syncProducer = syncProducer
connections[connKey] = newConn
logger.Debugf("Caching Kafka connection [%s]", connKey)
return newConn, nil
}
// validateBrokerUrl ensures that this string meets the host:port definition of a kafka host spec
// Kafka calls it a url but its really just host:port, which for numeric ip addresses is not a valid URI
// technically speaking.
func validateBrokerUrl(broker string) error {
hostPort := strings.Split(broker, ":")
if len(hostPort) != 2 {
return fmt.Errorf("BrokerUrl must be composed of sections like \"host:port\"")
}
i, err := strconv.Atoi(hostPort[1])
if err != nil || i < 0 || i > 32767 {
return fmt.Errorf("port specification [%s] is not numeric and between 0 and 32767", hostPort[1])
}
return nil
}
func getCerts(logger log.Logger, trustStore string) (*x509.CertPool, error) {
certPool := x509.NewCertPool()
fileInfo, err := os.Stat(trustStore)
if err != nil {
return certPool, fmt.Errorf("Truststore [%s] does not exist", trustStore)
}
switch mode := fileInfo.Mode(); {
case mode.IsDir():
break
case mode.IsRegular():
return certPool, fmt.Errorf("TrustStore [%s] is not a directory. Must be a directory containing trusted certificates in PEM format",
trustStore)
}
trustedCertFiles, err := ioutil.ReadDir(trustStore)
if err != nil || len(trustedCertFiles) == 0 {
return certPool, fmt.Errorf("failed to read trusted certificates from [%s] Must be a directory containing trusted certificates in PEM format", trustStore)
}
for _, trustCertFile := range trustedCertFiles {
fqfName := fmt.Sprintf("%s%c%s", trustStore, os.PathSeparator, trustCertFile.Name())
trustCertBytes, err := ioutil.ReadFile(fqfName)
if err != nil {
logger.Warnf("Failed to read trusted certificate [%s] ... continuing", trustCertFile.Name())
} else if trustCertBytes != nil {
certPool.AppendCertsFromPEM(trustCertBytes)
}
}
if len(certPool.Subjects()) < 1 {
return certPool, fmt.Errorf("failed to read trusted certificates from [%s] After processing all files in the directory no valid trusted certs were found", trustStore)
}
return certPool, nil
}