本文介绍: 在OLAP数据库中,可变数据(Mutable data)通常是不被欢迎的,Clickhouse也是如此,早期版本支持UPDATE和DELTE操作。在Clickhouse 1.1.54388版本之后才支持UPDATE和DELETE操作,适用于MergeTree引擎,并且这种操作方式异步的(asynchronous),但是在一些交互场景下很难使用。Clickhouse提供了deleteupdate操作,这类操作被称之为Mutation查询,是ALTER语句的变种。

历史:

在OLAP数据库中,可变数据(Mutable data)通常是不被欢迎的,Clickhouse也是如此,早期版本支持UPDATE和DELTE操作。在Clickhouse 1.1.54388版本之后才支持UPDATE和DELETE操作,适用于MergeTree引擎,并且这种操作方式是异步的(asynchronous),但是在一些交互场景下很难使用。在一些场景用户需要修改数据即刻可以看到。这种实时更新real-tiime update)在2020年4月可以clickhouse实现了。

clickhouse 开源的2016年ClickHouse当时不支持数据修改。为了模拟更新,只能使用特殊插入结构,并且数据必须由分区删除

在GDPR要求的压力下,ClickHouse团队在2018年发布了UPDATE和DELETE。这些异步的,非原子更新被实现为ALTER TABLE UPDATE语句,并且有可能shuffle 很多数据。当不需要立即知道结果时,这对于批量操作和不频繁更新很有用。

常规意义(Normal SQL)上的SQL语句update支持依旧在clickhouse缺失,尽管它们每年确实出现在路线图中。如果需要实时更新行为我们必须使用其他方法。让我们考虑一个实际的用例,并比较ClickHouse中使用它的不同方法

概述

Clickhouse提供了delete和update操作,这类操作被称之为Mutation查询,是ALTER语句的变种。虽然Mutation能最终实现修改和删除,但是不能完全以通常意义上的updatedelete操作来理解

1.Mutation操作适用于批量数据的修改和删除

2.不支持事务 一旦语句被提交执行就会立刻对现有的数据产生影响,无法回滚

3.Mutation操作执行一个异步的过程,语句提交会立即返回,但是不代表具体逻辑已经执行完毕,具体的执行记录需要system.mutations系统查询

1.数据UPDATE和DELETE操作示例

