Apache Pulsar的特性

apache pulsarapache pulsar是apache软件基金会顶级项目,是下一代云原生分布式消息流平台。
pulsar 作为下一代云原生分布式消息流平台,支持 多租户、持久化存储、多机房跨区域数据复制 ,具有强一致性、高吞吐以及低延时的高可扩展流数据存储特性, 内置诸多其他系统商业版本才有的特性,是云原生时代解决实时消息流数据传输、存储和计算的最佳解决方案。
pulsar简介系统架构
功能特色
租户和命名空间(namespace)是 pulsar 支持多租户的两个核心概念。在租户级别,pulsar 为特定的租户预留合适的存储空间、应用授权与认证机制。在命名空间级别,pulsar 有一系列的配置策略(policy),包括存储配额、流控、消息过期策略和命名空间之间的隔离策略。
pulsar 做了队列模型和流模型的统一,在 topic 级别只需保存一份数据,同一份数据可多次消费。以流式、队列等方式计算不同的订阅模型大大提升了灵活度。
pulsar 使用计算与存储分离的云原生架构,数据从 broker 搬离,存在共享存储内部。上层是无状态 broker,复制消息分发和服务;下层是持久化的存储层 bookie 集群。pulsar 存储是分片的,这种构架可以避免扩容时受限制,实现数据的独立扩展和快速恢复。
pulsar 原生支持跨地域复制,因此 pulsar 可以跨不同地理位置的数据中心复制数据。当数据中心中断或网络分区时,在多个数据中心存有消息副本尤为重要,提高可用性。
pulsar functions 是基于 pulsar 的轻量级流处理方式。pulsar functions 直接部署在 broker 节点上(或作为 kubernetes 集群中的容器)。通过 pulsar functions,pulsar 可以直接解决许多流处理任务,简化操作。支持客户端 java 客户端c++ 客户端.net/c# 客户端go 客户端nodejs 客户端ruby 客户端pulsar安装与部署目前pulsar不支持window,下面通过docker进行安装,可以参考官网https://pulsar.apache.org/docs/next/getting-started-docker/
同时可以安装pulsar manager,具体操作可以参考官方文档 https://pulsar.apache.org/docs/next/administration-pulsar-manager/
其中pulsar manager 是一个网页式可视化管理与监测工具,支持多环境下的动态配置。可用于管理和监测租户、命名空间、topic、订阅、broker、集群等。
window环境使用docker推荐使用docker desktop,和linux一样可以通过docker命令管理镜像、部署容器等操作。打开并启动docker desktop后,在终端执行命令执行
_> docker search pulsar
可以查询到pulsar相关的镜像
镜像下载这里我们选择分别下载红框的两个镜像,执行命令
_> docker pull apachepulsar/pulsar _> docker pull apachepulsar/pulsar-manager
启动启动pulsardocker run -it -p 6650:6650 -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar bin/pulsar standalone启动pulsar managerdocker run --name pulsar-manager -dit -p 9527:9527 -p 7750:7750 -e spring_configuration_file=/pulsar-manager/pulsar-manager/application.properties apachepulsar/pulsar-manager添加用户:
for /f tokens=1 %a in ('curl http://localhost:7750/pulsar-manager/csrf-token') do set csrf_token=%acurl -x put x-xsrf-token: %csrf_token% -h cookie: xsrf-token=%csrf_token%; -h content-type: application/json -d {name: admin, password: 123456, description: super user admin, email: admin@test.com} http://localhost:7750/pulsar-manager/users/superuser访问:
http://localhost:9527/ 用户名密码:admin/123456配置environments:
这里需要保证pulsar manager应用服务能够访问到pulsar应用,由于都是通过docker部署,配置service url需要使用网络ip,不要用localhost。
管理界面:
pulsar与springboot集成springboot version : 2.3.7.releasepulsar client: 2.10.2通过properties简单定义一些broker相关的属性@data@configurationproperties(prefix = pulsar)public class pulsarproperties { private string cluster; private string namespace; private string serverurl; private string token;}通过配置定义了一些常用的组件,比如生产、消费工厂@configuration@enableconfigurationproperties({pulsarproperties.class})public class pulsarbootstrapconfiguration { private final pulsarproperties properties; public pulsarbootstrapconfiguration(pulsarproperties properties) { this.properties = properties; } @bean(destroymethod = close) public pulsarclient pulsarclient() throws pulsarclientexception { clientbuilder clientbuilder = pulsarclient.builder().serviceurl(properties.getserverurl()); return clientbuilder.build(); } @bean public pulsarproducerfactory pulsarproducerfactory() throws pulsarclientexception { return new pulsarproducerfactory(pulsarclient(), properties); } @bean public pulsarconsumerfactory pulsarconsumerfactory() throws pulsarclientexception { return new pulsarconsumerfactory(pulsarclient(), properties); }}启动服务,在服务启动后,通过实现smartinitializingsingleton接口,完成容器基本启动(不包含lazy的bean)后,开始对消费者consumer监听@slf4j@springbootapplicationpublic class pulsarapplication implements smartinitializingsingleton { @autowired private pulsarconsumerfactory consumerfactory; public static void main(string[] args) { springapplication.run(pulsarapplication.class,args); } @override public void aftersingletonsinstantiated() { startconsumerlistener(); } private void startconsumerlistener(){ consumer consumer = createconsumer(); if( consumer != null ){ while (!thread.currentthread().isinterrupted()){ completablefuture completablefuture = consumer.receiveasync(); message message = null; try { message = completablefuture.get(); } catch (interruptedexception e) { thread.currentthread().interrupt(); log.error(错误,e); } catch (executionexception e) { log.error(错误,e); } if( message!=null ){ try { log.info( 接收消息:{} , message.getvalue() ); consumer.acknowledge(message); } catch (pulsarclientexception e) { consumer.negativeacknowledge(message); throw new runtimeexception(e); } } } } } private consumer createconsumer() { try { return consumerfactory.getconsumer(constants.topic_demo); } catch (pulsarclientexception e) { log.error(创建consumer出错:{}, e.getmessage(),e); } return null; }}消息发送测试@slf4j@runwith(springrunner.class)@springboottestpublic class pulsarboottests { @autowired private pulsarproducerfactory producerfactory; @test public void sendmessage() throws pulsarclientexception { producer producer = producerfactory.getproducer(constants.topic_demo); producer.send( 测试消息: + new date()); producer.close(); }}检查消息接收情况2023-02-05 12:05:14.043 info 23472 --- [ulsar-timer-6-1] o.a.p.c.impl.consumerstatsrecorderimpl : [topic_demo] [sub-topic_demo] [7c2b2] prefetched messages: 0 --- consume throughput received: 0.02 msgs/s --- 0.00 mbit/s --- ack sent rate: 0.02 ack/s --- failed messages: 0 --- batch messages: 0 ---failed acks: 02023-02-05 12:06:16.425 info 23472 --- [ main] com.sucl.pulsar.pulsarapplication : 接收消息: 测试消息: sun feb 05 12:06:16 cst 2023结束语该篇主要通过官网对apache pulsar做了简单的了解与尝试,同时基于springboot,以简单的示例代码实现了消息的发送与接收,其中各个组件仅仅使用了默认的配置,在生产环境需要根据pulsar的特性以及官方api使其具有扩展性与易用性。

飞行时间系统对于机器视觉应用传统3D成像解决方案的优势
如何面向5G开启eMBB大航海时代
家庭电路四个重要注意事项
如何使用测试套件解决JEDEC-UFS堆栈验证的7大挑战
瑞声科技携手Dispelix向行业提供从设计到生产完整的刻蚀光栅方案
Apache Pulsar的特性
一些值得玩味的python代码
华为P11什么时候上市?华为Mate10还没来,华为P11就强势曝光了:走小轻薄路线
随着OLED电视销量的迅猛增长 OLED阵营的不断扩容
东莞华为扩展坞厂家六口扩展更便捷
开着特斯拉环球旅行是什么感受
微流控芯片检测技术_微流控芯片是否有前景
Moku 云编译介绍
吉利商用车湖州基地项目落户南太湖新区
谷歌持续精简,但核心业务持续增长
中国电信基于天翼医疗云专区的整体信息化解决方案
二叉树的前序遍历、中序遍历、后续遍历的非递归实现
数字化的深耕与构建:华为数字能源的立体版图
魅族MX首张官方图曝光 M9降价为其开道
强油循环风冷变压器发生“备用冷却器投入”信号时,怎么办?