這篇文章主要講解了“Elasticsearch reindex及Java使用sliceScorll優化查詢的方法”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Elasticsearch reindex及Java使用sliceScorll優化查詢的方法”吧!
Reindex會將一個索引的數據復制到另一個已存在的索引,但是并不會復制原索引的mapping(映射)、shard(分片)、replicas(副本)等配置信息。
簡單實例如下
POST _reindex
{
"source": {
"remote": {
"host": "http://otherhost:9200", // 遠程es的ip和port列表
"socket_timeout": "1m",
"connect_timeout": "10s" // 超時時間設置
},
"index": "my_index_name", // 源索引名稱
"query": { // 滿足條件的數據
"match": {
"test": "data"
}
}
},
"dest": {
"index": "dest_index_name" // 目標索引名稱
}
}具體詳細的使用參考
ElasticSearch 6.3版本 Document APIs之Reindex API
elasticsearch 基礎 —— ReIndex
在java中對于reindexapi沒有找到,于是作者采用了別名轉換和全Index查詢加上bulk插入的方式對于索引進行遷移。
但是轉移數據實在太慢,所以使用了slice對scorll查詢進行優化
多線程reindex
具體開啟線程數根據Index分片數進行調整,最好和主分片數相同,本例子為五個分片,同時還使用了別名轉換對索引進行無縫銜接避免數據正常插入讀取
//建新索引
createUserRecordIndex(newIndexName, typeName);
//篩選時間
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
RangeQueryBuilder rangeQueryBuilder;
rangeQueryBuilder = QueryBuilders.rangeQuery("createTime")
.gte(DateUtil.format(DateUtil.parse(createBeginDate, DateUtil.LONG_WEB_FORMAT), DateUtil.LONG_WEB_FORMAT))
.lte(DateUtil.format(DateUtil.parse(createEndDate, DateUtil.LONG_WEB_FORMAT), DateUtil.LONG_WEB_FORMAT));
boolQueryBuilder.must(rangeQueryBuilder);
try {
//多線程處理查詢請求
List<Future> list = new ArrayList<>();
for (int i = 0; i < 5; i++) {
SliceBuilder sliceBuilder = new SliceBuilder(i, 5);
SearchResponse response = EsBuildersServiceUtil.getESClient()
.prepareSearch(userRecordAlias)
.setTypes(userRecordType)
.setQuery(boolQueryBuilder)
.setSize(1000).setScroll(new TimeValue(10000))
.slice(sliceBuilder)
.execute()
.actionGet();
SliceQuery sliceQuery = new SliceQuery(newIndexName, typeName, response);
Future submit = threadPoolTaskExecutor.submit(sliceQuery);
list.add(submit);
}
for (Future future : list) {
future.get();
}
} catch (Exception e) {
log.error("reindex error =", e);
throw new MembershipDataException(MembershipDataErrorCode.ES_INDEX_CONVERT_ERROR);
}
try {
//別名轉換
EsBuildersServiceUtil.getESClient().admin().indices().prepareAliases().removeAlias(oldIndexName, userRecordAlias).execute().actionGet();
EsBuildersServiceUtil.getESClient().admin().indices().prepareAliases().addAlias(newIndexName, userRecordAlias).execute().actionGet();
} catch (Exception e) {
log.error(" convertAlias error =", e);
throw new MembershipDataException(MembershipDataErrorCode.ES_ALIASES_CONVERT_ERROR);
}slice線程
class SliceQuery implements Callable {
private String newIndexName;
private String typeName;
private SearchResponse response;
private SliceQuery(String newIndexName, String typeName, SearchResponse response) {
this.newIndexName = newIndexName;
this.typeName = typeName;
this.response = response;
}
@Override
public Void call() {
//獲取總數量
long totalCount = response.getHits().getTotalHits();
//計算總次數,每次搜索數量為分片數*設置的size大小
int page = (int) totalCount / 1000;
operateRecordList(response, newIndexName, typeName);
for (int i = 0; i < page; i++) {
//再次發送請求,并使用上次搜索結果的ScrollId
response = EsBuildersServiceUtil.getESClient().prepareSearchScroll(response.getScrollId())
.setScroll(new TimeValue(10000)).execute()
.actionGet();
operateRecordList(response, newIndexName, typeName);
}
return null;
}
}批量插入
/**
* 從查詢數據中獲取并批量插入Index
*
* @param response
* @param indexName
* @param typeName
*/
private void operateRecordList(SearchResponse response, String indexName, String typeName) {
try {
SearchHits hits = response.getHits();
List<AddUserRecordRequest> list = new ArrayList<>();
for (SearchHit hit : hits) {
String sourceAsString = hit.getSourceAsString();
list.add(JSON.parseObject(sourceAsString, AddUserRecordRequest.class));
}
//批量插入
saveBulkRecord(list, indexName, typeName);
} catch (Exception e) {
log.error("operateRecordList error =", e);
throw new MembershipDataException(MembershipDataErrorCode.ES_DATA_ADD_ERROR);
}
}
/**
* 批量插入
*
* @param list
* @param indexName
* @param typeName
*/
private void saveBulkRecord(List<AddUserRecordRequest> list, String indexName, String typeName) {
try {
BulkRequestBuilder bulkRequest = EsBuildersServiceUtil.getESClient().prepareBulk();
for (AddUserRecordRequest recordRequest : list) {
JSONObject json = JSONObject.fromObject(recordRequest);
bulkRequest.add(EsBuildersServiceUtil.getESClient()
.prepareIndex(indexName, typeName)
.setSource(json));
}
if (list.size() > 0) {
bulkRequest.execute().actionGet();
}
} catch (Exception e) {
log.error("saveBulkRecord error =", e);
throw new MembershipDataException(MembershipDataErrorCode.ES_DATA_ADD_ERROR);
}
}感謝各位的閱讀,以上就是“Elasticsearch reindex及Java使用sliceScorll優化查詢的方法”的內容了,經過本文的學習后,相信大家對Elasticsearch reindex及Java使用sliceScorll優化查詢的方法這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。