02-Elasticsearch的java调用

论坛 期权论坛 脚本     
匿名网站用户   2020-12-20 06:05   23   0

1.Elasticsearch依赖包

<!-- Elastic -->
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>6.8.0</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>6.8.0</version>
    </dependency>

2. Elasticsearch服务类



import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.StringUtils;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.Operator;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.search.MatchQuery.ZeroTermsQuery;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.alibaba.fastjson.JSON;
import com.hengtiansoft.eis.beans.dto.EntNameDto;
import com.hengtiansoft.eis.commons.constant.NumConstant;
import com.hengtiansoft.eis.commons.enums.EErrorCode;
import com.hengtiansoft.eis.commons.exception.BizServiceException;
import com.hengtiansoft.eis.commons.util.MessageUtil;
import com.hengtiansoft.eis.elastic.service.EsClientService;

/**
 * 
 * @ClassName: EsClientService.java
 * @Description: Elastic 调用服务类
 *
 * @version: v1.0.0
 * @author: 
 * @date: 2019年6月18日 
 *
 */
@Service
@SuppressWarnings("all")
public class EsClientServiceImpl implements EsClientService {

    private static final Logger LOGGER = LoggerFactory.getLogger(EsClientServiceImpl.class);

    private static final String INDEX_NAME = "ent_name_01";
    private static final String FIELD_NAME = "name";
    private static final String FIELD_NAME_ONCE = "name_once";
    private static final String TYPE = "doc";
 
    @Autowired
 private RestHighLevelClient client;
    
    
    /**
  * @Description: 将将一条企业名称加入到ES中
  */
    @Override
 public String storeEntName(String  entName, String entNameOnce) throws IOException {
     if (!this.existsIndex(INDEX_NAME)) {
      this.createIndex(INDEX_NAME);
      LOGGER.info("=====>【Elastic中不存在索引:{}, 则创建索引成功!】", INDEX_NAME);
  }
     if (!exists(INDEX_NAME, TYPE, entName)) {
      IndexResponse indexResponse =  add(INDEX_NAME,TYPE, entName, entNameOnce);
      String result = JSON.toJSONString(indexResponse);
      LOGGER.info("=====>【单条企业名称添加Elastic】 index:{}, type:{}, entName:{}, entNameOnce:{}, Elastic返回结果:{}", 
        INDEX_NAME, TYPE, entName, entNameOnce, result);
      return result;
  }
  return null;
 }
 
 /** 
  * @Description: 将企业名称添加到Elastic中
  * 使用默认的INDEX和TYPE
  */
 public IndexResponse add(String  entName, String entNameOnce) throws IOException {
  return  add(INDEX_NAME, TYPE,  entName, entNameOnce);
 }
 
 /**
  * @Description: 将企业名称添加到Elastic中
  */
 public IndexResponse add(String index, String type, String  entName, String entNameOnce) throws IOException {
  IndexRequest indexRequest = new IndexRequest(index, type, DigestUtils.md5Hex(entName));
  indexRequest.source(XContentFactory.jsonBuilder()
                .startObject()
                .field(FIELD_NAME, entName)
                .field(FIELD_NAME_ONCE, entName)
                .endObject());
        indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
  return  client.index(indexRequest, RequestOptions.DEFAULT);
 }

 /**
  * @Description: 批量添加企业信息到Elastic中
  * MAP 中key="name" 企业名称 | key="name_once" 企业曾用名
  */
 @Override
 public void bulkStoreEntName(List<EntNameDto> entNameMapList)  throws IOException {
  bulkAdd(INDEX_NAME, TYPE, entNameMapList);
 }

