深度解析Apache Pulsar内存使用原理

导语

Apache Pulsar 中大量使用了 Java 的堆内内存、直接内存和系统的 Page Cache 等内存概念。为了帮助大家在性能调优和问题定位中更清晰地理解这些概念和使用各种内存,本文将为大家详细介绍 Apache Pulsar 中的内存使用原理、推荐配置并进行简单的验证测试。

基本概念

Pulsar 中的 Broker,BookKeeper 和 ZooKeeper 三大组件都是基于 JVM(Java 虚拟机)运行的。JVM 在执行 Java 程序的过程中会把管理的内存划分成多个区域,在每个区域存放不同类型的数据。下图所示为 JVM 运行时主要的内存分区:

图片[1]-深度解析Apache Pulsar内存使用原理-不念博客

堆内内存(Heap Memory)

在上图这些分区中,占用内存空间最大的一部分叫做“堆(Heap)”,也就是我们所说的堆内内存(on-heap memory)。JVM 中的“堆”主要存放所有对象的实例。这一区域在 JVM 启动的时创建,由所有的线程共享,同时也是垃圾收集器的主要工作区域。

JVM 自带的垃圾回收器给开发者带来极大的便利,但同时也会产生 GC 停顿,对 Java 应用程序带来一定的影响。

堆外内存(Off-heap Memory)

为了解决堆内内存过大带来的长时间的 GC 停顿,以及操作系统对堆内内存不可知的问题,JVM 开辟出了堆外内存(Off-heap Memory)。堆外内存意味着把一些对象的实例分配在 JVM 堆内内存以外的内存区域,这些内存直接受操作系统(而不是 JVM)管理。这样能保持一个较小的堆,以减少垃圾收集对应用的影响。同时因为这部分区域直接受操作系统的管理,别的进程和设备可以直接通过操作系统对其进行访问,减少了从 JVM 中复制内存数据的过程。使用堆外内存的优缺点如下:

优点:

  • • 方便自主开辟很大的内存空间,对大内存伸缩性友好;
  • • 减少垃圾回收带来的 GC 停顿时间;
  • • 直接受操作系统控制,可以直接被其他进程和设备访问,减少了从 JVM 复制数据的过程;
  • • 适合分配次数少、读写操作频繁的场景。

缺点:

  • • 需要开发者维护内存,容易出现内存泄漏并且难排查;
  • • 如果对象的存储结构复杂,堆外内存的数据结构不直观,进行串行化(Serialization)会损耗一定的性能。

直接内存(Direct Memory)

直接内存(Direct Memory)属于堆外内存的一种,既不是 JVM 运行时数据区的一部分,也不是 JVM 规范中定义的内存区域,它直接由操作系统分配,因此不受 Java 堆大小的限制,但是会受到所在机器总内存的大小及处理器寻址空间的限制。在 JDK1.4 中新引入的 NIO(Non-blocking IO,即同步非阻塞 IO)机制是一种基于通道与缓冲区的新 I/O 方式,可以直接从操作系统中分配直接内存,即在堆外分配内存。这样的机制在一些场景中既可以避免在 Java 堆和 Native 堆中来回复制数据,也能避免 GC 停顿对业务的影响,从而提高性能。

Broker 中的内存

Broker 既使用了堆内内存,也使用了直接内存。由于 Broker 不存储数据,所以对于 OS Cache 使用较少,内存分配如下图所示:

图片[2]-深度解析Apache Pulsar内存使用原理-不念博客

堆内内存(Heap Memory)

在 Pulsar Broker 中大量使用了堆内内存,用于记录一些处理过程中的缓存信息,如 Ledger、Cursor、延迟消息的时间和元数据等信息,下表是在某测试环境中统计出 Broker 的堆内内存使用情况。

图片[3]-深度解析Apache Pulsar内存使用原理-不念博客

