Skip to content

Commit 0b89803

Browse files
Merge pull request #59 from ShyamKunda/master
Added type for bulk requests for Elastic search versions less than 7
2 parents 6035ee1 + e52aaa2 commit 0b89803

File tree

2 files changed

+48
-7
lines changed

2 files changed

+48
-7
lines changed

src/main/java/io/github/delirius325/jmeter/backendlistener/elasticsearch/ElasticSearchMetricSender.java

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,13 @@
1010
import org.apache.http.HttpStatus;
1111
import org.apache.http.entity.ContentType;
1212
import org.apache.http.nio.entity.NStringEntity;
13+
import org.apache.http.util.EntityUtils;
1314
import org.elasticsearch.client.Request;
1415
import org.elasticsearch.client.RequestOptions;
1516
import org.elasticsearch.client.Response;
1617
import org.elasticsearch.client.RestClient;
18+
import org.json.simple.JSONObject;
19+
import org.json.simple.parser.JSONParser;
1720
import org.slf4j.Logger;
1821
import org.slf4j.LoggerFactory;
1922

@@ -95,16 +98,51 @@ public void createIndex() throws IOException {
9598
logger.info("Index already exists!");
9699
}
97100
}
101+
102+
public int getElasticSearchVersion() {
103+
Request request = new Request("GET", "/" );
104+
int elasticSearchVersion = -1;
105+
try {
106+
Response response = this.client.performRequest(setAuthorizationHeader(request));
107+
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK && logger.isErrorEnabled()) {
108+
logger.error("Unable to perform request to ElasticSearch engine", this.esIndex);
109+
}else {
110+
String responseBody = EntityUtils.toString(response.getEntity());
111+
JSONParser parser = new JSONParser();
112+
JSONObject elasticSearchConfig = (JSONObject) parser.parse(responseBody);
113+
JSONObject version = (JSONObject)elasticSearchConfig.get("version");
114+
String elasticVersion = version.get("number").toString();
115+
elasticSearchVersion = Integer.parseInt(elasticVersion.split("\\.")[0]);
116+
117+
}
118+
} catch (Exception e) {
119+
if (logger.isErrorEnabled()) {
120+
logger.error("Exception" + e);
121+
logger.error("ElasticSearch Backend Listener was unable to perform request to the ElasticSearch engine. Check your JMeter console for more info.");
122+
}
123+
}
124+
return elasticSearchVersion;
125+
}
126+
98127

99128
/**
100129
* This method sends the ElasticSearch documents for each document present in the list (metricList). All is being
101130
* sent through the low-level ElasticSearch REST Client.
102131
*/
103-
public void sendRequest() {
104-
Request request = new Request("POST", "/" + this.esIndex + "/_bulk");
105-
StringBuilder bulkRequestBody = new StringBuilder();
106-
String actionMetaData = String.format(SEND_BULK_REQUEST, this.esIndex);
107-
132+
public void sendRequest(int elasticSearchVersionPrefix) {
133+
logger.info("Elastic Search version : " + Integer.toString(elasticSearchVersionPrefix));
134+
Request request;
135+
StringBuilder bulkRequestBody = new StringBuilder();
136+
String actionMetaData;
137+
if(elasticSearchVersionPrefix < 7) {
138+
request = new Request("POST", "/" + this.esIndex + "/SampleResult/_bulk");
139+
actionMetaData = String.format(SEND_BULK_REQUEST, this.esIndex, "SampleResult");
140+
}
141+
else {
142+
request = new Request("POST", "/" + this.esIndex + "/_bulk");
143+
actionMetaData = String.format(SEND_BULK_REQUEST, this.esIndex);
144+
}
145+
108146
for (String metric : this.metricList) {
109147
bulkRequestBody.append(actionMetaData);
110148
bulkRequestBody.append(metric);
@@ -123,6 +161,7 @@ public void sendRequest() {
123161
} catch (Exception e) {
124162
if (logger.isErrorEnabled()) {
125163
logger.error("Exception" + e);
164+
logger.error("Elastic Search Request End Point: " + request.getEndpoint());
126165
logger.error("ElasticSearch Backend Listener was unable to perform request to the ElasticSearch engine. Check your JMeter console for more info.");
127166
}
128167
}

src/main/java/io/github/delirius325/jmeter/backendlistener/elasticsearch/ElasticsearchBackendClient.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ public class ElasticsearchBackendClient extends AbstractBackendListenerClient {
8080
private Set<String> fields;
8181
private int buildNumber;
8282
private int bulkSize;
83+
private int esVersion;
8384
private long timeoutMs;
8485

8586
public ElasticsearchBackendClient() {
@@ -136,6 +137,7 @@ public void onFailure(Node node) {
136137
context.getParameter(ES_AUTH_USER), context.getParameter(ES_AUTH_PWD),
137138
context.getParameter(ES_AWS_ENDPOINT));
138139
this.sender.createIndex();
140+
this.esVersion = sender.getElasticSearchVersion();
139141

140142
checkTestMode(context.getParameter(ES_TEST_MODE));
141143
super.setupTest(context);
@@ -202,7 +204,7 @@ public void handleSampleResults(List<SampleResult> results, BackendListenerConte
202204

203205
if (this.sender.getListSize() >= this.bulkSize) {
204206
try {
205-
this.sender.sendRequest();
207+
this.sender.sendRequest(this.esVersion);
206208
} catch (Exception e) {
207209
logger.error("Error occured while sending bulk request.", e);
208210
} finally {
@@ -214,7 +216,7 @@ public void handleSampleResults(List<SampleResult> results, BackendListenerConte
214216
@Override
215217
public void teardownTest(BackendListenerContext context) throws Exception {
216218
if (this.sender.getListSize() > 0) {
217-
this.sender.sendRequest();
219+
this.sender.sendRequest(this.esVersion);
218220
}
219221
this.sender.closeConnection();
220222
super.teardownTest(context);

0 commit comments

Comments
 (0)