Java—物联网开发-Mqtt协议+485-自动面条机


485协议自动做面条,通过海为的CBOX

package com.ruoyi.device.mqtt.config;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Service;

@Configuration
@Service
public class MqttConfig {

    @Value("${mqtt.broker-url}")
    private String brokerUrl;

    @Value("${mqtt.client-id}")
    private String clientId;

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Value("${mqtt.connection-timeout:10}")
    private int connectionTimeout;  // 默认连接超时为10秒

    @Value("${mqtt.keep-alive-interval:30}")
    private int keepAliveInterval;  // 默认保持连接时间间隔为20秒

    @Value("${mqtt.clean-session:false}")
    private boolean cleanSession;  // 默认启用清除会话

    @Value("${mqtt.automatic-reconnect:true}")
    private boolean automaticReconnect;  // 默认启用自动重连

    @Bean
    public MqttClient mqttClient() throws MqttException {
        // 创建 MQTT 客户端实例
        //随机客户端id
        String clientId = this.clientId;
        String os = System.getProperty("os.name").toLowerCase();

        if (os.contains("win")) {
            System.out.println("当前操作系统:Windows");
            clientId= clientId + "win-mqtt-niuma";
        } else if (os.contains("nix") || os.contains("nux")) {
            clientId= clientId + "nix-mqtt-niuma";
        } else {
            clientId= clientId + "default-mqtt-niuma";
        }
        MqttClient mqttClient = new MqttClient(brokerUrl, clientId);

        // 创建连接选项
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setUserName(username);

        // 设置密码(如果有的话)
        if (password != null && !password.isEmpty()) {
            connOpts.setPassword(password.toCharArray());
        }

        // 设置清除会话
        connOpts.setCleanSession(cleanSession);

        // 设置自动重连
        connOpts.setAutomaticReconnect(automaticReconnect);

        // 设置连接超时
        connOpts.setConnectionTimeout(connectionTimeout);

        // 设置心跳间隔
        connOpts.setKeepAliveInterval(keepAliveInterval);

        try {
            // 连接到 MQTT Broker
            mqttClient.connect(connOpts);
            System.out.println("Connected to broker: " + brokerUrl);
        } catch (MqttException e) {
            // 异常处理,确保连接异常时打印相关信息
            System.err.println("Failed to connect to broker: " + brokerUrl);
            throw e;  // 将异常抛出,方便外部处理
        }

        return mqttClient;
    }
}

package com.ruoyi.device.mqtt.service;

import cn.hutool.core.util.RandomUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ruoyi.common.constant.UserRoleConstants;
import com.ruoyi.common.core.domain.entity.SysUser;
import com.ruoyi.common.exception.ServiceException;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.device.dto.MqttPublishVo;
import com.ruoyi.device.mqtt.config.RedisConstant;
import com.ruoyi.device.util.RedisUtil;
import com.ruoyi.device.websoket.CallNumberScreeWebSocketHandler;
import com.ruoyi.device.websoket.DeviceParamWebSocketHandler;
import com.ruoyi.device.websoket.OrderStatusWebSocketHandler;
import com.ruoyi.small_device_api.deviceException.bo.SmDeviceExceptionBo;
import com.ruoyi.small_device_api.deviceException.service.ISmDeviceExceptionService;
import com.ruoyi.small_device_api.equipment.domain.SmEquipment;
import com.ruoyi.small_device_api.equipment.mapper.SmEquipmentMapper;
import com.ruoyi.small_device_api.equipment.service.ISmEquipmentService;
import com.ruoyi.small_device_api.noodleLog.bo.SmNoodleLogBo;
import com.ruoyi.small_device_api.noodleLog.service.impl.SmNoodleLogServiceImpl;
import com.ruoyi.small_device_api.noodleStatus.bo.SmNoodleStatusBo;
import com.ruoyi.small_device_api.noodleStatus.domain.SmNoodleStatus;
import com.ruoyi.small_device_api.noodleStatus.service.ISmNoodleStatusService;
import com.ruoyi.small_device_api.noodleStatus.service.impl.SmNoodleStatusServiceImpl;
import com.ruoyi.small_device_api.orderDetails.bo.SmOrderDetailsBo;
import com.ruoyi.small_device_api.orderDetails.domain.SmOrderDetails;
import com.ruoyi.small_device_api.orderDetails.service.ISmOrderDetailsService;
import com.ruoyi.small_device_api.orders.domain.SmCallNumber;
import com.ruoyi.small_device_api.orders.domain.SmOrders;
import com.ruoyi.small_device_api.orders.mapper.SmOrdersMapper;
import com.ruoyi.small_device_api.orders.service.ISmCallNumberService;
import com.ruoyi.small_device_api.orders.service.ISmOrdersService;
import com.ruoyi.small_device_api.products.domain.SmProducts;
import com.ruoyi.small_device_api.products.service.ISmProductsService;
import com.ruoyi.small_device_api.specItem.domain.SmSpecItem;
import com.ruoyi.small_device_api.specItem.service.ISmSpecItemService;
import com.ruoyi.small_device_api.tableConfig.domain.SmTableConfig;
import com.ruoyi.small_device_api.tableConfig.service.ISmTableConfigService;
import com.ruoyi.small_device_api.util.DeviceStatus;
import com.ruoyi.small_device_api.util.OrderStatus;
import com.ruoyi.system.service.ISysUserService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.annotation.Order;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;

import static com.ruoyi.device.mqtt.config.RedisConstant.*;

@Service
@Slf4j
public class MqttService {

    @Value("${mqtt.device-test:true}")
    private Boolean deviceTest;
    @Autowired
    private ObjectMapper objectMapper;  // For converting objects to JSON

    //发送指令
    private static final String control = "yc/%s/control";
    //做面装套
    private static final String makeNoodle = "yc/1023D00J0041/makeNoodle";
    //报警信息
    private static final String alarm = "yc/1023D00J0041/alarm";
    //可以做面
    private static final String yesOrder = "yc/1023D00J0041/yesOrder";
    //设置参数
    private static final String setParam = "yc/1023D00J0041/canshu";
    //可以做下一份
//    private static final String pubNext = "niuma/miantiao/%s/pub1";

    @Autowired
    private MqttClient mqttClient;
    private static final int MAX_RETRY_COUNT = 3;
    @Autowired
    private RedisUtil redisUtil;
    @Autowired
    private SmNoodleStatusServiceImpl smNoodleStatusServiceImpl;


    @Autowired
    private ISmNoodleStatusService smNoodleStatusService;
    @Autowired
    private SmNoodleLogServiceImpl smNoodleLogServiceImpl;
    @Autowired
    private SmEquipmentMapper smEquipmentMapper;

    // 添加订阅状态标记和重连调度器
    private volatile boolean isSubscribed = false;
    private ScheduledExecutorService reconnectExecutor;

    //初始化
    @PostConstruct
    @Order(-9999)
    public void init() throws Exception {
        try {
            // 检查MQTT客户端配置
            logMqttClientConfig();

            // 尝试连接
            connectAndRetry();
        } catch (Exception e) {
            log.error("MQTT初始化失败,将在后台尝试重连: {}", e.getMessage(), e);
            // 记录错误但不中断启动流程
            scheduleReconnect();
        }
    }

    /**
     * 记录MQTT客户端配置
     */
    private void logMqttClientConfig() {
        try {
            if (mqttClient != null) {
                log.info("=== MQTT客户端配置信息 ===");
                log.info("客户端ID: {}", mqttClient.getClientId());
                log.info("服务器URI: {}", mqttClient.getServerURI());
                log.info("连接状态: {}", mqttClient.isConnected() ? "已连接" : "未连接");
                log.info("==========================");
            } else {
                log.error("MQTT客户端未注入,请检查Spring配置");
            }
        } catch (Exception e) {
            log.error("获取MQTT客户端配置失败: {}", e.getMessage(), e);
        }
    }

