485协议自动做面条,通过海为的CBOX
package com.ruoyi.device.mqtt.config;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Service;
@Configuration
@Service
public class MqttConfig {
@Value("${mqtt.broker-url}")
private String brokerUrl;
@Value("${mqtt.client-id}")
private String clientId;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.connection-timeout:10}")
private int connectionTimeout; // 默认连接超时为10秒
@Value("${mqtt.keep-alive-interval:30}")
private int keepAliveInterval; // 默认保持连接时间间隔为20秒
@Value("${mqtt.clean-session:false}")
private boolean cleanSession; // 默认启用清除会话
@Value("${mqtt.automatic-reconnect:true}")
private boolean automaticReconnect; // 默认启用自动重连
@Bean
public MqttClient mqttClient() throws MqttException {
// 创建 MQTT 客户端实例
//随机客户端id
String clientId = this.clientId;
String os = System.getProperty("os.name").toLowerCase();
if (os.contains("win")) {
System.out.println("当前操作系统:Windows");
clientId= clientId + "win-mqtt-niuma";
} else if (os.contains("nix") || os.contains("nux")) {
clientId= clientId + "nix-mqtt-niuma";
} else {
clientId= clientId + "default-mqtt-niuma";
}
MqttClient mqttClient = new MqttClient(brokerUrl, clientId);
// 创建连接选项
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(username);
// 设置密码(如果有的话)
if (password != null && !password.isEmpty()) {
connOpts.setPassword(password.toCharArray());
}
// 设置清除会话
connOpts.setCleanSession(cleanSession);
// 设置自动重连
connOpts.setAutomaticReconnect(automaticReconnect);
// 设置连接超时
connOpts.setConnectionTimeout(connectionTimeout);
// 设置心跳间隔
connOpts.setKeepAliveInterval(keepAliveInterval);
try {
// 连接到 MQTT Broker
mqttClient.connect(connOpts);
System.out.println("Connected to broker: " + brokerUrl);
} catch (MqttException e) {
// 异常处理,确保连接异常时打印相关信息
System.err.println("Failed to connect to broker: " + brokerUrl);
throw e; // 将异常抛出,方便外部处理
}
return mqttClient;
}
}
package com.ruoyi.device.mqtt.service;
import cn.hutool.core.util.RandomUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ruoyi.common.constant.UserRoleConstants;
import com.ruoyi.common.core.domain.entity.SysUser;
import com.ruoyi.common.exception.ServiceException;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.device.dto.MqttPublishVo;
import com.ruoyi.device.mqtt.config.RedisConstant;
import com.ruoyi.device.util.RedisUtil;
import com.ruoyi.device.websoket.CallNumberScreeWebSocketHandler;
import com.ruoyi.device.websoket.DeviceParamWebSocketHandler;
import com.ruoyi.device.websoket.OrderStatusWebSocketHandler;
import com.ruoyi.small_device_api.deviceException.bo.SmDeviceExceptionBo;
import com.ruoyi.small_device_api.deviceException.service.ISmDeviceExceptionService;
import com.ruoyi.small_device_api.equipment.domain.SmEquipment;
import com.ruoyi.small_device_api.equipment.mapper.SmEquipmentMapper;
import com.ruoyi.small_device_api.equipment.service.ISmEquipmentService;
import com.ruoyi.small_device_api.noodleLog.bo.SmNoodleLogBo;
import com.ruoyi.small_device_api.noodleLog.service.impl.SmNoodleLogServiceImpl;
import com.ruoyi.small_device_api.noodleStatus.bo.SmNoodleStatusBo;
import com.ruoyi.small_device_api.noodleStatus.domain.SmNoodleStatus;
import com.ruoyi.small_device_api.noodleStatus.service.ISmNoodleStatusService;
import com.ruoyi.small_device_api.noodleStatus.service.impl.SmNoodleStatusServiceImpl;
import com.ruoyi.small_device_api.orderDetails.bo.SmOrderDetailsBo;
import com.ruoyi.small_device_api.orderDetails.domain.SmOrderDetails;
import com.ruoyi.small_device_api.orderDetails.service.ISmOrderDetailsService;
import com.ruoyi.small_device_api.orders.domain.SmCallNumber;
import com.ruoyi.small_device_api.orders.domain.SmOrders;
import com.ruoyi.small_device_api.orders.mapper.SmOrdersMapper;
import com.ruoyi.small_device_api.orders.service.ISmCallNumberService;
import com.ruoyi.small_device_api.orders.service.ISmOrdersService;
import com.ruoyi.small_device_api.products.domain.SmProducts;
import com.ruoyi.small_device_api.products.service.ISmProductsService;
import com.ruoyi.small_device_api.specItem.domain.SmSpecItem;
import com.ruoyi.small_device_api.specItem.service.ISmSpecItemService;
import com.ruoyi.small_device_api.tableConfig.domain.SmTableConfig;
import com.ruoyi.small_device_api.tableConfig.service.ISmTableConfigService;
import com.ruoyi.small_device_api.util.DeviceStatus;
import com.ruoyi.small_device_api.util.OrderStatus;
import com.ruoyi.system.service.ISysUserService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.annotation.Order;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import static com.ruoyi.device.mqtt.config.RedisConstant.*;
@Service
@Slf4j
public class MqttService {
@Value("${mqtt.device-test:true}")
private Boolean deviceTest;
@Autowired
private ObjectMapper objectMapper; // For converting objects to JSON
//发送指令
private static final String control = "yc/%s/control";
//做面装套
private static final String makeNoodle = "yc/1023D00J0041/makeNoodle";
//报警信息
private static final String alarm = "yc/1023D00J0041/alarm";
//可以做面
private static final String yesOrder = "yc/1023D00J0041/yesOrder";
//设置参数
private static final String setParam = "yc/1023D00J0041/canshu";
//可以做下一份
// private static final String pubNext = "niuma/miantiao/%s/pub1";
@Autowired
private MqttClient mqttClient;
private static final int MAX_RETRY_COUNT = 3;
@Autowired
private RedisUtil redisUtil;
@Autowired
private SmNoodleStatusServiceImpl smNoodleStatusServiceImpl;
@Autowired
private ISmNoodleStatusService smNoodleStatusService;
@Autowired
private SmNoodleLogServiceImpl smNoodleLogServiceImpl;
@Autowired
private SmEquipmentMapper smEquipmentMapper;
// 添加订阅状态标记和重连调度器
private volatile boolean isSubscribed = false;
private ScheduledExecutorService reconnectExecutor;
//初始化
@PostConstruct
@Order(-9999)
public void init() throws Exception {
try {
// 检查MQTT客户端配置
logMqttClientConfig();
// 尝试连接
connectAndRetry();
} catch (Exception e) {
log.error("MQTT初始化失败,将在后台尝试重连: {}", e.getMessage(), e);
// 记录错误但不中断启动流程
scheduleReconnect();
}
}
/**
* 记录MQTT客户端配置
*/
private void logMqttClientConfig() {
try {
if (mqttClient != null) {
log.info("=== MQTT客户端配置信息 ===");
log.info("客户端ID: {}", mqttClient.getClientId());
log.info("服务器URI: {}", mqttClient.getServerURI());
log.info("连接状态: {}", mqttClient.isConnected() ? "已连接" : "未连接");
log.info("==========================");
} else {
log.error("MQTT客户端未注入,请检查Spring配置");
}
} catch (Exception e) {
log.error("获取MQTT客户端配置失败: {}", e.getMessage(), e);
}
}
/**
* 连接MQTT并重试
*/
private void connectAndRetry() throws ServiceException {
int attempt = 0;
while (attempt < MAX_RETRY_COUNT) {
try {
if (!mqttClient.isConnected()) {
log.info("正在连接MQTT代理服务器...");
// 确保MqttClient配置正确
if (mqttClient.getClientId() == null || mqttClient.getClientId().isEmpty()) {
log.error("MQTT客户端ID为空,无法连接");
throw new ServiceException("MQTT客户端ID为空");
}
// 确保连接选项设置正确
org.eclipse.paho.client.mqttv3.MqttConnectOptions options = new org.eclipse.paho.client.mqttv3.MqttConnectOptions();
options.setCleanSession(true);
options.setKeepAliveInterval(60);
options.setConnectionTimeout(30);
options.setAutomaticReconnect(true);
log.info("MQTT连接选项: 清除会话:{}, 保活间隔:{}秒, 连接超时:{}秒, 自动重连:{}",
options.isCleanSession(), options.getKeepAliveInterval(),
options.getConnectionTimeout(), options.isAutomaticReconnect());
// 连接
mqttClient.connect(options);
log.info("成功连接到MQTT代理服务器");
}
// 无论之前是否订阅,都强制执行订阅流程
log.info("开始执行MQTT主题订阅");
initSubscriptions();
log.info("完成MQTT主题订阅流程");
// 确认一下订阅状态
verifySubscriptions();
return; // 如果连接成功,直接返回
} catch (MqttException e) {
attempt++;
log.error("MQTT连接失败,尝试 {}/{}:{}, 错误码: {}",
attempt, MAX_RETRY_COUNT, e.getMessage(), e.getReasonCode());
if (attempt == MAX_RETRY_COUNT) {
throw new ServiceException("MQTT连接失败,已达最大重试次数,将在后台继续尝试");
}
// 如果需要,可以在这里增加延时
try {
Thread.sleep(2000); // 延时2秒重试
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
}
}
}
}
/**
* 验证订阅状态
*/
private void verifySubscriptions() {
try {
// 我们无法直接从Paho客户端获取已订阅的主题列表
// 但我们可以确认当前连接状态
if (mqttClient.isConnected()) {
log.info("MQTT连接正常,订阅应该已生效");
// 添加测试消息发布
try {
String testTopic = "test/mqtt/verification";
log.info("发布测试消息到主题: {}", testTopic);
MqttMessage testMessage = new MqttMessage("MQTT订阅验证测试".getBytes());
testMessage.setQos(1);
mqttClient.publish(testTopic, testMessage);
log.info("测试消息发布成功");
} catch (Exception e) {
log.error("发布测试消息失败: {}", e.getMessage(), e);
}
} else {
log.warn("MQTT未连接,订阅可能未生效");
}
} catch (Exception e) {
log.error("验证订阅状态出错: {}", e.getMessage(), e);
}
}
/**
* 安排定期重连任务
*/
private synchronized void scheduleReconnect() {
// 如果已存在重连任务,不再创建新的
if (reconnectExecutor != null && !reconnectExecutor.isShutdown()) {
log.info("重连任务已存在,不再创建新任务");
return;
}
log.info("创建MQTT重连定时任务");
reconnectExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
Thread thread = new Thread(r, "mqtt-reconnect-thread");
thread.setDaemon(true); // 设为守护线程,不阻止JVM退出
return thread;
});
reconnectExecutor.scheduleAtFixedRate(() -> {
try {
if (!mqttClient.isConnected()) {
log.info("尝试重新连接MQTT...");
// 重连前检查配置
logMqttClientConfig();
// 配置连接选项
org.eclipse.paho.client.mqttv3.MqttConnectOptions options = new org.eclipse.paho.client.mqttv3.MqttConnectOptions();
options.setCleanSession(true);
options.setKeepAliveInterval(60);
options.setConnectionTimeout(30);
options.setAutomaticReconnect(true);
mqttClient.connect(options);
log.info("MQTT重连成功");
// 重连后强制执行订阅
log.info("重连后开始执行MQTT主题订阅");
initSubscriptions();
log.info("重连后完成MQTT主题订阅");
// 验证订阅状态
verifySubscriptions();
} else if (!isSubscribed) {
// 如果已连接但未订阅,执行订阅
log.info("MQTT已连接但未订阅,开始执行订阅");
initSubscriptions();
log.info("补充订阅完成");
// 验证订阅状态
verifySubscriptions();
} else {
// 检查连接和订阅状态
log.info("MQTT连接和订阅状态检查 - 连接: {}, 订阅: {}",
mqttClient.isConnected() ? "已连接" : "未连接",
isSubscribed ? "已订阅" : "未订阅");
}
} catch (MqttException e) {
log.error("MQTT重连失败: {},错误码: {}", e.getMessage(), e.getReasonCode());
} catch (Exception e) {
log.error("MQTT重连或订阅过程发生错误: {}", e.getMessage(), e);
}
}, 10, 30, TimeUnit.SECONDS); // 10秒后开始,每30秒尝试一次
log.info("MQTT重连定时任务已创建");
}
/**
* 初始化所有订阅
*/
private synchronized void initSubscriptions() {
try {
log.info("开始执行MQTT主题订阅流程,当前订阅状态: {}", isSubscribed ? "已订阅" : "未订阅");
// 检查MQTT连接状态
if (!mqttClient.isConnected()) {
log.error("MQTT未连接,无法执行订阅");
throw new RuntimeException("MQTT未连接,无法执行订阅");
}
// 检查订阅的主题模式
log.info("订阅主题模式:makeNoodle: {}, yesOrder: {}, alarm: {}, setParam: {}",
makeNoodle, yesOrder, alarm, setParam);
// 执行所有订阅操作,即使之前已订阅
log.info("开始订阅设备状态主题: {}", makeNoodle);
subDeviceStatus();
log.info("完成订阅设备状态主题");
log.info("开始订阅订单主题: {}", yesOrder);
sendOrder();
log.info("完成订阅订单主题");
log.info("开始订阅报警主题: {}", alarm);
subAlarm();
log.info("完成订阅报警主题");
log.info("开始订阅参数主题: {}", setParam);
subParams();
log.info("完成订阅参数主题");
isSubscribed = true;
log.info("所有MQTT主题订阅完成,订阅标志设置为: {}", isSubscribed);
// 检查已订阅的主题
try {
if (mqttClient.isConnected()) {
log.info("MQTT客户端当前已连接,客户端ID: {}", mqttClient.getClientId());
} else {
log.warn("MQTT客户端当前未连接!");
}
} catch (Exception e) {
log.error("检查MQTT客户端状态时出错: {}", e.getMessage(), e);
}
} catch (Exception e) {
log.error("初始化MQTT订阅失败: {}", e.getMessage(), e);
isSubscribed = false; // 重置标志,允许下次重试
log.info("由于错误,订阅标志重置为: {}", isSubscribed);
throw new RuntimeException("MQTT订阅失败", e);
}
}
// 添加资源清理方法
@PreDestroy
public void cleanup() {
log.info("正在清理MQTT资源...");
if (reconnectExecutor != null && !reconnectExecutor.isShutdown()) {
reconnectExecutor.shutdown();
try {
if (!reconnectExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
reconnectExecutor.shutdownNow();
}
} catch (InterruptedException e) {
reconnectExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}
try {
if (mqttClient != null && mqttClient.isConnected()) {
mqttClient.disconnect();
log.info("MQTT客户端已断开连接");
}
} catch (MqttException e) {
log.error("断开MQTT连接时发生错误: {}", e.getMessage());
}
}
// 发布消息
public void publishMessage(String topic, Object payload) {
String message = null;
try {
message = objectMapper.writeValueAsString(payload);
} catch (JsonProcessingException e) {
throw new RuntimeException("消息序列化失败: " + e.getMessage(), e);
}
MqttMessage mqttMessage = new MqttMessage(message.getBytes());
mqttMessage.setQos(1); // 至少一次
mqttMessage.setRetained(false);
final String finalMessage = message;
try {
// 确保连接
if (!mqttClient.isConnected()) {
log.info("MQTT未连接,正在重新连接...");
try {
mqttClient.connect();
log.info("重新连接MQTT成功");
// 确保订阅存在
if (!isSubscribed) {
initSubscriptions();
}
} catch (MqttException e) {
log.error("重连MQTT失败: {}", e.getMessage());
scheduleReconnect();
throw new RuntimeException("MQTT连接失败,消息发送失败", e);
}
}
mqttClient.publish(topic, mqttMessage);
log.info("发布指令{} ,设备:{} ", finalMessage, topic);
// 记录到数据库
try {
// 从主题中提取设备ID
String deviceId = topic.split("/")[1];
// 创建日志记录
SmNoodleLogBo smNoodleLogBo = new SmNoodleLogBo();
smNoodleLogBo.setMachineId(deviceId);
smNoodleLogBo.setOperationType("MQTT指令发送");
smNoodleLogBo.setPackageInfo(finalMessage);
smNoodleLogBo.setRemark("主题: " + topic);
// 保存到数据库
smNoodleLogServiceImpl.add(smNoodleLogBo);
log.info("MQTT指令记录已保存到数据库 - 设备: {}, 主题: {}", deviceId, topic);
} catch (Exception e) {
log.error("保存MQTT指令到数据库失败: {}", e.getMessage(), e);
// 不抛出异常,避免中断消息发送流程
}
} catch (MqttException e) {
log.error("发布消息失败: {}", e.getMessage());
scheduleReconnect();
throw new RuntimeException("发布消息失败: " + e.getMessage(), e);
}
}
public void subscribe(String topic) throws Exception {
try {
mqttClient.subscribe(topic, (t, m) -> {
String receivedMessage = new String(m.getPayload());
// 处理接收到的消息
});
} catch (MqttException e) {
log.error("订阅主题 {} 失败: {}", topic, e.getMessage());
isSubscribed = false; // 重置订阅状态
throw e; // 保持原有异常抛出行为
}
}
//锁
private static Map<String, CountDownLatch> latchMap = new ConcurrentHashMap<>();
private static Map<String, CountDownLatch> orderLatchMap = new ConcurrentHashMap<>();
//是否下发桌号
private static Map<String, Boolean> pubTableId = new ConcurrentHashMap<>();
//是否可以做下份面
private static Map<String, Boolean> startNext = new ConcurrentHashMap<>();
@Autowired
private ISmOrdersService smOrdersService;
@Autowired
@Lazy
private ISmEquipmentService smEquipmentService;
@Autowired
private ISmOrderDetailsService smOrderDetailsService;
@Autowired
private ISmProductsService smProductsService;
@Autowired
private ISmSpecItemService smSpecItemService;
@Autowired
private ISmTableConfigService smTableConfigService;
@Autowired
private ISysUserService userService;
@Autowired
private StringRedisTemplate redisTemplate;
//可以做面的时候发送订单
public void sendOrder() {
try {
log.info("开始订阅yesOrder主题: {}", yesOrder);
mqttClient.subscribe(yesOrder, (t, m) -> {
try {
// 记录收到的每条消息
String rawMessage = new String(m.getPayload());
log.info("接收到yesOrder消息 - 主题: {}, 原始消息: {}", t, rawMessage);
// 获取设备号
String deviceId = t.split("/")[1];
log.info("解析设备ID: {}", deviceId);
// 获取状态{"alowOrder":1}
JSONObject message = null;
try {
message = JSON.parseObject(rawMessage);
log.info("消息解析结果: {}", message);
} catch (Exception e) {
log.error("解析JSON消息失败: {}", e.getMessage());
return;
}
// 检查alowOrder字段
if (message.get("alowOrder") != null) {
log.info("发现alowOrder字段, 值为: {}", message.get("alowOrder"));
// 1的时候发送订单
if ((Integer) message.get("alowOrder") == 1) {
log.info("{}可以做面,准备获取队列数据", deviceId);
// 从队列获取数据
String deviceLockKey = getDeviceLockKey(deviceId);
log.info("获取队列数据的key: {}", deviceLockKey);
String orderData = redisUtil.popFromQueue(deviceLockKey);
if (orderData != null) {
log.info("从队列获取到订单数据: {}", orderData);
JSONObject jsonObject = JSON.parseObject(orderData);
log.info("解析订单数据为JSON: {}", jsonObject);
// 发送消息
String controlTopic = String.format(control, deviceId);
log.info("准备向主题 {} 发送订单数据", controlTopic);
publishMessage(controlTopic, jsonObject);
log.info("订单数据发送完成");
} else {
log.info("队列 {} 中没有可用的订单数据", deviceLockKey);
}
} else {
log.info("{}不可做面,alowOrder值为: {}", deviceId, message.get("alowOrder"));
}
} else {
log.info("消息中未包含alowOrder字段: {}", message);
}
} catch (Exception e) {
log.error("处理yesOrder消息时出错: {}", e.getMessage(), e);
}
});
log.info("成功订阅yesOrder主题,已设置回调处理器");
} catch (MqttException e) {
log.error("订阅yesOrder主题失败: {}", e.getMessage(), e);
isSubscribed = false;
throw new RuntimeException("订阅yesOrder主题失败", e);
}
}
//报警信息
public void subAlarm() {
try {
log.info("开始订阅alarm主题: {}", alarm);
mqttClient.subscribe(alarm, (t, m) -> {
//获取设备号
String deviceId = t.split("/")[1];
//获取报警信息
String alarmMessage = new String(m.getPayload());
log.info("{}报警信息{}", deviceId, alarmMessage);
processAlarm(deviceId, JSON.parseObject(alarmMessage));
});
log.info("成功订阅alarm主题");
} catch (MqttException e) {
log.error("订阅alarm主题失败: {}", e.getMessage(), e);
isSubscribed = false;
throw new RuntimeException("订阅alarm主题失败", e);
}
}
public static String parseFoodProcessStatus(Map<String, Integer> data) {
StringBuilder sb = new StringBuilder();
sb.append("桌号: ").append(data.getOrDefault("a150", 0)).append("\n");
sb.append("序号: ").append(data.getOrDefault("a151", 0)).append("\n");
// JDK8 不能使用 switch 表达式,改用 Map
Map<Integer, String> bowlSizeMap = new HashMap<>();
bowlSizeMap.put(1, "小碗");
bowlSizeMap.put(2, "中碗");
bowlSizeMap.put(3, "大碗");
sb.append("碗大小: ").append(bowlSizeMap.getOrDefault(data.getOrDefault("a152", 0), "未知")).append("\n");
sb.append("面篓位置: ").append(data.getOrDefault("a153", 0)).append("\n");
sb.append("剩余时间: ").append(data.getOrDefault("a154", 0)).append("秒\n");
// 处理状态值映射,0 是 "进行中",1 是 "未进行"
Map<String, String> statusMap = new HashMap<>();
statusMap.put("a155", "加汤");
statusMap.put("a156", "加浇头");
statusMap.put("a157", "夹肉");
statusMap.put("a158", "冲凉");
statusMap.put("a159", "完成");
statusMap.forEach((key, value) ->
sb.append(value).append("状态: ").append(data.getOrDefault(key, 0) == 0 ? value + "中" : "未" + value).append("\n")
);
return sb.toString();
}
//订阅设备状态
public void subDeviceStatus() {
try {
log.info("开始订阅makeNoodle主题: {}", makeNoodle);
mqttClient.subscribe(makeNoodle, (t, m) -> {
Map<String, Integer> message = JSON.parseObject(m.getPayload(), Map.class);
try {
if (message != null) {
String deviceId = t.split("/")[1];
log.info("收到设备:{},发来的信息:{}", deviceId, message);
MatrixQuery query = new MatrixQuery(message);
// 获取全部分组标识值
Map<Character, Integer> markers = query.getAllGroupMarkers();
log.info("处理过后的全部分组标识值: {}", markers);
for (Integer value : markers.values()) {
//第二列不为0的话,进行循环
if (value != null && value > 0) {
//查询面条id
log.info("zzw这是第二列的数据: {}", value);
Object noodleId = redisUtil.get(getOrderNo(value.longValue(), deviceId));
log.info("这是sm_noodle_status表的Id: {},zzw这是第二列的数据:{}", noodleId, value);
if (noodleId != null) {
//解析数据 value=桌号
Map<String, Integer> stringIntegerMap = query.queryByValue(value);
//stringIntegerMap {a150=4 , a151=7237, a152=1, a153=4, a154=0, a150=1, a159=0, a155=0, a156=0, a157=0, a158=0}
//第一组进来的是a组 a151
log.info("订单号: {},对应的当前行数据:{}", value, stringIntegerMap);
List<Map.Entry<String, Integer>> list = new ArrayList<>(stringIntegerMap.entrySet());
log.info("当前组的List: {}", list);
Boolean eatDataFlag = redisUtil.get(String.valueOf(value)) != null;
//缓存中已经存在,说明已经取过并且吃完了,所以不需要循环
log.info("缓存中已经存在,说明已经取过并且吃完了,所以不需要循环:{}", eatDataFlag);
if (eatDataFlag) {
//这里a已经去走餐了,所以a不走了,但是不能影响b
continue;
}
for (Map.Entry<String, Integer> stringIntegerEntry : stringIntegerMap.entrySet()) {
String key = stringIntegerEntry.getKey();//a151 a152
char type = key.charAt(key.length() - 1);//末尾数字 1 2
int state = stringIntegerEntry.getValue();//状态值 0 1 等等
log.info("这是名字:{},这是第【{}+1】列,这是对应的值:{}", key, type, stringIntegerEntry.getValue());
//更新信息
SmNoodleStatus smNoodleStatus = smNoodleStatusServiceImpl.queryById(Long.valueOf(noodleId + ""));
log.info("这是smNoodleStatus:{}", smNoodleStatus);
if (smNoodleStatus != null) {
switch (type) {
case '0': // 桌号:a150
smNoodleStatus.setTablePosition(state + "");
break;
case '1': // 序号:a151
log.info("序号{}状态{}", noodleId, state);
break;
case '2': // 碗类型:a152,0未选,1小碗,2中碗,3大碗
String bowlType;
switch (state) {
case 1:
bowlType = "小碗";
break;
case 2:
bowlType = "中碗";
break;
case 3:
bowlType = "大碗";
break;
default:
bowlType = "未选择";
}
smNoodleStatus.setBowlType(bowlType);
break;
case '3': // 面篓位置:a153
smNoodleStatus.setNoodleBasketPosition(state + "");
break;
case '4': // 剩余时间/餐品状态:a154
if (state == 0) {
Integer sta = (Integer) redisUtil.get("取餐号状态码:" + value);
log.info("这是获取缓存中桌号:{},对应的的状态码:{}======【如果为null则还未到待取餐】", value, sta);
if (sta != null && sta == 1) {
// TODO 上次是1,这次是0,说明上次是正在取餐,这次是取餐完成
redisUtil.set("取餐号状态码:" + value, 0, 24, TimeUnit.HOURS);
SmOrders smOrders = smOrdersService.queryById(Long.valueOf(smNoodleStatus.getOrderId()));
smNoodleStatus.setEndTime(new Date());
smNoodleStatus.setStatus("制作完成");
if (smOrders != null) {
// TODO 把桌号放缓存---标记这一单结束了,再次循环就不在进来了
redisUtil.set(value.toString(), "1", 24, TimeUnit.HOURS);
log.info("增加前zzw这是完成的数量:{},这是面的数量:{}", smOrders.getCompletedCount(), smOrders.getNoodleCount());
smOrders.setCompletedCount(smOrders.getCompletedCount() + 1);
if (Objects.equals(smOrders.getCompletedCount(), smOrders.getNoodleCount())) {
smOrders.setDeviceStatus(DeviceStatus.COMPLETED.getCode());
//加入到叫号
redisUtil.addToQueue(RedisConstant.callNumberKey(deviceId), smOrders.getTakeNumber() + "", 24, TimeUnit.HOURS);
}
redisUtil.set(value.toString(), "1", 24, TimeUnit.HOURS);
log.info("增加后zzw这是完成的数量:{},这是面的数量:{}", smOrders.getCompletedCount(), smOrders.getNoodleCount());
smOrders.setMealStatus(0);
smOrdersService.updateById(smOrders);
}
redisUtil.increment(WAITING_COUNT_KEY + ":" + deviceId, -1);
// 餐已经到过客位,现在状态为0表示餐被取走了
smNoodleStatus.setRemainingTime("已取走");
smNoodleStatus.setStatus("已取餐");
// 获取订单信息并更新状态
if (smNoodleStatus.getOrderId() != null) {
log.info("设备编号:{}订单编号:{}详情id:{}已完成", deviceId, smNoodleStatus.getOrderId(), smNoodleStatus.getDetailId());
try {
SmOrders order = smOrdersService.queryById(Long.valueOf(smNoodleStatus.getOrderId()));
// 更新取餐号状态为已取餐 isPicked=0 callStatus=2 isPicked=1 callStatus=2
log.info("将02改为状态改为12");
smCallNumberService.markAsPicked(order.getStoreId(), order.getTakeNumber());
//根据门店找到大屏推送消息
log.info("推送消息给门店:" + order.getStoreId());
QueryWrapper<SmEquipment> smEquipmentQueryWrapper = new QueryWrapper<>();
smEquipmentQueryWrapper.lambda().eq(SmEquipment::getMerchantId, order.getStoreId()).eq(SmEquipment::getMachineType, 2)
.eq(SmEquipment::getMachineStatus, 1);
List<SmEquipment> smEquipments = smEquipmentMapper.selectListBytoreId(smEquipmentQueryWrapper);
if (!smEquipments.isEmpty()) {
log.info("推送消息给大屏ID:{}", smEquipments.get(0).getScreenId());
callNumberScreeWebSocketHandler.pushToStore(smEquipments.get(0).getScreenId(), "0000");
}
orderStatusWebSocketHandler.pushToStore(order.getStoreId(), "1111");
// 更新订单为已取餐状态
order.setMealStatus(1); // 1表示已取餐
smOrdersService.updateById(order);
} catch (Exception e) {
log.error("更新订单状态失败:{}", e.getMessage());
}
}
} else {
// 已完成的餐就不再制作中了
if (!"制作完成".equals(smNoodleStatus.getStatus())) {
smNoodleStatus.setRemainingTime("餐品制作中");
smNoodleStatus.setStatus("制作中");
}
}
} else if (state == 1) {
// TODO 1==待取餐
SmOrders order = smOrdersService.queryById(Long.valueOf(smNoodleStatus.getOrderId()));
log.info("订单{}餐品面条{}已到达客位,等待取餐", order.getOrderNumber(), smNoodleStatus.getId());
Boolean isExit = redisUtil.get("smNoodleStatusId:" + smNoodleStatus.getId()) == null;
log.info("拨号大屏已经在叫号=是否已经存在面条编号{}数据,True是不存在,False是存在:{}", smNoodleStatus.getId(), isExit);
if (isExit) {
// 餐已到位,可以取餐
smNoodleStatus.setRemainingTime("已到达客位");
smNoodleStatus.setStatus("可取餐");
// 获取订单信息并更新状态
if (smNoodleStatus.getOrderId() != null) {
try {
// 更新订单为可取餐状态
//order.setDeviceStatus(DeviceStatus.COMPLETED.getCode());//制作一个面并不是完成的
order.setMealStatus(0); // 0表示可取餐
smOrdersService.updateById(order);
//根据门店找到大屏推送消息
log.info("推送消息给门店:" + order.getStoreId());
QueryWrapper<SmEquipment> smEquipmentQueryWrapper = new QueryWrapper<>();
smEquipmentQueryWrapper.lambda().eq(SmEquipment::getMerchantId, order.getStoreId()).eq(SmEquipment::getMachineType, 2)
.eq(SmEquipment::getMachineStatus, 1);
List<SmEquipment> smEquipments = smEquipmentMapper.selectListBytoreId(smEquipmentQueryWrapper);
if (!smEquipments.isEmpty()) {
log.info("推送消息给大屏ID:{}", smEquipments.get(0).getScreenId());
callNumberScreeWebSocketHandler.pushToStore(smEquipments.get(0).getScreenId(), "0000");
}
orderStatusWebSocketHandler.pushToStore(order.getStoreId(), "1111");
log.info("这是修改叫号状态:00改为:02");
//这里 callStatus=0 pick=0 变成 pick=0 callStatus=2
smCallNumberService.markAsCalled(order.getStoreId(), order.getTakeNumber());
//判断下当前号码是否是当前门店的
redisUtil.set("smNoodleStatusId:" + smNoodleStatus.getId(), "已经存在报号队列中", 24, TimeUnit.HOURS);
redisUtil.set("取餐号状态码:" + value, 1, 24, TimeUnit.HOURS);
} catch (Exception e) {
log.error("更新订单状态失败:{}", e.getMessage());
}
}
}
} else {
// 其他数值状态(可能是倒计时)
smNoodleStatus.setRemainingTime(state + "秒");
}
break;
case '5': // 加汤中:a155
smNoodleStatus.setAddSoup(state == 0 ? "加汤中" : "未加汤");
break;
case '6': // 加浇头中:a156
smNoodleStatus.setAddTopping(state == 0 ? "加浇头中" : "未加浇头");
break;
case '7': // 夹肉:a157
smNoodleStatus.setMeatInsertion(state == 0 ? "夹肉中" : "未夹肉");
break;
case '8': // 冲凉:a158
smNoodleStatus.setWarming(state == 0 ? "冲凉中" : "未冲凉");
break;
case '9': // 完成:a159
smNoodleStatus.setIsCompleted(state);
smNoodleStatus.setCompletionStatus(state == 1 ? "完成" : "未完成");
break;
default:
System.out.println("字段 " + key + " 状态未定义");
break;
}
}
log.info("这是for 循环每次更新的面条状态:");
smNoodleStatusServiceImpl.updateById(smNoodleStatus);
}
SmNoodleLogBo smNoodleLogBo = new SmNoodleLogBo();
smNoodleLogBo.setMachineId(deviceId);
smNoodleLogBo.setOperationType("做面");
smNoodleLogBo.setNoodleId(noodleId + "");
smNoodleLogBo.setPackageInfo(JSONObject.toJSONString(message));
smNoodleLogServiceImpl.add(smNoodleLogBo);
}
}
}
}
} catch (Exception e) {
log.error("处理设备状态消息时出错: {}", e.getMessage(), e);
}
});
log.info("成功订阅makeNoodle主题");
} catch (MqttException e) {
log.error("订阅makeNoodle主题失败: {}", e.getMessage(), e);
isSubscribed = false;
throw new RuntimeException("订阅makeNoodle主题失败", e);
}
}
// 获取某个订单或设备当前做面的数量
public int getNoodleCount(String deviceId) {
String key = NOODLE_COUNT_KEY + ":" + deviceId;
Object countStr = redisTemplate.opsForHash().get(NOODLE_COUNT_KEY, key);
return countStr == null ? 0 : Integer.parseInt(countStr + "");
}
// 增加面数量
public void incrementNoodleCount(String deviceId) {
String key = NOODLE_COUNT_KEY + ":" + deviceId;
redisTemplate.opsForHash().increment(NOODLE_COUNT_KEY, key, 1);
}
// 减少面数量
public void decrementNoodleCount(String deviceId) {
String key = NOODLE_COUNT_KEY + ":" + deviceId;
redisTemplate.opsForHash().increment(NOODLE_COUNT_KEY, key, -1); // 使用负值来减少数量
}
// 获取所有机器的面数量
public Map<Object, Object> getAllNoodleCounts() {
return redisTemplate.opsForHash().entries(NOODLE_COUNT_KEY);
}
private static final long DELAY = 0; // 初始延迟
private static final long PERIOD = 3; // 每 10 秒检查一次
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final ExecutorService orderExecutor = new ThreadPoolExecutor(10, // 核心线程数
20, // 最大线程数
60L, // 空闲线程的存活时间
TimeUnit.SECONDS, // 空闲线程的存活时间单位
new LinkedBlockingQueue<>(100), // 阻塞队列,最大任务数为 100
new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略); // 线程池,处理每个新订单
public void startQueueProcessing() {
scheduler.scheduleAtFixedRate(() -> {
}, DELAY, PERIOD, TimeUnit.SECONDS); // 定时执行
}
private final ConcurrentHashMap<String, CountDownLatch> deviceLatches = new ConcurrentHashMap<>();
public void processQueue(String deviceId) {
Long size = redisTemplate.opsForList().size(deviceId);
if (size != null && size > 0) {
log.info("队列还有{}个订单需要处理", size);
} else {
// log.info("队列为空");
return;
}
//判斷機器是否自動
// Object m0 = redisUtil.get(getM0AutoTopic(deviceId));
// if (m0 != null) {
// if (!(Boolean) m0) {
// log.info("{}机器自动模式未开启", deviceId);
// return;
// }
// }
String code = deviceId.split(":")[1];
boolean lock = redisUtil.acquireLock(getDeviceLockKey(code), 60);
if (lock) {
String orderId = redisUtil.popFromQueue(deviceId);
if (orderId == null) {
return;
}
SmOrders smOrders = smOrdersService.queryById(Long.parseLong(orderId));
orderExecutor.submit(() -> {
try {
subscribeToPaymentStatus(smOrders);
} catch (Exception e) {
redisUtil.releaseLock(getDeviceLockKey(code));
log.error("处理订单 {} 时发生异常", smOrders.getOrderNumber(), e);
}
});
} else {
log.info("机器正在做面");
}
}
@PostConstruct
@Order(Integer.MIN_VALUE)
public void initRedisQueue() {
log.info("初始化Redis队列");
}
@Autowired
private DeviceParamWebSocketHandler deviceParamWebSocketHandler;
@Autowired
private CallNumberScreeWebSocketHandler callNumberScreeWebSocketHandler;
@Autowired
private OrderStatusWebSocketHandler orderStatusWebSocketHandler;
private void subParams() {
try {
log.info("开始订阅setParam主题: {}", setParam);
mqttClient.subscribe(setParam, (t, m) -> {
//获取设备号
String deviceId = t.split("/")[1];
//参数
MachineParameter message = JSON.parseObject(m.getPayload(), MachineParameter.class);
log.info("{}设置参数{}", deviceId, message.toString());
//发送到websocket
message.processReceivedParams();
deviceParamWebSocketHandler.pushToStore(deviceId, message);
});
log.info("成功订阅setParam主题");
} catch (MqttException e) {
log.error("订阅setParam主题失败: {}", e.getMessage(), e);
isSubscribed = false;
throw new RuntimeException("订阅setParam主题失败", e);
}
}
//接收到订单 发送到上位机
public void sendOrderToMachine(SmOrders smOrders) {
//订单拆分成碗
SmOrderDetailsBo smOrderDetailsBo = new SmOrderDetailsBo();
smOrderDetailsBo.setOrderId(smOrders.getId());
List<SmOrderDetails> smOrderDetails = smOrderDetailsService.queryList(smOrderDetailsBo);
smOrderDetails.forEach(smOrderDetail -> {
//获取数量
Long quantity = smOrderDetail.getQuantity();
//规格
String specificationDetail = smOrderDetail.getSpecificationDetailId();
//桌号
Long mealTableId = smOrders.getMealTableId();
//根据数量添加到smNoodleStatus
for (int i = 0; i < quantity; i++) {
SmNoodleStatus smNoodleStatus = new SmNoodleStatus();
smNoodleStatus.setOrderId(smOrderDetail.getId() + "");
smNoodleStatus.setOrderNumber(smOrderDetail.getOrderNumber());
smNoodleStatus.setNoodlePosition(specificationDetail);
smNoodleStatus.setMachineId(smOrders.getDeviceId());
smNoodleStatus.setNoodlePosition(smOrderDetail.getSpecificationDetailId());
}
});
}
public Long incrementOrInitializeKey(String key) {
// 判断 key 是否存在
Boolean exists = redisTemplate.hasKey(key);
if (Boolean.TRUE.equals(exists)) {
// 如果 key 存在,递增 1 并返回新值
return redisTemplate.opsForValue().increment(key);
} else {
// 如果 key 不存在,设置初始值为 1
redisTemplate.opsForValue().set(key, "1000", 1, TimeUnit.DAYS);
return 1000L;
}
}
@Autowired
private ISmCallNumberService smCallNumberService;
public int subscribeToPaymentStatus(SmOrders smOrders) {
// 获取与设备ID关联的 CountDownLatch
String deviceId = smOrders.getDeviceId();
log.info("开始订阅支付状态 {}", deviceId);
if (StringUtils.isEmpty(deviceId)) {
throw new ServiceException("设备号不能为空");
}
// 添加幂等性检查,防止重复处理同一订单
String processKey = "order:processed:" + smOrders.getOrderNumber();
if (redisUtil.exists(processKey)) {
log.info("订单{}已被处理过,跳过重复处理", smOrders.getOrderNumber());
return 0;
}
// 记录订单已处理,设置10分钟过期时间
redisUtil.set(processKey, "1", 1, TimeUnit.DAYS);
//判断订单状态是已支付设备状态是未做面
if (!smOrders.getOrderStatus().equals(OrderStatus.PAID.getCode()) || !smOrders.getDeviceStatus().equals(DeviceStatus.PENDING.getCode())) {
log.info("订单状态错误");
return 0;
}
// 检查是否已设置取餐号,如果未设置则生成新的取餐号
if (StringUtils.isEmpty(smOrders.getTakeNumber())) {
String storeId = smOrders.getStoreId();
if (StringUtils.isEmpty(storeId)) {
log.error("店铺ID不能为空");
return 0;
}
try {
String takeNumber = getTakeNumber(storeId);
smOrders.setTakeNumber(takeNumber);
// 更新订单取餐号
smOrdersService.updateById(smOrders);
log.info("订单{}已更新取餐号:{}", smOrders.getOrderNumber(), takeNumber);
} catch (Exception e) {
log.error("生成取餐号失败: {}", e.getMessage(), e);
return 0;
}
}
Long id = smOrders.getId();
Long mealTableId = smOrders.getMealTableId();
if (mealTableId == null) {
log.info("桌号不能为空");
return 0;
}
SmTableConfig smTableConfig = smTableConfigService.queryById(mealTableId);
if (smTableConfig == null) {
log.info("桌号不能为空");
return 0;
}
SmOrderDetailsBo smOrderDetailsBo = new SmOrderDetailsBo();
smOrderDetailsBo.setOrderId(id);
List<SmOrderDetails> smOrderDetails = smOrderDetailsService.queryList(smOrderDetailsBo);
//数量累加
int sumQuantity = smOrderDetails.stream().filter(smOrderDetail -> smOrderDetail.getQuantity() != null).mapToInt(smOrderDetail -> smOrderDetail.getQuantity().intValue()).sum();
smOrders.setInProgressCount(sumQuantity);
smOrders.setNoodleCount(sumQuantity);
smOrders.setCompletedCount(0);
redisUtil.increment(WAITING_COUNT_KEY + ":" + deviceId, sumQuantity);
for (SmOrderDetails smOrderDetail : smOrderDetails) {
Long productId = smOrderDetail.getProductId();
if (productId == null) {
continue;
}
SmProducts smProducts = smProductsService.queryById(productId);
String operType = smProducts.getProductCommand();
JSONObject m520 = new JSONObject();
m520.put("method", "order");
Long quantity = smOrderDetail.getQuantity();
log.info("这是做面的份数:{}", quantity);
//做面完成
for (Long i = 0L; i < quantity; i++) {
JSONObject upMessage = new JSONObject();
//修改判断 不再关联规格 根据牛肉面炸酱面来进行穿餐
if ("炸酱面".equals(smOrderDetail.getProductName())) {
//{"wTopping":2}
upMessage.put("wTopping", 2);
//{"wSoup":2}
upMessage.put("wSoup", 2);
}
if ("红烧牛肉面".equals(smOrderDetail.getProductName())) {
//{"wTopping":1}
upMessage.put("wTopping", 1);
//{"wSoup":2}
upMessage.put("wSoup", 1);
}
String specificationDetail = smOrderDetail.getSpecificationDetail();
log.info("获取做面的详情,拼接发送命令大中小:{}", specificationDetail);
if (specificationDetail.contains("小份")) {
//{"wSize":1}
upMessage.put("wSize", 1);
}
if (specificationDetail.contains("中份")) {
//{"wSize":2}
upMessage.put("wSize", 2);
}
if (specificationDetail.contains("大份")) {
//{"wSize":3}
upMessage.put("wSize", 3);
}
try {
SmNoodleStatusBo smNoodleStatusBo = new SmNoodleStatusBo();
if (smTableConfig.getMachineLocation() != null) {
m520.put(smTableConfig.getMachineLocation().trim(), true);
smNoodleStatusBo.setTablePosition(smTableConfig.getMachineLocation());
smNoodleStatusBo.setPickupPosition(smTableConfig.getLocationDesc());
}
smNoodleStatusBo.setIsCompleted(0);
smNoodleStatusBo.setOrderId(smOrders.getId() + "");
smNoodleStatusBo.setNoodlePosition(operType);
smNoodleStatusBo.setDetailId(smOrderDetail.getId());
smNoodleStatusBo.setSpecification(smOrderDetail.getSpecificationDetail());
smNoodleStatusBo.setCompletionPosition(smProducts.getProductFinishCommand());
smNoodleStatusBo.setOrderNumber(smOrders.getOrderNumber());
smNoodleStatusBo.setNoodleName(smOrderDetail.getProductName());
smNoodleStatusBo.setMachineId(deviceId);
m520.put("noodleId", smNoodleStatusBo.getId());
// mqttPublishVo.setDeviceId(deviceId);
// mqttPublishVo.setNoodleName(smProducts.getProductName());
smOrders.setMealTableId(smTableConfig.getId());
smOrdersService.updateById(smOrders);
// mqttPublishVo.setTableNumber(smTableConfig.getTableNumber());
// mqttPublishVo.setQuantity(1);
smNoodleStatusBo.setPackageData(JSON.toJSONString(smNoodleStatusBo));
smNoodleStatusBo.setStartTime(new Date());
smNoodleStatusBo.setStatus("发送报数据");
smNoodleStatusServiceImpl.add(smNoodleStatusBo);
// mqttPublishVo.setNoodleId(smNoodleStatusBo.getId());
// JSONObject publishMessage = JSON.parseObject(JSON.toJSONString(mqttPublishVo));
// publishMessage.putAll(mapItem);
//生成上位机数据
Long currentValue = incrementOrInitializeKey("NiuMaBang");
upMessage.put("wNo", currentValue);
log.info("这是传来的桌号:{}", smTableConfig.getTableNumber());
upMessage.put("wNumberOfTable", Integer.parseInt(smTableConfig.getTableNumber()));
// 添加到Redis队列,设置1小时过期时间
redisUtil.addToQueue(getDeviceLockKey(deviceId), JSON.toJSONString(upMessage), 1, TimeUnit.HOURS);
log.info("添加订单数据到Redis队列,设置1小时过期 - 设备: {}, 订单: {}", deviceId, smOrders.getOrderNumber());
//redis-key生成四位随机字符 过期一个小时
log.info("这是存入的订单编号:{},还有设备ID:{}", currentValue, deviceId);
redisUtil.set(getOrderNo(currentValue, deviceId), smNoodleStatusBo.getId() + "", 60, TimeUnit.MINUTES);
// log.info("发送给上位机数据:{}", JSON.toJSONString(publishMessage));
//机器需要做的面的数量
incrementNoodleCount(deviceId);
} catch (Exception e) {
log.error("处理设备 {} 的消息时发生异常: {}", deviceId, e.getMessage(), e);
return 0;
}
}
}
//循环添加以后再进行生成叫号
List<SmNoodleStatus> smNoodleStatuses = smNoodleStatusService.queryListBug(smOrders);
log.info("这是查询的数据长度: {}", smNoodleStatuses.size());
if (!smNoodleStatuses.isEmpty()) {
//生成多个取餐号
for (int i = 0; i < smNoodleStatuses.size(); i++) {
log.info("这是第{}编循环插入数据", i + 1);
// 在订单支付成功生成取餐号后
SmCallNumber callNumber = new SmCallNumber();
callNumber.setStoreId(smOrders.getStoreId());
callNumber.setDeviceId(smOrders.getDeviceId());
callNumber.setOrderId(smOrders.getId());
callNumber.setOrderNumber(smOrders.getOrderNumber());
callNumber.setTakeNumber(smOrders.getTakeNumber());
callNumber.setCallStatus(0); // 等待叫号
callNumber.setIsPicked(0); // 未取餐
callNumber.setTenantCode(UserRoleConstants.DEFAULT_TENANT_ID); // 租户编码
callNumber.setDisplayPriority(0); // 默认优先级
callNumber.setCreateTime(new Date());
callNumber.setUpdateTime(new Date());
//保存面的名称 大小完 种类
callNumber.setNoodleName(smNoodleStatuses.get(i).getNoodleName());
//大小份从详情表查询
String specificationDetail = smNoodleStatuses.get(i).getSpecification();
if (specificationDetail.contains("小份")) {
callNumber.setBowlType("小份");
}
if (specificationDetail.contains("中份")) {
callNumber.setBowlType("中份");
}
if (specificationDetail.contains("大份")) {
callNumber.setBowlType("大份");
}
callNumber.setSpecification(removeSpecificWords(smNoodleStatuses.get(i).getSpecification()));
log.info("这是保存成功的数据{}", callNumber);
boolean save = smCallNumberService.save(callNumber);
log.info("{}这是保存成功失败的标识", save);
}
log.info("点餐机00推送消息给门店:{}", smOrders.getStoreId());
QueryWrapper<SmEquipment> smEquipmentQueryWrapper = new QueryWrapper<>();
smEquipmentQueryWrapper.lambda().eq(SmEquipment::getMerchantId, smOrders.getStoreId()).eq(SmEquipment::getMachineType, 2)
.eq(SmEquipment::getMachineStatus, 1);
List<SmEquipment> smEquipments = smEquipmentMapper.selectListBytoreId(smEquipmentQueryWrapper);
if (!smEquipments.isEmpty()) {
log.info("点餐机00推送消息给大屏ID:{}", smEquipments.get(0).getScreenId());
callNumberScreeWebSocketHandler.pushToStore(smEquipments.get(0).getScreenId(), "0000");
}
orderStatusWebSocketHandler.pushToStore(smOrders.getStoreId(), "1111");
}
smOrders.setDeviceStatus(DeviceStatus.IN_PROGRESS.getCode());
smOrdersService.updateById(smOrders);
return 1;
}
public String removeSpecificWords(String input) {
if (input == null || input.isEmpty()) {
return input;
}
// 定义需要去除的词汇列表
String[] wordsToRemove = {"大份,", "小份,", "中份,", "汤面,", "干面,", "牛肉,", "炸酱,"};
String result = input;
for (String word : wordsToRemove) {
result = result.replace(word, "");
}
return result.trim();
}
public void queueDeviceOrder(SmOrders smOrders) {
//订单加入队列,设置1小时过期时间
redisUtil.addToQueue(getOrderQueueKey(smOrders.getDeviceId()), smOrders.getId() + "", 1, TimeUnit.HOURS);
log.info("订单加入设备队列,设置1小时过期 - 设备: {}, 订单: {}", smOrders.getDeviceId(), smOrders.getOrderNumber());
// 使用线程池来处理当前订单
// scheduler.scheduleAtFixedRate(() -> processQueue(smOrders.getDeviceId()), DELAY, PERIOD, TimeUnit.SECONDS);
}
// 报警代码映射表(可配置化)
public static final Map<String, String> ALARM_DESCRIPTIONS = new LinkedHashMap() {{
put("alert00", "面机缺水");
put("alert01", "面机缺面");
put("alert02", "落碗机缺碗");
put("alert03", "浇头缺卤");
put("alert04", "加汤机缺汤");
put("alert05", "切肉机缺肉");
put("alert06", "可机出面异常");
put("alert07", "面篓1翻转异常");
put("alert08", "面篓2翻转异常");
put("alert09", "面篓3翻转异常");
put("alert10", "面篓4翻转异常");
put("alert11", "面篓5翻转异常");
put("alert12", "面篓6翻转异常");
put("alert13", "小料1缺料");
put("alert14", "小料2缺料");
put("alert15", "小料3缺料");
put("alert16", "小料4缺料");
put("alert17", "小料5缺料");
put("alert18", "小料6缺料");
put("alert19", "小料7缺料");
put("alert20", "小料8缺料");
put("alert21", "落碗异常");
put("alert22", "煮面炉1温度报警");
put("alert23", "煮面炉2温度报警");
put("alert24", "面机加水驱动报警1");
put("alert25", "浇头阀门驱动报警2");
put("alert26", "浇头伺服报警3");
put("alert27", "和面伺服报警4");
put("alert28", "到加汤位超时");
put("alert29", "到浇头位超时");
put("alert30", "到切肉位超时");
put("alert31", "到1号桌超时");
put("alert32", "到2号桌超时");
put("alert33", "到3号桌超时");
put("alert34", "到4号桌超时");
put("alert35", "面篓1回原异常");
put("alert36", "面篓2回原异常");
put("alert37", "面篓3回原异常");
put("alert38", "面篓4回原异常");
put("alert39", "面篓5回原异常");
put("alert40", "面篓6回原异常");
put("alert41", "煮面炉1低水位报警");
put("alert42", "煮面炉2低水位报警");
put("alert43", "煮面炉1干烧报警");
put("alert44", "煮面炉2干烧报警");
}};
@Autowired
private ISmDeviceExceptionService smDeviceExceptionService;
// 报警处理增强方法示例
private void processAlarm(String deviceSN, JSONObject payload) {
List<String> activeAlarms = ALARM_DESCRIPTIONS.keySet().stream()
.filter(field -> payload.getInteger(field) != null && payload.getInteger(field) == 1)
.map(field -> String.format("%s(%s)", ALARM_DESCRIPTIONS.get(field), field))
.collect(Collectors.toList());
if (!activeAlarms.isEmpty()) {
SmDeviceExceptionBo smDeviceException = new SmDeviceExceptionBo();
smDeviceException.setDeviceId(deviceSN);
smDeviceException.setExceptionTime(new Date());
smDeviceException.setExceptionType("机器报警");
smDeviceException.setExceptionMessage(String.join(" | ", activeAlarms));
smDeviceExceptionService.add(smDeviceException);
String alarmDetails = String.join(" | ", activeAlarms);
log.warn("设备报警 - 设备SN: {} | 报警详情: {}", deviceSN, alarmDetails);
// 发送到监控系统
// alarmService.notify(deviceSN, activeAlarms);
}
}
// 在文件中找到获取设备锁key的方法,添加日志输出
// 如果找不到具体方法,可能是来自RedisConstant类,需要在使用处添加日志
// 在文件底部添加方法获取RedisConstant中的key生成,并增加日志
public String getDeviceLockKey(String deviceId) {
String key = RedisConstant.getDeviceLockKey(deviceId);
log.info("生成设备锁key - 设备ID: {}, 生成的key: {}", deviceId, key);
return key;
}
// 同样为getOrderNo方法添加日志
public String getOrderNo(Long value, String deviceId) {
String key = RedisConstant.getOrderNo(value, deviceId);
log.info("生成订单号key - 值: {}, 设备ID: {}, 生成的key: {}", value, deviceId, key);
return key;
}
@Resource
private SmOrdersMapper smOrdersMapper;
// 添加专门用于诊断订阅问题的方法
public void diagnoseSubscription(String specificTopic) {
try {
log.info("开始诊断特定主题的订阅: {}", specificTopic);
// 检查当前连接状态
boolean connected = mqttClient.isConnected();
log.info("MQTT连接状态: {}", connected ? "已连接" : "未连接");
if (!connected) {
log.warn("MQTT未连接,无法继续诊断");
return;
}
// 检查特定主题是否匹配我们的订阅模式
boolean matchesYesOrder = specificTopic.matches("yc/[^/]+/yesOrder");
log.info("特定主题 {} 是否匹配yesOrder模式: {}", specificTopic, matchesYesOrder);
// 尝试特定订阅
log.info("尝试直接订阅特定主题: {}", specificTopic);
mqttClient.subscribe(specificTopic, (topic, message) -> {
log.info("收到特定主题消息 - 主题: {}, 消息: {}", topic, new String(message.getPayload()));
});
log.info("成功订阅特定主题: {}", specificTopic);
} catch (Exception e) {
log.error("诊断订阅时出错: {}", e.getMessage(), e);
}
}
// 添加一个强制重新连接和订阅的公共方法,可以通过API调用
public void forceReconnectAndSubscribe() {
log.info("强制重新连接和订阅MQTT");
try {
// 先断开连接
if (mqttClient.isConnected()) {
log.info("断开当前MQTT连接");
mqttClient.disconnect();
}
// 重置订阅状态
isSubscribed = false;
// 重新连接
log.info("重新连接MQTT");
connectAndRetry();
log.info("强制重新连接和订阅完成");
} catch (Exception e) {
log.error("强制重新连接和订阅失败: {}", e.getMessage(), e);
// 启动后台重连机制
scheduleReconnect();
}
}
// 提供检查MQTT状态的方法
public Map<String, Object> checkMqttStatus() {
Map<String, Object> status = new HashMap<>();
try {
boolean connected = mqttClient != null && mqttClient.isConnected();
status.put("connected", connected);
status.put("subscribed", isSubscribed);
if (mqttClient != null) {
status.put("clientId", mqttClient.getClientId());
status.put("serverUri", mqttClient.getServerURI());
}
// 替换Map.of(),使用Java 8兼容的方式
Map<String, String> topicPatterns = new HashMap<>();
topicPatterns.put("makeNoodle", makeNoodle);
topicPatterns.put("yesOrder", yesOrder);
topicPatterns.put("alarm", alarm);
topicPatterns.put("setParam", setParam);
status.put("topicPatterns", topicPatterns);
} catch (Exception e) {
log.error("检查MQTT状态时出错: {}", e.getMessage(), e);
status.put("error", e.getMessage());
}
return status;
}
// 取餐号生成逻辑,从ApiSmOrdersController复制过来并稍作修改
private String getTakeNumber(String storeId) {
String today = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
String key = "ticket:number:" + today + ":" + storeId;
String lockKey = "ticket:lock:" + today + ":" + storeId;
try {
// 使用分布式锁确保序号分配的原子性
boolean lockAcquired = redisUtil.acquireLock(lockKey, 1); // 1分钟锁超时
if (!lockAcquired) {
// 如果无法获取锁,等待短暂时间后重试
Thread.sleep(100);
return getTakeNumber(storeId);
}
// 检查当前值
Long currentNumber = 0L;
if (redisUtil.hasKey(key)) {
Object currentValue = redisUtil.get(key);
if (currentValue != null) {
try {
currentNumber = Long.parseLong(currentValue.toString());
} catch (NumberFormatException e) {
log.error("取餐号格式错误,重置为0: {}", e.getMessage());
}
}
} else {
// 如果键不存在,检查数据库中今天的最大取餐号
try {
String startOfDay = today + " 00:00:00";
String endOfDay = today + " 23:59:59";
// 查询当天最大的取餐号
List<SmOrders> todayOrders = smOrdersMapper.selectList(
new LambdaQueryWrapper<SmOrders>()
.eq(SmOrders::getStoreId, storeId)
.between(SmOrders::getCreateTime, startOfDay, endOfDay)
.isNotNull(SmOrders::getTakeNumber)
.ne(SmOrders::getTakeNumber, "")
.orderByDesc(SmOrders::getTakeNumber)
.last("LIMIT 1")
);
if (!todayOrders.isEmpty()) {
SmOrders latestOrder = todayOrders.get(0);
String latestTakeNumber = latestOrder.getTakeNumber();
if (latestTakeNumber != null && latestTakeNumber.startsWith("#")) {
try {
currentNumber = Long.parseLong(latestTakeNumber.substring(1));
} catch (NumberFormatException e) {
log.error("数据库取餐号格式错误: {}", e.getMessage());
}
}
}
} catch (Exception e) {
log.error("查询数据库最大取餐号失败: {}", e.getMessage());
}
}
// 增加计数器
Long number = currentNumber + 1;
// 设置或更新Redis
redisUtil.set(key, number);
// 设置当天过期时间(如果是第一次设置)
if (number == 1) {
// 设置过期时间到明天凌晨
LocalDateTime midnight = LocalDate.now().plusDays(1).atStartOfDay();
long secondsUntilMidnight = ChronoUnit.SECONDS.between(LocalDateTime.now(), midnight);
redisUtil.setExpire(key, secondsUntilMidnight, TimeUnit.SECONDS);
}
// 格式化为3位数,如001
String tno = String.format("#%03d", number);
log.info("生成取餐号 - 店铺ID: {}, 日期: {}, 序号: {}", storeId, today, tno);
return tno;
} catch (Exception e) {
log.error("生成取餐号失败: {}", e.getMessage(), e);
// 出现异常时,生成基于当前时间的备用号码,确保服务不中断
String fallbackNumber = "#" + System.currentTimeMillis() % 1000;
return fallbackNumber;
} finally {
// 释放锁
redisUtil.releaseLock(lockKey);
}
}
}
评论