autorenew

使用 Easy-Es 对数据做多重聚合

一、引入依赖及配置

<dependency>  
    <groupId>cn.easysheet</groupId>  
    <artifactId>easy-es-boot-starter</artifactId>  
    <version>2.0.0</version>
</dependency>
easy-es:  
  address: 127.0.0.1:9200  
  username: elastic  
  password: elastic  
  enable: true  
  global-config:  
    print-dsl: false  
    db-config:  
      map-underscore-to-camel-case: false  
      smartAddKeywordSuffix: false  
  async:  
    concurrent: 10

二、类构建(pojo、mapper、dao)

2.1 实体类

@IndexName(value = "my_stats", refreshPolicy = RefreshPolicy.IMMEDIATE)  
public class MyStatistics {  
    @IndexId(type = IdType.TIME_ID)  
    @IndexField("id")  
    private String id;  
  
    /**  
     * 通信时长 单位是秒 
     */  
    @IndexField("communication_duration")  
    private Integer communicationDuration;  

    /**  
     * 记录时间  
     */  
    @IndexField("count_time")  
    private Long countTime;  
  
    /**  
     * 记录时间  
     */  
    @IndexField("record_time")  
    @JsonFormat(pattern = DatePattern.NORM_DATETIME_PATTERN, timezone = "GMT+8") 
    private Date recordTime;  
    
    /**  
     * 设备编号  
     */  
    @IndexField("device_no")  
    private Integer deviceNo;  
}

实体类的注解类似于 Mybatis-plus。

2.2 mapper 层

public interface MyStatisticsMapper extends BaseEsMapper<MyStatistics> {  
}

Mapper 层写法也与 Mybatis-plus 类似。

2.3 dao 层

@Service  
@RequiredArgsConstructor  
public class MyStatisticsDao {  
  
    private final MyStatisticsMapper myStatisticsMapper;  
  
    public void save(MyStatistics myStatistics) {  
        myStatisticsMapper.insert(myStatistics);  
    }  
  
    public List<MyStatistics> list() {  
        return EsWrappers.lambdaChainQuery(myStatisticsMapper)  
                .list();  
    }  
  
    public Boolean recordExist(Long countTime, String deviceNo) {  
        return EsWrappers.lambdaChainQuery(myStatisticsMapper)  
                .eq(MyStatistics::getCountTime, countTime)  
                .eq(MyStatistics::getDeviceNo, deviceNo)  
                .exists();  
    }  
}

dao 层的基本的增删查改也与 Mybatis-plus 类似。

2.4 启动程序加入 EsMapper 扫描

@EsMapperScan("xxx.**.esmapper.**")  
@SpringBootApplication   
public class Application {  
    public static void main(String[] args) {  
        SpringApplication.run(Application.class, args);  
    }  
}

三、聚合函数

Easy-Es 虽然也做了一些简单聚合的封装,但是复杂的聚合,还是得自己写。好在 Easy-Es 还提供了原生查询。

// 半原生查询(也可以归于混合查询,通常在自定义聚合Aggregation时使用)
SearchResponse search(LambdaEsQueryWrapper<T> wrapper) throws IOException;

// 标准原生查询 可指定 RequestOptions
SearchResponse search(SearchRequest searchRequest, RequestOptions requestOptions) throws IOException;

我使用的是标准原生查询。下面代码是按时间统计(超过一天按天统计,不足一天按小时统计)通信时长。

public AccessUseTimeStatsVO getStats(String deviceNo, Long beginTime, Long endTime) {  
    Boolean sortByHour = (endTime - beginTime) <= 86400000L;  
    SearchRequest request = new SearchRequest("my_stats");  
    // 根据设备编号和起止时间查询  
    QueryBuilder queryBuilder = QueryBuilders.boolQuery()  
            .must(QueryBuilders.matchQuery("device_no", deviceNo))  
            .must(QueryBuilders.rangeQuery("count_time")  
                    .gte(beginTime)  
                    .lte(endTime));  
    // 通信时长  
    String durationMaxAggrName = "durationMax";  
    AggregationBuilder durationMaxAggr = AggregationBuilders.max(durationMaxAggrName)  
            .field("communication_duration");  
    String durationMinAggrName = "durationMin";  
    AggregationBuilder durationMinAggr = AggregationBuilders.min(durationMinAggrName)  
            .field("communication_duration");  
    // 根据时间聚合  
    String dateHistogramAggName = "groupDate";  
    DateHistogramAggregationBuilder dateHistogramAgg = AggregationBuilders.dateHistogram(dateHistogramAggName)  
            .field("record_time")  
            .timeZone(ZoneId.of("Asia/Shanghai"))  
            .subAggregation(durationMaxAggr)  
            .subAggregation(durationMinAggr);  
    // 按小时 or 按天  
    if (sortByHour) {  
        dateHistogramAgg.calendarInterval(DateHistogramInterval.HOUR)  
                .format("HH");  
    } else {  
        dateHistogramAgg.calendarInterval(DateHistogramInterval.DAY)  
                .format("MM-dd");  
    }  
    request.source().query(queryBuilder).aggregation(dateHistogramAgg);  
    SearchResponse response = search(request);  
    Aggregations aggregations = response.getAggregations();  
    Histogram groupDateAggr = aggregations.get(dateHistogramAggName);  
    // 时间段  
    List<String> dateTimeArray = new ArrayList<>();  
    // 入网时长  
    List<Integer> accessDurationArray = new ArrayList<>();  
    List<? extends Histogram.Bucket> buckets = groupDateAggr.getBuckets();  
    for (int i = 0; i < buckets.size(); i++) {  
        Histogram.Bucket bucket = buckets.get(i);  
        // 时间  
        dateTimeArray.add(bucket.getKeyAsString());  
        // 入网时长  
        Aggregation durationMax = bucket.getAggregations().get(durationMaxAggrName);  
        double maxValue = ((ParsedMax) durationMax).getValue();  
        Aggregation durationMin = bucket.getAggregations().get(durationMinAggrName);  
        double minValue = ((ParsedMin) durationMin).getValue();  
        int durationValue;  
        if (Double.isInfinite(maxValue) || Double.isInfinite(minValue)) {  
            durationValue = 0;  
        } else {  
            durationValue = new BigDecimal(maxValue - minValue)  
                    .divide(new BigDecimal(60), 2, BigDecimal.ROUND_HALF_UP).intValue();  
        }  
        accessDurationArray.add(durationValue);  
    }  
    return AccessUseTimeStatsVO.builder()  
            .dateTimeArray(dateTimeArray)  
            .accessDurationArray(accessDurationArray)  
            .build();  
}  
  
private SearchResponse search(SearchRequest request) {  
    SearchResponse response = null;  
    try {  
        response = myStatisticsMapper.search(request, RequestOptions.DEFAULT);  
        log.debug("response:{}", response);  
    } catch (Exception ex) {  
        log.error("search error", ex);  
    }  
    return response;  
}
@Data  
@AllArgsConstructor  
@NoArgsConstructor  
@Builder  
public class AccessUseTimeStatsVO {  
  
    /**  
     * 小时 或 日  
     */  
    private List<String> dateTimeArray;  
  
    /**  
     * 入网时长 min  
     */    
     private List<Integer> accessDurationArray;  
}

四、参考