    /**
     * 连接MQTT并重试
     */
    private void connectAndRetry() throws ServiceException {
        int attempt = 0;
        while (attempt < MAX_RETRY_COUNT) {
            try {
                if (!mqttClient.isConnected()) {
                    log.info("正在连接MQTT代理服务器...");

                    // 确保MqttClient配置正确
                    if (mqttClient.getClientId() == null || mqttClient.getClientId().isEmpty()) {
                        log.error("MQTT客户端ID为空,无法连接");
                        throw new ServiceException("MQTT客户端ID为空");
                    }

                    // 确保连接选项设置正确
                    org.eclipse.paho.client.mqttv3.MqttConnectOptions options = new org.eclipse.paho.client.mqttv3.MqttConnectOptions();
                    options.setCleanSession(true);
                    options.setKeepAliveInterval(60);
                    options.setConnectionTimeout(30);
                    options.setAutomaticReconnect(true);

                    log.info("MQTT连接选项: 清除会话:{}, 保活间隔:{}秒, 连接超时:{}秒, 自动重连:{}",
                            options.isCleanSession(), options.getKeepAliveInterval(),
                            options.getConnectionTimeout(), options.isAutomaticReconnect());

                    // 连接
                    mqttClient.connect(options);
                    log.info("成功连接到MQTT代理服务器");
                }

                // 无论之前是否订阅,都强制执行订阅流程
                log.info("开始执行MQTT主题订阅");
                initSubscriptions();
                log.info("完成MQTT主题订阅流程");

                // 确认一下订阅状态
                verifySubscriptions();

                return;  // 如果连接成功,直接返回
            } catch (MqttException e) {
                attempt++;
                log.error("MQTT连接失败,尝试 {}/{}:{}, 错误码: {}",
                        attempt, MAX_RETRY_COUNT, e.getMessage(), e.getReasonCode());
                if (attempt == MAX_RETRY_COUNT) {
                    throw new ServiceException("MQTT连接失败,已达最大重试次数,将在后台继续尝试");
                }
                // 如果需要,可以在这里增加延时
                try {
                    Thread.sleep(2000);  // 延时2秒重试
                } catch (InterruptedException interruptedException) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    /**
     * 验证订阅状态
     */
    private void verifySubscriptions() {
        try {
            // 我们无法直接从Paho客户端获取已订阅的主题列表
            // 但我们可以确认当前连接状态
            if (mqttClient.isConnected()) {
                log.info("MQTT连接正常,订阅应该已生效");

                // 添加测试消息发布
                try {
                    String testTopic = "test/mqtt/verification";
                    log.info("发布测试消息到主题: {}", testTopic);
                    MqttMessage testMessage = new MqttMessage("MQTT订阅验证测试".getBytes());
                    testMessage.setQos(1);
                    mqttClient.publish(testTopic, testMessage);
                    log.info("测试消息发布成功");
                } catch (Exception e) {
                    log.error("发布测试消息失败: {}", e.getMessage(), e);
                }
            } else {
                log.warn("MQTT未连接,订阅可能未生效");
            }
        } catch (Exception e) {
            log.error("验证订阅状态出错: {}", e.getMessage(), e);
        }
    }

    /**
     * 安排定期重连任务
     */
    private synchronized void scheduleReconnect() {
        // 如果已存在重连任务,不再创建新的
        if (reconnectExecutor != null && !reconnectExecutor.isShutdown()) {
            log.info("重连任务已存在,不再创建新任务");
            return;
        }

        log.info("创建MQTT重连定时任务");
        reconnectExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
            Thread thread = new Thread(r, "mqtt-reconnect-thread");
            thread.setDaemon(true); // 设为守护线程,不阻止JVM退出
            return thread;
        });

        reconnectExecutor.scheduleAtFixedRate(() -> {
            try {
                if (!mqttClient.isConnected()) {
                    log.info("尝试重新连接MQTT...");

                    // 重连前检查配置
                    logMqttClientConfig();

                    // 配置连接选项
                    org.eclipse.paho.client.mqttv3.MqttConnectOptions options = new org.eclipse.paho.client.mqttv3.MqttConnectOptions();
                    options.setCleanSession(true);
                    options.setKeepAliveInterval(60);
                    options.setConnectionTimeout(30);
                    options.setAutomaticReconnect(true);

                    mqttClient.connect(options);
                    log.info("MQTT重连成功");

                    // 重连后强制执行订阅
                    log.info("重连后开始执行MQTT主题订阅");
                    initSubscriptions();
                    log.info("重连后完成MQTT主题订阅");

                    // 验证订阅状态
                    verifySubscriptions();
                } else if (!isSubscribed) {
                    // 如果已连接但未订阅,执行订阅
                    log.info("MQTT已连接但未订阅,开始执行订阅");
                    initSubscriptions();
                    log.info("补充订阅完成");

                    // 验证订阅状态
                    verifySubscriptions();
                } else {
                    // 检查连接和订阅状态
                    log.info("MQTT连接和订阅状态检查 - 连接: {}, 订阅: {}",
                            mqttClient.isConnected() ? "已连接" : "未连接",
                            isSubscribed ? "已订阅" : "未订阅");
                }
            } catch (MqttException e) {
                log.error("MQTT重连失败: {},错误码: {}", e.getMessage(), e.getReasonCode());
            } catch (Exception e) {
                log.error("MQTT重连或订阅过程发生错误: {}", e.getMessage(), e);
            }
        }, 10, 30, TimeUnit.SECONDS); // 10秒后开始,每30秒尝试一次

        log.info("MQTT重连定时任务已创建");
    }

    /**
     * 初始化所有订阅
     */
    private synchronized void initSubscriptions() {
        try {
            log.info("开始执行MQTT主题订阅流程,当前订阅状态: {}", isSubscribed ? "已订阅" : "未订阅");

            // 检查MQTT连接状态
            if (!mqttClient.isConnected()) {
                log.error("MQTT未连接,无法执行订阅");
                throw new RuntimeException("MQTT未连接,无法执行订阅");
            }

            // 检查订阅的主题模式
            log.info("订阅主题模式:makeNoodle: {}, yesOrder: {}, alarm: {}, setParam: {}",
                    makeNoodle, yesOrder, alarm, setParam);

            // 执行所有订阅操作,即使之前已订阅
            log.info("开始订阅设备状态主题: {}", makeNoodle);
            subDeviceStatus();
            log.info("完成订阅设备状态主题");

            log.info("开始订阅订单主题: {}", yesOrder);
            sendOrder();
            log.info("完成订阅订单主题");

            log.info("开始订阅报警主题: {}", alarm);
            subAlarm();
            log.info("完成订阅报警主题");

            log.info("开始订阅参数主题: {}", setParam);
            subParams();
            log.info("完成订阅参数主题");

            isSubscribed = true;
            log.info("所有MQTT主题订阅完成,订阅标志设置为: {}", isSubscribed);

            // 检查已订阅的主题
            try {
                if (mqttClient.isConnected()) {
                    log.info("MQTT客户端当前已连接,客户端ID: {}", mqttClient.getClientId());
                } else {
                    log.warn("MQTT客户端当前未连接!");
                }
            } catch (Exception e) {
                log.error("检查MQTT客户端状态时出错: {}", e.getMessage(), e);
            }
        } catch (Exception e) {
            log.error("初始化MQTT订阅失败: {}", e.getMessage(), e);
            isSubscribed = false; // 重置标志,允许下次重试
            log.info("由于错误,订阅标志重置为: {}", isSubscribed);
            throw new RuntimeException("MQTT订阅失败", e);
        }
    }

    // 添加资源清理方法
    @PreDestroy
    public void cleanup() {
        log.info("正在清理MQTT资源...");
        if (reconnectExecutor != null && !reconnectExecutor.isShutdown()) {
            reconnectExecutor.shutdown();
            try {
                if (!reconnectExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
                    reconnectExecutor.shutdownNow();
                }
            } catch (InterruptedException e) {
                reconnectExecutor.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }

        try {
            if (mqttClient != null && mqttClient.isConnected()) {
                mqttClient.disconnect();
                log.info("MQTT客户端已断开连接");
            }
        } catch (MqttException e) {
            log.error("断开MQTT连接时发生错误: {}", e.getMessage());
        }
    }

    // 发布消息
    public void publishMessage(String topic, Object payload) {
        String message = null;
        try {
            message = objectMapper.writeValueAsString(payload);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("消息序列化失败: " + e.getMessage(), e);
        }
        MqttMessage mqttMessage = new MqttMessage(message.getBytes());
        mqttMessage.setQos(1); // 至少一次
        mqttMessage.setRetained(false);

        final String finalMessage = message;

        try {
            // 确保连接
            if (!mqttClient.isConnected()) {
                log.info("MQTT未连接,正在重新连接...");
                try {
                    mqttClient.connect();
                    log.info("重新连接MQTT成功");

                    // 确保订阅存在
                    if (!isSubscribed) {
                        initSubscriptions();
                    }
                } catch (MqttException e) {
                    log.error("重连MQTT失败: {}", e.getMessage());
                    scheduleReconnect();
                    throw new RuntimeException("MQTT连接失败,消息发送失败", e);
                }
            }

            mqttClient.publish(topic, mqttMessage);
            log.info("发布指令{} ,设备:{} ", finalMessage, topic);

            // 记录到数据库
            try {
                // 从主题中提取设备ID
                String deviceId = topic.split("/")[1];

                // 创建日志记录
                SmNoodleLogBo smNoodleLogBo = new SmNoodleLogBo();
                smNoodleLogBo.setMachineId(deviceId);
                smNoodleLogBo.setOperationType("MQTT指令发送");
                smNoodleLogBo.setPackageInfo(finalMessage);
                smNoodleLogBo.setRemark("主题: " + topic);

                // 保存到数据库
                smNoodleLogServiceImpl.add(smNoodleLogBo);
                log.info("MQTT指令记录已保存到数据库 - 设备: {}, 主题: {}", deviceId, topic);
            } catch (Exception e) {
                log.error("保存MQTT指令到数据库失败: {}", e.getMessage(), e);
                // 不抛出异常,避免中断消息发送流程
            }
        } catch (MqttException e) {
            log.error("发布消息失败: {}", e.getMessage());
            scheduleReconnect();
            throw new RuntimeException("发布消息失败: " + e.getMessage(), e);
        }
    }

    public void subscribe(String topic) throws Exception {
        try {
            mqttClient.subscribe(topic, (t, m) -> {
                String receivedMessage = new String(m.getPayload());
                // 处理接收到的消息
            });
        } catch (MqttException e) {
            log.error("订阅主题 {} 失败: {}", topic, e.getMessage());
            isSubscribed = false; // 重置订阅状态
            throw e; // 保持原有异常抛出行为
        }
    }

    //锁
    private static Map<String, CountDownLatch> latchMap = new ConcurrentHashMap<>();
    private static Map<String, CountDownLatch> orderLatchMap = new ConcurrentHashMap<>();
    //是否下发桌号
    private static Map<String, Boolean> pubTableId = new ConcurrentHashMap<>();
    //是否可以做下份面
    private static Map<String, Boolean> startNext = new ConcurrentHashMap<>();

    @Autowired
    private ISmOrdersService smOrdersService;
    @Autowired
    @Lazy
    private ISmEquipmentService smEquipmentService;
    @Autowired
    private ISmOrderDetailsService smOrderDetailsService;
    @Autowired
    private ISmProductsService smProductsService;
    @Autowired
    private ISmSpecItemService smSpecItemService;
    @Autowired
    private ISmTableConfigService smTableConfigService;


    @Autowired
    private ISysUserService userService;

    @Autowired
    private StringRedisTemplate redisTemplate;

    //可以做面的时候发送订单
    public void sendOrder() {
        try {
            log.info("开始订阅yesOrder主题: {}", yesOrder);
            mqttClient.subscribe(yesOrder, (t, m) -> {
                try {
                    // 记录收到的每条消息
                    String rawMessage = new String(m.getPayload());
                    log.info("接收到yesOrder消息 - 主题: {}, 原始消息: {}", t, rawMessage);

                    // 获取设备号
                    String deviceId = t.split("/")[1];
                    log.info("解析设备ID: {}", deviceId);

                    // 获取状态{"alowOrder":1}
                    JSONObject message = null;
                    try {
                        message = JSON.parseObject(rawMessage);
                        log.info("消息解析结果: {}", message);
                    } catch (Exception e) {
                        log.error("解析JSON消息失败: {}", e.getMessage());
                        return;
                    }

                    // 检查alowOrder字段
                    if (message.get("alowOrder") != null) {
                        log.info("发现alowOrder字段, 值为: {}", message.get("alowOrder"));

                        // 1的时候发送订单
                        if ((Integer) message.get("alowOrder") == 1) {
                            log.info("{}可以做面,准备获取队列数据", deviceId);

                            // 从队列获取数据
                            String deviceLockKey = getDeviceLockKey(deviceId);
                            log.info("获取队列数据的key: {}", deviceLockKey);

                            String orderData = redisUtil.popFromQueue(deviceLockKey);
                            if (orderData != null) {
                                log.info("从队列获取到订单数据: {}", orderData);

                                JSONObject jsonObject = JSON.parseObject(orderData);
                                log.info("解析订单数据为JSON: {}", jsonObject);

                                // 发送消息
                                String controlTopic = String.format(control, deviceId);
                                log.info("准备向主题 {} 发送订单数据", controlTopic);

                                publishMessage(controlTopic, jsonObject);
                                log.info("订单数据发送完成");
                            } else {
                                log.info("队列 {} 中没有可用的订单数据", deviceLockKey);
                            }
                        } else {
                            log.info("{}不可做面,alowOrder值为: {}", deviceId, message.get("alowOrder"));
                        }
                    } else {
                        log.info("消息中未包含alowOrder字段: {}", message);
                    }
                } catch (Exception e) {
                    log.error("处理yesOrder消息时出错: {}", e.getMessage(), e);
                }
            });
            log.info("成功订阅yesOrder主题,已设置回调处理器");
        } catch (MqttException e) {
            log.error("订阅yesOrder主题失败: {}", e.getMessage(), e);
            isSubscribed = false;
            throw new RuntimeException("订阅yesOrder主题失败", e);
        }
    }

    //报警信息
    public void subAlarm() {
        try {
            log.info("开始订阅alarm主题: {}", alarm);
            mqttClient.subscribe(alarm, (t, m) -> {
                //获取设备号
                String deviceId = t.split("/")[1];
                //获取报警信息
                String alarmMessage = new String(m.getPayload());
                log.info("{}报警信息{}", deviceId, alarmMessage);
                processAlarm(deviceId, JSON.parseObject(alarmMessage));
            });
            log.info("成功订阅alarm主题");
        } catch (MqttException e) {
            log.error("订阅alarm主题失败: {}", e.getMessage(), e);
            isSubscribed = false;
            throw new RuntimeException("订阅alarm主题失败", e);
        }
    }

    public static String parseFoodProcessStatus(Map<String, Integer> data) {
        StringBuilder sb = new StringBuilder();

        sb.append("桌号: ").append(data.getOrDefault("a150", 0)).append("\n");
        sb.append("序号: ").append(data.getOrDefault("a151", 0)).append("\n");

        // JDK8 不能使用 switch 表达式,改用 Map
        Map<Integer, String> bowlSizeMap = new HashMap<>();
        bowlSizeMap.put(1, "小碗");
        bowlSizeMap.put(2, "中碗");
        bowlSizeMap.put(3, "大碗");

        sb.append("碗大小: ").append(bowlSizeMap.getOrDefault(data.getOrDefault("a152", 0), "未知")).append("\n");
        sb.append("面篓位置: ").append(data.getOrDefault("a153", 0)).append("\n");
        sb.append("剩余时间: ").append(data.getOrDefault("a154", 0)).append("秒\n");

        // 处理状态值映射,0 是 "进行中",1 是 "未进行"
        Map<String, String> statusMap = new HashMap<>();
        statusMap.put("a155", "加汤");
        statusMap.put("a156", "加浇头");
        statusMap.put("a157", "夹肉");
        statusMap.put("a158", "冲凉");
        statusMap.put("a159", "完成");

        statusMap.forEach((key, value) ->
                sb.append(value).append("状态: ").append(data.getOrDefault(key, 0) == 0 ? value + "中" : "未" + value).append("\n")
        );

        return sb.toString();
    }

    //订阅设备状态
    public void subDeviceStatus() {
        try {
            log.info("开始订阅makeNoodle主题: {}", makeNoodle);
            mqttClient.subscribe(makeNoodle, (t, m) -> {
                Map<String, Integer> message = JSON.parseObject(m.getPayload(), Map.class);
                try {
                    if (message != null) {
                        String deviceId = t.split("/")[1];
                        log.info("收到设备:{},发来的信息:{}", deviceId, message);
                        MatrixQuery query = new MatrixQuery(message);
                        // 获取全部分组标识值
                        Map<Character, Integer> markers = query.getAllGroupMarkers();
                        log.info("处理过后的全部分组标识值: {}", markers);
                        for (Integer value : markers.values()) {
                            //第二列不为0的话,进行循环
                            if (value != null && value > 0) {
                                //查询面条id
                                log.info("zzw这是第二列的数据: {}", value);
                                Object noodleId = redisUtil.get(getOrderNo(value.longValue(), deviceId));
                                log.info("这是sm_noodle_status表的Id: {},zzw这是第二列的数据:{}", noodleId, value);
                                if (noodleId != null) {
                                    //解析数据 value=桌号
                                    Map<String, Integer> stringIntegerMap = query.queryByValue(value);
                                    //stringIntegerMap {a150=4 , a151=7237, a152=1, a153=4, a154=0, a150=1, a159=0, a155=0, a156=0, a157=0, a158=0}
                                    //第一组进来的是a组  a151
                                    log.info("订单号: {},对应的当前行数据:{}", value, stringIntegerMap);
                                    List<Map.Entry<String, Integer>> list = new ArrayList<>(stringIntegerMap.entrySet());
                                    log.info("当前组的List: {}", list);
                                    Boolean eatDataFlag = redisUtil.get(String.valueOf(value)) != null;
                                    //缓存中已经存在,说明已经取过并且吃完了,所以不需要循环
                                    log.info("缓存中已经存在,说明已经取过并且吃完了,所以不需要循环:{}", eatDataFlag);
                                    if (eatDataFlag) {
                                        //这里a已经去走餐了,所以a不走了,但是不能影响b
                                        continue;
                                    }
                                    for (Map.Entry<String, Integer> stringIntegerEntry : stringIntegerMap.entrySet()) {
                                        String key = stringIntegerEntry.getKey();//a151  a152
                                        char type = key.charAt(key.length() - 1);//末尾数字 1  2
                                        int state = stringIntegerEntry.getValue();//状态值 0   1 等等
                                        log.info("这是名字:{},这是第【{}+1】列,这是对应的值:{}", key, type, stringIntegerEntry.getValue());
                                        //更新信息
                                        SmNoodleStatus smNoodleStatus = smNoodleStatusServiceImpl.queryById(Long.valueOf(noodleId + ""));
                                        log.info("这是smNoodleStatus:{}", smNoodleStatus);
                                        if (smNoodleStatus != null) {
                                            switch (type) {
                                                case '0': // 桌号:a150
                                                    smNoodleStatus.setTablePosition(state + "");
                                                    break;
                                                case '1': // 序号:a151
                                                    log.info("序号{}状态{}", noodleId, state);
                                                    break;
                                                case '2': // 碗类型:a152,0未选,1小碗,2中碗,3大碗
                                                    String bowlType;
                                                    switch (state) {
                                                        case 1:
                                                            bowlType = "小碗";
                                                            break;
                                                        case 2:
                                                            bowlType = "中碗";
                                                            break;
                                                        case 3:
                                                            bowlType = "大碗";
                                                            break;
                                                        default:
                                                            bowlType = "未选择";
                                                    }
                                                    smNoodleStatus.setBowlType(bowlType);
                                                    break;
                                                case '3': // 面篓位置:a153
                                                    smNoodleStatus.setNoodleBasketPosition(state + "");
                                                    break;
                                                case '4': // 剩余时间/餐品状态:a154
                                                    if (state == 0) {
                                                        Integer sta = (Integer) redisUtil.get("取餐号状态码:" + value);
                                                        log.info("这是获取缓存中桌号:{},对应的的状态码:{}======【如果为null则还未到待取餐】", value, sta);
                                                        if (sta != null && sta == 1) {
                                                            // TODO 上次是1,这次是0,说明上次是正在取餐,这次是取餐完成
                                                            redisUtil.set("取餐号状态码:" + value, 0, 24, TimeUnit.HOURS);
                                                            SmOrders smOrders = smOrdersService.queryById(Long.valueOf(smNoodleStatus.getOrderId()));
                                                            smNoodleStatus.setEndTime(new Date());
                                                            smNoodleStatus.setStatus("制作完成");
                                                            if (smOrders != null) {
                                                                // TODO 把桌号放缓存---标记这一单结束了,再次循环就不在进来了
                                                                redisUtil.set(value.toString(), "1", 24, TimeUnit.HOURS);
                                                                log.info("增加前zzw这是完成的数量:{},这是面的数量:{}", smOrders.getCompletedCount(), smOrders.getNoodleCount());
                                                                smOrders.setCompletedCount(smOrders.getCompletedCount() + 1);
                                                                if (Objects.equals(smOrders.getCompletedCount(), smOrders.getNoodleCount())) {
                                                                    smOrders.setDeviceStatus(DeviceStatus.COMPLETED.getCode());
                                                                    //加入到叫号
                                                                    redisUtil.addToQueue(RedisConstant.callNumberKey(deviceId), smOrders.getTakeNumber() + "", 24, TimeUnit.HOURS);
                                                                }
                                                                redisUtil.set(value.toString(), "1", 24, TimeUnit.HOURS);
                                                                log.info("增加后zzw这是完成的数量:{},这是面的数量:{}", smOrders.getCompletedCount(), smOrders.getNoodleCount());
                                                                smOrders.setMealStatus(0);
                                                                smOrdersService.updateById(smOrders);
                                                            }
                                                            redisUtil.increment(WAITING_COUNT_KEY + ":" + deviceId, -1);
                                                            // 餐已经到过客位,现在状态为0表示餐被取走了
                                                            smNoodleStatus.setRemainingTime("已取走");
                                                            smNoodleStatus.setStatus("已取餐");
                                                            // 获取订单信息并更新状态
                                                            if (smNoodleStatus.getOrderId() != null) {
                                                                log.info("设备编号:{}订单编号:{}详情id:{}已完成", deviceId, smNoodleStatus.getOrderId(), smNoodleStatus.getDetailId());
                                                                try {
                                                                    SmOrders order = smOrdersService.queryById(Long.valueOf(smNoodleStatus.getOrderId()));
                                                                    // 更新取餐号状态为已取餐 isPicked=0 callStatus=2   isPicked=1 callStatus=2
                                                                    log.info("将02改为状态改为12");
                                                                    smCallNumberService.markAsPicked(order.getStoreId(), order.getTakeNumber());
                                                                    //根据门店找到大屏推送消息
                                                                    log.info("推送消息给门店:" + order.getStoreId());

                                                                    QueryWrapper<SmEquipment> smEquipmentQueryWrapper = new QueryWrapper<>();
                                                                    smEquipmentQueryWrapper.lambda().eq(SmEquipment::getMerchantId, order.getStoreId()).eq(SmEquipment::getMachineType, 2)
                                                                            .eq(SmEquipment::getMachineStatus, 1);
                                                                    List<SmEquipment> smEquipments = smEquipmentMapper.selectListBytoreId(smEquipmentQueryWrapper);
                                                                    if (!smEquipments.isEmpty()) {
                                                                        log.info("推送消息给大屏ID:{}", smEquipments.get(0).getScreenId());
                                                                        callNumberScreeWebSocketHandler.pushToStore(smEquipments.get(0).getScreenId(), "0000");
                                                                    }
                                                                    orderStatusWebSocketHandler.pushToStore(order.getStoreId(), "1111");
                                                                    // 更新订单为已取餐状态
                                                                    order.setMealStatus(1); // 1表示已取餐
                                                                    smOrdersService.updateById(order);
                                                                } catch (Exception e) {
                                                                    log.error("更新订单状态失败:{}", e.getMessage());
                                                                }
                                                            }
                                                        } else {
                                                            //  已完成的餐就不再制作中了
                                                            if (!"制作完成".equals(smNoodleStatus.getStatus())) {
                                                                smNoodleStatus.setRemainingTime("餐品制作中");
                                                                smNoodleStatus.setStatus("制作中");
                                                            }
                                                        }
                                                    } else if (state == 1) {
                                                        // TODO  1==待取餐
                                                        SmOrders order = smOrdersService.queryById(Long.valueOf(smNoodleStatus.getOrderId()));
                                                        log.info("订单{}餐品面条{}已到达客位,等待取餐", order.getOrderNumber(), smNoodleStatus.getId());
                                                        Boolean isExit = redisUtil.get("smNoodleStatusId:" + smNoodleStatus.getId()) == null;
                                                        log.info("拨号大屏已经在叫号=是否已经存在面条编号{}数据,True是不存在,False是存在:{}", smNoodleStatus.getId(), isExit);
                                                        if (isExit) {
                                                            // 餐已到位,可以取餐
                                                            smNoodleStatus.setRemainingTime("已到达客位");
                                                            smNoodleStatus.setStatus("可取餐");
                                                            // 获取订单信息并更新状态
                                                            if (smNoodleStatus.getOrderId() != null) {
                                                                try {
                                                                    // 更新订单为可取餐状态
                                                                    //order.setDeviceStatus(DeviceStatus.COMPLETED.getCode());//制作一个面并不是完成的
                                                                    order.setMealStatus(0); // 0表示可取餐
                                                                    smOrdersService.updateById(order);
                                                                    //根据门店找到大屏推送消息
                                                                    log.info("推送消息给门店:" + order.getStoreId());
                                                                    QueryWrapper<SmEquipment> smEquipmentQueryWrapper = new QueryWrapper<>();
                                                                    smEquipmentQueryWrapper.lambda().eq(SmEquipment::getMerchantId, order.getStoreId()).eq(SmEquipment::getMachineType, 2)
                                                                            .eq(SmEquipment::getMachineStatus, 1);
                                                                    List<SmEquipment> smEquipments = smEquipmentMapper.selectListBytoreId(smEquipmentQueryWrapper);
                                                                    if (!smEquipments.isEmpty()) {
                                                                        log.info("推送消息给大屏ID:{}", smEquipments.get(0).getScreenId());
                                                                        callNumberScreeWebSocketHandler.pushToStore(smEquipments.get(0).getScreenId(), "0000");
                                                                    }
                                                                    orderStatusWebSocketHandler.pushToStore(order.getStoreId(), "1111");
                                                                    log.info("这是修改叫号状态:00改为:02");
                                                                    //这里 callStatus=0   pick=0   变成   pick=0   callStatus=2
                                                                    smCallNumberService.markAsCalled(order.getStoreId(), order.getTakeNumber());
                                                                    //判断下当前号码是否是当前门店的
                                                                    redisUtil.set("smNoodleStatusId:" + smNoodleStatus.getId(), "已经存在报号队列中", 24, TimeUnit.HOURS);
                                                                    redisUtil.set("取餐号状态码:" + value, 1, 24, TimeUnit.HOURS);
                                                                } catch (Exception e) {
                                                                    log.error("更新订单状态失败:{}", e.getMessage());
                                                                }
                                                            }
                                                        }
                                                    } else {
                                                        // 其他数值状态(可能是倒计时)
                                                        smNoodleStatus.setRemainingTime(state + "秒");
                                                    }
                                                    break;
                                                case '5': // 加汤中:a155
                                                    smNoodleStatus.setAddSoup(state == 0 ? "加汤中" : "未加汤");
                                                    break;
                                                case '6': // 加浇头中:a156
                                                    smNoodleStatus.setAddTopping(state == 0 ? "加浇头中" : "未加浇头");
                                                    break;
                                                case '7': // 夹肉:a157
                                                    smNoodleStatus.setMeatInsertion(state == 0 ? "夹肉中" : "未夹肉");
                                                    break;
                                                case '8': // 冲凉:a158
                                                    smNoodleStatus.setWarming(state == 0 ? "冲凉中" : "未冲凉");
                                                    break;
                                                case '9': // 完成:a159
                                                    smNoodleStatus.setIsCompleted(state);
                                                    smNoodleStatus.setCompletionStatus(state == 1 ? "完成" : "未完成");
                                                    break;
                                                default:
                                                    System.out.println("字段 " + key + " 状态未定义");
                                                    break;
                                            }
                                        }
                                        log.info("这是for 循环每次更新的面条状态:");
                                        smNoodleStatusServiceImpl.updateById(smNoodleStatus);
                                    }
                                    SmNoodleLogBo smNoodleLogBo = new SmNoodleLogBo();
                                    smNoodleLogBo.setMachineId(deviceId);
                                    smNoodleLogBo.setOperationType("做面");
                                    smNoodleLogBo.setNoodleId(noodleId + "");
                                    smNoodleLogBo.setPackageInfo(JSONObject.toJSONString(message));
                                    smNoodleLogServiceImpl.add(smNoodleLogBo);
                                }
                            }
                        }

                    }
                } catch (Exception e) {
                    log.error("处理设备状态消息时出错: {}", e.getMessage(), e);
                }
            });
            log.info("成功订阅makeNoodle主题");
        } catch (MqttException e) {
            log.error("订阅makeNoodle主题失败: {}", e.getMessage(), e);
            isSubscribed = false;
            throw new RuntimeException("订阅makeNoodle主题失败", e);
        }
    }


    // 获取某个订单或设备当前做面的数量
    public int getNoodleCount(String deviceId) {
        String key = NOODLE_COUNT_KEY + ":" + deviceId;
        Object countStr = redisTemplate.opsForHash().get(NOODLE_COUNT_KEY, key);
        return countStr == null ? 0 : Integer.parseInt(countStr + "");
    }

    // 增加面数量
    public void incrementNoodleCount(String deviceId) {
        String key = NOODLE_COUNT_KEY + ":" + deviceId;
        redisTemplate.opsForHash().increment(NOODLE_COUNT_KEY, key, 1);
    }

    // 减少面数量
    public void decrementNoodleCount(String deviceId) {
        String key = NOODLE_COUNT_KEY + ":" + deviceId;
        redisTemplate.opsForHash().increment(NOODLE_COUNT_KEY, key, -1);  // 使用负值来减少数量
    }

    // 获取所有机器的面数量
    public Map<Object, Object> getAllNoodleCounts() {
        return redisTemplate.opsForHash().entries(NOODLE_COUNT_KEY);
    }

    private static final long DELAY = 0;  // 初始延迟
    private static final long PERIOD = 3;  // 每 10 秒检查一次

    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private final ExecutorService orderExecutor = new ThreadPoolExecutor(10, // 核心线程数
            20, // 最大线程数
            60L, // 空闲线程的存活时间
            TimeUnit.SECONDS, // 空闲线程的存活时间单位
            new LinkedBlockingQueue<>(100), // 阻塞队列,最大任务数为 100
            new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略);  // 线程池,处理每个新订单

    public void startQueueProcessing() {
        scheduler.scheduleAtFixedRate(() -> {
        }, DELAY, PERIOD, TimeUnit.SECONDS);  // 定时执行
    }

    private final ConcurrentHashMap<String, CountDownLatch> deviceLatches = new ConcurrentHashMap<>();

    public void processQueue(String deviceId) {
        Long size = redisTemplate.opsForList().size(deviceId);
        if (size != null && size > 0) {
            log.info("队列还有{}个订单需要处理", size);
        } else {
//            log.info("队列为空");
            return;
        }
        //判斷機器是否自動
//        Object m0 = redisUtil.get(getM0AutoTopic(deviceId));
//        if (m0 != null) {
//            if (!(Boolean) m0) {
//                log.info("{}机器自动模式未开启", deviceId);
//                return;
//            }
//        }
        String code = deviceId.split(":")[1];
        boolean lock = redisUtil.acquireLock(getDeviceLockKey(code), 60);
        if (lock) {
            String orderId = redisUtil.popFromQueue(deviceId);
            if (orderId == null) {
                return;
            }
            SmOrders smOrders = smOrdersService.queryById(Long.parseLong(orderId));
            orderExecutor.submit(() -> {
                try {
                    subscribeToPaymentStatus(smOrders);
                } catch (Exception e) {
                    redisUtil.releaseLock(getDeviceLockKey(code));
                    log.error("处理订单 {} 时发生异常", smOrders.getOrderNumber(), e);
                }
            });
        } else {
            log.info("机器正在做面");
        }

    }

    @PostConstruct
    @Order(Integer.MIN_VALUE)
    public void initRedisQueue() {
        log.info("初始化Redis队列");
    }

    @Autowired
    private DeviceParamWebSocketHandler deviceParamWebSocketHandler;


    @Autowired
    private CallNumberScreeWebSocketHandler callNumberScreeWebSocketHandler;

    @Autowired
    private OrderStatusWebSocketHandler orderStatusWebSocketHandler;


    private void subParams() {
        try {
            log.info("开始订阅setParam主题: {}", setParam);
            mqttClient.subscribe(setParam, (t, m) -> {
                //获取设备号
                String deviceId = t.split("/")[1];
                //参数
                MachineParameter message = JSON.parseObject(m.getPayload(), MachineParameter.class);
                log.info("{}设置参数{}", deviceId, message.toString());
                //发送到websocket
                message.processReceivedParams();
                deviceParamWebSocketHandler.pushToStore(deviceId, message);
            });
            log.info("成功订阅setParam主题");
        } catch (MqttException e) {
            log.error("订阅setParam主题失败: {}", e.getMessage(), e);
            isSubscribed = false;
            throw new RuntimeException("订阅setParam主题失败", e);
        }
    }


    //接收到订单 发送到上位机
    public void sendOrderToMachine(SmOrders smOrders) {
        //订单拆分成碗
        SmOrderDetailsBo smOrderDetailsBo = new SmOrderDetailsBo();
        smOrderDetailsBo.setOrderId(smOrders.getId());
        List<SmOrderDetails> smOrderDetails = smOrderDetailsService.queryList(smOrderDetailsBo);
        smOrderDetails.forEach(smOrderDetail -> {
            //获取数量
            Long quantity = smOrderDetail.getQuantity();
            //规格
            String specificationDetail = smOrderDetail.getSpecificationDetailId();
            //桌号
            Long mealTableId = smOrders.getMealTableId();
            //根据数量添加到smNoodleStatus
            for (int i = 0; i < quantity; i++) {
                SmNoodleStatus smNoodleStatus = new SmNoodleStatus();
                smNoodleStatus.setOrderId(smOrderDetail.getId() + "");
                smNoodleStatus.setOrderNumber(smOrderDetail.getOrderNumber());
                smNoodleStatus.setNoodlePosition(specificationDetail);
                smNoodleStatus.setMachineId(smOrders.getDeviceId());
                smNoodleStatus.setNoodlePosition(smOrderDetail.getSpecificationDetailId());
            }
        });
    }


    public Long incrementOrInitializeKey(String key) {
        // 判断 key 是否存在
        Boolean exists = redisTemplate.hasKey(key);

        if (Boolean.TRUE.equals(exists)) {
            // 如果 key 存在,递增 1 并返回新值
            return redisTemplate.opsForValue().increment(key);
        } else {
            // 如果 key 不存在,设置初始值为 1
            redisTemplate.opsForValue().set(key, "1000", 1, TimeUnit.DAYS);
            return 1000L;
        }
    }


    @Autowired
    private ISmCallNumberService smCallNumberService;

    public int subscribeToPaymentStatus(SmOrders smOrders) {
        // 获取与设备ID关联的 CountDownLatch
        String deviceId = smOrders.getDeviceId();
        log.info("开始订阅支付状态 {}", deviceId);
        if (StringUtils.isEmpty(deviceId)) {
            throw new ServiceException("设备号不能为空");
        }

        // 添加幂等性检查,防止重复处理同一订单
        String processKey = "order:processed:" + smOrders.getOrderNumber();
        if (redisUtil.exists(processKey)) {
            log.info("订单{}已被处理过,跳过重复处理", smOrders.getOrderNumber());
            return 0;
        }
        // 记录订单已处理,设置10分钟过期时间
        redisUtil.set(processKey, "1", 1, TimeUnit.DAYS);

        //判断订单状态是已支付设备状态是未做面
        if (!smOrders.getOrderStatus().equals(OrderStatus.PAID.getCode()) || !smOrders.getDeviceStatus().equals(DeviceStatus.PENDING.getCode())) {
            log.info("订单状态错误");
            return 0;
        }

        // 检查是否已设置取餐号,如果未设置则生成新的取餐号
        if (StringUtils.isEmpty(smOrders.getTakeNumber())) {
            String storeId = smOrders.getStoreId();
            if (StringUtils.isEmpty(storeId)) {
                log.error("店铺ID不能为空");
                return 0;
            }
            try {
                String takeNumber = getTakeNumber(storeId);
                smOrders.setTakeNumber(takeNumber);
                // 更新订单取餐号
                smOrdersService.updateById(smOrders);
                log.info("订单{}已更新取餐号:{}", smOrders.getOrderNumber(), takeNumber);
            } catch (Exception e) {
                log.error("生成取餐号失败: {}", e.getMessage(), e);
                return 0;
            }
        }
        Long id = smOrders.getId();
        Long mealTableId = smOrders.getMealTableId();
        if (mealTableId == null) {
            log.info("桌号不能为空");
            return 0;
        }
        SmTableConfig smTableConfig = smTableConfigService.queryById(mealTableId);
        if (smTableConfig == null) {
            log.info("桌号不能为空");
            return 0;
        }
        SmOrderDetailsBo smOrderDetailsBo = new SmOrderDetailsBo();
        smOrderDetailsBo.setOrderId(id);
        List<SmOrderDetails> smOrderDetails = smOrderDetailsService.queryList(smOrderDetailsBo);
        //数量累加
        int sumQuantity = smOrderDetails.stream().filter(smOrderDetail -> smOrderDetail.getQuantity() != null).mapToInt(smOrderDetail -> smOrderDetail.getQuantity().intValue()).sum();
        smOrders.setInProgressCount(sumQuantity);
        smOrders.setNoodleCount(sumQuantity);
        smOrders.setCompletedCount(0);
        redisUtil.increment(WAITING_COUNT_KEY + ":" + deviceId, sumQuantity);
        for (SmOrderDetails smOrderDetail : smOrderDetails) {
            Long productId = smOrderDetail.getProductId();
            if (productId == null) {
                continue;
            }
            SmProducts smProducts = smProductsService.queryById(productId);
            String operType = smProducts.getProductCommand();
            JSONObject m520 = new JSONObject();
            m520.put("method", "order");
            Long quantity = smOrderDetail.getQuantity();
            log.info("这是做面的份数:{}", quantity);
            //做面完成
            for (Long i = 0L; i < quantity; i++) {
                JSONObject upMessage = new JSONObject();
                //修改判断 不再关联规格 根据牛肉面炸酱面来进行穿餐
                if ("炸酱面".equals(smOrderDetail.getProductName())) {
                    //{"wTopping":2}
                    upMessage.put("wTopping", 2);
                    //{"wSoup":2}
                    upMessage.put("wSoup", 2);
                }
                if ("红烧牛肉面".equals(smOrderDetail.getProductName())) {
                    //{"wTopping":1}
                    upMessage.put("wTopping", 1);
                    //{"wSoup":2}
                    upMessage.put("wSoup", 1);
                }
                String specificationDetail = smOrderDetail.getSpecificationDetail();
                log.info("获取做面的详情,拼接发送命令大中小:{}", specificationDetail);
                if (specificationDetail.contains("小份")) {
                    //{"wSize":1}
                    upMessage.put("wSize", 1);
                }
                if (specificationDetail.contains("中份")) {
                    //{"wSize":2}
                    upMessage.put("wSize", 2);
                }
                if (specificationDetail.contains("大份")) {
                    //{"wSize":3}
                    upMessage.put("wSize", 3);
                }
                try {
                    SmNoodleStatusBo smNoodleStatusBo = new SmNoodleStatusBo();
                    if (smTableConfig.getMachineLocation() != null) {
                        m520.put(smTableConfig.getMachineLocation().trim(), true);
                        smNoodleStatusBo.setTablePosition(smTableConfig.getMachineLocation());
                        smNoodleStatusBo.setPickupPosition(smTableConfig.getLocationDesc());
                    }
                    smNoodleStatusBo.setIsCompleted(0);
                    smNoodleStatusBo.setOrderId(smOrders.getId() + "");
                    smNoodleStatusBo.setNoodlePosition(operType);
                    smNoodleStatusBo.setDetailId(smOrderDetail.getId());
                    smNoodleStatusBo.setSpecification(smOrderDetail.getSpecificationDetail());
                    smNoodleStatusBo.setCompletionPosition(smProducts.getProductFinishCommand());
                    smNoodleStatusBo.setOrderNumber(smOrders.getOrderNumber());
                    smNoodleStatusBo.setNoodleName(smOrderDetail.getProductName());
                    smNoodleStatusBo.setMachineId(deviceId);
                    m520.put("noodleId", smNoodleStatusBo.getId());
//                    mqttPublishVo.setDeviceId(deviceId);
//                    mqttPublishVo.setNoodleName(smProducts.getProductName());
                    smOrders.setMealTableId(smTableConfig.getId());
                    smOrdersService.updateById(smOrders);
//                    mqttPublishVo.setTableNumber(smTableConfig.getTableNumber());
//                    mqttPublishVo.setQuantity(1);
                    smNoodleStatusBo.setPackageData(JSON.toJSONString(smNoodleStatusBo));
                    smNoodleStatusBo.setStartTime(new Date());
                    smNoodleStatusBo.setStatus("发送报数据");
                    smNoodleStatusServiceImpl.add(smNoodleStatusBo);
//                    mqttPublishVo.setNoodleId(smNoodleStatusBo.getId());
//                    JSONObject publishMessage = JSON.parseObject(JSON.toJSONString(mqttPublishVo));
//                    publishMessage.putAll(mapItem);
                    //生成上位机数据
                    Long currentValue = incrementOrInitializeKey("NiuMaBang");
                    upMessage.put("wNo", currentValue);
                    log.info("这是传来的桌号:{}", smTableConfig.getTableNumber());
                    upMessage.put("wNumberOfTable", Integer.parseInt(smTableConfig.getTableNumber()));
                    // 添加到Redis队列,设置1小时过期时间
                    redisUtil.addToQueue(getDeviceLockKey(deviceId), JSON.toJSONString(upMessage), 1, TimeUnit.HOURS);
                    log.info("添加订单数据到Redis队列,设置1小时过期 - 设备: {}, 订单: {}", deviceId, smOrders.getOrderNumber());

                    //redis-key生成四位随机字符 过期一个小时
                    log.info("这是存入的订单编号:{},还有设备ID:{}", currentValue, deviceId);
                    redisUtil.set(getOrderNo(currentValue, deviceId), smNoodleStatusBo.getId() + "", 60, TimeUnit.MINUTES);
//                    log.info("发送给上位机数据:{}", JSON.toJSONString(publishMessage));
                    //机器需要做的面的数量
                    incrementNoodleCount(deviceId);
                } catch (Exception e) {
                    log.error("处理设备 {} 的消息时发生异常: {}", deviceId, e.getMessage(), e);
                    return 0;
                }
            }

        }


        //循环添加以后再进行生成叫号
        List<SmNoodleStatus> smNoodleStatuses = smNoodleStatusService.queryListBug(smOrders);
        log.info("这是查询的数据长度: {}", smNoodleStatuses.size());
        if (!smNoodleStatuses.isEmpty()) {
            //生成多个取餐号
            for (int i = 0; i < smNoodleStatuses.size(); i++) {
                log.info("这是第{}编循环插入数据", i + 1);
                // 在订单支付成功生成取餐号后
                SmCallNumber callNumber = new SmCallNumber();
                callNumber.setStoreId(smOrders.getStoreId());
                callNumber.setDeviceId(smOrders.getDeviceId());
                callNumber.setOrderId(smOrders.getId());
                callNumber.setOrderNumber(smOrders.getOrderNumber());
                callNumber.setTakeNumber(smOrders.getTakeNumber());
                callNumber.setCallStatus(0); // 等待叫号
                callNumber.setIsPicked(0); // 未取餐
                callNumber.setTenantCode(UserRoleConstants.DEFAULT_TENANT_ID); // 租户编码
                callNumber.setDisplayPriority(0); // 默认优先级
                callNumber.setCreateTime(new Date());
                callNumber.setUpdateTime(new Date());
                //保存面的名称 大小完 种类
                callNumber.setNoodleName(smNoodleStatuses.get(i).getNoodleName());
                //大小份从详情表查询
                String specificationDetail = smNoodleStatuses.get(i).getSpecification();
                if (specificationDetail.contains("小份")) {
                    callNumber.setBowlType("小份");
                }
                if (specificationDetail.contains("中份")) {
                    callNumber.setBowlType("中份");
                }
                if (specificationDetail.contains("大份")) {
                    callNumber.setBowlType("大份");
                }
                callNumber.setSpecification(removeSpecificWords(smNoodleStatuses.get(i).getSpecification()));
                log.info("这是保存成功的数据{}", callNumber);
                boolean save = smCallNumberService.save(callNumber);
                log.info("{}这是保存成功失败的标识", save);
            }
            log.info("点餐机00推送消息给门店:{}", smOrders.getStoreId());
            QueryWrapper<SmEquipment> smEquipmentQueryWrapper = new QueryWrapper<>();
            smEquipmentQueryWrapper.lambda().eq(SmEquipment::getMerchantId, smOrders.getStoreId()).eq(SmEquipment::getMachineType, 2)
                    .eq(SmEquipment::getMachineStatus, 1);
            List<SmEquipment> smEquipments = smEquipmentMapper.selectListBytoreId(smEquipmentQueryWrapper);
            if (!smEquipments.isEmpty()) {
                log.info("点餐机00推送消息给大屏ID:{}", smEquipments.get(0).getScreenId());
                callNumberScreeWebSocketHandler.pushToStore(smEquipments.get(0).getScreenId(), "0000");
            }
            orderStatusWebSocketHandler.pushToStore(smOrders.getStoreId(), "1111");
        }
        smOrders.setDeviceStatus(DeviceStatus.IN_PROGRESS.getCode());
        smOrdersService.updateById(smOrders);
        return 1;
    }


    public String removeSpecificWords(String input) {
        if (input == null || input.isEmpty()) {
            return input;
        }
        // 定义需要去除的词汇列表
        String[] wordsToRemove = {"大份,", "小份,", "中份,", "汤面,", "干面,", "牛肉,", "炸酱,"};

        String result = input;
        for (String word : wordsToRemove) {
            result = result.replace(word, "");
        }
        return result.trim();
    }

    public void queueDeviceOrder(SmOrders smOrders) {
        //订单加入队列,设置1小时过期时间
        redisUtil.addToQueue(getOrderQueueKey(smOrders.getDeviceId()), smOrders.getId() + "", 1, TimeUnit.HOURS);
        log.info("订单加入设备队列,设置1小时过期 - 设备: {}, 订单: {}", smOrders.getDeviceId(), smOrders.getOrderNumber());
        // 使用线程池来处理当前订单
//        scheduler.scheduleAtFixedRate(() -> processQueue(smOrders.getDeviceId()), DELAY, PERIOD, TimeUnit.SECONDS);
    }

    // 报警代码映射表(可配置化)
    public static final Map<String, String> ALARM_DESCRIPTIONS = new LinkedHashMap() {{
        put("alert00", "面机缺水");
        put("alert01", "面机缺面");
        put("alert02", "落碗机缺碗");
        put("alert03", "浇头缺卤");
        put("alert04", "加汤机缺汤");
        put("alert05", "切肉机缺肉");
        put("alert06", "可机出面异常");
        put("alert07", "面篓1翻转异常");
        put("alert08", "面篓2翻转异常");
        put("alert09", "面篓3翻转异常");
        put("alert10", "面篓4翻转异常");
        put("alert11", "面篓5翻转异常");
        put("alert12", "面篓6翻转异常");
        put("alert13", "小料1缺料");
        put("alert14", "小料2缺料");
        put("alert15", "小料3缺料");
        put("alert16", "小料4缺料");
        put("alert17", "小料5缺料");
        put("alert18", "小料6缺料");
        put("alert19", "小料7缺料");
        put("alert20", "小料8缺料");
        put("alert21", "落碗异常");
        put("alert22", "煮面炉1温度报警");
        put("alert23", "煮面炉2温度报警");
        put("alert24", "面机加水驱动报警1");
        put("alert25", "浇头阀门驱动报警2");
        put("alert26", "浇头伺服报警3");
        put("alert27", "和面伺服报警4");
        put("alert28", "到加汤位超时");
        put("alert29", "到浇头位超时");
        put("alert30", "到切肉位超时");
        put("alert31", "到1号桌超时");
        put("alert32", "到2号桌超时");
        put("alert33", "到3号桌超时");
        put("alert34", "到4号桌超时");
        put("alert35", "面篓1回原异常");
        put("alert36", "面篓2回原异常");
        put("alert37", "面篓3回原异常");
        put("alert38", "面篓4回原异常");
        put("alert39", "面篓5回原异常");
        put("alert40", "面篓6回原异常");
        put("alert41", "煮面炉1低水位报警");
        put("alert42", "煮面炉2低水位报警");
        put("alert43", "煮面炉1干烧报警");
        put("alert44", "煮面炉2干烧报警");
    }};

    @Autowired
    private ISmDeviceExceptionService smDeviceExceptionService;

    // 报警处理增强方法示例
    private void processAlarm(String deviceSN, JSONObject payload) {
        List<String> activeAlarms = ALARM_DESCRIPTIONS.keySet().stream()
                .filter(field -> payload.getInteger(field) != null && payload.getInteger(field) == 1)
                .map(field -> String.format("%s(%s)", ALARM_DESCRIPTIONS.get(field), field))
                .collect(Collectors.toList());

        if (!activeAlarms.isEmpty()) {
            SmDeviceExceptionBo smDeviceException = new SmDeviceExceptionBo();
            smDeviceException.setDeviceId(deviceSN);
            smDeviceException.setExceptionTime(new Date());
            smDeviceException.setExceptionType("机器报警");
            smDeviceException.setExceptionMessage(String.join(" | ", activeAlarms));
            smDeviceExceptionService.add(smDeviceException);
            String alarmDetails = String.join(" | ", activeAlarms);
            log.warn("设备报警 - 设备SN: {} | 报警详情: {}", deviceSN, alarmDetails);
            // 发送到监控系统
//            alarmService.notify(deviceSN, activeAlarms);
        }
    }

    // 在文件中找到获取设备锁key的方法,添加日志输出
    // 如果找不到具体方法,可能是来自RedisConstant类,需要在使用处添加日志

    // 在文件底部添加方法获取RedisConstant中的key生成,并增加日志
    public String getDeviceLockKey(String deviceId) {
        String key = RedisConstant.getDeviceLockKey(deviceId);
        log.info("生成设备锁key - 设备ID: {}, 生成的key: {}", deviceId, key);
        return key;
    }

    // 同样为getOrderNo方法添加日志
    public String getOrderNo(Long value, String deviceId) {
        String key = RedisConstant.getOrderNo(value, deviceId);
        log.info("生成订单号key - 值: {}, 设备ID: {}, 生成的key: {}", value, deviceId, key);
        return key;
    }

    @Resource
    private SmOrdersMapper smOrdersMapper;

    // 添加专门用于诊断订阅问题的方法
    public void diagnoseSubscription(String specificTopic) {
        try {
            log.info("开始诊断特定主题的订阅: {}", specificTopic);

            // 检查当前连接状态
            boolean connected = mqttClient.isConnected();
            log.info("MQTT连接状态: {}", connected ? "已连接" : "未连接");

            if (!connected) {
                log.warn("MQTT未连接,无法继续诊断");
                return;
            }

            // 检查特定主题是否匹配我们的订阅模式
            boolean matchesYesOrder = specificTopic.matches("yc/[^/]+/yesOrder");
            log.info("特定主题 {} 是否匹配yesOrder模式: {}", specificTopic, matchesYesOrder);

            // 尝试特定订阅
            log.info("尝试直接订阅特定主题: {}", specificTopic);
            mqttClient.subscribe(specificTopic, (topic, message) -> {
                log.info("收到特定主题消息 - 主题: {}, 消息: {}", topic, new String(message.getPayload()));
            });
            log.info("成功订阅特定主题: {}", specificTopic);

        } catch (Exception e) {
            log.error("诊断订阅时出错: {}", e.getMessage(), e);
        }
    }

    // 添加一个强制重新连接和订阅的公共方法,可以通过API调用
    public void forceReconnectAndSubscribe() {
        log.info("强制重新连接和订阅MQTT");
        try {
            // 先断开连接
            if (mqttClient.isConnected()) {
                log.info("断开当前MQTT连接");
                mqttClient.disconnect();
            }

            // 重置订阅状态
            isSubscribed = false;

            // 重新连接
            log.info("重新连接MQTT");
            connectAndRetry();

            log.info("强制重新连接和订阅完成");
        } catch (Exception e) {
            log.error("强制重新连接和订阅失败: {}", e.getMessage(), e);
            // 启动后台重连机制
            scheduleReconnect();
        }
    }

    // 提供检查MQTT状态的方法
    public Map<String, Object> checkMqttStatus() {
        Map<String, Object> status = new HashMap<>();
        try {
            boolean connected = mqttClient != null && mqttClient.isConnected();
            status.put("connected", connected);
            status.put("subscribed", isSubscribed);

            if (mqttClient != null) {
                status.put("clientId", mqttClient.getClientId());
                status.put("serverUri", mqttClient.getServerURI());
            }

            // 替换Map.of(),使用Java 8兼容的方式
            Map<String, String> topicPatterns = new HashMap<>();
            topicPatterns.put("makeNoodle", makeNoodle);
            topicPatterns.put("yesOrder", yesOrder);
            topicPatterns.put("alarm", alarm);
            topicPatterns.put("setParam", setParam);

            status.put("topicPatterns", topicPatterns);

        } catch (Exception e) {
            log.error("检查MQTT状态时出错: {}", e.getMessage(), e);
            status.put("error", e.getMessage());
        }
        return status;
    }

    // 取餐号生成逻辑,从ApiSmOrdersController复制过来并稍作修改
    private String getTakeNumber(String storeId) {
        String today = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
        String key = "ticket:number:" + today + ":" + storeId;
        String lockKey = "ticket:lock:" + today + ":" + storeId;

        try {
            // 使用分布式锁确保序号分配的原子性
            boolean lockAcquired = redisUtil.acquireLock(lockKey, 1); // 1分钟锁超时
            if (!lockAcquired) {
                // 如果无法获取锁,等待短暂时间后重试
                Thread.sleep(100);
                return getTakeNumber(storeId);
            }

            // 检查当前值
            Long currentNumber = 0L;
            if (redisUtil.hasKey(key)) {
                Object currentValue = redisUtil.get(key);
                if (currentValue != null) {
                    try {
                        currentNumber = Long.parseLong(currentValue.toString());
                    } catch (NumberFormatException e) {
                        log.error("取餐号格式错误,重置为0: {}", e.getMessage());
                    }
                }
            } else {
                // 如果键不存在,检查数据库中今天的最大取餐号
                try {
                    String startOfDay = today + " 00:00:00";
                    String endOfDay = today + " 23:59:59";

                    // 查询当天最大的取餐号
                    List<SmOrders> todayOrders = smOrdersMapper.selectList(
                            new LambdaQueryWrapper<SmOrders>()
                                    .eq(SmOrders::getStoreId, storeId)
                                    .between(SmOrders::getCreateTime, startOfDay, endOfDay)
                                    .isNotNull(SmOrders::getTakeNumber)
                                    .ne(SmOrders::getTakeNumber, "")
                                    .orderByDesc(SmOrders::getTakeNumber)
                                    .last("LIMIT 1")
                    );

                    if (!todayOrders.isEmpty()) {
                        SmOrders latestOrder = todayOrders.get(0);
                        String latestTakeNumber = latestOrder.getTakeNumber();
                        if (latestTakeNumber != null && latestTakeNumber.startsWith("#")) {
                            try {
                                currentNumber = Long.parseLong(latestTakeNumber.substring(1));
                            } catch (NumberFormatException e) {
                                log.error("数据库取餐号格式错误: {}", e.getMessage());
                            }
                        }
                    }
                } catch (Exception e) {
                    log.error("查询数据库最大取餐号失败: {}", e.getMessage());
                }
            }

            // 增加计数器
            Long number = currentNumber + 1;

            // 设置或更新Redis
            redisUtil.set(key, number);

            // 设置当天过期时间(如果是第一次设置)
            if (number == 1) {
                // 设置过期时间到明天凌晨
                LocalDateTime midnight = LocalDate.now().plusDays(1).atStartOfDay();
                long secondsUntilMidnight = ChronoUnit.SECONDS.between(LocalDateTime.now(), midnight);
                redisUtil.setExpire(key, secondsUntilMidnight, TimeUnit.SECONDS);
            }

            // 格式化为3位数,如001
            String tno = String.format("#%03d", number);
            log.info("生成取餐号 - 店铺ID: {}, 日期: {}, 序号: {}", storeId, today, tno);
            return tno;
        } catch (Exception e) {
            log.error("生成取餐号失败: {}", e.getMessage(), e);
            // 出现异常时,生成基于当前时间的备用号码,确保服务不中断
            String fallbackNumber = "#" + System.currentTimeMillis() % 1000;
            return fallbackNumber;
        } finally {
            // 释放锁
            redisUtil.releaseLock(lockKey);
        }
    }

}

java
Python
  • 作者:91张先生(联系作者)
  • 发表时间:2026-01-10 16:22
  • 版权声明:自由转载-非商用-非衍生-保持署名
  • 项目开源,联系作者
  • 评论