springcloudsteam整合kafka进行消息发送与接收-创新互联

spring cloud steam :
Binder和Binding
Binder是SpringCloud Stream的一个抽象概念,是应用与消息中间件之间的粘合剂,目前SpringCloud Stream实现了Kafka和RabbitMQ的binder
Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。
整合配置yml文件:

成都创新互联专注于凉州企业网站建设,自适应网站建设,购物商城网站建设。凉州网站建设公司,为凉州等地区提供建站服务。全流程按需求定制制作,专业设计,全程项目跟踪,成都创新互联专业和态度为您提供的服务
spring:
  cloud:
    #    function:
    #      definition: testChannel
    stream:
      default-binder: kafka #默认的binder是kafka(粘合剂粘合的类型为kafka)

      #可以动态绑定的目标列表(如:动态路由),如果设置,则只能绑定列出的目的地
      #     dynamic-destinations:
      #绑定信息
      bindings:
        #消息报错后,数据保存的topic
        error:
          destination: myError
        testChannel-in-0:
          #消费者
          consumer:
            #            bindingName: myInConsumer
            #消费者并发 默认为1
            concurrency: 1
            #是否分区接收数据 默认false
            partitioned: false
            #头信息模式,设置为raw时,禁用输入头文件解析。仅适用于不支持消息头的消息中间件,并且需要头部嵌入。入站数据来自外部Spring Cloud Stream应用程序时很有用。
            header-mode: headers
            #重试次数
            max-attempts: 3
            #初始回退间隔时间
            back-off-initial-interval: 1000
            #大回退间隔时间
            back-off-max-interval: 10000
            #回退倍数
            back-off-multiplier: 2.0
            #大于0时,表示允许自定义该消费者的实例索引,-1时使用spring.cloud.stream.instance-index
            instance-index: -1
            #大于0时表示自定义消费者实例技术,-1时默认使用spring.cloud.stream.instanceCount
            instance-count: -1
            content-type: application/json

          #生产者
          producer:
            #一个确定如何分配出站数据的SpEL表达式
            partition-key-expression: headers.cs
            #一个PartitionKeyExtractorStrategy实现。如果设置,或者如果设置了partitionKeyExpression,则该通道上的出站数据将被分区,并且partitionCount必须设置为大于1的值才能生效。这两个选项是相互排斥的。
            #partition-key-extractor-class:
            #一个PartitionSelectorStrategy实现。与partitionSelectorExpression相互排斥。如果没有设置,则分区将被选为hashCode(key) % partitionCount,其中key通过partitionKeyExpression或partitionKeyExtractorClass计算。
            #partition-selector-class:
            #partition-selector-expression:
            #如果启用分区,则数据的目标分区数。如果生产者被分区,则必须设置为大于1的值。在Kafka,解释为提示; 而是使用更大的和目标主题的分区计数。
            partition-count: 3
            #消息发送失败的处理逻辑默认是关闭的   test.errors
            error-channel-enabled: true
          destination: test #目标主题 相当于kafka的topic
          binder: kafka  #粘合器
          content-type: application/json
          #          content-type: text/html
          group: group2
        testChannel-out-0:
          #消费者
          consumer:
            #            bindingName: myOutConsumer
            #消费者并发 默认为1
            concurrency: 1
            #是否分区接收数据 默认false
            partitioned: false
            #头信息模式,设置为raw时,禁用输入头文件解析。仅适用于不支持消息头的消息中间件,并且需要头部嵌入。入站数据来自外部Spring Cloud Stream应用程序时很有用。
            header-mode: headers
            #重试次数
            max-attempts: 3
            #初始回退间隔时间
            back-off-initial-interval: 1000
            #大回退间隔时间
            back-off-max-interval: 10000
            #回退倍数
            back-off-multiplier: 2.0
            #大于0时,表示允许自定义该消费者的实例索引,-1时使用spring.cloud.stream.instance-index
            instance-index: -1
            #大于0时表示自定义消费者实例技术,-1时默认使用spring.cloud.stream.instanceCount
            instance-count: -1

          #          #生产者
          #          producer:
          #            #一个确定如何分配出站数据的SpEL表达式
          #            partition-key-expression: headers.cs
          #            #一个PartitionKeyExtractorStrategy实现。如果设置,或者如果设置了partitionKeyExpression,则该通道上的出站数据将被分区,并且partitionCount必须设置为大于1的值才能生效。这两个选项是相互排斥的。
          #            #partition-key-extractor-class:
          #            #一个PartitionSelectorStrategy实现。与partitionSelectorExpression相互排斥。如果没有设置,则分区将被选为hashCode(key) % partitionCount,其中key通过partitionKeyExpression或partitionKeyExtractorClass计算。
          #            #partition-selector-class:
          #            #partition-selector-expression:
          #            #如果启用分区,则数据的目标分区数。如果生产者被分区,则必须设置为大于1的值。在Kafka,解释为提示; 而是使用更大的和目标主题的分区计数。
          #            partition-count: 3
          #            #消息发送失败的处理逻辑默认是关闭的   test.errors
          #            error-channel-enabled: true
          destination: test #本例子创建了另外一个topic (test1)用于区分不同的功能区分。
          binder: kafka
          content-type: application/json
          group: group1

      #          producer:
      #            error-channel-enabled: true
      ##            partitionSelectorName: customPartitionSelector
      ##            partitionKeyExtractorName: customPartitionKeyExtractor
      #            partitionCount: 3
      #            partitionKeyExpression: headers.cs
      ##            partition-key-extractor-name: customPartitionKeyExtractor
      ##            partition-selector-name: customPartitionSelector
      binders:
        kafka:
          binder:
            #kafka brokers,默认localhost
            brokers: localhost
            #kafka 端口号,默认9092
            default-broker-port: 9092
            #kafka zk节点,默认localhost
            zk-nodes: localhost
            #zookeeper 端口
            default-zk-port: 2181
            #配置,map
            #configuration:
            #自定义标题列表
            headers:  cs
            #偏移量保存时间(ms)窗口,0:忽略,默认10000(ms)
            offset-update-time-window: 10000
            #偏移量保存次数,与时间窗口互斥
            offset-update-count: 0
            #broker 需要的ack数量
            required-acks: 1
            #只有设置autoCreateTopics或autoAddPartitions才有效
            min-partition-count: 1
            #自动创建topic时 生成的副本数量
            replication-factor: 1
            #自动创建主题
            auto-create-topics: true
            #如果设置为true,则绑定器将根据需要创建新的分区。如果设置为false,则绑定器将依赖于已配置的主题的分区大小。如果目标主题的分区计数小于预期值,则绑定器将无法启动。
            auto-add-partitions: true
            #socket 缓冲区大小
            socket-buffer-size: 2097152

            bootstrap-servers: 127.0.0.1:9092 #kafka服务地址,集群部署的时候需要配置多个

            #配置,map
            configuration:
              acks: -1
              key:
                serializer: org.apache.kafka.common.serialization.StringSerializer
              #            value:
              #              serializer: org.apache.kafka.common.serialization.StringSerializer
              max:
                poll:
                  records: 200
              retries: 3
              session:
                timeout:
                  ms: 40000   # 每次消费的处理时间
          #绑定
          bindings:
            testChannel-out-put:
              #消费者
              consumer:
                #主题分区消费者组成员之间自动平衡
                auto-rebalance-enabled: true
                #自动提交偏移量
                auto-commit-offset: true