从上表可以看出,ManagedLedgerImpl 和 ManagedCursorImp 占用内存比较大。ManagedLedgerImpl 主要存储了 Topic 的 Ledger 信息,如果 Topic 数量很多或者 Topic 的保留数据非常长(默认最短 10 分钟并且达到 5 万条,最长 4 小时进行切换),就会占用大量内存;ManagedCursorImp 主要存储了消费者的偏移量信息,Pulsar 可以支持单条消息确认和批量确认,这意味着需要记录哪些消息被确认,以及连续确认的消息 ID 区间,如果消息 ID 区间中有大量的空洞信息,在内存中记录的消息 ID 区间相应增多(当然会实时存储到 BookKeeper 中,以便中断后可以继续消费),因此消费者数量大意味着高内存开销。

另外大量延迟消息也会占用较大内存,维护了 Timestamp、LedgerId、EntryId 等 3 个 Long 类型的优先级队列,关于延迟消息的最新改进可以参考 PIP-195[1]。占用内存较大的还有元数据缓存信息以及必要的数据。

直接内存(Direct Memory)

直接内存存储对象的操作相对堆内内存中的操作更加频繁,比如消息的发送和消费需要频繁创建大量对象,非常合适采用堆外内存。Netty 提供了堆外内存的创建和回收能力能降低使用门槛。Pulsar Broker 的堆外内存主要用于发送和消费时的 Message 对象构造,以及追尾读的实时缓存消息。

发送对象构造 Cache

发送时的 Message 对象构造使用了一部分直接内存,主要由 OpAddEntry 提供,使用了 Netty 的 RECYCLER。这部分直接内存主要缓存生产者发送来的消息,等待这些消息写入 Bookie 中。如果写入速率低于生产者发送过来的速率,等待写入的消息超过 Cache 大小,Broker 将停止读取生产者发送过来的数据。

在极端情况下,当生产者有较大的突增流量冲击时,Broker 会出现堆外内存不足导致异常的情况,通常是因为 OpAddEntry 对象缓存了大量待写入的消息而 Bookie 端写入速度跟不上,导致 Broker 端在写入 Bookie 超时前(目前为 30s)积压大量消息,而客户端继续写入新消息最终导致无法再分配更多的堆外内存。

这部分 Cache 默认占用到 MaxDirectMemorySize 的 1/2,代码如下:

private int maxMessagePublishBufferSizeInMB = Math.max(64,
   (int) (DirectMemoryUtils.jvmMaxDirectMemory() / 2 / (1024 * 1024)));

在 broker.conf 中的配置如下:

# Max memory size for broker handling messages sending from producers.
# If the processing message size exceed this value, broker will stop read data
# from the connection. The processing messages means messages are sends to broker
# but broker have not send response to client, usually waiting to write to bookies.
# It's shared across all the topics running in the same broker.
# Use -1 to disable the memory limitation. Default is 1/2 of direct memory.
maxMessagePublishBufferSizeInMB=

尾部数据 Cache

Pulsar Broker 的堆外内存的另外一种用处是 Topic 的尾部数据缓存,在追尾读消费场景中可以避免网络往返的时延和在 Bookie 上读取磁盘的时延,从而提高 Message 的读取性能。在尾部读取消息时,消费者从服务 Broker 中读取数据,由于 Broker 已经将数据缓存在内存中,因此无需从磁盘读取数据。 在追赶读(Catch-Up Read)读取消息时, Broker Cache 大概率会无法命中,所有的读请求都会落到 Bookie 上。

具体过程如下图所示,首先 Broker 会判断该 Topic 是否有 Active Cursor,如果有,则将收到的消息写入该 Topic 对应的 Cache 中;否则不写入 Cache。

图片[4]-深度解析Apache Pulsar内存使用原理-不念博客

判断存在 Active Cursor 需要同时满足以下两个条件:

  1. 1. 有 Durable Cursor;
  2. 2. Cursor 的延迟在 managedLedgerCursorBackloggedThreshold 范围内,该参数在broker.conf中配置,默认为 1000 条 entry。

这部分 Cache 默认占用MaxDirectMemorySize 的 1/5,代码如下:

private int managedLedgerCacheSizeMB = Math.max(64,
       (int) (DirectMemoryUtils.jvmMaxDirectMemory() / 5 / (1024 * 1024)));

在broker.conf 中配置如下:

# Amount of memory to use for caching data payload in managed ledger. This memory
# is allocated from JVM direct memory and it's shared across all the topics
# running  in the same broker. By default, uses 1/5th of available direct memory
managedLedgerCacheSizeMB=

