300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > ElasticSearch工具类封装

ElasticSearch工具类封装

时间:2024-08-12 21:29:02

相关推荐

ElasticSearch工具类封装

最近在项目中有看到一种比较实用的ElasticSearch工具类封装方式,特此记录便于日后查阅。

1、controller层

@RequestMapping(value = "/uri/page", method = RequestMethod.GET)public DataResult page(@RequestParam(name = "pageIndex") Integer pageIndex,@RequestParam(name = "pageSize") Integer pageSize,@RequestParam(name = "uri") String uri,@RequestParam(name = "api") String api,@RequestParam(name = "method") String method) {try {List<Filter> filters = new ArrayList();if(StringUtils.isNotBlank(uri)){filters.add(new Filter("uri", FilterEnum.INCLUDE_NO_SPLIT.getType(), uri));}if(StringUtils.isNotBlank(api)){filters.add(new Filter("api", FilterEnum.EQUAL.getType(), api.toLowerCase()));}if(StringUtils.isNotBlank(method)){filters.add(new Filter("method", FilterEnum.EQUAL.getType(), method.toLowerCase()));}return new DataResult(SystemStatusCode.NORMAL.getValue(), "ok", topStatisticsService.findTopUriByCondition(filters, pageIndex, pageSize));} catch (Exception e) {LOGGER.error("获取uri top分页数据失败", e);return new DataResult(SystemStatusCode.ERROR.getValue(), "fail", e.getMessage());}}

2、FilterEnum

package com.fenqile.mon;/*** @author sherrycao* @version /3/12*/public enum FilterEnum {INCLUDE(1, "包含"),EXCLUDE(2, "不包含"),EQUAL(3, "等于"),UNEQUAL(4, "不等于"),INCLUDE_NO_SPLIT(1, "包含,但不分词"),EQUAL_NO_SPLIT(5, "等于,但不分词");private int type;private String desc;FilterEnum(int type, String desc) {this.type = type;this.desc = desc;}public int getType() {return type;}public void setType(int type) {this.type = type;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}}

3、Service层

