1.Elasticsearch依赖包
<!-- Elastic -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.8.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.8.0</version>
</dependency>
2. Elasticsearch服务类
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.StringUtils;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.Operator;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.search.MatchQuery.ZeroTermsQuery;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSON;
import com.hengtiansoft.eis.beans.dto.EntNameDto;
import com.hengtiansoft.eis.commons.constant.NumConstant;
import com.hengtiansoft.eis.commons.enums.EErrorCode;
import com.hengtiansoft.eis.commons.exception.BizServiceException;
import com.hengtiansoft.eis.commons.util.MessageUtil;
import com.hengtiansoft.eis.elastic.service.EsClientService;
@Service
@SuppressWarnings("all")
public class EsClientServiceImpl implements EsClientService {
private static final Logger LOGGER = LoggerFactory.getLogger(EsClientServiceImpl.class);
private static final String INDEX_NAME = "ent_name_01";
private static final String FIELD_NAME = "name";
private static final String FIELD_NAME_ONCE = "name_once";
private static final String TYPE = "doc";
@Autowired
private RestHighLevelClient client;
@Override
public String storeEntName(String entName, String entNameOnce) throws IOException {
if (!this.existsIndex(INDEX_NAME)) {
this.createIndex(INDEX_NAME);
LOGGER.info("=====>【Elastic中不存在索引:{}, 则创建索引成功!】", INDEX_NAME);
}
if (!exists(INDEX_NAME, TYPE, entName)) {
IndexResponse indexResponse = add(INDEX_NAME,TYPE, entName, entNameOnce);
String result = JSON.toJSONString(indexResponse);
LOGGER.info("=====>【单条企业名称添加Elastic】 index:{}, type:{}, entName:{}, entNameOnce:{}, Elastic返回结果:{}",
INDEX_NAME, TYPE, entName, entNameOnce, result);
return result;
}
return null;
}
public IndexResponse add(String entName, String entNameOnce) throws IOException {
return add(INDEX_NAME, TYPE, entName, entNameOnce);
}
public IndexResponse add(String index, String type, String entName, String entNameOnce) throws IOException {
IndexRequest indexRequest = new IndexRequest(index, type, DigestUtils.md5Hex(entName));
indexRequest.source(XContentFactory.jsonBuilder()
.startObject()
.field(FIELD_NAME, entName)
.field(FIELD_NAME_ONCE, entName)
.endObject());
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
return client.index(indexRequest, RequestOptions.DEFAULT);
}
@Override
public void bulkStoreEntName(List<EntNameDto> entNameMapList) throws IOException {
bulkAdd(INDEX_NAME, TYPE, entNameMapList);
}
private void bulkAdd(String index, String type, List<EntNameDto> entNameList) throws IOException {
if (null == entNameList || entNameList.size() <= 0) {
LOGGER.warn("=====>【批量将企业信息加入Elastic失败! 原因: 待加入的数据为空!】");
return ;
}
int count = 0;
BulkRequest bulkAddRequest = new BulkRequest();
bulkAddRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (EntNameDto dto : entNameList) {
if (null != dto) {
if (StringUtils.isBlank(dto.getEntName())) {
continue;
}
if(!exists(INDEX_NAME, TYPE, dto.getEntName())){
IndexRequest indexRequest = new IndexRequest(index, type, DigestUtils.md5Hex(dto.getEntName().trim()));
indexRequest.source(XContentFactory.jsonBuilder()
.startObject()
.field(FIELD_NAME, dto.getEntName())
.field(FIELD_NAME_ONCE, dto.getEntNameOld())
.endObject());
bulkAddRequest.add(indexRequest);
count++ ;
if ( count >= NumConstant.FIVE_HUNDRED ) {
client.bulk(bulkAddRequest, RequestOptions.DEFAULT);
count = 0;
bulkAddRequest = new BulkRequest();
bulkAddRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
}
}
}
}
if (count > 0) {
client.bulk(bulkAddRequest, RequestOptions.DEFAULT);
}
}
@Override
public boolean existsIndex(String index) throws IOException {
GetIndexRequest request = new GetIndexRequest();
request.indices(index);
boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
LOGGER.info("=====>【验证Elastic中是否存在索引】 INDEX:{}, isExist:{}", index, exists);
return exists;
}
@Override
public String createIndex(String index) throws IOException {
CreateIndexRequest request = new CreateIndexRequest(index);
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
String result = JSON.toJSONString(createIndexResponse);
LOGGER.info("=====>【ElasticServiceImpl.createIndex success】 index:{}, response : {}", index, result);
return result;
}
@Override
public float searchEntName( String entName) throws IOException {
float score = 0.00F;
if (StringUtils.isBlank(entName)) {
LOGGER.info("=====>【查询企业名称: null , 得分: 0.0F】");
return score;
}
BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery();
boolBuilder.must(QueryBuilders.matchQuery(FIELD_NAME, entName).operator(Operator.AND).zeroTermsQuery(ZeroTermsQuery.ALL));
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(boolBuilder);
sourceBuilder.fetchSource(new String[] {FIELD_NAME }, new String[] {});
SearchRequest searchRequest = new SearchRequest(INDEX_NAME);
searchRequest.types(TYPE);
searchRequest.source(sourceBuilder);
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
if (response.getHits().getHits().length > 0) {
SearchHit hit = response.getHits().getHits()[0];
LOGGER.info("=====>【查询企业名称: {} , Elastic匹配分数最高企业名称:{} , 得分: {}】",
entName, hit.getSourceAsMap().get(FIELD_NAME), hit.getScore());
score = hit.getScore();
} else {
LOGGER.info("=====>【查询企业名称: {} ,Elastic未能匹配到该名称 , 得分: 0.0F】", entName);
}
return score;
}
@Override
public List<EntNameDto> search(String entName, Integer page, Integer pagesize) throws IOException {
if (page == null || page < 1 || pagesize == null || pagesize < 1) {
throw new BizServiceException(MessageUtil.getMessage(EErrorCode.PAGE_PARAMS_ERROR));
}
if (StringUtils.isBlank(entName)) {
throw new BizServiceException(MessageUtil.getMessage(EErrorCode.INTERFACE_BIDATA_010_ERROR));
}
return search(INDEX_NAME, TYPE, entName, page, pagesize);
}
private List<EntNameDto> search(String index, String type, String entName, Integer page, Integer pagesize) throws IOException {
BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery();
boolBuilder.should(QueryBuilders.matchQuery(FIELD_NAME, entName).operator(Operator.AND).zeroTermsQuery(ZeroTermsQuery.ALL));
boolBuilder.should(QueryBuilders.matchQuery(FIELD_NAME_ONCE, entName).operator(Operator.AND).zeroTermsQuery(ZeroTermsQuery.ALL));
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(boolBuilder);
sourceBuilder.from((page-1)*pagesize);
sourceBuilder.size(pagesize);
sourceBuilder.fetchSource(new String[] {FIELD_NAME, FIELD_NAME_ONCE}, new String[] {});
SearchRequest searchRequest = new SearchRequest(index);
searchRequest.types(type);
searchRequest.source(sourceBuilder);
LOGGER.info("=====>【企业名称和曾用名匹配! 企业名称或曾用名:{}, 请求报文 : {} 】", entName, JSON.toJSONString(searchRequest));
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
LOGGER.info("=====>【企业名称和曾用名匹配! 企业名称或曾用名:{}, 返回报文 : {} 】", entName, JSON.toJSONString(searchRequest));
SearchHits hits = response.getHits();
SearchHit[] searchHits = hits.getHits();
List<EntNameDto> result = new ArrayList<>();
EntNameDto dto = null;
for (SearchHit hit : searchHits) {
String localEntName = hit.getSourceAsMap().get(FIELD_NAME).toString();
String localEntNameOld = hit.getSourceAsMap().get(FIELD_NAME_ONCE) == null ? "": hit.getSourceAsMap().get(FIELD_NAME).toString();
result.add(new EntNameDto(localEntName, localEntNameOld));
LOGGER.info("=====>[企业名称和曾用名匹配! 企业名称或曾用名:{}, 返回结果 :{} ]", entName, hit.getSourceAsString());
}
return result;
}
private boolean exists(String index, String type, String entName) throws IOException {
GetRequest getRequest = new GetRequest(index, type, DigestUtils.md5Hex(entName));
getRequest.fetchSourceContext(new FetchSourceContext(false));
getRequest.storedFields("_none_");
boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
LOGGER.info("=====>[验证Elastic中是否存在该企业信息] index:{}, type:{}, entName:{}, exists:{}", index, type, entName, exists);
return exists;
}
private String get(String index, String type, String entName) throws IOException {
GetRequest getRequest = new GetRequest(index, type, DigestUtils.md5Hex(entName));
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
String responseJson = JSON.toJSONString(getResponse);
LOGGER.info("=====>[ElasticServiceImpl.exists] index:{}, type:{}, entName:{}, responseJson:{}", index, type, entName, responseJson);
return responseJson;
}
@SuppressWarnings("unused")
private void update(String index, String type, String id, String entName) throws IOException {
UpdateRequest request = new UpdateRequest(index, type, id);
request.doc(XContentFactory.jsonBuilder()
.startObject()
.field(FIELD_NAME, entName)
.endObject());
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
String responseJson = JSON.toJSONString(updateResponse);
LOGGER.info("=====>[ElasticServiceImpl.update] index:{}, type:{}, entName:{}, responseJson:{}", index, type, entName, responseJson);
}
private void delete(String index, String type, String entName) throws IOException {
DeleteRequest deleteRequest = new DeleteRequest(index, type, DigestUtils.md5Hex(entName));
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
DeleteResponse response = client.delete(deleteRequest, RequestOptions.DEFAULT);
String responseJson = JSON.toJSONString(response);
LOGGER.info("=====>[ElasticServiceImpl.update] delete:{}, type:{}, entName:{}, responseJson:{}", index, type, entName, responseJson);
}
}
##3. 创建工厂注入Bean
在XML配置
<bean id="esClientFactory" class="com.hengtiansoft.eis.elastic.factory.EsClientFactory"></bean>
<bean id="restHighLevelClient" factory-bean="esClientFactory" factory-method="getClient"></bean>
import java.util.ArrayList;
import java.util.List;
import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig.Builder;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback;
import org.elasticsearch.client.RestClientBuilder.RequestConfigCallback;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
public class EsClientFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(EsUtil.class);
private static final int PORT = 9200;
private static final String SCHEMA = "http";
private static final int CONNECTTIMEOUT = 1000;
private static final int SOCKETTIMEOUT = 30000;
private static final int CONNECTIONREQUESTTIMEOUT = 500;
private static final int MAXCONNECTNUM = 100;
private static final int MAXCONNECTPERROUTE = 100;
@Value("#{'${es.services.ip}'.split(',')}")
private List<String> serversList;
public RestHighLevelClient getClient() {
ArrayList<HttpHost> hostList = new ArrayList<>();
for (String host : serversList) {
hostList.add(new HttpHost(host, PORT, SCHEMA));
}
RestClientBuilder builder = RestClient.builder(hostList.toArray(new HttpHost[0]));
builder.setRequestConfigCallback(new RequestConfigCallback() {
@Override
public Builder customizeRequestConfig(Builder requestConfigBuilder) {
requestConfigBuilder.setConnectTimeout(CONNECTTIMEOUT);
requestConfigBuilder.setSocketTimeout(SOCKETTIMEOUT);
requestConfigBuilder.setConnectionRequestTimeout(CONNECTIONREQUESTTIMEOUT);
return requestConfigBuilder;
}
});
builder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.setMaxConnTotal(MAXCONNECTNUM);
httpClientBuilder.setMaxConnPerRoute(MAXCONNECTPERROUTE);
return httpClientBuilder;
}
});
LOGGER.info("【estHighLevelClient init Success】");
return new RestHighLevelClient(builder);
}
}
##4. 测试
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.annotation.Rollback;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.transaction.annotation.Transactional;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.Random;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath*:applicationContext.xml"})
@Rollback(value=true)
@Transactional(transactionManager = "transactionManager")
public class EsTest {
private static final Logger LOGGER = LoggerFactory.getLogger(EsTest.class);
@Autowired
private EsClientService esClientService;
@Test
public void testSaveEntName() throws IOException {
esClientService.searchEntName("两湖绿大名");
}
@Test
public void testGetScoreByEntName(){
Random random = new Random();
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream("D:\\测试数据\\test.txt"), Charset.forName("GBK")))) {
String line;
while ((line = bufferedReader.readLine()) != null) {
String name;
if (line.length() > 2) {
name = line.substring(1, random.nextInt(line.length() - 2) + 1);
} else {
name = line;
}
esClientService.searchEntName(name);
Thread.sleep(NumConstant.FIVE_HUNDRED);
}
} catch (Exception e) {
LOGGER.error(e.getLocalizedMessage());
}
}
@Test
public void testEntName() {
try {
esClientService.search("公司");
} catch (Exception e) {
LOGGER.error(e.getLocalizedMessage());
}
}
}
源码地址: https://download.csdn.net/download/yk10010/11252439 |