[SpringBoot]使用RabbitMQ

引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.2.10.RELEASE</version>
</dependency>

<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.38.Final</version>
</dependency>

<!-- https://mvnrepository.com/artifact/io.projectreactor.netty/reactor-netty -->
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>0.8.9.RELEASE</version>
</dependency>

新增配置类RabbitMQConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.karoy.springboot.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
public static final String queueName = "message";

@Bean
Queue queue() {
return new Queue(queueName, true);
}

@Bean
TopicExchange exchange() {
return new TopicExchange("karoy.topic");
}

@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("karoy.#");
}
}

发送方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.karoy.springboot.action;

import com.karoy.springboot.common.ApiResult;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.karoy.springboot.config.RabbitMQConfig;

@Api(description = "HELLO接口")
@RestController
public class HelloController {
@Autowired
private AmqpTemplate amqpTemplate;

@ApiOperation(value = "欢迎" , notes="欢迎")
@ApiResponses({
@ApiResponse(code = 200, message = "OK", response = ApiResult.class),
@ApiResponse(code = 204, message = "No Content", response = ApiResult.class),
@ApiResponse(code = 401, message = "Unauthorized", response = ApiResult.class),
@ApiResponse(code = 403, message = "Forbidden", response = ApiResult.class),
@ApiResponse(code = 404, message = "Not Found", response = ApiResult.class)
})
@RequestMapping("/hello")
public String hello(@RequestParam("name") String name) {

amqpTemplate.convertAndSend(RabbitMQConfig.queueName, name);

return "Hello "+name;
}
}

接收方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.karoy.springboot.action;

import com.karoy.springboot.common.ApiResult;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.karoy.springboot.config.RabbitMQConfig;

@Component
@RabbitListener(queues = RabbitMQConfig.queueName)
public class TopicConsumer {
@Autowired
private AmqpTemplate amqpTemplate;

public String send(String message) {
amqpTemplate.convertAndSend(RabbitMQConfig.queueName, message);
}

@RabbitHandler
public void recieved(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
System.out.println("[message] recieved message:" + message);
// 确认消息接收
channel.basicAck(tag,false);

// 拒绝消息
//channel.basicReject(tag,false);
}
}

结束