 private void bulkAdd(String index, String type, List<EntNameDto> entNameList)  throws IOException {
  if (null == entNameList || entNameList.size() <= 0) {
   LOGGER.warn("=====>【批量将企业信息加入Elastic失败! 原因: 待加入的数据为空!】");
   return ;
  }
  int count  = 0; 
  BulkRequest bulkAddRequest = new BulkRequest();
  bulkAddRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
  for (EntNameDto dto : entNameList) {
   if (null != dto) {
    if (StringUtils.isBlank(dto.getEntName())) {
     continue;
    }
    if(!exists(INDEX_NAME, TYPE, dto.getEntName())){
     IndexRequest indexRequest = new IndexRequest(index, type, DigestUtils.md5Hex(dto.getEntName().trim()));
     indexRequest.source(XContentFactory.jsonBuilder()
                   .startObject()
                   .field(FIELD_NAME, dto.getEntName())
                   .field(FIELD_NAME_ONCE, dto.getEntNameOld())
                   .endObject());
     bulkAddRequest.add(indexRequest);
     count++ ;
     if ( count >= NumConstant.FIVE_HUNDRED ) {
      client.bulk(bulkAddRequest, RequestOptions.DEFAULT);
      count = 0;
      bulkAddRequest = new BulkRequest();
      bulkAddRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
     }
    }
   }
  }
  if (count > 0) {
   client.bulk(bulkAddRequest, RequestOptions.DEFAULT);
  }
 }
 

    /**
     * @Description: 判断索引是否存在
     */
 @Override
 public boolean existsIndex(String index) throws IOException {
  GetIndexRequest request = new GetIndexRequest();
  request.indices(index);
  boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
  LOGGER.info("=====>【验证Elastic中是否存在索引】 INDEX:{}, isExist:{}", index, exists);
  return exists;
 }
    
    /** 
  * @Description: 创建Index索引
  */
 @Override
 public String createIndex(String index) throws IOException {
  CreateIndexRequest request = new CreateIndexRequest(index);
  CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
  String result = JSON.toJSONString(createIndexResponse);
  LOGGER.info("=====>【ElasticServiceImpl.createIndex success】 index:{}, response : {}", index, result);
  return result;
 }
 
 
 /**
  * @Description: 根据企业名称获取一个分数 
  * 该方法只查询企业名称, 不匹配曾用名
  */
 @Override
 public float searchEntName( String entName) throws IOException {
  float score = 0.00F;
  if (StringUtils.isBlank(entName)) {
      LOGGER.info("=====>【查询企业名称: null , 得分: 0.0F】");
      return score;
     }
  BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery();
  boolBuilder.must(QueryBuilders.matchQuery(FIELD_NAME, entName).operator(Operator.AND).zeroTermsQuery(ZeroTermsQuery.ALL));
  SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
  sourceBuilder.query(boolBuilder);
  sourceBuilder.fetchSource(new String[] {FIELD_NAME }, new String[] {});
  SearchRequest searchRequest = new SearchRequest(INDEX_NAME);
  searchRequest.types(TYPE);
  searchRequest.source(sourceBuilder);
  SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
  if (response.getHits().getHits().length > 0) {
            SearchHit hit = response.getHits().getHits()[0];
            LOGGER.info("=====>【查询企业名称: {} , Elastic匹配分数最高企业名称:{} , 得分: {}】", 
              entName, hit.getSourceAsMap().get(FIELD_NAME), hit.getScore());
            score =  hit.getScore();
        } else { 
            LOGGER.info("=====>【查询企业名称: {} ,Elastic未能匹配到该名称 , 得分: 0.0F】", entName);
        }
  return score;
 }

 
    
    /**
     * 根据企业名得到查询结果
     */
 @Override
   public List<EntNameDto> search(String entName, Integer page, Integer pagesize) throws IOException {
     if (page == null || page < 1 ||  pagesize == null || pagesize < 1) {
      throw new BizServiceException(MessageUtil.getMessage(EErrorCode.PAGE_PARAMS_ERROR));
     }
     if (StringUtils.isBlank(entName)) {
      throw new BizServiceException(MessageUtil.getMessage(EErrorCode.INTERFACE_BIDATA_010_ERROR));
     }
    return search(INDEX_NAME, TYPE, entName, page, pagesize);
   }

