本文介绍: 聚合( aggregations)可以实现文档数据统计分析运算。在分类的同时,进行了分数的计算,并且按照平均分做降序”size“: 0,”aggs“: {“terms“: {“order“: {},”aggs“: {“stats“: {中分词器analyzerl:在tokenizer之前对文本进行处理例如删除字符替换字符ltokenizer:将文本按照一定的规则切割成词条(term例如keyword,就是不分词;还有ik_smartl:将tokenizer

目录

一、定义

二、分类

1、桶(Bucket)聚合:

2、度量(Metric)聚合:

3、管道聚合(Pipeline Aggregation):


聚合( aggregations)可以实现文档数据统计分析运算

TermAggregation:按照文档字段分组

Date Histogram:按照日期阶梯分组例如一周为一组,或者一月为一组

  1. 计数(COUNT):计算指定列中非空值的数量。
  2. 求和(SUM):计算指定列中所有数值总和
  3. 平均值(AVG):计算指定列中所有数值的平均值
  4. 最大值(MAX):查找指定列中的最大值
  5. 最小值(MIN):查找指定列中的最小值。
  1. 管道聚合是MongoDB中一种强大的数据聚合工具,它可用于通过多个聚合操作连接在一起来对文档进行处理
  2. 通过管道聚合,MongoDB用户可以使用多个聚合操作顺序执行,以生成更为复杂、细致和灵活的数据查询汇总结果
  3. 管道聚合可以处理来自单个或多个集合数据

一般而言,MongoDB 的聚合管道通过 $ 开头的操作符实现数据聚合操作。以下是一些常见的聚合管道操作:

1. $match用于选择满足条件文档可以通过使用查询条件过滤文档
2. $group用于文档分组通过指定一个多个字段进行分组,对每个分组执行聚合操作,最终返回每个组的统计结果
3. $project用于选择文档的特定字段,并输出指定的字段。
4. $sort用于对文档进行排序,可以根据指定字段进行升序或降序排列
5. $limit用于限制输出文档的数量。
6. $skip用于跳过指定数量的文档,并返回剩余的文档。
7. $unwind:用于展开数组属性,将数组属性每个元素转换一个单独的文档。

  1. keyword
  2. 数值
  3. 日期
  4. 布尔
GET /hotel/_search
{
  "size": 0,    // 设置size为0,结果中不包含文档,只包含聚合结果
  "aggs": {    // 定义聚合
    "brandagg": {    //给聚合起个名字
      "terms": {    // 聚合的类型,按照品牌值聚合,所以选择term
        "field": "brand",    // 参与聚合的字段
        "size": 20    // 希望获取的聚合结果数量
      }
    }
  }
}

运行后,数据被按照品牌(brand划分

分类的同时,进行了分数的计算,并且按照平均分做降序

GET /hotel/_search
{
  "size": 0,
  "aggs": {
    "brandagg": {
      "terms": {
        "field": "brand", 
        "size": 20,
        "order": {
          "scoreAgg.avg": "desc"
        }
      },
      "aggs": {
        "scoreAgg": {
          "stats": {
            "field": "score"
          }
        }
      }
    }
  }
}

   @Test
    void testAggregation() throws IOException {
        // 准备请求对象
        SearchRequest request = new SearchRequest();

        // 初始化 SearchSourceBuilder
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

        // 设置 size
        sourceBuilder.size(0);

        // 聚合
        sourceBuilder.aggregation(AggregationBuilders
                .terms("brandAgg")
                .field("brand")
                .size(10)
        );

        // 将 SearchSourceBuilder 设置到 SearchRequest 中
        request.source(sourceBuilder);

        // 发出请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);

        // 解析结果
        Aggregations aggregations = response.getAggregations();

        // 根据聚合名称获取聚合结果
        Terms brandTerms = aggregations.get("brandAgg");

        // 获取桶
        List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();

        // 遍历
        for (Terms.Bucket bucket : buckets) {
            String key = bucket.getKeyAsString();
            System.out.println(key);
        }
    }

成功提取出品牌名

    Map<String , List<String>> filters() throws IOException;
    @Override
    public Map<String, List<String>> filters() throws IOException {
        // 准备请求对象
        SearchRequest request = new SearchRequest();
        // 初始化 SearchSourceBuilder
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        // 设置 size
        sourceBuilder.size(0);
        // 聚合
        buildAggs(sourceBuilder);
        // 将 SearchSourceBuilder 设置到 SearchRequest 中
        request.source(sourceBuilder);
        // 发出请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        // 解析结果
        HashMap<String, List<String>> result = new HashMap<>();
        Aggregations aggregations = response.getAggregations();
        // 根据名称,获得结果
        List<String> brandList = getAggByName(aggregations,"brandAgg");
        result.put("品牌",brandList);
        List<String> cityList = getAggByName(aggregations,"cityAgg");
        result.put("城市",cityList);
        List<String> starList = getAggByName(aggregations,"starAgg");
        result.put("星级",starList);
        return result;
    }

    private List<String> getAggByName(Aggregations aggregations,String aggName) {
        // 根据聚合名称获取聚合结果
        Terms brandTerms = aggregations.get(aggName);
        // 获取桶
        List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
        // 遍历
        List<String > brandList = new ArrayList<>();
        for (Terms.Bucket bucket : buckets) {
            String key = bucket.getKeyAsString();
            brandList.add(key);
        }
        return brandList;
    }

    private void buildAggs(SearchSourceBuilder sourceBuilder) {
        sourceBuilder.aggregation(AggregationBuilders
                .terms("brandAgg")
                .field("brand")
                .size(100)
        );
        sourceBuilder.aggregation(AggregationBuilders
                .terms("cityAgg")
                .field("city")
                .size(100)
        );
        sourceBuilder.aggregation(AggregationBuilders
                .terms("starAgg")
                .field("starName")
                .size(100)
        );
    }

    @PostMapping("filters")
    public Map<String , List<String >> getFilters(@RequestBody RequestParams params){
        return service.filters(params);
    }
    Map<String , List<String>> filters(RequestParams params);

    @Override
    public Map<String, List<String>> filters(RequestParams params) {
        try {
            // 1.准备Request
            SearchRequest request = new SearchRequest("hotel");
            // 2.准备DSL
            // 2.1.query
            buildBasicQuery(params, request);
            // 2.2.设置size
            request.source().size(0);
            // 2.3.聚合
            buildAggs(request);
            // 3.发出请求
            SearchResponse response = client.search(request, RequestOptions.DEFAULT);
            // 4.解析结果
            Map<String, List<String>> result = new HashMap<>();
            Aggregations aggregations = response.getAggregations();
            // 4.1.根据品牌名称获取品牌结果
            List<String> brandList = getAggByName(aggregations, "brandAgg");
            result.put("brand", brandList);
            // 4.2.根据品牌名称获取品牌结果
            List<String> cityList = getAggByName(aggregations, "cityAgg");
            result.put("city", cityList);
            // 4.3.根据品牌名称获取品牌结果
            List<String> starList = getAggByName(aggregations, "starAgg");
            result.put("starName", starList);

            return result;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

/var/lib/docker/volumes/es-plugins/_data/

docker restart es

POST /_analyze
{
  "text": ["西巴仙人爱桃树"],
  "analyzer": "pinyin"
}

每个字都拆成了拼音

elasticsearch分词器analyzer)的组成包含部分

lcharacter filters:在tokenizer之前对文本进行处理例如删除字符、替换字符

ltokenizer:将文本按照一定的规则切割成词条(term)。例如keyword,就是不分词;还有ik_smart

ltokenizer filter:将tokenizer输出的词条做进一步处理例如大小写转换、同义词处理、拼音处理

// 自定义拼音分词器
PUT /test
{
  "settings": {
    "analysis": {
      "analyzer": { 
        "my_analyzer": { 
          "tokenizer": "ik_max_word",
          "filter": "py"
        }
      },
      "filter": {
        "py": { 
          "type": "pinyin",
          "keep_full_pinyin": false,
          "keep_joined_full_pinyin": true,
          "keep_original": true,
          "limit_first_letter_length": 16,
          "remove_duplicated_term": true,
          "none_chinese_pinyin_tokenize": false
        }
      }
    }
  }
}

为了提高补全查询的效率,对于文档中字段的类型有一些约束

语法

// 自动补全查询
GET /test/_search
{
  "suggest": {
    "title_suggest": {
      "text": "s", // 关键字
      "completion": {
        "field": "title", // 补全查询的字段
        "skip_duplicates": true, // 跳过重复的
        "size": 10 // 获取前10条结果
      }
    }
  }
}
DELETE /hotel
PUT /hotel
{
  "settings": {
    "analysis": {
      "analyzer": {
        "text_anlyzer": {
          "tokenizer": "ik_max_word",
          "filter": "py"
        },
        "completion_analyzer": {
          "tokenizer": "keyword",
          "filter": "py"
        }
      },
      "filter": {
        "py": {
          "type": "pinyin",
          "keep_full_pinyin": false,
          "keep_joined_full_pinyin": true,
          "keep_original": true,
          "limit_first_letter_length": 16,
          "remove_duplicated_term": true,
          "none_chinese_pinyin_tokenize": false
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "id":{
        "type": "keyword"
      },
      "name":{
        "type": "text",
        "analyzer": "text_anlyzer",
        "search_analyzer": "ik_smart",
        "copy_to": "all"
      },
      "address":{
        "type": "keyword",
        "index": false
      },
      "price":{
        "type": "integer"
      },
      "score":{
        "type": "integer"
      },
      "brand":{
        "type": "keyword",
        "copy_to": "all"
      },
      "city":{
        "type": "keyword"
      },
      "starName":{
        "type": "keyword"
      },
      "business":{
        "type": "keyword",
        "copy_to": "all"
      },
      "location":{
        "type": "geo_point"
      },
      "pic":{
        "type": "keyword",
        "index": false
      },
      "all":{
        "type": "text",
        "analyzer": "text_anlyzer",
        "search_analyzer": "ik_smart"
      },
      "suggestion":{
          "type": "completion",
          "analyzer": "completion_analyzer"
      }
    }
  }
}
private List<String> suggestion;
this.suggestion = Arrays.asList(this.brand,this.business);

    @Test
    void testBulkRequest() throws IOException{
//        批量查询酒店数据
        List<Hotel> hotelList = iHotelService.list();
//        创建Request
        BulkRequest request = new BulkRequest();
//        准备参数
        for (Hotel hotel : hotelList){
//        转换为文档类型HotelDoc
            HotelDoc hotelDoc = new HotelDoc(hotel);
//        创建新增文档的Request对象
            request.add(new IndexRequest("hotel")
                    .id(hotelDoc.getId().toString())
                    .source(JSON.toJSONString(hotelDoc),XContentType.JSON));
        }
//        发送请求
        client.bulk(request,RequestOptions.DEFAULT);
    }

//            有多个值,做切割
            String[] arr = this.business.split("/");
            this.suggestion = new ArrayList<>();
            this.suggestion.add(this.brand);
            Collections.addAll(this.suggestion,arr);
        }else {
            this.suggestion = Arrays.asList(this.brand,this.business);
        }

@Test
    void testSuggest() throws IOException {
        SearchRequest request = new SearchRequest("hotel");
        request.source().suggest(new SuggestBuilder().addSuggestion(
                "suggestions",
                SuggestBuilders.completionSuggestion("suggestion")
                        .prefix("h")
                        .skipDuplicates(true)
                        .size(10)
        ));
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        Suggest suggest = response.getSuggest();
        CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");
        List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
        for (CompletionSuggestion.Entry.Option option : options) {
            String string = option.getText().toString();
            System.out.println(string);
        }
    }

    @PostMapping("suggestion")
    public List<String> getSuggestion(@RequestParam("key") String prefix){
        return service.getSuggestions(prefix);
    }
    List<String> getSuggestions(String prefix);
    @Override
    public List<String> getSuggestions(String prefix) {
        try {
//            准备Request
            SearchRequest request = new SearchRequest("hotel");
            request.source().suggest(new SuggestBuilder().addSuggestion(
                    "suggestions",
                    SuggestBuilders.completionSuggestion("suggestion")
                            .prefix(prefix)
                            .skipDuplicates(true)
                            .size(10)
            ));
//            发送请求
            SearchResponse response = client.search(request, RequestOptions.DEFAULT);
//            得到响应
            Suggest suggest = response.getSuggest();
            CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");
            List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
            ArrayList<String> list = new ArrayList<>(options.size());
//            遍历
            for (CompletionSuggestion.Entry.Option option : options) {
                String string = option.getText().toString();
                list.add(string);
            }
            return list;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

  1. 数据同步是一个周期性的过程。数据同步通常需要从源系统读取数据然后将数据传递到目标系统。这个过程可能需要经过多个步骤,如数据转换、数据清洗、数据映射等。一般情况下,数据同步是一个周期性的过程,定期将目标系统中的数据更新最新状态

  2. 数据同步目标是确保数据的一致性。在数据同步的过程中,目标是确保源系统中的数据与目标系统中的数据保持一致。这样可以保证不同应用之间使用相同的数据。

  3. 数据同步需要考虑数据的安全性完整性。在数据同步的过程中,数据的安全性完整性必须得到保障。例如,在数据传输过程中,需要使用加密技术保护敏感数据的机密性。

  4. 数据同步通常需要使用专业工具平台。数据同步的过程需要使用专业工具或平台来完成。这些工具或平台通常提供了丰富的功能技术,如数据清洗、数据转换、数据映射等,以确保数据同步的质量和效率。

  5. 数据同步可以提高企业效率和降低成本。通过数据同步,企业可以在不同部门团队之间共享数据,从而更好理解业务趋势和市场需求,进一步提高效率和降低成本。

实现简单

粗暴业务耦合度高

优点:

低耦合,实现难度一般依赖

缺点:

mq可靠性

优点:

完全解除服务间耦合

缺点:

开启binlog增加数据库负担、实现复杂度

数据同步案例实现方案

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注