1.创建表:
CREATE TABLE city
(
    `id` UInt8, 
    `country` String, 
	area String,
    `province` String, 
    `city` String, 
    `create_time` datetime DEFAULT now()
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(create_time)
ORDER BY id;
 
2.插入数据:
insert into city(id,country,area,province,city) VALUES
(1,'China','North','Hubei','wuhan'),
(2,'China','South','Guangdong','guangzhou'),
(3,'China','South','Guangdong','shenzhen'),
(4,'China','North','Beijing','Beijing'),
(5,'China','South','Shanghai','Shanghai');
 
数据查询:
┌─id─┬─country─┬─area──┬─province──┬─city──────┬─────────create_time─┐
│  1 │ China   │ North │ Hubei     │ wuhan     │ 2020-06-26 16:11:21 │
│  2 │ China   │ South │ Guangdong │ guangzhou │ 2020-06-26 16:11:21 │
│  3 │ China   │ South │ Guangdong │ shenzhen  │ 2020-06-26 16:11:21 │
│  4 │ China   │ North │ Beijing   │ Beijing   │ 2020-06-26 16:11:21 │
│  5 │ China   │ South │ Shanghai  │ Shanghai2020-06-26 16:11:21 │
└────┴─────────┴───────┴───────────┴───────────┴─────────────────────┘
 
5 rows in set. Elapsed: 0.003 sec. 
 
3.updatedelete操作:
-- UPDATE 操作:
 
ALTER TABLE city UPDATE area='South' WHERE city='wuhan';
 
 
Clickhouse>  select database,table,mutation_id,command,create_time,block_numbers.number no,is_done from system.mutations where table='city';
 
 
┌─database─┬─table─┬─mutation_id────┬─command────────────────────────────────────┬─────────create_time─┬─no──┬─is_done─┐
│ datasetscitymutation_5.txtUPDATE area = 'South' WHERE city = 'wuhan'2020-06-26 16:12:50[5]1 │
└──────────┴───────┴────────────────┴────────────────────────────────────────────┴─────────────────────┴─────┴─────────┘
 
1 rows in set. Elapsed: 0.004 sec.  
 
--DELETE 操作:
ALTER TABLE city DELETE WHERE city='guangzhou';
 
┌─database─┬─table─┬─mutation_id────┬─command────────────────────────────────────┬─────────create_time─┬─no──┬─is_done─┐
│ datasets │ city  │ mutation_5.txtUPDATE area = 'South' WHERE city = 'wuhan'2020-06-26 16:12:50[5]1 │
│ datasets │ city  │ mutation_6.txt │ DELETE WHERE city = 'guangzhou'2020-06-26 16:44:58[6]1 │
└──────────┴───────┴────────────────┴────────────────────────────────────────────┴─────────────────────┴─────┴─────────┘
 
2 rows in set. Elapsed: 0.004 sec. 
 
4.查看目录# ls -l /var/lib/clickhouse/data/datasets/city/
total 20
drwxr-x---. 2 clickhouse clickhouse 4096 Jun 26 16:12 202006_4_4_0_5
drwxr-x---. 2 clickhouse clickhouse 4096 Jun 26 16:44 202006_4_4_0_6
drwxr-x---. 2 clickhouse clickhouse    6 Jun 26 16:02 detached
-rw-r-----. 1 clickhouse clickhouse    1 Jun 26 16:02 format_version.txt
-rw-r-----. 1 clickhouse clickhouse  109 Jun 26 16:12 mutation_5.txt
-rw-r-----. 1 clickhouse clickhouse   96 Jun 26 16:44 mutation_6.txt
 
 
 # cat /var/lib/clickhouse/data/datasets/city/mutation_5.txt 
format version: 1
create time: 2020-06-26 16:12:50
commands: UPDATE area = 'South' WHERE city = 'wuhan' 
 # cat /var/lib/clickhouse/data/datasets/city/mutation_6.txt  
format version: 1
create time: 2020-06-26 16:44:58
commands: DELETE WHERE city = 'guangzhou' 
 
可以发现在执行了updatedelete操作之后数据目录生成文件mutation_5.txt,mutation_6.txt。
此外还有在同名的
目录下在末尾增加了_5 ,_6的后缀可以看到mutation_5.txt和mutation_6.txt 是日志文件完整记录updatedelete操作语句和时间。
mutation_id生成对应日志文件用于记录相关信息。
数据删除的过程是以数据表每个分区目录为单位,将所有目录重写为新的目录,在目录的命名规则是在原有的名称上加上 block_numbers.number
数据的在重写过程中会讲所需要删除的数据去掉。旧的数据并不会立即删除,而是被标记为非激活状态(active =0),
等到MergeTree引擎的下一次合并动作触发时候,
这些非活动目录才会被真正的从物理上删除。
 
 
UPDATE语句不能修改分区键和主键

2.数据的实时更新操作(Real-time UPDATE):

应用场景假设一个收集各种信息告警系统用户或者机器学习算法不时的查询数据库库,以确认最新告警信息。
若确认了
最新消息需要修改数据库告警记录然后警报消息从用户视图消除。这看起来像是Clickhouse的OLTP操作。
 
 
 
由于我们无法使用更新,因此我们将不得不插入修改后的记录。 一旦数据库中有两条记录,我们需要一种有效的方法获取最新一条。 为此,我们尝试3种不同的方法:
ReplacingMergeTree
Aggregate functions
AggregatingMergeTree 
 
1.创建表:
CREATE TABLE alerts(
  tenant_id     UInt32,
  alert_id      String,
  timestamp     DateTime Codec(Delta, LZ4),
  alert_data    String,
  acked         UInt8 DEFAULT 0,
  ack_time      DateTime DEFAULT toDateTime(0),
  ack_user      LowCardinality(String) DEFAULT ''
)
ENGINE = ReplacingMergeTree(ack_time)
PARTITION BY tuple()
ORDER BY (tenant_id, timestamp, alert_id);
 
为了简单便于演示,alert_id使用一个随机数告警信息写入字段alert_data。实际上告警信息表可能有更多的字段信息。
 
ReplacecingMergeTee是一种特殊的表引擎,它用主键ORDER BY替换数据-具有相同键值的行的较新版本将替换较旧的行。
在最新(Newness)的数据信息由列ack_time确定,数据替换后台合并操作期间执行。 它不会立即发生,也无法完全保证会发生,因此查询结果一致性一个问题。 不过,ClickHouse具有处理此类表的特殊语法2.模拟数据:模拟1000租户,产生1000万条记录。
INSERT INTO alerts(tenant_id, alert_id, timestamp, alert_data)
SELECT
  toUInt32(rand(1)%1000+1) AS tenant_id,
  randomPrintableASCII(32) as alert_id,
  toDateTime('2020-01-01 00:00:00') + rand(2)%(3600*24*30) as timestamp,
  randomPrintableASCII(256) as alert_data
FROM numbers(10000000);
 
 
3.我们99%的数据标记acked,由字段ack_user ack_time 确认。我们插入新的行,用以替代update操作。
INSERT INTO alerts (tenant_id, alert_id, timestamp, alert_data, acked, ack_user, ack_time)
SELECT tenant_id, alert_id, timestamp, alert_data, 
  1 as acked, 
  concat('user', toString(rand()%1000)) as ack_user,  now() as ack_time
FROM alerts WHERE cityHash64(alert_id) % 99 != 0;
 
 
若立即查询有多少记录则反馈如下SELECT count() FROM alerts;
┌──count()─┐
│ 19898060 │
└──────────┘
1 rows in set. Elapsed: 0.008 sec
 
此时可以确认 确认的数据和未确认的数据都包含在其中,数据的替换操作并未发生。为了查看真实的数据,我们使用关键字FINAL。
 
 
Clickhouse> select count(1) from alerts FINAL;
 
SELECT count(1)
FROM alerts
FINAL
 
┌─count(1)─┐
│ 10000000 │
└──────────┘
 
1 rows in set. Elapsed: 1.293 sec. Processed 10.00 million rows, 530.00 MB 
(7.74 million rows/s., 409.97 MB/s.) 
 
现在该计数正确的,但查询时间增加了很多! 使用FINAL,ClickHouse必须扫描所有行并在查询时间通过主键合并它们。 
这会产生正确答案,但会带来很多开销。 让我们看看是否可以通过仅过滤未确认的行来做得更好。
 
Clickhouse> SELECT count() FROM alerts FINAL WHERE NOT acked;
 
SELECT count()
FROM alerts
FINAL
WHERE NOT acked
 
┌─count()─┐
│  100696 │
└─────────┘
 
1 rows in set. Elapsed: 0.740 sec. Processed 10.00 million rows, 540.00 MB 
(13.51 million rows/s., 729.55 MB/s.) 
 
 
查询时间和数据总量是一样的,尽管计数更小一些。过滤了反而没有速查询,随着数据规模的增加,查询的成本会增加而不是缩小。
 
 
ok 查询整个表 并没有很大的帮助,在我们的场景下还要继续使用ReplacingMergeTree 引起吗?
现在让我们随机查询租户ID(tenant_id)并且查询的所有的数据尚未确认。类似于有一个用户正在查看的仪表板。
由于“ alert_data”只是随机垃圾,因此我们将计算校验和,并使用它来确认结果在多种方法中是相同的:
 
Clickhouse> SELECT count(),sum(cityHash64(*)) AS data FROM alerts FINAL WHERE (tenant_id = 451) AND (NOT acked);
 
SELECT 
    count(), 
    sum(cityHash64(*)) AS data
FROM alerts
FINAL
WHERE (tenant_id = 451) AND (NOT acked)
 
┌─count()─┬────────────────data─┐
│      988095029702435009750 │
└─────────┴─────────────────────┘
 
1 rows in set. Elapsed: 0.025 sec. Processed 16.38 thousand rows, 5.29 MB (650.31 thousand rows/s., 210.01 MB/s.) 
 
查询及其快,只有25ms可以查询出非确认的数据。为啥如此快呢?
在这个过滤条件中不同的是tenant_id 是主键的一部分,所以clickhouse在FINAL之前就可以过滤数据,ReplacingMergeTree 变的更加有效。
 
 
下面我们尝试过滤一个用户并且查询确认的数据。列的选择性cardinality )是一样的,有1000个用户使用用户user451:
Clickhouse> SELECT count() FROM alerts FINAL WHERE (ack_user = 'user451') AND acked;
 
