From c09a1588cf9fbefc5eab3df41b5082e1aa4d4251 Mon Sep 17 00:00:00 2001 From: Jast <745925668@qq.com> Date: Tue, 7 Jan 2025 13:50:06 +0800 Subject: [PATCH] [Feature]Support ES query index parameters (#254) --- .../ElasticSearchDataSourceChannel.java | 3 +- .../elasticsearch/client/EsRestClient.java | 60 ++++++++++++------- 2 files changed, 41 insertions(+), 22 deletions(-) diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/ElasticSearchDataSourceChannel.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/ElasticSearchDataSourceChannel.java index 77c12f292..29c847202 100644 --- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/ElasticSearchDataSourceChannel.java +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/ElasticSearchDataSourceChannel.java @@ -60,9 +60,10 @@ public List getTables( String database, Map option) { databaseCheck(database); + try (EsRestClient client = EsRestClient.createInstance(ConfigFactory.parseMap(requestParams))) { - return client.listIndex(); + return client.listIndex(option.get("filterName")); } } diff --git a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/client/EsRestClient.java b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/client/EsRestClient.java index 01d20bb28..64cb10027 100644 --- a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/client/EsRestClient.java +++ b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-elasticsearch/src/main/java/org/apache/seatunnel/datasource/plugin/elasticsearch/client/EsRestClient.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.datasource.plugin.elasticsearch.ElasticSearchOptionRule; +import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHost; import org.apache.http.HttpStatus; import org.apache.http.auth.AuthScope; @@ -252,27 +253,7 @@ public void close() { } public List listIndex() { - String endpoint = "/_cat/indices?format=json"; - Request request = new Request("GET", endpoint); - try { - Response response = restClient.performRequest(request); - if (response == null) { - throw new ResponseException("GET " + endpoint + " response null"); - } - if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { - String entity = EntityUtils.toString(response.getEntity()); - return JsonUtils.toList(entity, Map.class).stream() - .map(map -> map.get("index").toString()) - .collect(Collectors.toList()); - } else { - throw new ResponseException( - String.format( - "GET %s response status code=%d", - endpoint, response.getStatusLine().getStatusCode())); - } - } catch (IOException ex) { - throw new ResponseException(ex); - } + return this.listIndex(null); } public void dropIndex(String tableName) { @@ -365,4 +346,41 @@ private static Map getFieldTypeMappingFromProperties(JsonNode pr } return mapping; } + + public List listIndex(String filterName) { + String endpoint = "/_cat/indices?format=json"; + Request request = new Request("GET", endpoint); + try { + Response response = restClient.performRequest(request); + if (response == null) { + throw new ResponseException("GET " + endpoint + " response null"); + } + if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { + String entity = EntityUtils.toString(response.getEntity()); + List indices = + JsonUtils.toList(entity, Map.class).stream() + .map(map -> map.get("index").toString()) + .collect(Collectors.toList()); + + if (StringUtils.isNotEmpty(filterName)) { + indices = + indices.stream() + .filter( + index -> + index.toLowerCase() + .contains(filterName.toLowerCase())) + .collect(Collectors.toList()); + } + + return indices; + } else { + throw new ResponseException( + String.format( + "GET %s response status code=%d", + endpoint, response.getStatusLine().getStatusCode())); + } + } catch (IOException ex) { + throw new ResponseException(ex); + } + } }