-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathIF2Client.java
148 lines (129 loc) · 4.12 KB
/
IF2Client.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package intercor.if2.client;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ShutdownSignalException;
/**
*
* @author Igor Passchier
* @copyright (c) Tass International BV
*/
abstract public class IF2Client {
/**
* User name used to connect to the AMQP broker. Defaults to prosumer
*/
private static final String USER = "USER";
/**
* Password to connect to the AMQP broker. Defaults to prosumerpw
*/
private static final String PASSWORD = "PASSWORD";
/**
* Virtual host to connect to. Defaults to test
*/
public static final String VIRTUALHOST = "VIRTUALHOST";
/**
* Hostname or IP address of the broker. Defaults to localhost
*/
public static final String HOST = "HOST";
/**
* TCP port of the broker. Defaults to 5671 when ssl is used, and 5672 when not.
* Note, that this should be an Integer
*/
public static final String PORT = "PORT";
/**
* Exchange to connect to. This is only used by the consumer, the publisher uses
* the message type for publication. Defaults to SPAT
*/
public static final String EXCHANGE = "EXCHANGE";
/**
* Use SSL for the connection. Note, that although SSL is used, certificates are
* not validated. Defaults to true. It should be a Boolean
*/
public static final String USESSL = "USESSL";
protected Connection connection;
protected Channel channel;
protected Map<String, Object> props = new HashMap<>();
public IF2Client() {
props.put(HOST, "localhost");
props.put(PORT, 5671);
props.put(USER, "prosumer");
props.put(PASSWORD, "prosumerpw");
props.put(VIRTUALHOST, "test");
props.put(EXCHANGE, "SPAT");
props.put(USESSL, true);
}
public IF2Client(Map<String, Object> properties) {
this();
/* Change the default port, based on whether SSL is requested */
if (properties.containsKey(USESSL) && !(Boolean) properties.get(USESSL))
props.put(PORT, 5672);
this.props.putAll(properties);
}
private String reconstructUri(ConnectionFactory factory) {
StringBuffer buf = new StringBuffer();
buf.append(("amqp" + (factory.isSSL() ? "s" : "") + "://"));
buf.append(factory.getUsername() + ":" + factory.getPassword() + "@");
buf.append(factory.getHost() + ":" + factory.getPort() + "/");
if (!factory.getVirtualHost().equals("/"))
buf.append("/" + factory.getVirtualHost());
return buf.toString();
}
/**
* Connect to the broker, and generate a channel.
*
* @throws NoSuchAlgorithmException
* @throws KeyManagementException
*/
public void connect() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(props.get(HOST).toString());
factory.setUsername(props.get(USER).toString());
factory.setPassword(props.get(PASSWORD).toString());
if (props.get(VIRTUALHOST).toString().length() > 0)
factory.setVirtualHost(props.get(VIRTUALHOST).toString());
factory.setPort(((Number) props.get(PORT)).intValue());
String uri = "not defined yet";
try {
if ((Boolean) props.get(USESSL)) {
factory.useSslProtocol();
}
final String url = uri = reconstructUri(factory);
connection = factory.newConnection();
connection.addShutdownListener((ShutdownSignalException cause) -> {
System.out.println("Connection closed to " + url);
});
System.out.println("Connection opened to " + url);
channel = connection.createChannel();
channel.addShutdownListener((ShutdownSignalException cause) -> {
System.out.println("Channel closed");
});
System.out.println("Channel opened");
} catch (Exception e) {
System.out.println("Exception in connect to " + uri);
disconnect();
throw e;
}
}
/**
* Close the channel, and disconnect
*/
public void disconnect() {
try {
if (channel != null)
channel.abort();
if (connection != null)
connection.abort();
System.out.println("Disconnected");
} catch (IOException e) {
e.printStackTrace();
} finally {
connection = null;
channel = null;
}
}
}