php向kafka写数据 php rdkafka

3、Kafka生产者-向Kafka写入数据

发送消息的主要步骤

网站建设公司,为您提供网站建设,网站制作,网页设计及定制网站建设服务,专注于企业网站设计,高端网页制作,对成都广告设计等多个行业拥有丰富的网站建设经验的网站建设公司。专业网站设计,网站优化推广哪家好,专业成都网站推广优化,H5建站,响应式网站。

格式:每个消息是一个 ProducerRecord 对象, 必须指定 所属的 Topic和Value , 还可以指定Partition及Key

1:序列化 ProducerRecord

2:分区: 如指定Partition,不做任何事情;否则,Partitioner 根据key得到Partition 。生产者向哪个Partition发送

3:消息添加到相应 bach中 ,独立线程将batch 发到Broker上

4:broker收到消息响应 。 成功回RecordMetaData对象 ,包含了Topic信息、Patition信息、消息在Partition中的Offset信息; 失败返回错误

有序场景:不建议retries  0。可max.in.flight.requests.per.connection  1, 影响生产者吞吐量,但保证有序          ps: 同partition消息有序

三个 必选 的属性:

(1) bootstrap.servers ,broker地址清单

(2) key.serializer: 实现org.apache.kafka.common.serialization.Serializer接口的类,key序列化成字节数组。注意: 必须被设置,即使没指定key

(3)value.serializer, value序列化成字节数组

同步发送消息

异步发送消息

(1)acks: 指定多少partition副本收到消息,生产者才会认为写成功

0,不需等待服务器的响应,吞吐量高,如broker没有收到,生产者不知道

1,leader partition收到消息,一个即成功

all,所有partition都收到,才成功,leader和follower共同应答

(2)buffer.memory, 生产者内 缓存区域大小

(3)compression.type ,默认不压缩,设置成snappy、gzip或lz4对发送给broker压缩

(4)retries, 重发消息的次数

(5)batch.size, 发送同一partition消息会先存储在batch中,该参数指定一个batch内存大小,单位byte。不一定填满才发送

(6)linger.ms ,批次时间,batch被填满或者linger.ms达到上限,就把batch中的消息发送出去

(7)max.in.flight.requests.per.connection, 生产者在收到服务器响应之前可以发送的消息个数

创建ProducerRecord时,必须 指定序列化器 ,推荐序列化框架Avro、Thrift、ProtoBuf等

用 Avro 之前,先定义schema(通常用 JSON 写)

(1)创建一个类代表客户,作为消息的value

(2)定义schema

(3)生成Avro对象发送到Kafka

ProducerRecord包含Topic、value,key默认null,ey的两个作用:1)附加信息    2)被写到Topic的哪个partition

key  null ,默认partitioner, RoundRobin均衡分布

key不空,hash进行散列 ,不改变partition数量(永远不加),key和partition映射不变。

自定义paritioner 需实现Partitioner接口

消息中间件Kafka - PHP操作使用Kafka

cd librdkafka/

./configure make make install

安装成功界面 没有报错就是安装成功

thinkphp,kafka,hbase,spark之间的通讯机制怎么来实现

Spark 有自己的 Kafka connector 用于从Kafka读出读入数据。

Spark 到 Hbase 很多人就用一个foreach operator来写数据。


网页标题:php向kafka写数据 php rdkafka
网页网址:http://pwwzsj.com/article/dodohgh.html