Skip to content

Commit 0eb1d74

Browse files
committed
added reactive messaging demo
1 parent 203c3bb commit 0eb1d74

File tree

6 files changed

+217
-33
lines changed

6 files changed

+217
-33
lines changed

README.md

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ src/main
5858
│ │ ├── Trace.java
5959
│ │ ├── TraceTagHolder.java
6060
│ │ └── TraceTag.java
61+
│   ├── reactive [Reactive Messaging]
62+
│   │   └── ReactiveResource.java
6163
│   ├── jpa [拡張機能 JPA/JTA]
6264
│   │   ├── Country.java
6365
│   │   ├── CountryResource.java
@@ -239,18 +241,6 @@ public List<Country> getCountriesWithError(){
239241
| stackTrace | default = false ; Exception発生時にtrace logにstack traceを出力するか否か |
240242

241243

242-
## 変更履歴
243-
244-
|Date | 内容 |
245-
|----------|--------------------------------------|
246-
|2019.12.10| 初版 |
247-
|2019.12.20| Helidon 1.4.1 ベースに更新 |
248-
|2020.01.20| gRPCのデモを追加 |
249-
|2020.03.02| Helidon 1.4.2 ベースに更新 |
250-
|2020.05.08| Helidon 1.4.4 ベースに更新、tracing用アノテーションを追加、testクラス追加 |
251-
|2020.05.13| OpenTracing用のInterceptorの仕様変更(@TraceConfigを廃止) |
252-
|2020.07.06| Helidon 2.0.1 ベースに更新 (Java 11必須) |
253-
254244
---
255245
_Copyright © 2019-2020, Oracle and/or its affiliates. All rights reserved._
256246

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,12 @@
126126
<scope>test</scope>
127127
</dependency>
128128

129+
<!-- support Reactive Messaging / Kafka Connector -->
130+
<dependency>
131+
<groupId>io.helidon.microprofile.messaging</groupId>
132+
<artifactId>helidon-microprofile-messaging</artifactId>
133+
</dependency>
134+
129135
<!-- support gRPC-->
130136
<dependency>
131137
<groupId>io.helidon.microprofile.grpc</groupId>
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package oracle.demo.reactive;
2+
3+
import java.util.Arrays;
4+
import java.util.Arrays;
5+
import java.util.List;
6+
import java.util.concurrent.ExecutorService;
7+
import java.util.concurrent.Flow;
8+
import java.util.concurrent.SubmissionPublisher;
9+
import java.util.logging.Logger;
10+
11+
import javax.enterprise.context.ApplicationScoped;
12+
import javax.inject.Inject;
13+
import javax.ws.rs.DELETE;
14+
import javax.ws.rs.FormParam;
15+
import javax.ws.rs.GET;
16+
import javax.ws.rs.POST;
17+
import javax.ws.rs.PUT;
18+
import javax.ws.rs.Path;
19+
import javax.ws.rs.PathParam;
20+
import javax.ws.rs.Produces;
21+
import javax.ws.rs.QueryParam;
22+
import javax.ws.rs.core.MediaType;
23+
import javax.ws.rs.core.Response;
24+
25+
import org.eclipse.microprofile.reactive.messaging.Incoming;
26+
import org.eclipse.microprofile.reactive.messaging.Outgoing;
27+
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
28+
import org.reactivestreams.FlowAdapters;
29+
import org.reactivestreams.Publisher;
30+
31+
import io.helidon.common.configurable.ThreadPoolSupplier;
32+
import oracle.demo.jpa.Country;
33+
import oracle.demo.jpa.CountryDAO;
34+
35+
/**
36+
* Reactive way to update database via JPA
37+
*
38+
*/
39+
@ApplicationScoped
40+
@Path("/reactive/country")
41+
@Produces(MediaType.APPLICATION_JSON)
42+
public class ReactiveResource {
43+
44+
private final Logger logger = Logger.getLogger(ReactiveResource.class.getName());
45+
46+
private final ExecutorService es = ThreadPoolSupplier.builder().threadNamePrefix("messaging-").build().get();
47+
private final SubmissionPublisher<DaoEvent> publisher = new SubmissionPublisher<>(es, Flow.defaultBufferSize());
48+
49+
@Inject private CountryDAO dao;
50+
51+
private enum Operation {INSERT, UPDATE, DELETE}
52+
53+
private class DaoEvent{
54+
public Operation op;
55+
public Country[] countries;
56+
public DaoEvent(Operation op, Country[] countries){
57+
this.op = op;
58+
this.countries = countries;
59+
}
60+
}
61+
62+
@Outgoing("country-dao")
63+
public Publisher<DaoEvent> preparePublisher() {
64+
return ReactiveStreams.fromPublisher(FlowAdapters.toPublisher(publisher)).buildRs();
65+
}
66+
67+
@Incoming("country-dao")
68+
public void consume(DaoEvent event) {
69+
logger.info("DaoEvent: " + event.op.toString());
70+
switch(event.op){
71+
case INSERT:
72+
dao.insertCountries(event.countries);
73+
break;
74+
case UPDATE:
75+
Arrays.stream(event.countries).forEach(c -> dao.updateCountry(c.countryId, c.countryName));
76+
break;
77+
case DELETE:
78+
Arrays.stream(event.countries).forEach(c -> dao.deleteCountry(c.countryId));
79+
break;
80+
}
81+
}
82+
83+
84+
@POST
85+
@Path("/")
86+
public void insertCountries(Country[] countries) {
87+
publisher.submit(new DaoEvent(Operation.INSERT, countries));
88+
}
89+
90+
@PUT
91+
@Path("/{countryId}")
92+
public void updateCountry(@PathParam("countryId") int countryId, @FormParam("name") String countryName) {
93+
publisher.submit(new DaoEvent(Operation.UPDATE, new Country[]{new Country(countryId, countryName)}));
94+
}
95+
96+
@DELETE
97+
@Path("/{countryId}")
98+
public void deleteCountry(@PathParam("countryId") int countryId) {
99+
publisher.submit(new DaoEvent(Operation.DELETE, new Country[]{new Country(countryId, null)}));
100+
}
101+
102+
103+
}
104+

src/main/resources/logging.properties

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,15 @@
1818
# For more information see $JAVA_HOME/jre/lib/logging.properties
1919

2020
# Send messages to the console
21-
handlers=java.util.logging.ConsoleHandler
21+
handlers=io.helidon.common.HelidonConsoleHandler
22+
23+
# HelidonConsoleHandler uses a SimpleFormatter subclass that replaces "!thread!" with the current thread
24+
java.util.logging.SimpleFormatter.format=%1$tY.%1$tm.%1$td %1$tH:%1$tM:%1$tS %4$s %3$s !thread!: %5$s%6$s%n
2225

2326
# Global default logging level. Can be overriden by specific handlers and loggers
2427
.level=INFO
2528
#.level=FINEST
2629

27-
# Helidon Web Server has a custom log formatter that extends SimpleFormatter.
28-
# It replaces "!thread!" with the current thread name
29-
#java.util.logging.ConsoleHandler.level=INFO
30-
java.util.logging.ConsoleHandler.level=FINEST
31-
java.util.logging.ConsoleHandler.formatter=io.helidon.webserver.WebServerLogFormatter
32-
#java.util.logging.SimpleFormatter.format=%1$tY.%1$tm.%1$td %1$tH:%1$tM:%1$tS %4$s %3$s !thread!: %5$s%6$s%n
33-
java.util.logging.SimpleFormatter.format=%1$tY.%1$tm.%1$td %1$tH:%1$tM:%1$tS %4$s %3$s: %5$s%6$s%n
34-
3530
#Component specific log levels
3631
#io.helidon.webserver.level=INFO
3732
#io.helidon.config.level=INFO

src/test/java/oracle/demo/jpa/CountryResourceTest.java

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
import javax.inject.Inject;
44
import javax.json.JsonArray;
55
import javax.json.JsonObject;
6+
import javax.ws.rs.client.Entity;
67
import javax.ws.rs.client.WebTarget;
8+
import javax.ws.rs.core.Form;
9+
import javax.ws.rs.core.MediaType;
710
import javax.ws.rs.core.Response;
811

912
import org.junit.jupiter.api.Assertions;
@@ -18,29 +21,49 @@ public class CountryResourceTest{
1821

1922
@Test
2023
public void testGetCountries(){
21-
JsonArray jsonArray = webTarget.path("/jpa/country")
22-
.request()
23-
.get(JsonArray.class);
24+
JsonArray jsonArray = webTarget.path("/jpa/country").request().get(JsonArray.class);
2425
Assertions.assertEquals(2, jsonArray.size()); // USA, Japan
2526
}
2627

2728
@Test
2829
public void testGetCountry(){
29-
JsonObject jsonObject = webTarget.path("/jpa/country/1")
30-
.request()
31-
.get(JsonObject.class);
30+
JsonObject jsonObject = webTarget.path("/jpa/country/1").request().get(JsonObject.class);
3231
Assertions.assertEquals(1, jsonObject.getInt("countryId"));
3332
Assertions.assertEquals("USA", jsonObject.getString("countryName"));
3433

35-
jsonObject = webTarget.path("/jpa/country/81")
36-
.request()
37-
.get(JsonObject.class);
34+
jsonObject = webTarget.path("/jpa/country/81").request().get(JsonObject.class);
3835
Assertions.assertEquals(81, jsonObject.getInt("countryId"));
3936
Assertions.assertEquals("Japan", jsonObject.getString("countryName"));
4037

41-
Response response = webTarget.path("/country/99")
42-
.request()
43-
.get();
38+
Response response = webTarget.path("/country/99").request().get();
39+
Assertions.assertEquals(404, response.getStatus());
40+
}
41+
42+
@Test
43+
public void testCRUDCountry(){
44+
45+
// insert
46+
Country[] countries = new Country[]{ new Country(86, "China") };
47+
Response response = webTarget.path("/jpa/country").request().post(Entity.entity(countries, MediaType.APPLICATION_JSON));
48+
Assertions.assertEquals(200, response.getStatus());
49+
50+
JsonObject jsonObject = webTarget.path("/jpa/country/86").request().get(JsonObject.class);
51+
Assertions.assertEquals(86, jsonObject.getInt("countryId"));
52+
Assertions.assertEquals("China", jsonObject.getString("countryName"));
53+
54+
// update
55+
Form form = new Form().param("name", "People’s Republic of China");
56+
response = webTarget.path("/jpa/country/86").request().put(Entity.form(form));
57+
Assertions.assertEquals(204, response.getStatus());
58+
59+
jsonObject = webTarget.path("/jpa/country/86").request().get(JsonObject.class);
60+
Assertions.assertEquals(86, jsonObject.getInt("countryId"));
61+
Assertions.assertEquals("People’s Republic of China", jsonObject.getString("countryName"));
62+
63+
// delete
64+
response = webTarget.path("/jpa/country/86").request().delete();
65+
Assertions.assertEquals(204, response.getStatus());
66+
response = webTarget.path("/country/86").request().get();
4467
Assertions.assertEquals(404, response.getStatus());
4568

4669
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package oracle.demo.reactive;
2+
3+
import java.util.concurrent.TimeUnit;
4+
5+
import javax.inject.Inject;
6+
import javax.json.JsonArray;
7+
import javax.json.JsonObject;
8+
import javax.ws.rs.client.Entity;
9+
import javax.ws.rs.client.WebTarget;
10+
import javax.ws.rs.core.Form;
11+
import javax.ws.rs.core.MediaType;
12+
import javax.ws.rs.core.Response;
13+
14+
import org.junit.jupiter.api.Assertions;
15+
import org.junit.jupiter.api.Test;
16+
17+
import io.helidon.microprofile.tests.junit5.HelidonTest;
18+
import oracle.demo.jpa.Country;
19+
20+
@HelidonTest
21+
public class ReactiveResourceTest{
22+
23+
@Inject private WebTarget webTarget;
24+
25+
@Test
26+
public void testCRUDCountry(){
27+
28+
// warm-up
29+
JsonObject jsonObject = webTarget.path("/jpa/country/1").request().get(JsonObject.class);
30+
Assertions.assertEquals(1, jsonObject.getInt("countryId"));
31+
Assertions.assertEquals("USA", jsonObject.getString("countryName"));
32+
33+
// insert
34+
Country[] countries = new Country[]{ new Country(86, "China") };
35+
Response response = webTarget.path("/reactive/country").request().post(Entity.entity(countries, MediaType.APPLICATION_JSON));
36+
Assertions.assertEquals(204, response.getStatus());
37+
delay(1000);
38+
jsonObject = webTarget.path("/jpa/country/86").request().get(JsonObject.class);
39+
Assertions.assertEquals(86, jsonObject.getInt("countryId"));
40+
Assertions.assertEquals("China", jsonObject.getString("countryName"));
41+
42+
// update
43+
Form form = new Form().param("name", "People’s Republic of China");
44+
response = webTarget.path("/reactive/country/86").request().put(Entity.form(form));
45+
Assertions.assertEquals(204, response.getStatus());
46+
delay(1000);
47+
jsonObject = webTarget.path("/jpa/country/86").request().get(JsonObject.class);
48+
Assertions.assertEquals(86, jsonObject.getInt("countryId"));
49+
Assertions.assertEquals("People’s Republic of China", jsonObject.getString("countryName"));
50+
51+
// delete
52+
response = webTarget.path("/reactive/country/86").request().delete();
53+
Assertions.assertEquals(204, response.getStatus());
54+
delay(1000);
55+
response = webTarget.path("/country/86").request().get();
56+
Assertions.assertEquals(404, response.getStatus());
57+
58+
}
59+
60+
private void delay(long ms){
61+
try{
62+
TimeUnit.MILLISECONDS.sleep(ms);
63+
}catch(Exception e){}
64+
}
65+
66+
}

0 commit comments

Comments
 (0)