SELECT count()
FROM alerts
FINAL
WHERE (ack_user = 'user451') AND acked
 
┌─count()─┐
│    9924 │
└─────────┘
 
1 rows in set. Elapsed: 1.397 sec. Processed 10.00 million rows, 569.71 MB (7.16 million rows/s., 407.76 MB/s.) 
可以看到查询非常慢并且没有使用索引扫描1000万条记录。
我们不能在 ack_user列添加索引,因为它会破坏ReplacingMergeTree的语义。 不过,我们可以使用PREWHERE技巧:
 
Clickhouse> SELECT count() FROM alerts FINAL PREWHERE (ack_user = 'user451') AND acked;
 
SELECT count()
FROM alerts
FINAL
PREWHERE (ack_user = 'user451') AND acked
 
┌─count()─┐
│    9924 │
└─────────┘
 
1 rows in set. Elapsed: 0.277 sec. Processed 10.00 million rows, 567.78 MB (36.04 million rows/s., 2.05 GB/s.) 
 
PREWHERE是ClickHouse的特殊提示去应用于不同的过滤器。 通常,ClickHouse足够聪明,可以自动条件移至PREWHERE,
因此用户无需理会。 此次没有发生,便于我们检查核对。
 
 
clickhouse以大量的聚合函数功能而闻名,在最新的版本中已经支持超过100聚合函数,这位经验丰富的用户提供了极大的灵活性。
本案例中我们无需使用任何高级函数,只需要使用argMax,maxany三个函数。
查询租户451 我们可以使用argMax聚合函数SELECT count(), sum(cityHash64(*)) data FROM (
  SELECT tenant_id, alert_id, timestamp, 
         argMax(alert_data, ack_time) alert_data, 
         argMax(acked, ack_time) acked,
         max(ack_time) ack_time_,
         argMax(ack_user, ack_time) ack_user
  FROM alerts 
  GROUP BY tenant_id, alert_id, timestamp
) 
WHERE tenant_id=451 AND NOT acked;
 