#                auto-commit-on-error:
                #连接恢复尝试之间的间隔,以毫秒为单位。
                recovery-interval: 5000
                #是否将消费者偏移量重置为start-offset提供的值
                reset-offsets: false
                #新组的起始偏移量,或resetOffsets为true时的起始偏移量。允许的值:earliest,latest,默认值:null(相当于earliest)
                start-offset: earliest
                enable-dlq: false
                #              configuration:
                #接收错误消息的DLQ主题的名称。默认值:null(如果未指定,将导致错误的消息将转发到名为error::的主题)。:
              #              dlq-name:
              #生产者
              producer:
                #              configuration:
                buffer-size: 16348
                #生产者是否是同步的
                sync: true
                #生产者在发送之前等待多长时间,以便允许更多消息在同一批次中累积。(通常,生产者根本不等待,并且简单地发送在先前发送进行中累积的所有消息。)非零值可能会以延迟为代价增加吞吐量。
                batch-timeout: 0
                #                key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
                #                value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
                client-id: producer1
              health-timeout: 60

消息生产者:

@Component
public class MessageProducer {private final StreamBridge streamBridge;

    @Autowired(required = false)
    private BinderAwareChannelResolver resolver;


    public MessageProducer(StreamBridge streamBridge) {this.streamBridge = streamBridge;
    }

    public String resolverSendMessage(String messages) {SendMessageDto sendMessageDto = new SendMessageDto();
        sendMessageDto.setIp(UUID.randomUUID().toString());
        sendMessageDto.setMessage(messages);
        sendMessageDto.setTiem(new Date().toString());
        MessageBuilderstringMessageBuilder = MessageBuilder.withPayload(sendMessageDto);
        stringMessageBuilder.setHeader("cs","1");
//        stringMessageBuilder.setHeader(KafkaHeaders.MESSAGE_KEY,"1233322");
//        Messagebuild = ;
        GenericMessage stringMessage = (GenericMessage) stringMessageBuilder.build();
        resolver.resolveDestination("testChannel-in-0").send(stringMessage);
        return "yes!";

    }

    public String send(String messages) {SendMessageDto sendMessageDto = new SendMessageDto();
        sendMessageDto.setIp(UUID.randomUUID().toString());
        sendMessageDto.setMessage(messages);
        sendMessageDto.setTiem(new Date().toString());
//        String s = JSON.toJSONString(sendMessageDto);
        MessageBuilderstringMessageBuilder = MessageBuilder.withPayload(sendMessageDto);
        stringMessageBuilder.setHeader("cs","1");
//        stringMessageBuilder.setHeader(KafkaHeaders.MESSAGE_KEY,"1233322");
//        Messagebuild = ;
        GenericMessage stringMessage = (GenericMessage) stringMessageBuilder.build();
        streamBridge.send("testChannel-in-0", stringMessage);
        return "发送消息: " + messages;
    }
}

消息消费者

//注意这里采用的是函数式编程,向spring 容器中注入名为testChannel 的bean,  应为高版本的spring cloud steam 弃用了
//@StreamListener ,@Input等注解,而是提倡函数式接口
//testChannel 与生产者写入消息的通道名“testChannel-in-0”  有所差异,-in-0是spring cloud steam存在的默认规则
 @Bean(name = "testChannel")
    ConsumertestChannel( ) {return str ->{System.out.println("消费者处理消息:" +str );
        };
    }

如果需要指定消息的分区,需要在配置文件中自定义分区的计算逻辑属性为:

partition-key-expression: headers.cs

你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧


网站栏目:springcloudsteam整合kafka进行消息发送与接收-创新互联
文章路径:http://pwwzsj.com/article/jscoi.html