Skip to content

Commit c7d0423

Browse files
authored
Merge pull request #18 from iot-dsa-v2/develop
1.1.4
2 parents e1761ea + f2ee0a6 commit c7d0423

File tree

10 files changed

+131
-78
lines changed

10 files changed

+131
-78
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ dependencies {
2626
//implementation 'org.iot-dsa:dslink-v2-websocket:+' //for a locally installed sdk
2727
implementation 'com.github.iot-dsa-v2.sdk-dslink-java-v2:dslink-v2-websocket:+'
2828
implementation 'commons-logging:commons-logging:+'
29-
implementation 'org.apache.commons:commons-lang3:3.6'
29+
implementation 'org.apache.commons:commons-lang3:3.8.1'
3030
implementation 'commons-io:commons-io:2.6'
3131
implementation 'com.squareup.okhttp3:okhttp:3.14.0'
3232
//implementation 'org.apache.cxf:cxf-rt-rs-security-oauth2:3.1.7'

dslink.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "dslink-java-v2-restadapter",
3-
"version": "1.1.2",
3+
"version": "1.1.4",
44
"description": "Java DSA to REST adpater DSLink",
55
"main": "bin/dslink-java-v2-restadapter",
66
"configs": {

src/main/java/org/iot/dsa/dslink/restadapter/AbstractRuleNode.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,28 @@
11
package org.iot.dsa.dslink.restadapter;
22

33
import org.apache.commons.lang3.RandomStringUtils;
4+
import org.iot.dsa.dslink.DSIRequester;
45
import org.iot.dsa.node.DSBool;
56
import org.iot.dsa.node.DSIObject;
67
import org.iot.dsa.node.DSIValue;
78
import org.iot.dsa.node.DSInfo;
89
import org.iot.dsa.node.DSLong;
910
import org.iot.dsa.node.DSNode;
10-
import okhttp3.Response;
1111

1212
public abstract class AbstractRuleNode extends DSNode {
1313
private DSInfo bufferEnabled = getInfo(Constants.USE_BUFFER);
1414
private DSInfo maxBatchSize = getInfo(Constants.MAX_BATCH_SIZE);
1515
private String id;
16+
17+
public DSIRequester getRequester() {
18+
return MainNode.getRequester();
19+
}
1620

1721
public WebClientProxy getWebClientProxy() {
1822
return ((ConnectionNode) getParent()).getWebClientProxy();
1923
}
2024

21-
public abstract void responseRecieved(Response resp, int rowNum);
25+
public abstract void responseRecieved(ResponseWrapper resp, int rowNum);
2226

2327
@Override
2428
protected void declareDefaults() {

src/main/java/org/iot/dsa/dslink/restadapter/MainNode.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,9 @@ protected void declareDefaults() {
8080
@Override
8181
protected void onStarted() {
8282
instance = this;
83-
getLink().getUpstream().subscribe(new DSEventFilter(
83+
getLink().getConnection().subscribe(new DSEventFilter(
8484
((event, node, child, data) -> MainNode.setRequester(
85-
getLink().getUpstream().getRequester())),
85+
getLink().getConnection().getRequester())),
8686
DSLinkConnection.CONNECTED_EVENT,
8787
null));
8888
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package org.iot.dsa.dslink.restadapter;
2+
3+
import java.io.IOException;
4+
import org.iot.dsa.logging.DSLogger;
5+
import org.iot.dsa.time.DSDateTime;
6+
import okhttp3.Response;
7+
8+
public class OkHttpResponseWrapper extends DSLogger implements ResponseWrapper {
9+
10+
private Response response;
11+
private String body = null;
12+
13+
public OkHttpResponseWrapper(Response response) {
14+
this.response = response;
15+
}
16+
17+
public Response getResponse() {
18+
return response;
19+
}
20+
21+
@Override
22+
public int getCode() {
23+
return response.code();
24+
}
25+
26+
@Override
27+
public String getData() {
28+
if (body == null) {
29+
try {
30+
body = response.body().string();
31+
} catch (IOException e) {
32+
warn("", e);
33+
} finally {
34+
response.close();
35+
}
36+
}
37+
return body;
38+
}
39+
40+
@Override
41+
public DSDateTime getTS() {
42+
return DSDateTime.valueOf(response.receivedResponseAtMillis());
43+
}
44+
45+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package org.iot.dsa.dslink.restadapter;
2+
3+
import org.iot.dsa.time.DSDateTime;
4+
5+
public interface ResponseWrapper {
6+
7+
public int getCode();
8+
9+
public String getData();
10+
11+
public DSDateTime getTS();
12+
13+
}

src/main/java/org/iot/dsa/dslink/restadapter/RuleNode.java

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package org.iot.dsa.dslink.restadapter;
22

3-
import java.io.IOException;
43
import org.iot.dsa.node.DSDouble;
54
import org.iot.dsa.node.DSIObject;
65
import org.iot.dsa.node.DSInfo;
@@ -12,7 +11,6 @@
1211
import org.iot.dsa.node.action.ActionResult;
1312
import org.iot.dsa.node.action.DSAction;
1413
import org.iot.dsa.time.DSDateTime;
15-
import okhttp3.Response;
1614

1715
public class RuleNode extends AbstractRuleNode {
1816

@@ -134,25 +132,17 @@ public double getMaxRefreshRate() {
134132
}
135133

136134
@Override
137-
public void responseRecieved(Response resp, int rowNum) {
135+
public void responseRecieved(ResponseWrapper resp, int rowNum) {
138136
if (resp == null) {
139137
put(lastRespCode, DSInt.valueOf(-1));
140138
put(lastRespData, DSString.valueOf("Failed to send update"));
141139
put(lastRespTs, DSString.valueOf(DSDateTime.currentTime()));
142140
} else {
143-
int status = resp.code();
144-
String data = null;
145-
try {
146-
data = resp.body().string();
147-
} catch (IOException e) {
148-
warn("", e);
149-
} finally {
150-
resp.close();
151-
}
152-
141+
int status = resp.getCode();
142+
String data = resp.getData();
153143
put(lastRespCode, DSInt.valueOf(status));
154144
put(lastRespData, DSString.valueOf(data));
155-
put(lastRespTs, DSString.valueOf(DSDateTime.valueOf(resp.receivedResponseAtMillis())));
145+
put(lastRespTs, DSString.valueOf(resp.getTS()));
156146
}
157147
}
158148

src/main/java/org/iot/dsa/dslink/restadapter/RuleTableNode.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package org.iot.dsa.dslink.restadapter;
22

3-
import java.io.IOException;
43
import java.util.ArrayList;
54
import java.util.List;
65
import org.iot.dsa.node.DSElement;
@@ -12,7 +11,6 @@
1211
import org.iot.dsa.node.action.ActionResult;
1312
import org.iot.dsa.node.action.DSAction;
1413
import org.iot.dsa.time.DSDateTime;
15-
import okhttp3.Response;
1614

1715
public class RuleTableNode extends AbstractRuleNode {
1816

@@ -28,7 +26,7 @@ public RuleTableNode(DSList table) {
2826
}
2927

3028
@Override
31-
public void responseRecieved(Response resp, int rowNum) {
29+
public void responseRecieved(ResponseWrapper resp, int rowNum) {
3230
DSList respTable = lastResponses.getElement().toList();
3331
DSMap respMap = respTable.getMap(rowNum);
3432

@@ -37,19 +35,12 @@ public void responseRecieved(Response resp, int rowNum) {
3735
respMap.put(Constants.LAST_RESPONSE_DATA, "Failed to send update");
3836
respMap.put(Constants.LAST_RESPONSE_TS, DSDateTime.currentTime().toString());
3937
} else {
40-
int status = resp.code();
41-
String data = null;
42-
try {
43-
data = resp.body().string();
44-
} catch (IOException e) {
45-
warn("", e);
46-
} finally {
47-
resp.close();
48-
}
38+
int status = resp.getCode();
39+
String data = resp.getData();
4940

5041
respMap.put(Constants.LAST_RESPONSE_CODE, status);
5142
respMap.put(Constants.LAST_RESPONSE_DATA, data);
52-
respMap.put(Constants.LAST_RESPONSE_TS, DSDateTime.valueOf(resp.receivedResponseAtMillis()).toString());
43+
respMap.put(Constants.LAST_RESPONSE_TS, resp.getTS().toString());
5344
}
5445
fire(VALUE_CHANGED_EVENT, lastResponses, null);
5546
}

src/main/java/org/iot/dsa/dslink/restadapter/SubscriptionRule.java

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,22 @@
77
import org.iot.dsa.DSRuntime;
88
import org.iot.dsa.DSRuntime.Timer;
99
import org.iot.dsa.dslink.DSIRequester;
10+
import org.iot.dsa.dslink.requester.AbstractSubscribeHandler;
1011
import org.iot.dsa.dslink.requester.ErrorType;
1112
import org.iot.dsa.dslink.requester.OutboundStream;
12-
import org.iot.dsa.dslink.requester.OutboundSubscribeHandler;
13-
import org.iot.dsa.logging.DSLogger;
1413
import org.iot.dsa.node.DSElement;
1514
import org.iot.dsa.node.DSIValue;
15+
import org.iot.dsa.node.DSLong;
1616
import org.iot.dsa.node.DSMap;
1717
import org.iot.dsa.node.DSMap.Entry;
1818
import org.iot.dsa.node.DSStatus;
1919
import org.iot.dsa.time.DSDateTime;
20-
import org.iot.dsa.util.DSException;
2120
import okhttp3.Response;
2221

23-
public class SubscriptionRule extends DSLogger implements OutboundSubscribeHandler, UpdateSender {
22+
public class SubscriptionRule extends AbstractSubscribeHandler implements UpdateSender {
2423

2524
private AbstractRuleNode node;
26-
private OutboundStream stream;
25+
//private OutboundStream stream;
2726
private long lastUpdateTime = -1;
2827
private Timer future = null;
2928
private SubUpdate storedUpdate;
@@ -41,7 +40,7 @@ public class SubscriptionRule extends DSLogger implements OutboundSubscribeHandl
4140
private long minRefreshRate;
4241
private long maxRefreshRate;
4342

44-
private int rowNum;
43+
protected int rowNum;
4544

4645
public SubscriptionRule(AbstractRuleNode node, String subPath, String restUrl, String method, DSMap urlParameters, String body, double minRefreshRate, double maxRefreshRate, int rowNum) {
4746
this.node = node;
@@ -64,9 +63,9 @@ public void run() {
6463
}
6564

6665
private void init() {
67-
DSIRequester requester = MainNode.getRequester();
66+
DSIRequester requester = node.getRequester();
6867
int qos = 0;
69-
requester.subscribe(this.subPath, qos, this);
68+
requester.subscribe(this.subPath, DSLong.valueOf(qos), this);
7069
}
7170

7271
private void learnPattern() {
@@ -103,25 +102,28 @@ private void learnPattern() {
103102

104103
@Override
105104
public void onClose() {
106-
info("Rule with sub path " + subPath + ": onClose called");
107-
close();
105+
super.onClose();
106+
node.info("Rule with sub path " + subPath + ": onClose called");
107+
// close();
108108
}
109109

110110
@Override
111111
public void onError(ErrorType type, String msg) {
112-
info("Rule with sub path " + subPath + ": onError called with msg " + msg);
113-
DSException.throwRuntime(new RuntimeException(msg));
112+
super.onError(type, msg);
113+
node.info("Rule with sub path " + subPath + ": onError called with msg " + msg);
114+
// DSException.throwRuntime(new RuntimeException(msg));
114115
}
115116

116117
@Override
117118
public void onInit(String path, DSIValue qos, OutboundStream stream) {
118-
info("Rule with sub path " + subPath + ": onInit called");
119-
this.stream = stream;
119+
super.onInit(path, qos, stream);
120+
node.info("Rule with sub path " + subPath + ": onInit called");
121+
//this.stream = stream;
120122
}
121123

122124
@Override
123125
public void onUpdate(DSDateTime dateTime, DSElement value, DSStatus status) {
124-
info("Rule with sub path " + subPath + ": onUpdate called with value " + (value!=null ? value : "Null"));
126+
node.info("Rule with sub path " + subPath + ": onUpdate called with value " + (value!=null ? value : "Null"));
125127
storedUpdate = new SubUpdate(dateTime.toString(), value.toString(), status.toString(), dateTime.timeInMillis());
126128
if (lastUpdateTime < 0 || System.currentTimeMillis() - lastUpdateTime >= minRefreshRate) {
127129
if (future != null) {
@@ -164,7 +166,7 @@ public void run() {
164166
}
165167
}
166168

167-
private boolean sendUpdate(final SubUpdate update) {
169+
public boolean sendUpdate(final SubUpdate update) {
168170

169171
DSMap urlParams = urlParameters.copy();
170172
String body = this.body;
@@ -188,12 +190,13 @@ private boolean sendUpdate(final SubUpdate update) {
188190
body = body.replaceAll(Constants.PLACEHOLDER_BLOCK_END, "");
189191
}
190192

191-
info("Rule with sub path " + subPath + ": sending Update with value " + (update.value!=null ? update.value : "Null"));
193+
node.info("Rule with sub path " + subPath + ": sending Update with value " + (update.value!=null ? update.value : "Null"));
192194

193-
Response resp = restInvoke(urlParams, body);
194-
return resp != null && resp.code() == 200;
195+
ResponseWrapper resp = doSend(urlParams, body);
196+
return resp != null && resp.getCode() / 100 == 2;
195197
}
196198

199+
@Override
197200
public Queue<SubUpdate> sendBatchUpdate(Queue<SubUpdate> updates) {
198201
if (!batchable) {
199202
Queue<SubUpdate> failed = new LinkedList<SubUpdate>();
@@ -227,32 +230,33 @@ public Queue<SubUpdate> sendBatchUpdate(Queue<SubUpdate> updates) {
227230
}
228231
sb.append(suffix);
229232
String body = sb.toString();
230-
info("Rule with sub path " + subPath + ": sending batch update");
233+
node.info("Rule with sub path " + subPath + ": sending batch update");
231234

232-
Response resp = restInvoke(urlParams, body);
233-
if (resp != null && resp.code() == 200) {
235+
ResponseWrapper resp = doSend(urlParams, body);
236+
if (resp != null && resp.getCode() / 100 == 2) {
234237
return null;
235238
} else {
236239
return updatesCopy;
237240
}
238241

239242
}
240243

241-
private Response restInvoke(DSMap urlParams, String body) {
244+
protected ResponseWrapper doSend(DSMap urlParams, String body) {
242245
Response resp = null;
243246
try {
244247
resp = getWebClientProxy().invoke(method, restUrl, urlParams, body);
245248
} catch (Exception e) {
246-
warn("", e);
249+
node.warn("", e);
247250
}
248-
node.responseRecieved(resp, rowNum);
249-
return resp;
251+
ResponseWrapper respWrap = new OkHttpResponseWrapper(resp);
252+
node.responseRecieved(respWrap, rowNum);
253+
return respWrap;
250254
}
251255

252256
public void close() {
253-
if (stream != null && stream.isStreamOpen()) {
254-
info("Rule with sub path " + subPath + ": closing Stream");
255-
stream.closeStream();
257+
if (!isClosed() && getStream() != null) {
258+
node.info("Rule with sub path " + subPath + ": closing Stream");
259+
getStream().closeStream();
256260
}
257261
}
258262

@@ -263,5 +267,9 @@ public int getMaxBatchSize() {
263267
public WebClientProxy getWebClientProxy() {
264268
return node.getWebClientProxy();
265269
}
270+
271+
public AbstractRuleNode getNode() {
272+
return node;
273+
}
266274

267275
}

0 commit comments

Comments
 (0)