    /**
     * @Title: search
     * @Description: 搜索
     */
    private List<EntNameDto> search(String index, String type, String entName, Integer page, Integer pagesize) throws IOException {
        BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery();
        boolBuilder.should(QueryBuilders.matchQuery(FIELD_NAME, entName).operator(Operator.AND).zeroTermsQuery(ZeroTermsQuery.ALL));
        boolBuilder.should(QueryBuilders.matchQuery(FIELD_NAME_ONCE, entName).operator(Operator.AND).zeroTermsQuery(ZeroTermsQuery.ALL));
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query(boolBuilder);
        sourceBuilder.from((page-1)*pagesize);
        sourceBuilder.size(pagesize);
        sourceBuilder.fetchSource(new String[] {FIELD_NAME, FIELD_NAME_ONCE}, new String[] {});
        SearchRequest searchRequest = new SearchRequest(index);
        searchRequest.types(type);
        searchRequest.source(sourceBuilder);
        LOGGER.info("=====>【企业名称和曾用名匹配!  企业名称或曾用名:{}, 请求报文 : {} 】", entName, JSON.toJSONString(searchRequest));
        SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
        LOGGER.info("=====>【企业名称和曾用名匹配!  企业名称或曾用名:{}, 返回报文 : {} 】", entName, JSON.toJSONString(searchRequest));
        SearchHits hits = response.getHits();
        SearchHit[] searchHits = hits.getHits();
        List<EntNameDto> result = new ArrayList<>();
        EntNameDto dto = null;
        for (SearchHit hit : searchHits) {
         String localEntName = hit.getSourceAsMap().get(FIELD_NAME).toString();
         String  localEntNameOld = hit.getSourceAsMap().get(FIELD_NAME_ONCE) == null ? "": hit.getSourceAsMap().get(FIELD_NAME).toString();
         result.add(new EntNameDto(localEntName, localEntNameOld));
            //result.add(hit.getSourceAsMap().get(FIELD_NAME).toString());
            LOGGER.info("=====>[企业名称和曾用名匹配!  企业名称或曾用名:{}, 返回结果 :{} ]", entName,  hit.getSourceAsString());
        }
        return result;
    }


 /**
  * 
  * @Title: exists  
  * @Description: 判断记录是否存在
  * @param index  索引名称
  * @param type  类型
  * @param entName 数据
  * @throws IOException    参数  
  * @return boolean    boolean ture-已存在, false-不存在
  * @throws
  */
 private boolean exists(String index, String type, String  entName) throws IOException {
  GetRequest getRequest = new GetRequest(index, type,  DigestUtils.md5Hex(entName));
  getRequest.fetchSourceContext(new FetchSourceContext(false));
  getRequest.storedFields("_none_");
  boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
  LOGGER.info("=====>[验证Elastic中是否存在该企业信息] index:{}, type:{}, entName:{}, exists:{}", index, type, entName, exists);
  return exists;
 }

 /**
  * 
  * @Title: get  
  * @Description: 获取记录信息
  * @param index  索引名称
  * @param type  类型
  * @param entName 数据
  * @throws IOException    参数  
  * @return String    JSON数据
  * @throws
  */
 private String get(String index, String type, String  entName) throws IOException {
  GetRequest getRequest = new GetRequest(index, type, DigestUtils.md5Hex(entName));
  GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
  String responseJson = JSON.toJSONString(getResponse);
  LOGGER.info("=====>[ElasticServiceImpl.exists] index:{}, type:{}, entName:{}, responseJson:{}", index, type, entName, responseJson);
  return responseJson;
 }

 /**
  * 
  * @Title: update  
  * @Description: 更新记录信息
  * @param index  索引名称
  * @param type  类型
  * @param id  ID
  * @param entName 更新数据
  * @throws IOException    参数  
  * @return void    返回类型  
  * @throws
  */
 @SuppressWarnings("unused")
 private void update(String index, String type, String id, String  entName) throws IOException {
  UpdateRequest request = new UpdateRequest(index, type, id);
  request.doc(XContentFactory.jsonBuilder()
                .startObject()
                .field(FIELD_NAME, entName)
                .endObject());
        request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
  UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
  String responseJson = JSON.toJSONString(updateResponse);
  LOGGER.info("=====>[ElasticServiceImpl.update] index:{}, type:{}, entName:{}, responseJson:{}", index, type, entName, responseJson);
 }
 
 /**
  * 
  * @Title: delete  
  * @Description: 删除记录
  * @param index 索引名称
  * @param type 类型
  * @param entName 数据
  * @throws IOException    参数  
  * @return void    返回类型  
  * @throws
  */
 private void delete(String index, String type, String  entName) throws IOException {
  DeleteRequest deleteRequest = new DeleteRequest(index, type,  DigestUtils.md5Hex(entName));
        deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
  DeleteResponse response = client.delete(deleteRequest, RequestOptions.DEFAULT);
  String responseJson = JSON.toJSONString(response);
  LOGGER.info("=====>[ElasticServiceImpl.update] delete:{}, type:{}, entName:{}, responseJson:{}", index, type, entName, responseJson);
 }
}

