发布时间:2025-12-09 11:45:28 浏览次数:12
import org.springframework.integration.annotation.ServiceActivator; //导入依赖的package包/类@Bean@ServiceActivator(inputChannel = "pubSubOutputChannel")public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {PubSubMessageHandler adapter =new PubSubMessageHandler(pubsubTemplate, "exampleTopic");adapter.setPublishCallback(new ListenableFutureCallback<String>() {@Overridepublic void onFailure(Throwable ex) {LOGGER.info("There was an error sending the message.");}@Overridepublic void onSuccess(String result) {LOGGER.info("Message was sent successfully.");}});return adapter;} import org.springframework.integration.annotation.ServiceActivator; //导入依赖的package包/类@ServiceActivator(inputChannel = "outbound-rest", outputChannel = "mark-outboud")public String restOutbound(Message<?> msg) {try {ExportString exportString = (ExportString) msg.getPayload();logger.debug("message arrived at REST outbound sender: " + exportString.getEventId());Addressable addressable = exportString.getRegistration().getAddressable();String uri = addressable.getAddress() + ":" + addressable.getPort() + addressable.getPath();HttpRequestExecutingMessageHandler handler = new HttpRequestExecutingMessageHandler(uri);handler.setHttpMethod(HttpMethod.POST);handler.setExpectReply(false);Message<String> message = MessageBuilder.withPayload(exportString.getEventString()).build();handler.handleMessage(message);logger.info("message sent to REST address: " + uri + " : " + exportString.getEventId());return exportString.getEventId();} catch (Exception e) {logger.error("Problem with sending message via REST: " + e.getMessage());return null;}} import org.springframework.integration.annotation.ServiceActivator; //导入依赖的package包/类@ServiceActivator(inputChannel = "outbound-zmq", outputChannel = "mark-outboud")public String zmqOutbound(Message<?> msg) {try {ExportString exportString = (ExportString) msg.getPayload();logger.debug("message arrived at 0MQ outbound sender: " + exportString.getEventId());// today, for ZMQ, subscribers will listen to pre-subscribed EdgeX// ZMQ outbound port.// TODO - someday, reverse the direction and allow clients to set up// inpidual ZMQ port to publish out oflogger.debug(".....sending: " + exportString.getEventString());sendor.sendEventMessage(exportString.getEventString());// logger.error("--->" + exportString.getEventId() + " [email protected]// " + System.currentTimeMillis());logger.info("message sent via 0MQ " + exportString.getEventId());return exportString.getEventId();} catch (Exception e) {logger.error("Problem with sending message via 0MQ: " + e.getMessage());}return null;} import org.springframework.integration.annotation.ServiceActivator; //导入依赖的package包/类@ServiceActivator(inputChannel = "outbound-iotcore", outputChannel = "mark-outboud")public String mqttOutbound(Message<?> msg) {try {ExportString payload = (ExportString) msg.getPayload();logger.debug("message arrived at IoT Core MQTT outbound sender: " + payload.getEventId());Addressable addressable = payload.getRegistration().getAddressable();if (addressable != null) {// TODO - cache and reuse clients per clientId// String clientId = addressable.getPublisher();if (sender == null) {sender = new IotCoreMQTTSender(addressable, privateKeyFile, algorithm, 0, 600);}boolean ok = sender.sendMessage(payload.getEventString().getBytes());if (!ok) throw new Exception("error while sending message");logger.info("message sent to IoT Core MQTT broker: " + payload.getRegistration().getAddressable() + " : " + payload.getEventId());return payload.getEventId();} else {logger.error("No MQTT address information provided with registration. Event message not sent for client.");}} catch (Exception e) {logger.error("Problem when sending message via MQTT: " + e.getMessage());}return null;} import org.springframework.integration.annotation.ServiceActivator; //导入依赖的package包/类@ServiceActivator(inputChannel = "outbound-azure", outputChannel = "mark-outboud")public String mqttOutbound(Message<?> msg) {try {ExportString exportString = (ExportString) msg.getPayload();logger.debug("message arrived at Azure MQTT outbound sender: " + exportString.getEventId());Addressable addressable = exportString.getRegistration().getAddressable();if (addressable != null) {// TODO - someday cache and reuse clientsAzureMQTTSender sender = new AzureMQTTSender(exportString.getRegistration().getAddressable(),exportString.getDeviceId());sender.sendMessage(exportString.getEventString().getBytes());logger.info("message sent to Azure MQTT broker: " + exportString.getRegistration().getAddressable()+ " : " + exportString.getEventId());return exportString.getEventId();} elselogger.error("No MQTT address information provided with registration. Event message not sent for client.");} catch (Exception e) {logger.error("Problem with sending message via MQTT: " + e.getMessage());}return null;}