1
1
package net .delirius .jmeter .backendlistener .elasticsearch ;
2
2
3
- import java .net . InetAddress ;
3
+ import java .io . IOException ;
4
4
import java .text .ParseException ;
5
5
import java .text .SimpleDateFormat ;
6
6
import java .time .LocalDateTime ;
13
13
14
14
import org .apache .commons .io .IOUtils ;
15
15
import org .apache .commons .lang .StringUtils ;
16
+ import org .apache .http .HttpHost ;
16
17
import org .apache .jmeter .assertions .AssertionResult ;
17
18
import org .apache .jmeter .config .Arguments ;
18
19
import org .apache .jmeter .samplers .SampleResult ;
19
20
import org .apache .jmeter .threads .JMeterContextService ;
20
21
import org .apache .jmeter .util .JMeterUtils ;
21
22
import org .apache .jmeter .visualizers .backend .AbstractBackendListenerClient ;
22
23
import org .apache .jmeter .visualizers .backend .BackendListenerContext ;
23
- import org .elasticsearch .action .bulk .BulkRequestBuilder ;
24
+ import org .elasticsearch .action .bulk .BulkRequest ;
24
25
import org .elasticsearch .action .bulk .BulkResponse ;
25
- import org .elasticsearch .common .settings .Settings ;
26
- import org .elasticsearch .common .transport .InetSocketTransportAddress ;
26
+ import org .elasticsearch .action .index .IndexRequest ;
27
+ import org .elasticsearch .client .RestClient ;
28
+ import org .elasticsearch .client .RestHighLevelClient ;
27
29
import org .elasticsearch .common .unit .TimeValue ;
28
30
import org .elasticsearch .common .xcontent .XContentType ;
29
- import org .elasticsearch .transport .client .PreBuiltTransportClient ;
30
31
import org .slf4j .Logger ;
31
32
import org .slf4j .LoggerFactory ;
32
33
38
39
* @source_2: https://github.com/zumo64/ELK_POC
39
40
*/
40
41
public class ElasticsearchBackend extends AbstractBackendListenerClient {
41
- private static final String BUILD_NUMBER = "BuildNumber" ;
42
+ private static final String BUILD_NUMBER = "BuildNumber" ;
43
+ private static final String ES_SCHEME = "es.scheme" ;
42
44
private static final String ES_HOST = "es.host" ;
43
45
private static final String ES_PORT = "es.transport.port" ;
44
46
private static final String ES_INDEX = "es.index" ;
45
47
private static final String ES_TIMESTAMP = "es.timestamp" ;
46
48
private static final String ES_STATUS_CODE = "es.status.code" ;
47
- private static final String ES_CLUSTER = "es.cluster" ;
48
49
private static final String ES_BULK_SIZE = "es.bulk.size" ;
49
50
private static final String ES_TIMEOUT_MS = "es.timout.ms" ;
50
51
private static final long DEFAULT_TIMEOUT_MS = 200L ;
51
52
private static final Logger logger = LoggerFactory .getLogger (ElasticsearchBackend .class );
52
53
53
- private PreBuiltTransportClient client ;
54
+ private RestHighLevelClient client ;
54
55
private String index ;
55
56
private int buildNumber ;
56
57
private int bulkSize ;
57
- private BulkRequestBuilder bulkRequest ;
58
+ private BulkRequest bulkRequest ;
58
59
private long timeoutMs ;
59
60
60
61
@ Override
61
62
public Arguments getDefaultParameters () {
62
63
Arguments parameters = new Arguments ();
64
+ parameters .addArgument (ES_SCHEME , "http" );
63
65
parameters .addArgument (ES_HOST , null );
64
- parameters .addArgument (ES_PORT , "9300 " );
66
+ parameters .addArgument (ES_PORT , "9200 " );
65
67
parameters .addArgument (ES_INDEX , null );
66
68
parameters .addArgument (ES_TIMESTAMP , "yyyy-MM-dd'T'HH:mm:ss.SSSZZ" );
67
69
parameters .addArgument (ES_STATUS_CODE , "531" );
68
- parameters .addArgument (ES_CLUSTER , "elasticsearch" );
69
70
parameters .addArgument (ES_BULK_SIZE , "100" );
70
71
parameters .addArgument (ES_TIMEOUT_MS , Long .toString (DEFAULT_TIMEOUT_MS ));
71
72
return parameters ;
@@ -80,13 +81,17 @@ public void setupTest(BackendListenerContext context) throws Exception {
80
81
this .buildNumber = (JMeterUtils .getProperty (ElasticsearchBackend .BUILD_NUMBER ) != null
81
82
&& JMeterUtils .getProperty (ElasticsearchBackend .BUILD_NUMBER ).trim () != "" )
82
83
? Integer .parseInt (JMeterUtils .getProperty (ElasticsearchBackend .BUILD_NUMBER )) : 0 ;
83
- Settings settings = Settings .builder ().put ("cluster.name" , context .getParameter (ES_CLUSTER )).build ();
84
84
String host = context .getParameter (ES_HOST );
85
85
int port = Integer .parseInt (context .getParameter (ES_PORT ));
86
- this .client = new PreBuiltTransportClient (settings );
87
- this .client .addTransportAddress (
88
- new InetSocketTransportAddress (InetAddress .getByName (host ), port ));
89
- this .bulkRequest = this .client .prepareBulk ();
86
+ this .client = new RestHighLevelClient (
87
+ RestClient .builder (
88
+ new HttpHost (host , port , context .getParameter (ES_SCHEME , "http" )))
89
+ .setRequestConfigCallback (requestConfigBuilder ->
90
+ requestConfigBuilder
91
+ .setConnectTimeout (5000 )
92
+ .setSocketTimeout ((int )timeoutMs ))
93
+ .setMaxRetryTimeoutMillis (60000 ));
94
+ this .bulkRequest = new BulkRequest ().timeout (TimeValue .timeValueMillis (timeoutMs ));
90
95
super .setupTest (context );
91
96
} catch (Exception e ) {
92
97
throw new IllegalStateException ("Unable to setup connectivity to ES" , e );
@@ -96,37 +101,43 @@ public void setupTest(BackendListenerContext context) throws Exception {
96
101
@ Override
97
102
public void handleSampleResults (List <SampleResult > results , BackendListenerContext context ) {
98
103
for (SampleResult sr : results ) {
99
- this .bulkRequest .add (this .client .prepareIndex (this .index , "SampleResult" ).setSource (this .getElasticData (sr , context ), XContentType .JSON ));
104
+ this .bulkRequest .add (
105
+ new IndexRequest (this .index , "SampleResult" ).source (this .getElasticData (sr , context ),
106
+ XContentType .JSON ));
100
107
}
101
108
102
109
if (this .bulkRequest .numberOfActions () >= this .bulkSize ) {
103
110
try {
104
- BulkResponse bulkResponse = this .bulkRequest .get (TimeValue .timeValueMillis (timeoutMs ));
105
- if (bulkResponse .hasFailures ()) {
106
- if (logger .isErrorEnabled ()) {
107
- logger .error ("Failed to write a result on {}: {}" ,
108
- index , bulkResponse .buildFailureMessage ());
109
- }
110
- } else {
111
- logger .debug ("Wrote {} results in {}." ,
112
- index );
113
- }
111
+ sendRequest (bulkRequest );
114
112
} catch (Exception e ) {
115
113
logger .error ("Error sending data to ES, data will be lost" , e );
116
114
} finally {
117
- this .bulkRequest = this . client . prepareBulk ( );
115
+ this .bulkRequest = new BulkRequest (). timeout ( TimeValue . timeValueMillis ( timeoutMs ) );
118
116
}
119
117
}
120
118
}
121
119
122
120
@ Override
123
121
public void teardownTest (BackendListenerContext context ) throws Exception {
124
122
if (this .bulkRequest .numberOfActions () > 0 ) {
125
- this . bulkRequest . get ( );
123
+ sendRequest ( bulkRequest );
126
124
}
127
125
IOUtils .closeQuietly (client );
128
126
super .teardownTest (context );
129
127
}
128
+
129
+ private void sendRequest (BulkRequest bulkRequest ) throws IOException {
130
+ BulkResponse bulkResponse = this .client .bulk (bulkRequest );
131
+ if (bulkResponse .hasFailures ()) {
132
+ if (logger .isErrorEnabled ()) {
133
+ logger .error ("Failed to write a result on {}: {}" ,
134
+ index , bulkResponse .buildFailureMessage ());
135
+ }
136
+ } else {
137
+ logger .debug ("Wrote {} results in {}." ,
138
+ index );
139
+ }
140
+ }
130
141
131
142
public Map <String , Object > getElasticData (SampleResult sr , BackendListenerContext context ) {
132
143
HashMap <String , Object > jsonObject = new HashMap <>();
@@ -172,7 +183,7 @@ public Map<String, Object> getElasticData(SampleResult sr, BackendListenerContex
172
183
//all assertions
173
184
AssertionResult [] assertionResults = sr .getAssertionResults ();
174
185
if (assertionResults != null ) {
175
- HashMap <String , Object > [] assertionArray = new HashMap [assertionResults .length ];
186
+ Map <String , Object >[] assertionArray = new HashMap [assertionResults .length ];
176
187
Integer i = 0 ;
177
188
for (AssertionResult assertionResult : assertionResults ) {
178
189
HashMap <String , Object > assertionMap = new HashMap <>();
0 commit comments