结果和查询的行数相同,查询的时间极短。可以看到clickhouse的聚合效率不错,但是缺点是查询变的更加复杂。我们可以让他更加简单:
 
注意我们确认告警信息的时候只需更新三个列:
acked: 0 => 1
ack_time: 0 => now()
ack_user: '' => ‘user1’
 
在所有3种情况下,列值都会增加! 因此,我们可以使用 max代替笨重的argMax。
由于我们不更改alert_data,因此在此列上不需要任何实际的汇总。 ClickHouse为此
提供了
友好的any聚合功能。 它可以选择任何值而不会产生额外开销:
 
SELECT count(), sum(cityHash64(*)) data FROM (
  SELECT tenant_id, alert_id, timestamp, 
    any(alert_data) alert_data, 
    max(acked) acked, 
    max(ack_time) ack_time,
    max(ack_user) ack_user
  FROM alerts
  GROUP BY tenant_id, alert_id, timestamp
) 
WHERE tenant_id=451 AND NOT acked;
 
查询变的更加简单快速原因是使用any函数,在alert_data列上无需计算max值
 
AggregatingMergeTree
AggregatingMergeTree 是clickhouse中最强大的功能之一。当和物化视图结合时,可以实现实时数据聚合的功能。由于我们可以通过AggregatingMergeTree
让查询变的更加高效快速吗?实际上并没有太大的改进。
 
我们一次更新行数据,所以一组只需要聚合两行。对于这种情况,AggregatingMergeTree引擎并不是最佳选择。但是我们可以使用小技巧。
我们知道告警信息始终智慧树先插入未确认的信息再被确认。用户确认告警信息之后只需要修改3列。若我们不重复其他列的
数据,是否可以节省磁盘空间并提升性能呢?
 
我们使用max聚合函数来实现一个聚合功能的表。我们使用any函数替代max,但是需要一个列可以设置为Nullable,any函数可以选择non-null的值。
 
DROP TABLE alerts_amt_max;
CREATE TABLE alerts_amt_max (
  tenant_id     UInt32,
  alert_id      String,
  timestamp     DateTime Codec(Delta, LZ4),
  alert_data    SimpleAggregateFunction(max, String),
  acked         SimpleAggregateFunction(max, UInt8),
  ack_time      SimpleAggregateFunction(max, DateTime),
  ack_user      SimpleAggregateFunction(max, LowCardinality(String))
)
Engine = AggregatingMergeTree()
ORDER BY (tenant_id, timestamp, alert_id);
 
 
由于原始数据是随机的,我们将从一个alerts表生成一个新的表。
我们使用两个insert语句,一个是确认的告警信息一个是非确认的告警信息。
INSERT INTO alerts_amt_max SELECT * FROM alerts WHERE NOT acked;
INSERT INTO alerts_amt_max 
SELECT tenant_id, alert_id, timestamp,
  '' as alert_data, 
  acked, ack_time, ack_user 