其他

其余的直接内存由 Netty,以及网络进来的包使用。此外,消费时的 Message 对象构造也使用了一部分 Direct Memory,主要由 OpReadEntry 提供。 ##推荐配置 在长期实践经验中,推荐堆内内存和直接内存的配置比例是 1:2。 在 conf/pulsar_env.sh 文件中配置样例如下:

# Extra options to be passed to the jvm
PULSAR_MEM=${PULSAR_MEM:-"-Xms2g -Xmx2g -XX:MaxDirectMemorySize=4g"}

Bookie 中的内存

Bookie 由于存储数据,除使用了比较多的直接内存,对 OS Cache 也有比较大的需求。

图片[5]-深度解析Apache Pulsar内存使用原理-不念博客

堆内内存(Heap Memory)

Bookie 使用的堆内内存比较少,主要用于元数据信息和必要数据的存储,例如 Journal 组提交的队列。下表是在某测试环境中统计 BookKeeper 的堆内内存使用情况。

图片[6]-深度解析Apache Pulsar内存使用原理-不念博客

直接内存(Direct Memory)

Bookie 使用了大量的堆外内存,主要用于 Read Cache、Write Cache、Index Cache 和 Netty 的出入队列等,在 Bookie 的数据读写过程中的流程如下图所示:

图片[7]-深度解析Apache Pulsar内存使用原理-不念博客

从图中可以看出 Bookie 通常采用不同的磁盘将读写 IO 进行了物理隔离,同时使用了 Read Cache 和 Write Cache,这两个 Cache 在数据读写过程中有非常重要的作用,下面将会详细介绍。

写缓存(Write Cache)

Bookie 收到 Entry 写入请求后将其 Append 到 Journal 文件的同时(异常情况下数据可以恢复,保证数据的可靠性),也会保存到 Write Cache 中,如上图中深蓝色的两个箭头所示。Write Cache 分为两部分,一部分是正在写入的 Write Cache, 另一部分是正在正在刷盘的部分,两部分交替使用。

Write Cache 中有索引数据结构,可以通过索引查找到对应的 Entry。该索引是内存级别的,基于 Bookie 定义的ConcurrentLongLongPairHashMap 结构实现。消费者优先从正在写的 Write Cache 中读取消息,没有命中时,会去正在刷盘的 Write Cache 中读取,如果在 Write Cache 中没有命中时,则会请求从 Bookie 中读取。

每个 Write Cache 在增加 Entry 的时候会进行排序和去重处理,在同一个 Write Cache 中同一个 Ledger 下的数据是相邻有序的。在 Write Cache 中的数据刷盘到 Entrylog 文件时,写入的数据局部有序。这样的设计能够极大提高后续的读取效率。

Write Cache 大小默认为MaxDirectMemorySize 大小的 1/4,如MaxDirectMemorySize=4GB,则WriteCacheSize=1G,在多盘的情况下 Bookie 能够配置多个目录充分利用磁盘 IO,最终的 Cache size 需要按照目录数均分,详细代码如下:

private static final long DEFAULT_WRITE_CACHE_MAX_SIZE_MB = (long) (0.25 * PlatformDependent.maxDirectMemory())/ MB;
long perDirectoryWriteCacheSize = writeCacheMaxSize / numberOfDirs;

bookkeeper.conf 中配置如下:

# Size of Write Cache. Memory is allocated from JVM direct memory.
# Write cache is used to buffer entries before flushing into the entry log
# For good performance, it should be big enough to hold a substantial amount
# of entries in the flush interval
#  By default it will be allocated to 1/4th of the available direct memory
dbStorage_writeCacheMaxSizeMb=

读缓存(Read Cache)

当追赶读场景时,如果消息不在 Write Cache 中, Broker 会从 Bookie 中读取一部分磁盘上的数据写入到 Read Cache 中。因为存储的时候会做局部有序处理,获取相邻数据的概率会非常大,这种处理会极大提高后续读取数据的效率。

下图展示了在消费者消费数据时,Cache 是如何加速处理的:

图片[8]-深度解析Apache Pulsar内存使用原理-不念博客

