本地启动步骤 1、项目导入 IDEA,以及后续一些列 IDEA 内的配置,具体见这个文档:
文档:https://pulsar.apache.org/contribute/setup-ide/
2、打包二进制包,并本地启动单机版服务
需要先将本地 jdk 设为 17,并把 maven 的 jdk 设为 17
setting.xml:
1 2 3 4 5 6 7 8 9 10 11 12 13 < profile> < id> jdk-17 < / id> < activation> < activeByDefault> true < / activeByDefault> < jdk> 17 < / jdk> < / activation> < properties> < maven.compiler.source> 17 < / maven.compiler.source> < maven.compiler.target> 17 < / maven.compiler.target> < maven.compiler.compilerVersion> 17 < / maven.compiler.compilerVersion> < project.build.sourceEncoding> UTF-8 < / project.build.sourceEncoding> < / properties> < / profile>
然后执行打包:
1 ./mvnw -Pcore-modules,-main -T 1C install -DskipTests -Dspotbugs.skip=true -Dcheckstyle.skip=true
-Dcheckstyle.skip=true 是为了当修改了代码,或者加了注释时,可以成功打包。
包路径:
1 ./distribution/server/target/apache-pulsar-3.3.0-SNAPSHOT-bin.tar.gz
然后本地运行:
1 OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005" ./bin/pulsar standalone -nss -nfw
如果想要启动之前先等待远程 debug 开启,比如想 debug 启动的时候的初始化逻辑,可以将 suspend 设为 y,此时启动会先等待 debug 开启。
1 2 3 % OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=*:5005" ./bin/pulsar standalone -nss -nfw Listening for transport dt_socket at address: 5005 // 在这个地方block
如果想要启用 3.0 新版的延迟消息,需要修改 conf/broker.conf :
1 2 3 4 # Class name of the factory that implements the delayed deliver tracker. # If value is "org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory", # will create bucket based delayed message index tracker. delayedDeliveryTrackerFactoryClassName=org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory
3、在 IDEA 中开启远程 debug
文档:https://pulsar.apache.org/contribute/setup-debugging/
下载地址:https://pulsar.apache.org/download/
4、启动客户端,发送和接收消息。可以使用官方的工具,但是可能会碰到点问题,更直接的是直接自己写收发的 demo:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public static void main (String[] args) throws Exception { PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650" ) .build(); Producer<byte []> producer = client.newProducer() .topic("test-topic" ) .create(); producer.send("My message 无延时" .getBytes()); producer.newMessage().value("My message 延时" .getBytes()).deliverAt(tmp) .send(); Consumer consumer = client.newConsumer() .topic("test-topic" ) .subscriptionType(SubscriptionType.Shared) .subscriptionName("my-subscription" ) .subscribe(); while (true ) { Message msg = consumer.receive(); try { System.out.println("Message received: " + new String (msg.getData())); consumer.acknowledge(msg); } catch (Exception e) { consumer.negativeAcknowledge(msg); } } }
执行 main 以后,idea 就能 debug 到收消息方法,比如上面代码发的延时消息,在 BucketDelayedDeliveryTracker 的 addMessage 方法:
资料 https://pulsar.apache.org/contribute/setup-debugging/ https://github.com/apache/pulsar