问题陈述
1.1 架构描述
eBay的监控平台Sherlock.io包含着对不同类型的数据的处理,存储以及展示。而基于事件信号的(event)监控平台是已经发展成型,且存储和计算采用的是最近几年OLAP领域热度非常高的ClickHouse集群。事件信号监控目前已经self-service开放给用户,用户在UI上根据提示就可以完成自助注册写入。默认情况下,用户的数据会被写入默认的shared的ClickHouse集群中。但是一些重要的用户,通常他们的数据量也是比较大的,可以根据用户要求,将数据放进dedicated的ClickHouse集群当中。
目前为止,事件平台已经有超过20个ClickHouse集群用于事件存储,包含dedicated给用户的以及shared的部分。每一个集群都有相似的部署架构,但是根据用户数据量以及留存时间的需求,可以设置不同的shard数目;而每一个shard的数据会有三个备份,每一个备份分别位于ebay的3个不同的数据中心。
在这种架构下,事件的存储其实是可以在cluster level进行扩展的。例如当一个用户的数据量急剧增长,原来shared集群不适合存储这个case时,可以根据流量大小再重新spin up一个ClickHouse集群 dedicated给该用户。
1.2 问题初现
eBay应用的OLAP数据是在事件监控平台上线最早的用户案例之一,也是我们最重要的案例。在OLAP数据进入事件平台之前,已经存储在druid中一段时间,在eBay SRE以及domain team有着非常广泛的应用,包括在Grafana上创建可视化图表以及基于OLAP数据的一些告警。在我们将基于OLAP数据的告警规则建立开放给用户self-service之后,由于规则validation以及ratelimiting还未启用,导致大量的bad queries涌入OLAP 集群以至于集群有时处于高负荷状态。ClickHouse本身是连接敏感的应用,读query和写query共用线程池,当读queries占据了几乎所有的线程之后,OLAP数据的写入也会受到影响。
为了避免queries spike或者存在的某些load比较重的queries影响数据的写入,针对ClickHouse 的read/write隔离的方案需要尽快应用。初步的想法是为重要的案例增加一份单独的replica,ingress的流量只会使用写replica来进行数据的写入,通过zk将数据同步到读replica,而egress的queries只会发生在这些读replicas上,从而实现读写分离。
但是在将读写分离方案应用在生产上时,新的问题出现了:
事件监控平台使用的是
ClickHouseReplicatedMergeTree系列的table engine,来进行不同replica之间数据的同步。而ClickHouse server是通过centralized ZooKeeper集群来进行数据元数据的同步以及DDL queries指令的执行。目前的部署架构如下所示:
对于OLAP集群而言,每天有超过30亿条的数据插入, 平均每秒有100多个新写入的parts生成,而ClickHouse后台还会不断的生成merge task,生成更多的新的以及随时“过时”的parts。
并且一共有30个replica(10shards*3replicas)不断访问ZooKeeper拿到parts的元信息以完成数据同步。因此ZooKeeper处于巨大的压力之中,原本在一些数据mutation的时刻,zk已经显示出滞后,在这次enable了更多的replica来进行读写分离的时候,zk的outstanding requests高达1K+,并且在UTC 0点数据rotate的时候出现了数据丢失。
此时我们意识到随着用户流量的增大,集群shard数目不断增加,zk或许/已经成为了ClickHouse集群横向扩展的瓶颈。
解决方案
2.1 read/write separation
为了提高事件数据的响应速度,特别是为了保证基于事件数据的告警规则的有效性和稳定性,事件平台的ClickHouse采用了冷热分层的架构,而且热存储使用的是本地SSD磁盘,因此没有采用ClickHouse的存算分离方案。
为了实现ClickHouse的读写分离,基本思路是将集群中同一个shard的特定部分replica只提供写服务,另外一部分replica只执行读queries,具体设计如下:
在FCHC CRD中引入readWriteMod字段实现读写分离的支持,并且当readWriteMod>1时对该ClickHouse集群启用读写分离。
当{replica_num} % readWriteMod!= 0的节点则都默认为读节点,operator将这些读节点组织并创建一个读virtual集群,而位于query集群中的distributed tables则指向这个virtual集群节点中的具体表,实现这些节点的只读;
当{replica_num} % readWriteMod== 0 时该replica dedicated为写节点,ingress将会发现并倾向返回这批replica,并将数据直接写入,但是当这批写入节点不可用时,会回退到返回一个读节点进行数据写入,保证数据的安全性。这种模式下,同一个shard一般会分配1个写节点和1个或者多个读节点,因为通常都是读的压力比较大。
下图描述了一个shard中的replica是如何分配读写节点的:
2.2 ClickHouse集群的横向扩展
在问题描述的架构图中我们可以看到,ClickHouse集群横向扩展的瓶颈在于整个集群都在用Zookeeper来做数据的同步,而随着ClickHouse横向扩展,shard越来越多,ZK的压力只会有增无减。然而实际上除了分布式DDL指令的执行是整个集群水平的,数据同步实际上只是shard水平进行的,这意味着如果ClickHouse集群的不同shard配置不同的zk,ClickHouse的横向扩展就不再是问题了。
在我们进行调研ClickHouse 集群配置Muiti-zk的方案时,同时关注到社区在v21.3开始,ClickHouse Keeper可以代替ZooKeeper成为数据进行同步所依赖的一致性协调服务,Keeper使用的是和ClickHouse相同的code base,基于RAFT一致性算法的C++实现,可以提供线性读和线性写,算法本身非常成熟,社区也有多种基于不同编程语言的版本实现。默认情况下,ClickHouse Keeper提供了和ZooKeeper相同可见性保证,即线性写和非线性读。并且Keeper实现了一套和ZooKeeper完全兼容的client-server通信协议,这意味着任何的标准ZK client都可以和Keeper server进行通信。
另外非常重要的一点是,Keeper既可以作为一个独立的application进行单独部署,也可以内置在ClickHouse server里面一同启动,因此多种不同的deploy结构可以被支持。
鉴于我们遇到的横向扩展的问题,我们可以使集群中的不同的shard使用单独的一致性协调服务来进行数据的同步。在这种部署模式下,Keeper内置在ClickHouse server中一同启动,减少了外部依赖的维护。
但对于一个新引入的模块,我们必须要有足够的知识能够运维,尤其是在troubleshooting或者是在故障发生时恢复服务时。虽然相对于ZooKeeper来讲,Keeper完全是一个全新的模块,但是Keeper作为Zookeeper的替代品,却是非常友好的。
1.Keeper和zk具有完全相同的内部状态(Znode/ACL/Watch…),完全可以说是基于RAFT算法实现的ZooKeeper。
2.Keeper实现了和ZK完全相同的client-server通信协议,这意味着任何标准的zkClient可以和ClickHouse Keeper 交互以获取其内部状态。
3.和ZK提供了几乎一样的4lw(4 letter word)命令。
因此从定位问题的角度来看,Keeper和Zk一样简单能随时获取运行状态。但是这还不够,我们需要能够通过Keeper的异常状态,设置一些告警能及时收到通知,以做出处理和服务恢复,如果能在Grafana上建立dashboard展示Keeper的状态就更好了。但是当时Keeper并没有任何metrics暴露出来。
于是为了推进Keeper在事件监控平台上的应用,我们做了相关change将需要的内部信息通过metrics的形式和其他的监控信息一同暴露出来并贡献回了社区,并在Sherlock.io Grafana上建立了Keeper的dashboard:
方案实现及改善
3.1 实现细节
为了在创建ClickHouse集群的时候可以支持Keeper,我们在FCHC的CRD里面引入了enableKeeper这个字段,当federated clickhouse operator在reconcile FCHC 时,如果检测到这个字段为true的话,它将会为ClickHouse server生成Keeper server启动需要的config文件,同时还可以在CRD里面override一些Keeper server特定的设置。
enableKeeper: true
keeperConfig:
coordinationSettings:
raft_logs_level: trace
keeperNodesCount: 3
tcpPort: 9181
对于那些Keeper server也需要同时启动的ClickHouse server, 额外的keeper server相关配置需要被添加进配置文件中。
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>190</server_id><
<log_storage_path>/var/lib/keeper/log</log_storage_path>
<snapshot_storage_path>/var/lib/keeper/snapshots</snapshot_storage_path>
<raft_configuration>
<server>
<id>190</id>
<hostname>host-38-0-0</hostname>
<port>9999</port>
</server>
<server>
<id>225</id>
<hostname>host-45-0-0</hostname>
<port>9999</port>
</server>
<server>
<id>470</id>
<hostname>host-94-0-0</hostname>
<port>9999</port>
</server>
</raft_configuration>
</keeper_server>
主要的ClickHouse Keeper相关配置都是位于<keeper_server> section 中,具体的配置参数的意义可以参考ClickHouse Keeper.。Quorum的配置信息位于‘<keeper_server>.<raft_configuration>’中,其中描述了RAFT 集群servers的信息。Keeper根据此处的quorum server的定义彼此通信并创建一个NuRaft的一致性协调服务系统。
而对于某些没有Keeper server启动的ClickHouse replica, 它的配置就和其他任何一个依赖了第三方zk/keeper协调服务系统的server完全一样,只需要在<zookeeper>section下面配置好当前shard所使用的RAFT集群地址即可。
<zookeeper>
<node>
<host>host-38-0-0</host>
<port>9181</port>
</node>
<node>
<host>host-45-0-0</host>
<port>9181</port>
</node>
<node>
<host>host-94-0-0</host>
<port>9181</port>
</node>
</zookeeper>
3.2 测试及改进
在对keeper支持的测试过程中,遇到了许多实际的问题,而且又由于Keeper用来替换生产上已经非常稳定的ZooKeeper,功能测试和线上流量的测试都必不可少。
3.2.1 Ordinary database 创建失败
使用Keeper必须升级使用新版本ClickHouse,但是在测试过程中发现federated clickhouse operator在创建database时一直报错,创建失败。经过排查发现,在之前的版本,Ordinary类型的database是ClickHouse默认支持的数据库类型,operator当中也是显示指定数据库类型的。
而最新版本的ClickHouse,为了支持表的rename,将Atomic类型作为了默认数据库类型,并且默认deprecate掉Ordinary类型的数据库,如果需要继续支持Ordinary,需要显示在配置`
allow_deprecated_database_ordinary: “true”`中指定。
3.2.2 ClickHouse server 启动失败
实际上ClickHouse server有启动的过程,并且log显示Keeper server在不断的去和raft quorum中的其他server交互,但是一直处于timeout的状态。
查看之后发现问题出在我们使用开源的ClickHouse operator 来reconcile 当前kube cluster的CHI对象以创建出ClickHouse 集群,而默认情况下即使我们不指定readiness probe和liveness probe,operator会添加一个对ClichHouse 8123端口http检查的probes。但是ClickHouse server一直在等待连接RAFT cluster才能处于ready状态,RAFT要等server ready了才能够连接上彼此,因此陷入了一个死循环最终导致启动失败。
针对这个问题的解决方案是添加一个对config文件查看的readiness check,这样可以使得pod先处于ready状态,RAFT 集群可以先建立起来,最终启动成功。因为我们pod的rediness状态本身就是通过另外的service确认的,所以这个改动并不会影响我们的服务。
3.2.3 依次重启ClickHouse 集群节点时的
ip重用问题
在Keeper enable的集群启动之后,在集群里面做了一些常规的回归测试和故障恢复测试。依次重启对于ClickHouse升级或者应用某一项配置使其生效是一个非常常见的操作。但是在依次重启了一个Kube cluster里面的集群pod之后,会发现一个shard的pod会加入到另外一个shard的quorum中,而且这个问题几乎每次都可以复现。
上图中我们可以发现,在pod重启之后,zxid呈阶跃型上升,而原本同一个Quorum中的peer的 zxid还在原来的水平。经过debug发现,该pod已经加入了另外一个shard的quorum中。
由此我们发现,是一些原因共同作用导致了这种情况的发生。
在一些Kube cluster上,ip资源非常紧张。在批量删除了一些pod然后重建的过程,势必出现ip重用的问题,即原本shard0的pod的ip,在经过删除重建之后,成为了shard1的ip。
FQDN ip的resolve具有延时性,即quorum里面别的peer还在用老的ip在访问当前peer,然而老得ip已经是shard1的pod了,如果刚好shard1的pod的log_index小于shard0 leader的log_index, shard1就会更新自己的log_index并加入到shard0的quorum中。
目前来讲针对于这种情况并没有根本的解决方案,Tess team也没有固定ip的支持,因此ip重用在资源紧张的集群中是一定会发生的;而ip解析延迟只能通过annnotation尽量降低至分钟级别以内;因此只能通过阻止不同shard的peer加入到不同shard的quorum中解决,目前的方案是不同的shard的peer交互使用不同的端口,因此即使ip被重用了,因为端口不一致,shard1的pod永远加入不到shard0的quorum中。
3.2.4 某些shard的latency比较高
在ClickHouse 集群enable了Keeper之后,整体运行的很稳定。但是在region by region的依次重启集群pod之后,发现总有一到两个shard的latency相对其他的shard比较高。
经过研究发现,latency比较高的是所有的ClickHouse client都连到了同一个Keeper server上。
KeeperDispatcher::requestThread()
/// The code below do a very simple thing: batch all write (quorum) requests into vector until
/// previous write batch is not finished or max_batch size achieved. The main complexity goes from
/// the ability to process read requests without quorum (from local state). So when we are collecting
/// requests into
a batch we must check that the new request is not read request. Otherwise we have to
/// process all already accumulated write requests, wait them synchronously and only after that process
/// read request. So reads are some kind of "separator" for writes.
在ClickHouse Keeper的源代码我们可以看到,Keeper server会batch写请求,并且一次性执行以提高效率。但是当所有的client都连接到同一个server上时,几倍的读请求会将写请求batch分裂开执行,导致执行latency变高。
因此,为了使client连接到Keeper server分布均匀,在配置文件中的部分仅放置本地的Keeper server。
<zookeeper>
<node>
<host>localhost</host>
<port>9181</port>
</node>
</zookeeper>
该配置应用到ClickHouse集群中后,部分shard latency大的问题得到了解决。但是有时ClickHouse server重启会失败。
检查了CickHouse server的启动逻辑如下:
if <keeper_server> exists:
if < zookeeper> exists and connect successfully
start keeper async
else
wait for Keeper synchronously
KeeperDispatcher::initialize
if (!start_async)
{
server->waitInit();
LOG_DEBUG(log, "Quorum initialized");
}
void KeeperServer::waitInit()
{
std::unique_lock lock(initialized_mutex);
int64_t timeout = coordination_settings->startup_timeout.totalMilliseconds();
if (!initialized_cv.wait_for(lock, std::chrono::milliseconds(timeout), [&] { return initialized_flag.load(); }))
throw Exception(ErrorCodes::RAFT_ERROR, "Failed to wait RAFT initialization");
}
在上面的逻辑我们可以看到,Keeper server有一个等待初始化的过程,并且如果在一定的时间内不能完成初始化的工作,将会启动失败。而Keeper server的初始化工作包含snapshot快照的加载,以及应用log store里面的log commits到内部状态机。在log store的记录数比较大的情况下,将要花费大约2min完成初始化的工作,但默认情况下的等待初始化时间为30s,这是为什么有时候会出现启动失败的状况。
针对这种情况,可以增加startup启动的等待时间,同时在社区有另外一个相关的改善的PR,即在ClickHouse server退出的时候,打一份快照出来,这样在下一次启动的时候,只需要加载快照然后在leader server上同步缺失的log记录即可完成初始化,大大减少了等待时间。
3.3 Keeper在生产上的实践
目前对于ClickHouse支持Keeper的change已经部署到了生产上,并且新创建的集群都将Keeper作为数据同步默认的一致性协调系统。
而最初出现问题的OLAP集群也已经切换到了应用了Keeper的ClickHouse集群上,并且该集群已经成功enable了数据的读写分离。目前集群运行正常,数据写入的延迟和之前相比也更稳定。
总结与展望
为了保证Sherlock.io事件监控平台中ClickHouse集群的数据完整性和可用性,针对其读写分离方案提出被支持。对于任何非常重要的数据,如果有顾虑不想让读请求影响到数据的完整性,可以在ClickHouse集群上实施读写分离。
而对于ClickHouse 集群的横向扩展的集中式Zookeeper瓶颈问题,提出了在集群的shard水平上使用单独的基于Keeper的一致性协调系统来进行数据的同步。
目前Keeper的支持还仅限于replica数目大于等于3的集群,这是由RAFT集群的最小quorum数据决定的,但是Sherlock.io generic log也会使用ClickHouse作为后端存储,而对于log而言,2备份的可用性已经足够了,因此后面的调研方向将是在2备份情况下如何enable shard水平的Keeper。