发布时间:2025-12-09 20:24:22 浏览次数:4
pulsar,消息中间件,是一个用于服务器到服务器的消息系统,具有多租户、高性能等优势。
pulsar采用发布-订阅的设计模式,producer发布消息到topic,consumer订阅这些topic处理流入的消息,并当处理完成之后发送一个确认。
一旦创建订阅,即使consumer断开连接,pulsar仍然可以保存所有消息,在consumer确认消息已处理成功之后才会删除消息。
Pulsar 的关键特性如下:
Pulsar 的单个实例原生支持多个集群,可跨机房在集群间无缝地完成消息复制。
极低的发布延迟和端到端延迟。
可无缝扩展到超过一百万个 topic。
简单的客户端 API,支持 Java、Go、Python 和 C++。
Topic 支持多种订阅模式(独占订阅、共享订阅、故障转移订阅)。
通过 Apache BookKeeper 提供的持久化消息存储机制保证消息传递 。
由轻量级的 serverless 计算框架 Pulsar Functions 实现流原生的数据处理。
基于 Pulsar Functions 的 serverless connector 框架 Pulsar IO 使得数据更易移入、移出 Apache Pulsar。
分层式存储可在数据陈旧时,将数据从热存储卸载到冷/长期存储(如S3、GCS)中。
pulsar支持在多种环境下以多种安装模式进行安装。
环境:裸金属、Docker、kubernetes、AWS、DC/OS
模式:单机模式、单集群、多集群
Pulsar 二进制包目录:
| bin | Pulsar的命令行工具,比如 pulsar 和 pulsar-admin. |
| conf | Pulsar 的配置文件,包含 broker 配置、ZooKeeper 配置等。 |
| examples | Java JAR 包,包含 Pulsar Functions 的示例。 |
| lib | Pulsar使用到的 JAR 文件 |
| licenses | 开源许可文件,.txt 格式,用于规范 Pulsar 代码库的各个组件。 |
运行 Pulsar 会立即生成的目录:
| data | ZooKeeper和BookKeeper使用的数据存储目录 |
| instances | 为 Pulsar Functions 创建的组件。 |
| logs | 安装时生成的日志文件 |
消息是pulsar的基础单元。从生产者、到消费者、在pulsar中被传递保存的东西。由以下成分组成:
| Value / data payload | 消息携带的数据。 |
| Key | 消息可选择用key标记,有利于topic压缩等。 |
| Properties | 用户定义属性的可选键/值映射。 |
| Producer name | 产生消息的生产者的名称。如果不指定生产者名称,则使用默认名称。 |
| Topic name | 消息发布到的topic的名称。 |
| Schema version | 用于生成消息的模式的版本号。 |
| Sequence ID | 每条消息都属于其topic的有序序列。消息的序列ID由生产者分配,表示其在该序列ID中的顺序,也可自定义。 可用于重复消息删除等。 |
| Message ID | 消息ID表示消息在ledger中指定位置,在pulsar是唯一的。在持久层存储时由bookies分配。 |
| Publish time | 发布消息的时间戳。由生产者自动应用。 |
| Event time | 应用程序附加到消息的可选时间戳。例如,当消息被处理时应用程序为其附加一个时间戳。如果没有事件事件,则为0. |
消息默认大小时5M。可在broker.conf和bookeeper.conf配置文件中配置:
producer是一个进程,它依附于一个topic把消息发布给brokers。pulsar borker处理消息。
生产者可以以同步(sync)或异步(async)的方式向brokers发送消息。
| Sync send | producer发布每条消息后都等待broker的确认。如果没有收到确认,生产者将认为发送失败。 |
| Async send | producer将消息放入阻塞队列并立即返回,客户端在后端将消息发送到broker。如果队列已满,则在调用API时,producer将阻塞或立即失败(根据producer的配置参数决定)。 |
producer访问topic时,有不同类型的访问模式。
| Shared | 多着producer能向一个topic发布消息。(默认的模式) | ||||||||||||||||||||||||||||||
| Exclusive | 只有一个producer能向一个topic发布消息
consumer为每个大型消息保留一个单独的缓冲区,以便将其所有块聚合到一条消息中。 可以通过配置maxPendingChunkedMessage参数来限制消费者同时维护的分块消息的最大数量。 当达到阈值时,使用者通过静默确认或请求代理稍后重新交付来丢弃挂起的消息,从而优化内存利用率。 如果使用者在指定的时间内未能收到消息的所有块,则消息块将不完整地过期。缺省值为1分钟(由expireTimeOfIncompleteChunkedMessage 配置)。 3、消费者consumer是一个进程,它通过订阅附加到主题,然后接收消息。 consumer向broker发送 flow permit request去获取消息。在consumer端有一个队列(通过receiverQueueSize参数配置,默认是1000)用于接收从broker推送的消息。每次调用consumer.receive()时,都会从缓冲区中取出一条消息。 (1)接收模式消息以同步(sync)或异步(async)的方式从broker接收。
(2)监听客户端库为consumer提供侦听器实现。例如,Java客户机提供了MesssageListener接口。在此接口中,每当接收到新消息时,都会调用received方法。 (3)Acknowledgementconsumer在成功消费消息后向broker发送确认请求。 然后,这个消费的消息将被永久存储,只有在所有的订阅都确认后才会删除。 可配置 message retention policy使被consumer确认的消息继续保存。 消息可以通过以下两种方式确认:
注意:Cumulative acknowledgement不能用在Shared订阅类型中,因为Shared订阅类型涉及多个可以访问同一订阅的consumer。在Shared订阅类型中,消息是单独确认的。 (4)Negative acknowledgement否定确认,允许程序向broker发送通知,指示consumer没有处理消息。 当consumer没有使用消息,并需要重新使用它时,consumer可以向broker发送否定确认(nack),触发broker将此消息重新发送给consumer。 可单独或累积的否定确认消息。 注意:
(5)Acknowledgement timeout(6)Retry letter topic(7)Dead letter topic4、topic从生产者到消费者传输信息的通道。 结构: {persistent|non-persistent}://tenant/namespace/topic
提示: 不需要显式的提前的创建主题。 如果客户端向一个不存在的topic写入或接收消息,pulsar自动在主题名称中提供的namespace创建该主题; 如果客户端创建主题时没有指定租户和命名空间,则pulsar在默认租户和命名空间中创建主题。 5、名称空间(namespace)命名空间是topic的管理单元,主要是提供了一个相关主题的分组机制,将不同类型的topic放到同一个命名空间管理。 不同租户可创建多个命名空间,一个命名空间中可创建多个topic。 6、订阅(subscription)订阅是一个配置规则,决定如何将消息发送给consumer。 pulsar中有四种订阅类型:
(1)订阅类型订阅类型是在consumer连接到订阅时确定的; 可以通过重启consumer来更改类型; 当订阅没有连接consumer时,其订阅类型是hi未定义的。 Ⅰ、Exclusive(独占类型) 在独占类型中,只允许将单个consumer附加到订阅。默认的订阅模式。 如果多个consumer使用同一个订阅订阅了一个topic,就会发生错误。 Ⅱ、Failover(灾备类型) 在灾备类型中,多个consumer可以附加到同一个订阅; 非分区主题或分区主题的每个分区都选择一个主consumer去接收消息; 当主consumer断开连接时,未确认的和后续的消息都被传递到consumer等待队列中的下一个consumer中。 对于分区topic:broker根据优先级和消费者名称字典序对消费者排序。 对于非分区topic:broker将根据它们订阅非分区topic的顺序选择消费者。 Ⅲ、Shared(共享类型,round robin) 在共享类型中,多个consumer可以附加到同一个订阅。 消息循环分发订阅的consumer,每个消息都只传递给一个consumer。 当某个consumer断开连接时,所有发送给它但没有被确认的消息将被重新发送个剩余的某个消费者 。 注意:
Ⅳ、 Key_Shared 在key_shared类型中,多个consumer可以附加到同一个订阅; 具有相同key的消息仅传递给一个消费者,无论消息被重传多少次都会只传递给同一个消费者; 当有消费者连接或者断开连接时, 注意:
基于key的批处理: 确保生产者将具有相同key的消息打包到同一批中,没有key的消息被打包到同一批中(broker从这个批分发消息时以NON_KEY作为key)。 //开启基于key的批处理的示例Producer<byte[]> producer = client.newProducer().topic("my-topic").batcherBuilder(BatcherBuilder.KEY_BASED).create();注意:
(2)订阅模式创建订阅时会创建一个关联游标来记录消息最后消耗的位置,当订阅的consumer重新启动时可以继续从它消费的最后一条消息开始消费,订阅模式主要为了指定游标类型。
注意:
7、多主题订阅从pulsar 1.23.0-incubating版本开始,支持pulsar消费者可同时订阅多个topic。 两种订阅方式:
当订阅多个主题时,pulsar客户端自动调用pulsar api以发现与正则表达式/topic列表匹配的topic并订阅,如果任何主题不存在,则在主题创建后,consumer自动订阅它们。 多个topic之间没有顺序保证,当生产者向多个topic发送消息时,不能保证从这些主题读取消息的顺序相同。 8、分区topic分区topic是由多个broker处理的一种特殊类型的topic,因此可以实现更高的吞吐量(普通topic由一个broker服务,限制了topic的最大吞吐量)。 分区topic实现:为N个内部主题(其中N是分区的数量),当向分区topic发布消息时,每条消息被路由到多个broker中的一个。 pulsar自动维护分区的分布在哪个broker。 分区主题需要通过admin API显式创建。分区的数量可以在创建主题时指定。 路由模式决定了每个消息应该发布到哪个分区,而订阅类型决定了哪些消息将发送到哪个consumer。 通常,吞吐量问题应该指导分区/路由决策,而订阅决策应该由应用程序语义指导。 (1)路由模式路由模式决定每条消息应该发布到哪个分区(内部topic)。
(2)保证顺序消息的顺序与路由模式和key相关,通常用户希望每个key分区的消息是有序的。
(3)Hashing schemeHashingScheme是一个枚举,表示在为特定消息选择要使用的分区时可用的标准哈希函数集。 有两种类型的标准哈希函数可用:JavaStringHash和Murmur3_32Hash。 producer 的默认哈希函数是JavaStringHash。 注意,当生产者可以来自不同的多语言客户端时,JavaStringHash是没有用的,在这种用例下,建议使用Murmur3_32Hash。 9、非持久topic消息永远不会持久化到磁盘,而只存在于内存中。 在使用非持久传递时,杀死Pulsar代理或断开与topic的订阅者的连接意味着在该非持久topic上所有正在传输的消息都会丢失。 在非持久topic中,broker立即将消息传递给所有已连接的订阅,而不会持久化到BookKeeper。 如果订阅断开连接,broker无法传递正在传输的消息,订阅将无法再次收到这些消息。 非持久topic的消息传递会稍微比持久topic的快。 性能 连接方式 生产者和消费者与以持久性topic相同的方式连接到非持久性topic,但需要以non-persistent开头。 支持三种订阅类型: exclusive、shared、failover 10、消息重传11、消息保留和过期12、消息去重13、消息延迟传递四、架构在最高级别上,一个pulsar实例可以由一个或多个集群组成,实例中的集群之间可以复制数据。 需要做网站?需要网络推广?欢迎咨询客户经理 13272073477
|