(1). 概述
在这一小节,主要学习Spring Cloud Stream与Kafka的整合,后面会去剖析业务模型和部份源码.
(2). 项目结构如下
生产者负责生产消息,消费者负责消费消息.
lixin-macbook:Workspace lixin$ tree spring-cloud-stream-kafka-parent
spring-cloud-stream-kafka-parent
├── pom.xml
├── spring-cloud-stream-kafka-consumer # 消费者消费者
│ ├── pom.xml
│ ├── src
│ │ ├── main
│ │ │ ├── java
│ │ │ │ └── help
│ │ │ │ └── lixin
│ │ │ │ └── example
│ │ │ │ └── stream
│ │ │ │ ├── ConsumerApplication.java
│ │ │ │ └── service
│ │ │ │ └── RecieveService.java
│ │ │ └── resources
│ │ │ └── application.properties
│ │ └── test
│ │ └── java
│ └── target
├── spring-cloud-stream-kafka-provider # 消息生产者
│ ├── pom.xml
│ ├── src
│ │ ├── main
│ │ │ ├── java
│ │ │ │ └── help
│ │ │ │ └── lixin
│ │ │ │ └── example
│ │ │ │ └── stream
│ │ │ │ ├── ProviderApplication.java
│ │ │ │ ├── controller
│ │ │ │ │ └── ProducerController.java
│ │ │ │ └── service
│ │ │ │ └── SendService.java
│ │ │ └── resources
│ │ │ └── application.properties
│ │ └── test
│ │ └── java
│ └── target
└── src
├── main
│ ├── java
│ └── resources
└── test
└── java
(3). 引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- stream -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.1</version>
</dependency>
</dependencies>
(4). SendService
package help.lixin.example.stream.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
@EnableBinding(Source.class)
public class SendService {
@Autowired
private Source source;
public void sendMsg(String msg) {
source.output().send(MessageBuilder.withPayload(msg).build());
}
}
(5). ProducerController
package help.lixin.example.stream.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import help.lixin.example.stream.service.SendService;
@RestController
public class ProducerController {
@Autowired
private SendService sendService;
@RequestMapping("/send/{msg}")
public void send(@PathVariable("msg") String msg) {
sendService.sendMsg(msg);
}
}
(6). application.properties
server.port=6060
spring.application.name=stream-provider
# kafka定义
spring.cloud.stream.kafka.binder.brokers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
spring.cloud.stream.kafka.binder.zk-nodes=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
spring.cloud.stream.kafka.binder.auto-create-topics=true
spring.cloud.stream.kafka.binder.required-acks=-1
# 定义channel(output)与topic的关系.
spring.cloud.stream.bindings.output.destination=hello-world
spring.cloud.stream.bindings.output.content-type=text/plain
(7). ProviderApplication
package help.lixin.example.stream;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ProviderApplication {
public static void main(String[] args) {
SpringApplication.run(ProviderApplication.class, args);
}
}
(7). RecieveService
package help.lixin.example.stream.service;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
@EnableBinding(Sink.class)
public class RecieveService {
@StreamListener(Sink.INPUT)
public void recieve(String payload) {
System.out.println(payload);
}
}
(8). application.properties
server.port=6061
spring.application.name=stream-consumer
spring.cloud.stream.kafka.binder.brokers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
spring.cloud.stream.kafka.binder.auto-create-topics=true
spring.cloud.stream.bindings.input.destination=hello-world
#spring.cloud.stream.kafka.bindings.input.group=${spring.application.name}
(9). ConsumerApplication
package help.lixin.example.stream;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
(10). 查看kafka所有topic
lixin-macbook:bin lixin$ ./kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
__consumer_offsets
hello-world
(11). 测试运行
lixin-macbook:~ lixin$ curl http://localhost:6060/send/hello
lixin-macbook:~ lixin$ curl http://localhost:6060/send/world
(12). 总结
通过Spring Cloud Stream之后,我们可以做到在代码级别无缝的切换MQ厂商.