PulsarFunction例子-创新互联
在单机环境下实现字符串追加函数(Pulsar 2.4.2版本)
创新互联公司主营云南网站建设的网络公司,主营网站建设方案,成都app开发,云南h5微信小程序开发搭建,云南网站营销推广欢迎云南等地区企业咨询1 启动单机Pulsar
$ bin/pulsar-daemon start standalone
2 创建函数
1) 准备环境
项目引用 compile 'org.apache.pulsar:pulsar-functions-api:2.4.2'
2) 创建JAVA函数(此函数用于数据源来的topic schema是string,输出的tiopic schema是string)
导出jar包,放到pulsar服务器目录下,本例子放在 /data/jar/下
3)使用命令行工具加载函数到Pulsar,
bin/pulsar-admin functions create \
--classname test.AppStrFunction \
--jar /data/jar/pf.jar \
--inputs persistent://public/default/tlstest \
--output persistent://public/default/teststr \
--tenant public \
--namespace default \
--name appStrFunction
参数说明:
参数 | 说明 |
functions | 通知 pulsar broker,函数操作 |
create | 创建函数,默认创建成功后启动 |
classname | 函数类名称,需要加上包名 |
jar | 指定 jar 包的运行路径 |
inputs | 指定 函数 数据的来源在哪里,支持多个 topics 作为输入 |
output | 如果该 函数 有输出(有些情况下,function 没有输出),指定 function 输出的 topic,只能有一个输出 |
tenant | 指定该 函数 运行的租户名 |
namespace | 指定该 函数 运行的命名空间 |
name | 指定该 函数 运行的名称 |
停止函数
bin/pulsar-admin functions stop \
--tenant public \
--namespace default \
--name appStrFunction
启动函数
bin/pulsar-admin functions start \
--tenant public \
--namespace default \
--name appStrFunction
删除函数
bin/pulsar-admin functions delete \
--tenant public \
--namespace default \
--name appStrFunction
函数的日志在 pulsar安装目录 /logs/functions下
3 测试函数
根据前边函数已成功加载启动
1)向tlstest主题发送消息
import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; public class SendMsgTest{ public static void main(String[] args){ String url="pulsar://192.168.1.48:6650"; try{ PulsarClient client =PulsarClient.builder() .serviceUrl(url) .connectionTimeout(10,TimeUnit.SECONDS) .build(); Producer2)读取teststr主题消息
import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.schema.JSONSchema; import schema.OrderModel; import com.alibaba.fastjson.JSON; public class RecFunTest { public static void main(String[] args) { String url = "http://192.168.1.48:8080"; try{ PulsarClient client =PulsarClient.builder() .serviceUrl(url) .build(); Consumer另外有需要云服务器可以了解下创新互联cdcxhl.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。
当前名称:PulsarFunction例子-创新互联
链接URL:http://pwwzsj.com/article/cddggg.html