原创

Java—物联网开发-OPC DA协议 UA协议


####OPC DA 和 OPC UA 是 OPC 基金会制定的两套工业通信标准,均用于工业自动化系统中不同设备 / 软件间的数据交互,但 OPC UA 是 OPC DA 的下一代升级标准,在架构、安全性、跨平台性等方面有本质提升。

import OpenOPC
import socket
import json
import time
from concurrent.futures import ThreadPoolExecutor

def connect_opc():
    print("正在连接OPC服务器...")
    """连接到OPC服务器"""
    opc = OpenOPC.client()
    try:
        opc.connect(opc_server='SUPCON.AdvOPCServer.1', opc_host="128.128.2.130")
        print("OPC连接成功!")
        return opc
    except Exception as e:
        print(f"OPC连接失败: {e}")
        return None

# UDP服务器配置
server_ip = "192.168.166.255"
server_port = 9876
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

# 读取多个标签
tags = ['TE_301', 'TE_302', 'TE_303', 'TE_304', "TE_305", 'TE_101', 'TE_102', 'TE_103', 'TE_104', "TE_105",
        'LT_301', 'LT_302', 'LT_303', 'LT_304', "LT_305", 'LT_101', 'LT_102', 'LT_103', 'LT_104', "LT_105"]

# 初始连接
opc = connect_opc()

executor = ThreadPoolExecutor(max_workers=5)

def send_data_via_udp(json_data, server_ip, server_port):
    """通过UDP发送数据的函数"""
    try:
        udp_socket.sendto(json_data.encode('utf-8'), (server_ip, server_port))
        print(f"数据已发送到 {server_ip}:{server_port}")
        print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
    except Exception as e:
        print(f"发送数据时出错: {e}")


def read_tags_in_batches(opc, tags, batch_size=5):
    """分批读取标签"""
    data_dict = {}
    for i in range(0, len(tags), batch_size):
        batch = tags[i:i+batch_size]
        try:
            values = opc.read(batch, sync=True)
            for tag, value in zip(batch, values):
                # value是一个元组 (tag_name, value, quality, time)
                data_dict[tag] = value[1]  # 取实际值部分
#                 print(f"{tag}: {value[1]}")
        except Exception as e:
            print(f"读取批次标签时出错: {e}")
            raise
    return data_dict

try:
    while True:
        # 检查OPC连接状态,如果断开则尝试重连
        if opc is None:
            print("等待30秒后尝试重连...")
            time.sleep(30)
            opc = connect_opc()
            continue

        try:
           # 分批读取标签值
           data_dict = read_tags_in_batches(opc, tags, batch_size=5)
           # 将数据转换为JSON格式
           json_data = json.dumps(data_dict)
           # 使用线程池发送数据
           executor.submit(send_data_via_udp, json_data, server_ip, server_port)
        except Exception as e:
            print(f"读取数据时出错: {e}")
            print("关闭当前OPC连接")
            try:
                opc.close()
            except:
                pass
            opc = None
            print("等待30秒后尝试重连...")
            time.sleep(30)
            opc = connect_opc()
            continue

        # 等待5秒
        time.sleep(5)

except Exception as e:
    print(f"程序已停止: {e}")
    print("程序已停止")
finally:
    print("关闭线程池")
    # 清理资源
    if opc:
        try:
            opc.close()
        except:
            pass
    udp_socket.close()
    executor.shutdown(wait=True)  # 关闭线程池

import OpenOPC
import socket
import json
import time
from concurrent.futures import ThreadPoolExecutor

def connect_opc():
    print("正在连接OPC服务器...")
    """连接到OPC服务器"""
    opc = OpenOPC.client()
    try:
        opc.connect(opc_server='SUPCON.JXServer.1', opc_host="192.168.0.3")
        print("OPC连接成功!")
        return opc
    except Exception as e:
        print(f"OPC连接失败: {e}")
        return None

# UDP服务器配置
server_ip = "192.168.166.255"
server_port = 9876
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

# 读取多个标签
# 1007液位:T40541、温度T40483;
# 1009液位COM2_9_40002_JS、温度COM2_9_40001_JS;
# 1010液位COM2_10_40002_JS、温度COM2_10_40001_JS;
# 1011液位COM2_11_40002_JS、温度COM2_11_40001_JS;不可用
# 1011液位T40121、温度T40122;
# 1013液位COM2_13_40002_JS、温度COM2_13_40001_JS;
# 1006液位T40184;1006温度:T40183;
# 1014液位:T40182;1014温度:T40181;
tags = ['T40541','T40483','COM2_9_40001_JS','COM2_9_40002_JS',
        'COM2_10_40001_JS','COM2_10_40002_JS',
        'T40121','T40122','COM2_13_40001_JS','COM2_13_40002_JS',
        'T40184','T40183','T40182','T40181']

# 初始连接
opc = connect_opc()

executor = ThreadPoolExecutor(max_workers=5)

def send_data_via_udp(json_data, server_ip, server_port):
    """通过UDP发送数据的函数"""
    try:
        udp_socket.sendto(json_data.encode('utf-8'), (server_ip, server_port))
        print(f"数据已发送到 {server_ip}:{server_port}")
        print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
    except Exception as e:
        print(f"发送数据时出错: {e}")


def read_tags_in_batches(opc, tags, batch_size=5):
    """分批读取标签"""
    data_dict = {}
    for i in range(0, len(tags), batch_size):
        batch = tags[i:i+batch_size]
        try:
            values = opc.read(batch, sync=True)
            for tag, value in zip(batch, values):
                # value是一个元组 (tag_name, value, quality, time)
                data_dict[tag] = value[1]  # 取实际值部分
#                 print(f"{tag}: {value[1]}")
        except Exception as e:
            print(f"读取批次标签时出错: {e}")
            raise
    print(f'读取到的数据:{data_dict}')
    return data_dict

try:
    while True:
        # 检查OPC连接状态,如果断开则尝试重连
        if opc is None:
            print("等待30秒后尝试重连...")
            time.sleep(30)
            opc = connect_opc()
            continue

        try:
           # 分批读取标签值
           data_dict = read_tags_in_batches(opc, tags, batch_size=5)
           # 将数据转换为JSON格式
           json_data = json.dumps(data_dict)
           # 使用线程池发送数据
           executor.submit(send_data_via_udp, json_data, server_ip, server_port)
        except Exception as e:
            print(f"读取数据时出错: {e}")
            print("关闭当前OPC连接")
            try:
                opc.close()
            except:
                pass
            opc = None
            print("等待30秒后尝试重连...")
            time.sleep(30)
            opc = connect_opc()
            continue

        # 等待5秒
        time.sleep(5)

except Exception as e:
    print(f"程序已停止: {e}")
    print("程序已停止")
finally:
    print("关闭线程池")
    # 清理资源
    if opc:
        try:
            opc.close()
        except:
            pass
    udp_socket.close()
    executor.shutdown(wait=True)  # 关闭线程池

####OPC DA 是Windows 专属、功能单一的传统工业数据标准,已逐渐被淘汰;OPC UA 是跨平台、全功能、高安全的新一代工业通信架构,是当前工业互联网、智能制造的主流标准。

java
  • 作者:91张先生(联系作者)
  • 发表时间:2026-01-10 16:13
  • 版权声明:自由转载-非商用-非衍生-保持署名
  • 项目开源,联系作者
  • 评论