##使用多字段聚合
SearchResponse agg2 = client.prepareSearch("t_mdc_drg")
.setTypes("cz", "cx")
.addAggregation(
AggregationBuilders.terms("id_mdc").field("id_mdc")//设置聚合条件 group by id_mdc,id_drg
.subAggregation(
AggregationBuilders.terms("name_mdc").field("id_drg")
.subAggregation(AggregationBuilders.avg("avg").field("date"))// 聚合结果 avg(date)
.subAggregation(AggregationBuilders.max("max").field("date"))//聚合结果 max(date)
)
)
.execute().actionGet();
Map<String, Aggregation> maps = agg2.getAggregations().asMap();
StringTerms id_mdc=(StringTerms)maps.get("id_mdc");
for (StringTerms.Bucket bucket:id_mdc.getBuckets()
) {
StringTerms id_drg=(StringTerms)bucket.getAggregations().asMap().get("name_mdc");
for (StringTerms.Bucket bucket2: id_drg.getBuckets()
) {
InternalAvg avg = (InternalAvg)bucket2.getAggregations().asMap().get("avg");
InternalMax max = (InternalMax)bucket2.getAggregations().asMap().get("max");
System.out.println(bucket.getKeyAsString()+"|"+bucket2.getKeyAsString()+"="+avg+"|"+max);
}
}
QueryBuilders:
QueryBuilders:
boolQuery:
must:相当于sql的and
must not:相当于sql的not
should:相当于sql的or
mathcquery:单个匹配
mathcAllQuery:匹配所有
termQuery:termQuery("key", obj) 完全匹配 ;termsQuery("key", obj1, obj2..) 一次匹配多个值
multiMatchQuery:multiMatchQuery("text", "field1", "field2"..); 匹配多个字段, field有通配符忒行
idsQuery:构造一个只会匹配的特定数据 id 的查询
constantScoreQuery:看了一下这个类的构造函数ConstantScoreQuery(Filter filter) ,我的理解就是通过构造filter来完成文档的过滤,并且返回一个复合当前过滤条件的文档的常量分数,这个分数等于为查询条件设置的boost
fuzzyQuery:模糊查询
moreLikeThisQuery:文档中的文本查询
prefixQuery:前缀查询
rangeQuery:在一个范围内查询相匹配的文档
termQuery:一个查询相匹配的文件包含一个术语
termsQuery:一个查询相匹配的多个value---minimumMatch(1); // 设置最小数量的匹配提供了条件。默认为1。
wildcardQuery:通配符查询
nestedQuery:嵌套查询---scoreMode("total");// max, total, avg or none
disMaxQuery:对子查询的结果做union, score沿用子查询score的最大值,
spanFirstQuery:跨度查询,还包括(spanNearQuery,spanNotQuery,spanOrQuery,spanTermQuery)
其他字段解释:
percent_terms_to_match:匹配项(term)的百分比,默认是0.3
min_term_freq:一篇文档中一个词语至少出现次数,小于这个值的词将被忽略,默认是2
max_query_terms:一条查询语句中允许最多查询词语的个数,默认是25
stop_words:设置停止词,匹配时会忽略停止词
min_doc_freq:一个词语最少在多少篇文档中出现,小于这个值的词会将被忽略,默认是无限制
max_doc_freq:一个词语最多在多少篇文档中出现,大于这个值的词会将被忽略,默认是无限制
min_word_len:最小的词语长度,默认是0
max_word_len:最多的词语长度,默认无限制
boost_terms:设置词语权重,默认是1
boost:设置查询权重,默认是1
analyzer:设置使用的分词器,默认是使用该字段指定的分词器
操作例子(es版本还是1.7.2):
//查询某个医生的明细-- boolean query and 条件组合查询(时间+护士名称)
BoolQueryBuilder must = QueryBuilders.boolQuery().must(
QueryBuilders.matchQuery("name_doc", "681_护士206"))
.must(QueryBuilders.rangeQuery("yke123").gte(startTime + START_DATE_YUE)
.lte(endTime + END_DATE_YUE));
SearchResponse searchResponse = mSRB.setQuery(must).execute().actionGet();
/**
* 在es中所有的查询结果都会保存在SearchResponse中,在从SearchResponse中读取数据的时候,有两种方式:第一种是对Query的结果进行读取,
* 使用的是hit,每一条查询到的doc都是一个hit,可以将每个hit转换为map形式的数据,map的具体形式为<"field","value">的形式
*/
for(SearchHit hit:searchResponse.getHits()){
Map<String, Object> source = hit.getSource();//每条数据
if (!source.isEmpty()) {
String name_depa=(String) source.get("name_depa");
String name_doc=(String)source.get("name_doc");
System.out.println(name_depa+"=="+name_doc);
}
}
//查询该人的工时
SearchResponse searchResponse1 = mSRB.setQuery(must).addAggregation(AggregationBuilders.sum("hushi").field("hushi")).get();
InternalSum hushi = searchResponse1.getAggregations().get("hushi");
System.out.println("总工的工时"+hushi.getValue());
/**
* 第二种方式是针对查询中的聚合问题(aggregation),聚合完成后的每条doc都是一个bucket(桶),他的访问只能通过bucket来进行,而不能使用hit
*/
//设置聚合查询条件---使用SearchResponse封装结果
SearchResponse searchResponse3 = mSRB.addAggregation(AggregationBuilders.terms("name_doc").field("name_doc").size(0))//返回所有数据用0
.execute().actionGet();
System.out.println(searchResponse);
Terms depa_count = searchResponse.getAggregations().get("name_doc");
for (Terms.Bucket bucket : depa_count.getBuckets()) {
String name_depa = bucket.getKey();//科室名称
long docCount = bucket.getDocCount();//科室出现的次数
System.out.println(name_depa + "==" + docCount);
}
//该医生在该月的总条数
long l = searchResponse.getHits().totalHits();
System.out.println(l+"===============");
使用游标sroll进行分页查询遍历
client = (Client) Pools.getPool().borrowObject();//从链接池中获取客户端
//构建查询器
SearchRequestBuilder mSRB = client
.prepareSearch(index1)//索引--查询工时表
.setSearchType(SearchType.SCAN)//设置查询类型--DFS_QUERY_THEN_FETCH
.setSize(100).setScroll(TimeValue.timeValueMinutes(8));
String startTime = "2013-08";
String endTime = "2013-09";
String timeType = "0";
BoolQueryBuilder must = QueryBuilders.boolQuery()
//.must(QueryBuilders.matchQuery("name_doc", "681_护士206"))
.must(QueryBuilders.rangeQuery("yke123").gte(startTime + START_DATE_YUE)
.lte(endTime + END_DATE_YUE));
SearchResponse searchResponse = mSRB.setQuery(must).execute().actionGet();
//使用scroll遍历查询的数据
Date begin = new Date();
long count = searchResponse.getHits().getTotalHits();//第一次不返回数据
for(int i=0,sum=0; sum<count; i++){
searchResponse = client.prepareSearchScroll(searchResponse.getScrollId())
.setScroll(TimeValue.timeValueMinutes(8))
.execute().actionGet();
sum += searchResponse.getHits().hits().length;
for(SearchHit hit:searchResponse.getHits()){
Map<String, Object> source = hit.getSource();
String name_depa=(String) source.get("name_depa");
String name_doc=(String)source.get("name_doc");
String yke123=(String)source.get("yke123");
System.out.println("科室:"+name_depa+"医生:"+name_doc+"开单时间:"+yke123);
};
System.out.println("总量"+count+" 已经查到"+sum);
}
Date end = new Date();
System.out.println("耗时: "+(end.getTime()-begin.getTime()));
引用他人的http://blog.csdn.net/xr568897472/article/details/73826255:
- (1)统计某个字段的数量
- ValueCountBuilder vcb= AggregationBuilders.count("count_uid").field("uid");
- (2)去重统计某个字段的数量(有少量误差)
- CardinalityBuilder cb= AggregationBuilders.cardinality("distinct_count_uid").field("uid");
- (3)聚合过滤
- FilterAggregationBuilder fab= AggregationBuilders.filter("uid_filter").filter(QueryBuilders.queryStringQuery("uid:001"));
- (4)按某个字段分组
- TermsBuilder tb= AggregationBuilders.terms("group_name").field("name");
- (5)求和
- SumBuilder sumBuilder= AggregationBuilders.sum("sum_price").field("price");
- (6)求平均
- AvgBuilder ab= AggregationBuilders.avg("avg_price").field("price");
- (7)求最大值
- MaxBuilder mb= AggregationBuilders.max("max_price").field("price");
- (8)求最小值
- MinBuilder min= AggregationBuilders.min("min_price").field("price");
- (9)按日期间隔分组
- DateHistogramBuilder dhb= AggregationBuilders.dateHistogram("dh").field("date");
- (10)获取聚合里面的结果
- TopHitsBuilder thb= AggregationBuilders.topHits("top_result");
- (11)嵌套的聚合
- NestedBuilder nb= AggregationBuilders.nested("negsted_path").path("quests");
- (12)反转嵌套
- AggregationBuilders.reverseNested("res_negsted").path("kps ");
Elasticsearch的补充:
//创建索引
client.admin().indices().prepareCreate("").execute().actionGet();
//删除索引
client.admin().indices().prepareDelete("").execute().actionGet();
//获取查询的数据
client.prepareGet("index","type","id").execute().actionGet();
//更新数据
UpdateRequest up = new UpdateRequest();
up.index("index").type("type").id("id").doc("data");
//更新后获取响应信息
client.update(up).actionGet().status().getStatus();
//添加数据
HashMap<String, String> map = new HashMap<>();
map.put("aa","bb");
client.prepareIndex("index","type").setSource(map).get();
ArrayList<String> fields = new ArrayList<>();
//创建索引+mapping
CreateIndexRequestBuilder index = client.admin().indices().prepareCreate("index");
XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject("properties");
//都统一映射成一种类型,需要特殊映射可以做判断
for (String field: fields
) {
mapping.startObject(field)
.field("type","text")
.field("index","not_analyzed")
.endObject();
}
mapping.endObject()
.endObject();
index.addMapping("type",mapping);
index.execute().actionGet();
//查询
//1.构建查询条件
//构建查询器
SearchRequestBuilder srg = client
.prepareSearch("index")
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
//封装查询条件
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.termQuery("field","value"));//指定字段条件
boolQueryBuilder.mustNot(QueryBuilders.termQuery("field","value"));//过滤
boolQueryBuilder.must(QueryBuilders.rangeQuery("field").gte("").lte(""));//时间范围条件
srg.setQuery(boolQueryBuilder);
//对查询结果进聚合
srg.addAggregation(
AggregationBuilders.terms("alias").field("field")
.subAggregation(AggregationBuilders.stats("alias2").field("field2"))
).execute().actionGet();
//排序构建
SortBuilder sortBuilder= SortBuilders.fieldSort("age");
//单个查询条件构建
QueryBuilder queryBuilder = QueryBuilders.termQuery("field", "value");
//通配符查询
QueryBuilders.wildcardQuery("user", "k*hy17*");
//指定取哪些字段---setFetchSource
client.prepareSearch("index")
.setSearchType("")
.setFetchSource("age","")// setFetchSource有两个参数 第一个参数是包含哪些参数 第二个参数是排除哪些参数
.setFrom(0).setSize(10);
//聚合函数的统计api
AggregationBuilders.count("ageCount").field("age");
AggregationBuilders.max("max").field("age");
AggregationBuilders.sum("sum").field("age");
AggregationBuilders.avg("avg").field("age");
//统计样本基本指标
AggregationBuilders.stats("stats").field("age");//包含了max,min, avg,count,sum
SearchResponse response=srg.get();
Stats stats= response.getAggregations().get("stats");
stats.getAvgAsString();
stats.getMaxAsString();
stats.getMinAsString();
stats.getSumAsString();
stats.getCount();
//数据的遍历:非聚合+聚合,非聚合使用的是hits遍历,聚合则使用的是buckets
Terms terms = response.getAggregations().get("");
for (Terms.Bucket bucket:terms.getBuckets()
) {
bucket.getKey();//聚合字段的名称---一个字段一个桶
bucket.getDocCount();//该聚合字段出现的次数
}
//解析例子: select count(age),sum(age) from t_user group by age
AggregationBuilder by_age=AggregationBuilders.terms("by_age").field("age");
AggregationBuilder ageSum=AggregationBuilders.sum("ageSum").field("age");
SearchResponse sr=srg.addAggregation(by_age.subAggregation(ageSum)).get();
Aggregations aggregations = sr.getAggregations();
for (Aggregation agg: aggregations
) {
Terms term = (Terms)agg;
for(Terms.Bucket bucket:term.getBuckets()){
String age = bucket.getKeyAsString();//by_age字段-----若只有一个分组的话可以直接这样获取,多个的话 需要使用
//Aggregation alias = bucket.getAggregations().asMap().get("alias");//多个聚合的话需要这样获取每个聚合分组的桶
long count = bucket.getDocCount();//count(age)---age出现的次数
//获取分组后,再通过聚合函数取得的值
Aggregation by_age1 = bucket.getAggregations().getAsMap().get("by_age");
Terms terms_by_age = (Terms)by_age1;
for (Terms.Bucket bucket_by_age: terms_by_age.getBuckets()
) {
String filed_ageSum2= bucket_by_age.getKeyAsString();//聚合字段
double value_ageSum=((Sum)bucket_by_age.getAggregations().asMap().get("ageSum")).getValue();//聚合的值
}
}
}
}
操作实例:
//求其他统计值
SearchResponse sr = srb.setQuery(QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery(FieldConstant.HOSPITAL_GRADE_FIELD, hospitalDegree))//医院等级
.must(QueryBuilders.matchQuery(FieldConstant.ID_MDC, mdcID))
.must(QueryBuilders.matchQuery(FieldConstant.ID_DRG, drgID))
.must(QueryBuilders.rangeQuery(FieldConstant.HOSPITAL_LEAVE_TIME_FIELD).gte(startTime + FieldConstant.START_DATE).lte(endTime + FieldConstant.END_DATE)) //出院时间
.mustNot(QueryBuilders.termQuery(FieldConstant.DIAGNOSE_TYPE_FIELD, "0202")) //过滤门诊
)
.addAggregation(AggregationBuilders.terms("doctor")
.field(FieldConstant.DOCTOR_ID_FIELD)//医生ID
.size((int) DOCTOR_NUM)
.subAggregation(AggregationBuilders.terms("addrg")
.field(FieldConstant.NAME_AD_DRG)
.size((int) ADDRG_NUM)
.subAggregation(AggregationBuilders.sum("numWeight")
.field(FieldConstant.WEIGHT)//权重
)
.subAggregation(AggregationBuilders.sum("moneySumWeight")
.field(FieldConstant.MONEY_SUM_WEIGHT)//医保总费用*权重
)
.subAggregation(AggregationBuilders.sum("zyscWeight")
.field(FieldConstant.ZYSC_WEIGHT)//住院时长*权重
)
.subAggregation(AggregationBuilders.extendedStats("zyscStd")
.field(FieldConstant.ZHUYUAN_TIME)//住院时长标准差
)
.subAggregation(AggregationBuilders.sum("zzyWeight")
.field(FieldConstant.ZZY_WEIGHT)//再住院标识*权重
)
.subAggregation(AggregationBuilders.sum("tskssWeight")
.field(FieldConstant.TSKSS_FLAG_WEIGHT)//特殊抗生素使用标识*权重
)
)
).execute().actionGet();
Terms termsDoc = sr.getAggregations().get("doctor");
for (Bucket bk : termsDoc.getBuckets()) {
//医生基本信息
String doctorId = bk.getKey();//医生ID
//可信度等级判断
DoctorModel dm = new DoctorModel();
if(doctorLevelMap.containsKey(doctorId)){
dm = doctorLevelMap.get(doctorId);
}
ArrayList<AdDrgDocModel> adDrgDocList = new ArrayList<AdDrgDocModel>();
Terms termsAdDrg = bk.getAggregations().get("addrg");
for (Bucket bkAdDrg : termsAdDrg.getBuckets()) {
//该医生该AD_DRG的就诊次数
long treatmentNum = bkAdDrg.getDocCount();
if(treatmentNum >= 3){
String adDrgName = bkAdDrg.getKey();//AD_DRG名称
//就诊人次*权重
Sum num_agg = bkAdDrg.getAggregations().get("numWeight");
double num = num_agg.getValue();
//医保总费用*权重
Sum money_sum_agg = bkAdDrg.getAggregations().get("moneySumWeight");
double moneySum = money_sum_agg.getValue();
//住院时长*权重
Sum zysc_agg = bkAdDrg.getAggregations().get("zyscWeight");
double zysc = zysc_agg.getValue();
//住院时长标准差
ExtendedStats zysc_std_agg = bkAdDrg.getAggregations().get("zyscStd");
double zyscStd = zysc_std_agg.getStdDeviation();
if(Double.isNaN(zyscStd)){
zyscStd = 0.0;
}
//再住院标识*权重
Sum zzy_agg = bkAdDrg.getAggregations().get("zzyWeight");
double zzy = zzy_agg.getValue();
//特殊抗生素使用标识*权重
Sum tskss_agg = bkAdDrg.getAggregations().get("tskssWeight");
double tskss = tskss_agg.getValue();
AdDrgDocModel ad = new AdDrgDocModel(num, moneySum, zysc, zyscStd, zzy,
tskss, drgID, adDrgName);
//设置ADDRG的可信度
if(dm.getAdDrgDocList() != null){
for(AdDrgDocModel addm:dm.getAdDrgDocList()){
if(addm.getAdDrgName().equals(adDrgName)){
ad.setLevel(addm.getLevel());
break;
}
}
}
ad.setTreatmentNum(treatmentNum);
adDrgDocList.add(ad);
}
}
if(adDrgDocList.size() > 0){
DoctorModel doctorModel = new DoctorModel();
doctorModel.setDoctorId(doctorId);//医生ID
doctorModel.setDrgID(drgID);//DRGID
doctorModel.setAdDrgDocList(adDrgDocList);//ADDRG
doctorModel.setLevel(dm.getLevel());//可信度等级
doctorList.add(doctorModel);
}
}





