autorenew

Using Easy-Es for Multiple Data Aggregations

1. Adding Dependencies and Configuration

<dependency>
    <groupId>cn.easysheet</groupId>
    <artifactId>easy-es-boot-starter</artifactId>
    <version>2.0.0</version>
</dependency>
easy-es:
  address: 127.0.0.1:9200
  username: elastic
  password: elastic
  enable: true
  global-config:
    print-dsl: false
    db-config:
      map-underscore-to-camel-case: false
      smartAddKeywordSuffix: false
  async:
    concurrent: 10

2. Class Construction (POJO, Mapper, DAO)

2.1 Entity Class

@IndexName(value = "my_stats", refreshPolicy = RefreshPolicy.IMMEDIATE)
public class MyStatistics {
    @IndexId(type = IdType.TIME_ID)
    @IndexField("id")
    private String id;

    /**
     * Communication duration in seconds
     */
    @IndexField("communication_duration")
    private Integer communicationDuration;

    /**
     * Count time
     */
    @IndexField("count_time")
    private Long countTime;

    /**
     * Record time
     */
    @IndexField("record_time")
    @JsonFormat(pattern = DatePattern.NORM_DATETIME_PATTERN, timezone = "GMT+8")
    private Date recordTime;

    /**
     * Device number
     */
    @IndexField("device_no")
    private Integer deviceNo;
}

Entity class annotations are similar to Mybatis-plus.

2.2 Mapper Layer

public interface MyStatisticsMapper extends BaseEsMapper<MyStatistics> {
}

The Mapper layer implementation is also similar to Mybatis-plus.

2.3 DAO Layer

@Service
@RequiredArgsConstructor
public class MyStatisticsDao {

    private final MyStatisticsMapper myStatisticsMapper;

    public void save(MyStatistics myStatistics) {
        myStatisticsMapper.insert(myStatistics);
    }

    public List<MyStatistics> list() {
        return EsWrappers.lambdaChainQuery(myStatisticsMapper)
                .list();
    }

    public Boolean recordExist(Long countTime, String deviceNo) {
        return EsWrappers.lambdaChainQuery(myStatisticsMapper)
                .eq(MyStatistics::getCountTime, countTime)
                .eq(MyStatistics::getDeviceNo, deviceNo)
                .exists();
    }
}

The basic CRUD operations in the DAO layer are also similar to Mybatis-plus.

2.4 Add EsMapper Scan to Application Startup

@EsMapperScan("xxx.**.esmapper.**")
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

3. Aggregation Functions

Although Easy-Es provides some encapsulation for simple aggregations, complex aggregations still require custom implementation. Fortunately, Easy-Es also provides native queries.

// Semi-native query (can also be classified as hybrid query, typically used when customizing Aggregation)
SearchResponse search(LambdaEsQueryWrapper<T> wrapper) throws IOException;

// Standard native query with optional RequestOptions
SearchResponse search(SearchRequest searchRequest, RequestOptions requestOptions) throws IOException;

I used the standard native query approach. The following code aggregates communication duration by time (daily if over one day, hourly if less than one day).

public AccessUseTimeStatsVO getStats(String deviceNo, Long beginTime, Long endTime) {
    Boolean sortByHour = (endTime - beginTime) <= 86400000L;
    SearchRequest request = new SearchRequest("my_stats");
    // Query by device number and time range
    QueryBuilder queryBuilder = QueryBuilders.boolQuery()
            .must(QueryBuilders.matchQuery("device_no", deviceNo))
            .must(QueryBuilders.rangeQuery("count_time")
                    .gte(beginTime)
                    .lte(endTime));
    // Communication duration
    String durationMaxAggrName = "durationMax";
    AggregationBuilder durationMaxAggr = AggregationBuilders.max(durationMaxAggrName)
            .field("communication_duration");
    String durationMinAggrName = "durationMin";
    AggregationBuilder durationMinAggr = AggregationBuilders.min(durationMinAggrName)
            .field("communication_duration");
    // Aggregate by time
    String dateHistogramAggName = "groupDate";
    DateHistogramAggregationBuilder dateHistogramAgg = AggregationBuilders.dateHistogram(dateHistogramAggName)
            .field("record_time")
            .timeZone(ZoneId.of("Asia/Shanghai"))
            .subAggregation(durationMaxAggr)
            .subAggregation(durationMinAggr);
    // By hour or by day
    if (sortByHour) {
        dateHistogramAgg.calendarInterval(DateHistogramInterval.HOUR)
                .format("HH");
    } else {
        dateHistogramAgg.calendarInterval(DateHistogramInterval.DAY)
                .format("MM-dd");
    }
    request.source().query(queryBuilder).aggregation(dateHistogramAgg);
    SearchResponse response = search(request);
    Aggregations aggregations = response.getAggregations();
    Histogram groupDateAggr = aggregations.get(dateHistogramAggName);
    // Time period
    List<String> dateTimeArray = new ArrayList<>();
    // Access duration
    List<Integer> accessDurationArray = new ArrayList<>();
    List<? extends Histogram.Bucket> buckets = groupDateAggr.getBuckets();
    for (int i = 0; i < buckets.size(); i++) {
        Histogram.Bucket bucket = buckets.get(i);
        // Time
        dateTimeArray.add(bucket.getKeyAsString());
        // Access duration
        Aggregation durationMax = bucket.getAggregations().get(durationMaxAggrName);
        double maxValue = ((ParsedMax) durationMax).getValue();
        Aggregation durationMin = bucket.getAggregations().get(durationMinAggrName);
        double minValue = ((ParsedMin) durationMin).getValue();
        int durationValue;
        if (Double.isInfinite(maxValue) || Double.isInfinite(minValue)) {
            durationValue = 0;
        } else {
            durationValue = new BigDecimal(maxValue - minValue)
                    .divide(new BigDecimal(60), 2, BigDecimal.ROUND_HALF_UP).intValue();
        }
        accessDurationArray.add(durationValue);
    }
    return AccessUseTimeStatsVO.builder()
            .dateTimeArray(dateTimeArray)
            .accessDurationArray(accessDurationArray)
            .build();
}

private SearchResponse search(SearchRequest request) {
    SearchResponse response = null;
    try {
        response = myStatisticsMapper.search(request, RequestOptions.DEFAULT);
        log.debug("response:{}", response);
    } catch (Exception ex) {
        log.error("search error", ex);
    }
    return response;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class AccessUseTimeStatsVO {

    /**
     * Hour or Day
     */
    private List<String> dateTimeArray;

    /**
     * Access duration in minutes
     */
    private List<Integer> accessDurationArray;
}

4. References