FROM alerts WHERE acked;
 
 
对于已确认的事件,我们插入一个空字符串而不是 alert_data 。 
我们知道数据不会改变,并且只能存储一次! 聚合函数将填补空白。 
在实际的应用程序中,我们可以跳过所有未更改的列,并让它们获取默认值-- 数据插入之后确认数据:
SELECT  table, sum(rows) AS r,sum(data_compressed_bytes) AS c, sum(data_uncompressed_bytes) AS uc, uc /c AS ratio FROM system.parts WHERE active AND (database = 'datasets') GROUP BY table;
 
由于是随机字符串我们几乎么有适用压缩,但是聚合小了两倍,因为我们不必存储两次alert_data
 
SELECT count(), sum(cityHash64(*)) data FROM (
   SELECT tenant_id, alert_id, timestamp, 
          max(alert_data) alert_data, 
          max(acked) acked, 
          max(ack_time) ack_time,
          max(ack_user) ack_user
     FROM alerts_amt_max
   GROUP BY tenant_id, alert_id, timestamp
) 
WHERE tenant_id=451 AND NOT acked;
由于AggregatingMergeTree引擎,我们处理了更小的数据,也更加高效。
 
Materializing The Update
Clickhouse 将尽其所能的在后台合并数据,删除重复的行并执行聚合。但是有时候强制合并是有意义的比如释放磁盘空间。
我们可以使用 OPTIMIZE FINAL语句,OPTIMIZE操作是阻塞性且昂贵的,因此不能频繁的执行。我们来看下optimize对查询性能影响OPTIMIZE TABLE alerts FINAL;
OPTIMIZE TABLE alerts_amt_max FINAL;
 
应该optimize操作之后两个表具有相同的数据行数和其他相同的数据:
Clickhouse> SELECT  table, sum(rows) AS r,sum(data_compressed_bytes) AS c, sum(data_uncompressed_bytes) AS uc, uc /c AS ratio FROM system.parts WHERE active AND (database = 'datasets') GROUP BY table;
 
SELECT 
    table, 
    sum(rows) AS r, 
    sum(data_compressed_bytes) AS c, 
    sum(data_uncompressed_bytes) AS uc, 
    uc / c AS ratio
FROM system.parts
WHERE active AND (database = 'datasets')
GROUP BY table
 
┌─table──────────┬────────r─┬──────────c─┬─────────uc─┬──────────────ratio─┐
│ alerts         │ 10000000296640251530600274431.0315617747512595 │
│ alerts_amt_max │ 10000000296640251530600274431.0315617747512595 │
└────────────────┴──────────┴────────────┴────────────┴────────────────────┘
 
2 rows in set. Elapsed: 0.003 sec. 
 
 
结论:
clickhouse提供了丰富的工具处理实时更新比如:
 ReplacingMergeTree, CollapsingMergeTree (not reviewed here), AggregatingMergeTree
 和聚合函数。
 这些方法有三个共同点:
 1.被修改的数据(Modified data)通过插入新版本的数据。clickhouse的INSERT操作极快。
 2.这些方式可以高效的模拟OLTP数据库中的update语义
 3.实际的修改并不是直接发生的,即异步
 特定方法的选取取决于使用场景。
 ReplacecingMergeTree简单明了,对用户来说最方便,但是仅可用于中小型表,或者始终由主键查询数据时使用。
 aggregate functions提供了更大的灵活性和性能,但是需要大量的重写。
 AggregatingMergeTree允许保存存储,仅保留修改的列。
 
这些是clickhouse DB 设计人员的必备的工具,可以在需要的时候选用。
 
 
 
 

参考

https://www.altinity.com/blog/2020/4/14/handling-real-time-updates-in-clickhouse

https://www.altinity.com/blog/2018/10/16/updates-in-clickhouse

https://clickhouse.tech/blog/en/2016/how-to-update-data-in-clickhouse/

https://www.altinity.com/blog/2018/10/16/updates-in-clickhouse
https://www.altinity.com/blog/2018/1/23/updatingdeleting-rows-with-clickhouse-part-1
https://www.altinity.com/blog/2018/1/23/updatingdeleting-rows-from-clickhouse-part-2

转至:https://blog.csdn.net/vkingnew/article/details/106913907

原文地址:https://blog.csdn.net/asd54090/article/details/134739270

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_25210.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注