消费者获取数据的顺序如下:

  1. 1. 从 Broker 端的 Entry Cache 中获取,如果没有则继续;
  2. 2. 从 Bookie 的 Write Cache 正在写的这部分中获取,如果没有则继续;
  3. 3. 从 Bookie 的 Write Cache 正在刷盘的这部分中获取,如果没有则继续;
  4. 4. 从 Bookie 的 Read Cache 中获取,如果没有则继续;
  5. 5. 通过索引 (RocksDB) 读取磁盘上的 Entry 日志文件获取。索引返回一个带有 Entry 日志文件和文件中偏移量的位置信息,格式如: (EntryLog, Offset);
  6. 6. 在指示的偏移处读取指示的 Entry 日志文件;
  7. 7. 执行预读;
  8. 8. 将所有预读条目加载到 Read Cache 中;
  9. 9. 返回 Entry。

在上面任意一步获取到数据,消费者都会直接返回并跳过后面的步骤。如果是从磁盘文件中获取的数据,返回的时候数据将被存储到 Read Cache 中。

我们在使用的过程中,应尽量避免或减少追赶读场景,即消费老数据触发读取磁盘文件中的消息,以免对整体系统的性能造成影响。 Read Cache 大小默认为MaxDirectMemorySize 大小的 1/4,例如MaxDirectMemorySize=4GB,则ReadCacheSize=1G。在多盘的情况下 Bookie 能够配置多个目录充分利用磁盘 IO,最终的 Cache 大小需要按照目录数均分,详细代码如下:

private static final long DEFAULT_READ_CACHE_MAX_SIZE_MB = (long) (0.25 * PlatformDependent.maxDirectMemory())/ MB;
long perDirectoryReadCacheSize = readCacheMaxSize / numberOfDirs;

bookkeeper.conf 中配置如下:

# Size of Read cache. Memory is allocated from JVM direct memory.
# This read cache is pre-filled doing read-ahead whenever a cache miss happens
#  By default it will be allocated to 1/4th of the available direct memory
dbStorage_readAheadCacheMaxSizeMb=

索引缓存(Index Cache)

Bookie 的 IO 模式避免了随机写的情况,意味着不同 Topic 的数据会混在一起,在读取时需借助 index 来提升查询效率,同时对 Index 文件进行缓存,默认为MaxDirectMemorySize 大小的 1/10。

bookkeeper.conf 中配置如下:

## RocksDB specific configurations
## DbLedgerStorage uses RocksDB to store the indexes from
## (ledgerId, entryId) -> (entryLog, offset)

# Size of RocksDB block-cache. For best performance, this cache
# should be big enough to hold a significant portion of the index
# database which can reach ~2GB in some cases
# Default is to use 10% of the direct memory size
dbStorage_rocksDB_blockCacheSize=

其他

其他一部分直接内存用于分配对应操作对象,例如接收 Broker 的 Entry 对象的创建,以及 Netty 的出入队列等。

Page Cache

Page Cache 对本地文件的读写非常高效。Bookie 同样采用 Page Cache 对文件进行直接操作。Pulsar 与其他中间件不同的地方在于,Pulsar 的 Bookie 只在操作时使用,当不再需要时会及时关闭,因此 Pulsar 不会借助 Page Cache 来做大量缓存操作,下面配置可以看到 BookKeeper 关于 Page Cache 的默认行为。对于 Page Cache 大小的设置,由于 Bookie 的 Journal 和 Ledger 都会用到 Page Cache,所以建议 OS 预留总内存一半的资源。

在conf/bookkeeper.conf 中配置:

# Should we remove pages from page cache after force write
 journalRemoveFromPageCache=true

推荐配置

在长期实践经验中,推荐堆内内存,直接内存和 OS PageCache 的配置比例是 1:2:3。 在conf/bkenv.sh 文件中配置示例如下:

# Memory size options
BOOKIE_MEM=${BOOKIE_MEM:-${PULSAR_MEM:-"-Xms2g -Xmx2g -XX:MaxDirectMemorySize=4g"}}

测试

从以上理论分析可以看出,在最新消息的追尾读场景中,Broker Cache 或者 Bookie Write Cache 的命中概率更大;而在比较早的消息的追赶读场景中,Bookie 的磁盘和 Bookie Read Cache 命中概率更大。下面我们对追尾读和追赶读读两种场景分别进行测试,该测试仅提供一个基本思路以供参考。

