一、引入依赖及配置

1
2
3
4
<dependency>  
<groupId>com.alibaba</groupId>
<artifactId>easyexcel</artifactId>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
easy-es:  
address: 127.0.0.1:9200
username: elastic
password: elastic
enable: true
global-config:
print-dsl: false
db-config:
map-underscore-to-camel-case: false
smartAddKeywordSuffix: false
async:
concurrent: 10

二、类构建(pojo、mapper、dao)

2.1 实体类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@IndexName(value = "my_stats", refreshPolicy = RefreshPolicy.IMMEDIATE)  
public class MyStatistics {
@IndexId(type = IdType.TIME_ID)
@IndexField("id")
private String id;

/**
* 通信时长 单位是秒
*/
@IndexField("communication_duration")
private Integer communicationDuration;

/**
* 记录时间
*/
@IndexField("count_time")
private Long countTime;

/**
* 记录时间
*/
@IndexField("record_time")
@JsonFormat(pattern = DatePattern.NORM_DATETIME_PATTERN, timezone = "GMT+8")
private Date recordTime;

/**
* 设备编号
*/
@IndexField("device_no")
private Integer deviceNo;
}

实体类的注解类似于Mybatis-plus。

2.2 mapper层

1
2
public interface MyStatisticsMapper extends BaseEsMapper<MyStatistics> {  
}

Mapper层写法也与Mybatis-plus类似。

2.3 dao层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Service  
@RequiredArgsConstructor
public class MyStatisticsDao {

private final MyStatisticsMapper myStatisticsMapper;

public void save(MyStatistics myStatistics) {
myStatisticsMapper.insert(myStatistics);
}

public List<MyStatistics> list() {
return EsWrappers.lambdaChainQuery(myStatisticsMapper)
.list();
}

public Boolean recordExist(Long countTime, String deviceNo) {
return EsWrappers.lambdaChainQuery(myStatisticsMapper)
.eq(MyStatistics::getCountTime, countTime)
.eq(MyStatistics::getDeviceNo, deviceNo)
.exists();
}
}

dao层的基本的增删查改也与Mybatis-plus类似。

2.4 启动程序加入EsMapper 扫描

1
2
3
4
5
6
7
@EsMapperScan("xxx.**.esmapper.**")  
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}

三、聚合函数

Easy-Es虽然也做了一些简单聚合的封装,但是复杂的聚合,还是得自己写。好在Easy-Es还提供了原生查询。

1
2
3
4
5
// 半原生查询(也可以归于混合查询,通常在自定义聚合Aggregation时使用)
SearchResponse search(LambdaEsQueryWrapper<T> wrapper) throws IOException;

// 标准原生查询 可指定 RequestOptions
SearchResponse search(SearchRequest searchRequest, RequestOptions requestOptions) throws IOException;

我使用的是标准原生查询。下面代码是按时间统计(超过一天按天统计,不足一天按小时统计)通信时长。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public AccessUseTimeStatsVO getStats(String deviceNo, Long beginTime, Long endTime) {  
Boolean sortByHour = (endTime - beginTime) <= 86400000L;
SearchRequest request = new SearchRequest("my_stats");
// 根据设备编号和起止时间查询
QueryBuilder queryBuilder = QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("device_no", deviceNo))
.must(QueryBuilders.rangeQuery("count_time")
.gte(beginTime)
.lte(endTime));
// 通信时长
String durationMaxAggrName = "durationMax";
AggregationBuilder durationMaxAggr = AggregationBuilders.max(durationMaxAggrName)
.field("communication_duration");
String durationMinAggrName = "durationMin";
AggregationBuilder durationMinAggr = AggregationBuilders.min(durationMinAggrName)
.field("communication_duration");
// 根据时间聚合
String dateHistogramAggName = "groupDate";
DateHistogramAggregationBuilder dateHistogramAgg = AggregationBuilders.dateHistogram(dateHistogramAggName)
.field("record_time")
.timeZone(ZoneId.of("Asia/Shanghai"))
.subAggregation(durationMaxAggr)
.subAggregation(durationMinAggr);
// 按小时 or 按天
if (sortByHour) {
dateHistogramAgg.calendarInterval(DateHistogramInterval.HOUR)
.format("HH");
} else {
dateHistogramAgg.calendarInterval(DateHistogramInterval.DAY)
.format("MM-dd");
}
request.source().query(queryBuilder).aggregation(dateHistogramAgg);
SearchResponse response = search(request);
Aggregations aggregations = response.getAggregations();
Histogram groupDateAggr = aggregations.get(dateHistogramAggName);
// 时间段
List<String> dateTimeArray = new ArrayList<>();
// 入网时长
List<Integer> accessDurationArray = new ArrayList<>();
List<? extends Histogram.Bucket> buckets = groupDateAggr.getBuckets();
for (int i = 0; i < buckets.size(); i++) {
Histogram.Bucket bucket = buckets.get(i);
// 时间
dateTimeArray.add(bucket.getKeyAsString());
// 入网时长
Aggregation durationMax = bucket.getAggregations().get(durationMaxAggrName);
double maxValue = ((ParsedMax) durationMax).getValue();
Aggregation durationMin = bucket.getAggregations().get(durationMinAggrName);
double minValue = ((ParsedMin) durationMin).getValue();
int durationValue;
if (Double.isInfinite(maxValue) || Double.isInfinite(minValue)) {
durationValue = 0;
} else {
durationValue = new BigDecimal(maxValue - minValue)
.divide(new BigDecimal(60), 2, BigDecimal.ROUND_HALF_UP).intValue();
}
accessDurationArray.add(durationValue);
}
return AccessUseTimeStatsVO.builder()
.dateTimeArray(dateTimeArray)
.accessDurationArray(accessDurationArray)
.build();
}

private SearchResponse search(SearchRequest request) {
SearchResponse response = null;
try {
response = myStatisticsMapper.search(request, RequestOptions.DEFAULT);
log.debug("response:{}", response);
} catch (Exception ex) {
log.error("search error", ex);
}
return response;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Data  
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class AccessUseTimeStatsVO {

/**
* 小时 或 日
*/
private List<String> dateTimeArray;

/**
* 入网时长 min
*/
private List<Integer> accessDurationArray;
}

四、参考

Easy-Es官网:https://www.easy-es.cn/
ElasticSearch官网:https://www.elastic.co/docs