22
22
import org .apache .jmeter .visualizers .backend .AbstractBackendListenerClient ;
23
23
import org .apache .jmeter .visualizers .backend .BackendListenerContext ;
24
24
import org .elasticsearch .action .bulk .BulkRequestBuilder ;
25
- import org .elasticsearch .client . Client ;
25
+ import org .elasticsearch .action . bulk . BulkResponse ;
26
26
import org .elasticsearch .common .settings .Settings ;
27
27
import org .elasticsearch .common .transport .InetSocketTransportAddress ;
28
28
import org .elasticsearch .common .unit .TimeValue ;
@@ -51,7 +51,7 @@ public class ElasticsearchBackend extends AbstractBackendListenerClient {
51
51
private static final long DEFAULT_TIMEOUT_MS = 200L ;
52
52
private static final Logger logger = LoggerFactory .getLogger (ElasticsearchBackend .class );
53
53
54
- private Client client ;
54
+ private PreBuiltTransportClient client ;
55
55
private String index ;
56
56
private int buildNumber ;
57
57
private int bulkSize ;
@@ -84,7 +84,8 @@ public void setupTest(BackendListenerContext context) throws Exception {
84
84
Settings settings = Settings .builder ().put ("cluster.name" , context .getParameter (ES_CLUSTER )).build ();
85
85
String host = context .getParameter (ES_HOST );
86
86
int port = Integer .parseInt (context .getParameter (ES_PORT ));
87
- this .client = new PreBuiltTransportClient (settings ).addTransportAddress (
87
+ this .client = new PreBuiltTransportClient (settings );
88
+ this .client .addTransportAddress (
88
89
new InetSocketTransportAddress (InetAddress .getByName (host ), port ));
89
90
this .bulkRequest = this .client .prepareBulk ();
90
91
super .setupTest (context );
@@ -101,7 +102,16 @@ public void handleSampleResults(List<SampleResult> results, BackendListenerConte
101
102
102
103
if (this .bulkRequest .numberOfActions () >= this .bulkSize ) {
103
104
try {
104
- this .bulkRequest .get (TimeValue .timeValueMillis (timeoutMs ));
105
+ BulkResponse bulkResponse = this .bulkRequest .get (TimeValue .timeValueMillis (timeoutMs ));
106
+ if (bulkResponse .hasFailures ()) {
107
+ if (logger .isErrorEnabled ()) {
108
+ logger .error ("Failed to write a result on {}: {}" ,
109
+ index , bulkResponse .buildFailureMessage ());
110
+ }
111
+ } else {
112
+ logger .debug ("Wrote {} results in {}." ,
113
+ index );
114
+ }
105
115
} catch (Exception e ) {
106
116
logger .error ("Error sending data to ES, data will be lost" , e );
107
117
} finally {
0 commit comments