配置

  • • 环境基本信息:K8s 部署,其中,1 个 Broker 节点,1 个 Bookie 节点,3 个 ZooKeeper 节点。
  • • Broker JVM 配置:-Xms128m -Xmx256m -XX:MaxDirectMemorySize=256m
  • • Bookie JVM 配置:-Xms128m -Xmx256m -XX:MaxDirectMemorySize=256m
  • • 使用 pulsar-perf 工具进行测试,使用 Grafana 进行指标观察。

场景 1:追尾读

在两个客户端上同时启动生产者和消费者,从客户端看到生产速率和消费速率基本持平,生产速率如下:

图片[9]-深度解析Apache Pulsar内存使用原理-不念博客

消费速率如下:

图片[10]-深度解析Apache Pulsar内存使用原理-不念博客

Broker 和 Bookie 的堆内内存和直接内存的使用如下图所示:

图片[11]-深度解析Apache Pulsar内存使用原理-不念博客

BookKeeper 的监控中,Write Cache 和Read Cache 使用情况如下,其中Read entry from read cache 为 0 表示没有从Read Cache 中读取数据:

图片[12]-深度解析Apache Pulsar内存使用原理-不念博客

在 BookKeeper 的监控中,Bookie 的写吞吐率与客户端看到的一致,均为 31MB/s 左右;Bookie 的读吞吐率read entry 和Entry Rate 图中的read entry 均为 0,表示数据没有从 Bookie 中读取,如下图所示:

图片[13]-深度解析Apache Pulsar内存使用原理-不念博客
图片[14]-深度解析Apache Pulsar内存使用原理-不念博客

Broker 的读吞吐率与客户端看到的消费吞吐率一致,均为 31MB/s 左右:

图片[15]-深度解析Apache Pulsar内存使用原理-不念博客

在以上测试中,从客户端看到的生产速率和消费速率基本持平,从Read Cache 中可以看到,Read entry from read cache 为 0,也就是说没有数据是从Read Cache 中命中的,从 Bookie 的吞吐率输出(read entry)为 0 可以说明数据不是从Write Cache 中命中的,所以推断出数据是在 Broker 的 Cache 命中的。

场景 2:追赶读(Catch-Up Read)

在两个客户端上分别启动生产者和消费者,暂停消费者,等待生产者生产 10 分钟后再开始消费,从下图可以看到生产速率为 33000 msg/s:

图片[16]-深度解析Apache Pulsar内存使用原理-不念博客

将消费速率配置比生产速率低一些,本测试固定在 5000 msg/s:

图片[17]-深度解析Apache Pulsar内存使用原理-不念博客

Broker 和 Bookie 的堆内内存和直接内存使用情况如下:

图片[18]-深度解析Apache Pulsar内存使用原理-不念博客

在 BookKeeper 的监控中,Write Cache 和 Read Cache 中,可以看到Read entry rate from read cache 有速率,说明有数据从Read Cache 中读取,如下图所示:

图片[19]-深度解析Apache Pulsar内存使用原理-不念博客

Bookie 读写吞吐率与客户端看到的生产消费吞吐率一致,分别为 5.2MB/s 和 34.5MB/s:

图片[20]-深度解析Apache Pulsar内存使用原理-不念博客
图片[21]-深度解析Apache Pulsar内存使用原理-不念博客

Broker 读吞吐率(Out)跟客户端看到的消费吞吐量一致,均为 5.2MB/s:

图片[22]-深度解析Apache Pulsar内存使用原理-不念博客

从以上测试中可以看出,在有一定数量的 Backlog 情况下,生产速率为 260Mbit/s(约 32.5MB/s),消费速率为 39Mbit/s(约 4.87MB/s),理论上应该会有大量数据是从磁盘读取的。从 BookKeeper 的监控中 Read rate from read cache 和Read Cache Size 可以看出,有一部分数据是从 Read Cache 命中的,从 Bookie 的吞吐量输出为 5.2MB/s 与客户端消费速率大致相同可以看出,数据是从 Bookie 中命中的。

© 版权声明
THE END