##3. 创建工厂注入Bean

在XML配置

   <bean id="esClientFactory" class="com.hengtiansoft.eis.elastic.factory.EsClientFactory"></bean>
   <bean id="restHighLevelClient" factory-bean="esClientFactory" factory-method="getClient"></bean>
import java.util.ArrayList;
import java.util.List;

import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig.Builder;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
import org.elasticsearch.client.RestClientBuilder.RequestConfigCallback;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
public class EsClientFactory {
  private static final Logger LOGGER = LoggerFactory.getLogger(EsUtil.class);
  
 private static final int PORT = 9200;
 private static  final String SCHEMA = "http";
 private static final int CONNECTTIMEOUT = 1000;
 private static final int SOCKETTIMEOUT = 30000;
 private static final int CONNECTIONREQUESTTIMEOUT = 500;
 private static final int MAXCONNECTNUM = 100;
 private static final int MAXCONNECTPERROUTE = 100;
 
    @Value("#{'${es.services.ip}'.split(',')}")
    private List<String> serversList;

 public RestHighLevelClient getClient() {
     ArrayList<HttpHost> hostList = new ArrayList<>();
  for (String host : serversList) {
   hostList.add(new HttpHost(host, PORT, SCHEMA));
  }
  RestClientBuilder builder = RestClient.builder(hostList.toArray(new HttpHost[0]));
  builder.setRequestConfigCallback(new RequestConfigCallback() {
   @Override
   public Builder customizeRequestConfig(Builder requestConfigBuilder) {
    requestConfigBuilder.setConnectTimeout(CONNECTTIMEOUT);
    requestConfigBuilder.setSocketTimeout(SOCKETTIMEOUT);
    requestConfigBuilder.setConnectionRequestTimeout(CONNECTIONREQUESTTIMEOUT);
    return requestConfigBuilder;
   }
  });
  builder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
   @Override
   public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
    httpClientBuilder.setMaxConnTotal(MAXCONNECTNUM);
    httpClientBuilder.setMaxConnPerRoute(MAXCONNECTPERROUTE);
    return httpClientBuilder;
   }
  });
  LOGGER.info("【estHighLevelClient init Success】");
  return  new RestHighLevelClient(builder);
    }
}

##4. 测试

import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.transaction.annotation.Transactional;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.Random;

/**
 * @ClassName: TestBiDataTest.java
 * @Description: ES测试
 *
 * @version: v1.0.0
 * @author: 
 * @date: 2019年6月17日 
 *
 */
@RunWith(SpringJUnit4ClassRunner.class)  
@ContextConfiguration({"classpath*:applicationContext.xml"}) 
@Rollback(value=true)
@Transactional(transactionManager = "transactionManager")
public class EsTest {
 
 private static final Logger LOGGER = LoggerFactory.getLogger(EsTest.class);
    @Autowired
    private EsClientService esClientService;
    
    // 1. 保存企业名称到ES
    @Test
    public void testSaveEntName() throws IOException {
        esClientService.searchEntName("两湖绿大名");
    }

    //2. 根据名获取分数
    @Test
    public void testGetScoreByEntName(){
      Random random = new Random();
      try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream("D:\\测试数据\\test.txt"), Charset.forName("GBK")))) {
          String line;
          while ((line = bufferedReader.readLine()) != null) {
              String name;
              if (line.length() > 2) {
                  name = line.substring(1, random.nextInt(line.length() - 2) + 1);
              } else {
                  name = line;
              }
              esClientService.searchEntName(name);
              Thread.sleep(NumConstant.FIVE_HUNDRED); 
          }
      } catch (Exception e) {
       LOGGER.error(e.getLocalizedMessage());
   } 
    } 
    
    //2.  根据企业名称检索信息,  检索匹配到前10条
    @Test
    public void testEntName() {
     try {
            //esClientService.searchEntName("韩明优先公司");
            esClientService.search("公司");
  } catch (Exception e) {
   LOGGER.error(e.getLocalizedMessage());
  }
    }
    
}

源码地址: https://download.csdn.net/download/yk10010/11252439

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:1136255
帖子:227251
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP