Using Easy-Es for Multiple Data Aggregations
1. Adding Dependencies and Configuration
<dependency>
<groupId>cn.easysheet</groupId>
<artifactId>easy-es-boot-starter</artifactId>
<version>2.0.0</version>
</dependency>
easy-es:
address: 127.0.0.1:9200
username: elastic
password: elastic
enable: true
global-config:
print-dsl: false
db-config:
map-underscore-to-camel-case: false
smartAddKeywordSuffix: false
async:
concurrent: 10
2. Class Construction (POJO, Mapper, DAO)
2.1 Entity Class
@IndexName(value = "my_stats", refreshPolicy = RefreshPolicy.IMMEDIATE)
public class MyStatistics {
@IndexId(type = IdType.TIME_ID)
@IndexField("id")
private String id;
/**
* Communication duration in seconds
*/
@IndexField("communication_duration")
private Integer communicationDuration;
/**
* Count time
*/
@IndexField("count_time")
private Long countTime;
/**
* Record time
*/
@IndexField("record_time")
@JsonFormat(pattern = DatePattern.NORM_DATETIME_PATTERN, timezone = "GMT+8")
private Date recordTime;
/**
* Device number
*/
@IndexField("device_no")
private Integer deviceNo;
}
Entity class annotations are similar to Mybatis-plus.
2.2 Mapper Layer
public interface MyStatisticsMapper extends BaseEsMapper<MyStatistics> {
}
The Mapper layer implementation is also similar to Mybatis-plus.
2.3 DAO Layer
@Service
@RequiredArgsConstructor
public class MyStatisticsDao {
private final MyStatisticsMapper myStatisticsMapper;
public void save(MyStatistics myStatistics) {
myStatisticsMapper.insert(myStatistics);
}
public List<MyStatistics> list() {
return EsWrappers.lambdaChainQuery(myStatisticsMapper)
.list();
}
public Boolean recordExist(Long countTime, String deviceNo) {
return EsWrappers.lambdaChainQuery(myStatisticsMapper)
.eq(MyStatistics::getCountTime, countTime)
.eq(MyStatistics::getDeviceNo, deviceNo)
.exists();
}
}
The basic CRUD operations in the DAO layer are also similar to Mybatis-plus.
2.4 Add EsMapper Scan to Application Startup
@EsMapperScan("xxx.**.esmapper.**")
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
3. Aggregation Functions
Although Easy-Es provides some encapsulation for simple aggregations, complex aggregations still require custom implementation. Fortunately, Easy-Es also provides native queries.
// Semi-native query (can also be classified as hybrid query, typically used when customizing Aggregation)
SearchResponse search(LambdaEsQueryWrapper<T> wrapper) throws IOException;
// Standard native query with optional RequestOptions
SearchResponse search(SearchRequest searchRequest, RequestOptions requestOptions) throws IOException;
I used the standard native query approach. The following code aggregates communication duration by time (daily if over one day, hourly if less than one day).
public AccessUseTimeStatsVO getStats(String deviceNo, Long beginTime, Long endTime) {
Boolean sortByHour = (endTime - beginTime) <= 86400000L;
SearchRequest request = new SearchRequest("my_stats");
// Query by device number and time range
QueryBuilder queryBuilder = QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("device_no", deviceNo))
.must(QueryBuilders.rangeQuery("count_time")
.gte(beginTime)
.lte(endTime));
// Communication duration
String durationMaxAggrName = "durationMax";
AggregationBuilder durationMaxAggr = AggregationBuilders.max(durationMaxAggrName)
.field("communication_duration");
String durationMinAggrName = "durationMin";
AggregationBuilder durationMinAggr = AggregationBuilders.min(durationMinAggrName)
.field("communication_duration");
// Aggregate by time
String dateHistogramAggName = "groupDate";
DateHistogramAggregationBuilder dateHistogramAgg = AggregationBuilders.dateHistogram(dateHistogramAggName)
.field("record_time")
.timeZone(ZoneId.of("Asia/Shanghai"))
.subAggregation(durationMaxAggr)
.subAggregation(durationMinAggr);
// By hour or by day
if (sortByHour) {
dateHistogramAgg.calendarInterval(DateHistogramInterval.HOUR)
.format("HH");
} else {
dateHistogramAgg.calendarInterval(DateHistogramInterval.DAY)
.format("MM-dd");
}
request.source().query(queryBuilder).aggregation(dateHistogramAgg);
SearchResponse response = search(request);
Aggregations aggregations = response.getAggregations();
Histogram groupDateAggr = aggregations.get(dateHistogramAggName);
// Time period
List<String> dateTimeArray = new ArrayList<>();
// Access duration
List<Integer> accessDurationArray = new ArrayList<>();
List<? extends Histogram.Bucket> buckets = groupDateAggr.getBuckets();
for (int i = 0; i < buckets.size(); i++) {
Histogram.Bucket bucket = buckets.get(i);
// Time
dateTimeArray.add(bucket.getKeyAsString());
// Access duration
Aggregation durationMax = bucket.getAggregations().get(durationMaxAggrName);
double maxValue = ((ParsedMax) durationMax).getValue();
Aggregation durationMin = bucket.getAggregations().get(durationMinAggrName);
double minValue = ((ParsedMin) durationMin).getValue();
int durationValue;
if (Double.isInfinite(maxValue) || Double.isInfinite(minValue)) {
durationValue = 0;
} else {
durationValue = new BigDecimal(maxValue - minValue)
.divide(new BigDecimal(60), 2, BigDecimal.ROUND_HALF_UP).intValue();
}
accessDurationArray.add(durationValue);
}
return AccessUseTimeStatsVO.builder()
.dateTimeArray(dateTimeArray)
.accessDurationArray(accessDurationArray)
.build();
}
private SearchResponse search(SearchRequest request) {
SearchResponse response = null;
try {
response = myStatisticsMapper.search(request, RequestOptions.DEFAULT);
log.debug("response:{}", response);
} catch (Exception ex) {
log.error("search error", ex);
}
return response;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class AccessUseTimeStatsVO {
/**
* Hour or Day
*/
private List<String> dateTimeArray;
/**
* Access duration in minutes
*/
private List<Integer> accessDurationArray;
}
4. References
- Easy-Es Official Website: https://www.easy-es.cn/
- Elasticsearch Official Website: https://www.elastic.co/docs