public PageInfo<TreeComplexVo> findTopUriByCondition(List<Filter> filters, int pageIndex, int pageSize) throws Exception {PageInfo<TreeComplexVo> pageInfo = new PageInfo<>();// 获取所有indexList<String> indices = Arrays.asList(EsConstants.USER_TOP_URI_INDEX_PREFIX);for (String index : indices) {elasticSearchDao.createIndex(EsClient.getClient(), index);}SearchSourceBuilder searchSourceBuilder = structSearchSourceBuilder(filters, pageIndex, pageSize);searchSourceBuilder.sort("complexScore", SortOrder.DESC);List<TreeComplexVo> treeComplexVo = new ArrayList();String tableName = topProviderService.getEsTableName(EsConstants.TOP_URI, topProviderService.getEsTableIndex(EsConstants.TOP_URI));SearchResult searchResult = elasticSearchDao.search(EsClient.getClient(), indices, tableName, searchSourceBuilder.toString());List<SearchResult.Hit<JSONObject, Void>> hits = searchResult.getHits(JSONObject.class);for (SearchResult.Hit<JSONObject, Void> hit : hits) {treeComplexVo.add(JSON.parseObject(hit.source.toString(), TreeComplexVo.class));}Long total = searchResult.getTotal();pageInfo.setTotal(total);pageInfo.setPageNum(pageIndex);pageInfo.setPageSize(pageSize);pageInfo.setList(treeComplexVo);Long num = total % pageSize;int page;if (num == 0) {page = (int) (total / pageSize);} else {page = (int) (total / pageSize) + 1;}pageInfo.setPages(page);// 生成queryreturn pageInfo;}@Overridepublic Boolean loadServiceCount( int pageSize, Map<String, Integer> appMethodCountMap) {//计算新的表索引,这样轮流覆盖,暂时做0,1轮训Integer newIndex = getEsTableIndex(EsConstants.TOP_SERVICE)+1;String tableName = getEsTableName(EsConstants.TOP_SERVICE, newIndex);try{//清理旧数据elasticSearchDao.deleteDocAll(EsClient.getClient(), EsConstants.USER_TOP_PROVIDER_INDEX_PREFIX, tableName);}catch (Exception e){LOGGER.error("清理es数据失败", e);}Map<String, ServiceCountVo> maps = new HashMap();for(EnvironmentType environmentType: EnvironmentType.values()){try{//切换当前的zkServiceService serviceService = (ServiceService)GovernanceConfig.getEnvServiceMap().get(environmentType.getType()).get(SERVICE_PROVIDER_NAME);int pageIndex = 0;int resultSize = 0;do {//查询分页数据pageIndex +=1;PageInfo pageInfo = serviceService.getServiceCount(pageIndex, pageSize);resultSize = appendSingleZkServiceCount(maps, environmentType, pageInfo, appMethodCountMap);//分页数据刚好是指定分页大小就继续查询}while(resultSize==pageSize);}catch(Exception e){LOGGER.error("分页获取zk数据失败", e);}}/*** 分页插入数据到es*/List results = new ArrayList(maps.values());int batchSize = results.size()%ES_STORE_BATCH_SIZE==0?results.size()/ES_STORE_BATCH_SIZE:1+results.size()/ES_STORE_BATCH_SIZE;for(int i=1;i<=batchSize;i++){int start = Math.max(0, i-1)*ES_STORE_BATCH_SIZE;int end = Math.min(results.size(), start+ES_STORE_BATCH_SIZE);elasticSearchDao.insertDocuments(EsClient.getClient(), results.subList(start, end), EsConstants.USER_TOP_PROVIDER_INDEX_PREFIX, tableName);}//成功后更新表索引,方便查询使用setEsTableIndex(EsConstants.TOP_SERVICE, newIndex);return true;}

4、EsClient层

package com.fenqile.sgp.web.config;import com.google.gson.GsonBuilder;import io.searchbox.client.JestClient;import io.searchbox.client.JestClientFactory;import io.searchbox.client.config.HttpClientConfig;import com.fenqile.sgp.business.helper.HippoHelper;/*** @author sherrycao* @version /3/6*/public class EsClient {private static JestClient client;private static JestClient accessLogClient;private static JestClient businessLogClient;private EsClient() {}private static void build() {JestClientFactory factory = new JestClientFactory();factory.setHttpClientConfig(new HttpClientConfig.Builder(HippoHelper.getEsUrls()).multiThreaded(true)//一个route 默认不超过2个连接 路由是指连接到某个远程注解的个数。总连接数=route个数 * defaultMaxTotalConnectionPerRoute.defaultMaxTotalConnectionPerRoute(2)//所有route连接总数.maxTotalConnection(2).connTimeout(10000).readTimeout(10000).gson(new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create()).build());client = factory.getObject();}private static void buildAccessLogClient() {JestClientFactory factory = new JestClientFactory();factory.setHttpClientConfig(new HttpClientConfig.Builder(HippoHelper.getEsAccessLogUrls()).defaultCredentials("elastic","6018C23DD614E02D").multiThreaded(true)//一个route 默认不超过2个连接 路由是指连接到某个远程注解的个数。总连接数=route个数 * defaultMaxTotalConnectionPerRoute.defaultMaxTotalConnectionPerRoute(2)//所有route连接总数.maxTotalConnection(2).connTimeout(20000).readTimeout(20000).gson(new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create()).build());accessLogClient = factory.getObject();}private static void buildBusinessLogClient() {JestClientFactory factory = new JestClientFactory();factory.setHttpClientConfig(new HttpClientConfig.Builder(HippoHelper.getEsBusinessLogUrls()).defaultCredentials("elastic","6018C23DD614E02D").multiThreaded(true)//一个route 默认不超过2个连接 路由是指连接到某个远程注解的个数。总连接数=route个数 * defaultMaxTotalConnectionPerRoute.defaultMaxTotalConnectionPerRoute(2)//所有route连接总数.maxTotalConnection(2).connTimeout(20000).readTimeout(20000).gson(new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create()).build());businessLogClient = factory.getObject();}public static synchronized JestClient getClient() {if (client == null) {build();}return client;}public static synchronized JestClient getAccessLogClient() {if (accessLogClient == null) {buildAccessLogClient();}return accessLogClient;}public static synchronized JestClient getBusinessLogClient() {if (businessLogClient == null) {buildBusinessLogClient();}return businessLogClient;}}

5、ElasticSearchDao层

package com.fenqile.sgp.business.dao;import io.searchbox.client.JestClient;import io.searchbox.client.JestResult;import io.searchbox.core.*;import io.searchbox.indices.CreateIndex;import io.searchbox.indices.DeleteIndex;import org.elasticsearch.index.query.BoolQueryBuilder;import org.elasticsearch.index.query.QueryBuilders;import org.elasticsearch.search.builder.SearchSourceBuilder;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Service;import java.io.IOException;import java.util.Iterator;import java.util.List;import java.util.Map;/*** @author sherrycao* @version /3/11*/@Servicepublic class ElasticSearchDao {private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchDao.class);/*** 插入文档* @param document* @param indexName* @param typeName*/public void insertDocument(JestClient client, Object document, String indexName, String typeName) {try {Bulk.Builder bulk = new Bulk.Builder().defaultIndex(indexName).defaultType(typeName);Index index = new Index.Builder(document).build();bulk.addAction(index);BulkResult bulkResult = client.execute(bulk.build());bulkResult.isSucceeded();} catch (IOException e) {LOGGER.error("写入es异常, indexName {}, typeName {}", indexName, typeName);LOGGER.error("", e);}}/*** 批量插入文档* @param documents* @param indexName* @param typeName*/public void insertDocuments(JestClient client, List<Object> documents, String indexName, String typeName) {Bulk.Builder bulk = new Bulk.Builder().defaultIndex(indexName).defaultType(typeName);for (Object document : documents) {Index index = new Index.Builder(document).build();bulk.addAction(index);}try {BulkResult bulkResult = client.execute(bulk.build());bulkResult.isSucceeded();} catch (IOException e) {LOGGER.error("批量写入es异常, indexName {}, typeName {}", indexName, typeName);LOGGER.error("", e);}}/*** 指定id* @param client* @param documentMap* @param indexName* @param typeName*/public void insertDocuments(JestClient client, Map<String, Object> documentMap, String indexName, String typeName) {Bulk.Builder bulk = new Bulk.Builder().defaultIndex(indexName).defaultType(typeName);Iterator documentEntry = documentMap.entrySet().iterator();while(documentEntry.hasNext()){Map.Entry<String, Object> entry = (Map.Entry)documentEntry.next();Index index = new Index.Builder(entry.getValue()).id(entry.getKey()).build();bulk.addAction(index);}try {BulkResult bulkResult = client.execute(bulk.build());bulkResult.isSucceeded();} catch (IOException e) {LOGGER.error("批量写入es异常, indexName {}, typeName {}", indexName, typeName);LOGGER.error("", e);}}public SearchResult search(JestClient client, String indexName, String typeName, String query) throws Exception {Search search = new Search.Builder(query).addIndex(indexName).addType(typeName).build();return client.execute(search);}/*** 使用type查询* @param client* @param indexName* @param typeName* @param query* @return* @throws Exception*/public SearchResult search(JestClient client, List<String> indexName, String typeName, String query) throws Exception {LOGGER.info(query);Search search = new Search.Builder(query).addIndices(indexName).addType(typeName).build();LOGGER.info(search.toString());LOGGER.info(search.getPathToResult());return client.execute(search);}/*** 不使用type查询* @param client* @param indexName* @param query* @return* @throws Exception*/public SearchResult search(JestClient client, List<String> indexName, String query) throws Exception {LOGGER.info(query);Search search = new Search.Builder(query).addIndices(indexName).build();LOGGER.info(search.toString());LOGGER.info(search.getPathToResult());return client.execute(search);}public Boolean createIndex(JestClient client, String indexName) {try {JestResult jr = client.execute(new CreateIndex.Builder(indexName).build());return jr.isSucceeded();} catch (IOException e) {LOGGER.error("", e);}return false;}public boolean deleteDoc(JestClient client,String indexId, String indexName, String indexType) {Delete.Builder builder = new Delete.Builder(indexId);builder.refresh(true);Delete delete = builder.index(indexName).type(indexType).build();try {JestResult result = client.execute(delete);if (result != null && !result.isSucceeded()) {throw new RuntimeException(result.getErrorMessage()+"删除文档失败!");}} catch (Exception e) {LOGGER.error("",e);return false;}return true;}public boolean deleteDocAll(JestClient client,String indexName, String indexType) {SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();searchSourceBuilder.query(boolQueryBuilder);DeleteByQuery.Builder builder = new DeleteByQuery.Builder(searchSourceBuilder.toString());builder.refresh(true);DeleteByQuery deleteByQuery = builder.addIndex(indexName).addType(indexType).build();try {JestResult result = client.execute(deleteByQuery);if (result != null && !result.isSucceeded()) {throw new RuntimeException(result.getErrorMessage()+"删除文档失败!");}} catch (Exception e) {LOGGER.error("",e);return false;}return true;}/*** 删除类型* @param indexName* @param indexType*/public boolean deleteType(JestClient client, String indexName, String indexType) {DeleteIndex deleteIndex = new DeleteIndex.Builder(indexName).type(indexType).build();try {JestResult result = client.execute(deleteIndex);if (result != null && result.isSucceeded()) {throw new RuntimeException(result.getErrorMessage()+"删除类型失败!");}} catch (Exception e) {LOGGER.error("",e);return false;}return true;}/*** 删除索引* @param indexName*/public boolean deleteIndex(JestClient client, String indexName) {DeleteIndex deleteIndex = new DeleteIndex.Builder(indexName).build();try {JestResult result = client.execute(deleteIndex);if (result != null && result.isSucceeded()) {throw new RuntimeException(result.getErrorMessage()+"删除索引失败!");}} catch (Exception e) {LOGGER.error("",e);return false;}return true;}/*** 插入或更新文档* @param id* @param indexObject* @param indexName* @param indexType* @return*/public boolean insertOrUpdateDoc(JestClient client,String id, Object indexObject, String indexName, String indexType) {Index.Builder builder = new Index.Builder(indexObject);builder.id(id);builder.refresh(true);Index index = builder.index(indexName).type(indexType).build();try {JestResult result = client.execute(index);if (result != null && !result.isSucceeded()) {throw new RuntimeException(result.getErrorMessage()+"插入更新索引失败!");}} catch (Exception e) {LOGGER.error("",e);return false;}return true;}}

到此ElasticSearch工具类封装介绍完成。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。