使用 Easy-Es 对数据做多重聚合
一、引入依赖及配置
<dependency>
<groupId>cn.easysheet</groupId>
<artifactId>easy-es-boot-starter</artifactId>
<version>2.0.0</version>
</dependency>
easy-es:
address: 127.0.0.1:9200
username: elastic
password: elastic
enable: true
global-config:
print-dsl: false
db-config:
map-underscore-to-camel-case: false
smartAddKeywordSuffix: false
async:
concurrent: 10
二、类构建(pojo、mapper、dao)
2.1 实体类
@IndexName(value = "my_stats", refreshPolicy = RefreshPolicy.IMMEDIATE)
public class MyStatistics {
@IndexId(type = IdType.TIME_ID)
@IndexField("id")
private String id;
/**
* 通信时长 单位是秒
*/
@IndexField("communication_duration")
private Integer communicationDuration;
/**
* 记录时间
*/
@IndexField("count_time")
private Long countTime;
/**
* 记录时间
*/
@IndexField("record_time")
@JsonFormat(pattern = DatePattern.NORM_DATETIME_PATTERN, timezone = "GMT+8")
private Date recordTime;
/**
* 设备编号
*/
@IndexField("device_no")
private Integer deviceNo;
}
实体类的注解类似于 Mybatis-plus。
2.2 mapper 层
public interface MyStatisticsMapper extends BaseEsMapper<MyStatistics> {
}
Mapper 层写法也与 Mybatis-plus 类似。
2.3 dao 层
@Service
@RequiredArgsConstructor
public class MyStatisticsDao {
private final MyStatisticsMapper myStatisticsMapper;
public void save(MyStatistics myStatistics) {
myStatisticsMapper.insert(myStatistics);
}
public List<MyStatistics> list() {
return EsWrappers.lambdaChainQuery(myStatisticsMapper)
.list();
}
public Boolean recordExist(Long countTime, String deviceNo) {
return EsWrappers.lambdaChainQuery(myStatisticsMapper)
.eq(MyStatistics::getCountTime, countTime)
.eq(MyStatistics::getDeviceNo, deviceNo)
.exists();
}
}
dao 层的基本的增删查改也与 Mybatis-plus 类似。
2.4 启动程序加入 EsMapper 扫描
@EsMapperScan("xxx.**.esmapper.**")
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
三、聚合函数
Easy-Es 虽然也做了一些简单聚合的封装,但是复杂的聚合,还是得自己写。好在 Easy-Es 还提供了原生查询。
// 半原生查询(也可以归于混合查询,通常在自定义聚合Aggregation时使用)
SearchResponse search(LambdaEsQueryWrapper<T> wrapper) throws IOException;
// 标准原生查询 可指定 RequestOptions
SearchResponse search(SearchRequest searchRequest, RequestOptions requestOptions) throws IOException;
我使用的是标准原生查询。下面代码是按时间统计(超过一天按天统计,不足一天按小时统计)通信时长。
public AccessUseTimeStatsVO getStats(String deviceNo, Long beginTime, Long endTime) {
Boolean sortByHour = (endTime - beginTime) <= 86400000L;
SearchRequest request = new SearchRequest("my_stats");
// 根据设备编号和起止时间查询
QueryBuilder queryBuilder = QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("device_no", deviceNo))
.must(QueryBuilders.rangeQuery("count_time")
.gte(beginTime)
.lte(endTime));
// 通信时长
String durationMaxAggrName = "durationMax";
AggregationBuilder durationMaxAggr = AggregationBuilders.max(durationMaxAggrName)
.field("communication_duration");
String durationMinAggrName = "durationMin";
AggregationBuilder durationMinAggr = AggregationBuilders.min(durationMinAggrName)
.field("communication_duration");
// 根据时间聚合
String dateHistogramAggName = "groupDate";
DateHistogramAggregationBuilder dateHistogramAgg = AggregationBuilders.dateHistogram(dateHistogramAggName)
.field("record_time")
.timeZone(ZoneId.of("Asia/Shanghai"))
.subAggregation(durationMaxAggr)
.subAggregation(durationMinAggr);
// 按小时 or 按天
if (sortByHour) {
dateHistogramAgg.calendarInterval(DateHistogramInterval.HOUR)
.format("HH");
} else {
dateHistogramAgg.calendarInterval(DateHistogramInterval.DAY)
.format("MM-dd");
}
request.source().query(queryBuilder).aggregation(dateHistogramAgg);
SearchResponse response = search(request);
Aggregations aggregations = response.getAggregations();
Histogram groupDateAggr = aggregations.get(dateHistogramAggName);
// 时间段
List<String> dateTimeArray = new ArrayList<>();
// 入网时长
List<Integer> accessDurationArray = new ArrayList<>();
List<? extends Histogram.Bucket> buckets = groupDateAggr.getBuckets();
for (int i = 0; i < buckets.size(); i++) {
Histogram.Bucket bucket = buckets.get(i);
// 时间
dateTimeArray.add(bucket.getKeyAsString());
// 入网时长
Aggregation durationMax = bucket.getAggregations().get(durationMaxAggrName);
double maxValue = ((ParsedMax) durationMax).getValue();
Aggregation durationMin = bucket.getAggregations().get(durationMinAggrName);
double minValue = ((ParsedMin) durationMin).getValue();
int durationValue;
if (Double.isInfinite(maxValue) || Double.isInfinite(minValue)) {
durationValue = 0;
} else {
durationValue = new BigDecimal(maxValue - minValue)
.divide(new BigDecimal(60), 2, BigDecimal.ROUND_HALF_UP).intValue();
}
accessDurationArray.add(durationValue);
}
return AccessUseTimeStatsVO.builder()
.dateTimeArray(dateTimeArray)
.accessDurationArray(accessDurationArray)
.build();
}
private SearchResponse search(SearchRequest request) {
SearchResponse response = null;
try {
response = myStatisticsMapper.search(request, RequestOptions.DEFAULT);
log.debug("response:{}", response);
} catch (Exception ex) {
log.error("search error", ex);
}
return response;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class AccessUseTimeStatsVO {
/**
* 小时 或 日
*/
private List<String> dateTimeArray;
/**
* 入网时长 min
*/
private List<Integer> accessDurationArray;
}
四、参考
- Easy-Es 官网:https://www.easy-es.cn/
- ElasticSearch 官网:https://www.elastic.co/docs