发布时间:2025-12-10 11:36:30 浏览次数:5
wj振藩
分类专栏: 消息中间件
分布式消息中间件实践
在消息队列的完整使用场景中至少包含三个角色:
先看消息处理中心的代码:
处理中心类 Broker.java
把处理中心暴露给外部访问的服务类 BrokerServer.java
import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.io.PrintWriter;import java.net.Socket;public class BrokerServer implements Runnable {public static int SERVICE_PORT = 9999;private final Socket socket;public BrokerServer(Socket socket){this.socket = socket;}@Overridepublic void run() {try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));PrintWriter out = new PrintWriter(socket.getOutputStream())){while(true){String str = in.readLine();if(str==null){continue;}System.out.println("接收到数据:"+str);if(str.equals("CONSUME")){String msg = Broker.consume();out.println(msg);out.flush();}else{Broker.produce(str);}}}catch (IOException e) {e.printStackTrace();}}}Main.java 启动服务
import java.io.IOException;import java.net.ServerSocket;public class Main {public static void main(String[] args) throws IOException {ServerSocket socket = new ServerSocket(BrokerServer.SERVICE_PORT);while(true){BrokerServer server = new BrokerServer(socket.accept());new Thread(server).start();}}}
然后看与之通信进行发送和就收消息的客户端:
MqClient.java 与消息处理中心之间通信的客户端类
Main.java 启动客户端,进行发送和消费操作
import java.io.IOException;import java.util.Scanner;public class Main {public static void main(String[] args) throws IOException {System.out.println("请输入1或2选择写入还是取出:");System.out.println("1.写入消息 2.消费消息");int in;int i = 1;while(( in = new Scanner(System.in).nextInt())!=-1){if(in==1){//写入消息MqClient mqClient = new MqClient();mqClient.produce("Hello world-"+i);i++;}else if(in==2){MqClient mqClient = new MqClient();String msg = mqClient.consume();System.out.println("获取的消息是:"+msg);}else{System.out.println("请输入正确的选项");}System.out.println("请输入1或2选择写入还是取出:");System.out.println("1.写入消息 2.消费消息");}}}