####OPC DA 和 OPC UA 是 OPC 基金会制定的两套工业通信标准,均用于工业自动化系统中不同设备 / 软件间的数据交互,但 OPC UA 是 OPC DA 的下一代升级标准,在架构、安全性、跨平台性等方面有本质提升。
import OpenOPC
import socket
import json
import time
from concurrent.futures import ThreadPoolExecutor
def connect_opc():
print("正在连接OPC服务器...")
"""连接到OPC服务器"""
opc = OpenOPC.client()
try:
opc.connect(opc_server='SUPCON.AdvOPCServer.1', opc_host="128.128.2.130")
print("OPC连接成功!")
return opc
except Exception as e:
print(f"OPC连接失败: {e}")
return None
# UDP服务器配置
server_ip = "192.168.166.255"
server_port = 9876
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 读取多个标签
tags = ['TE_301', 'TE_302', 'TE_303', 'TE_304', "TE_305", 'TE_101', 'TE_102', 'TE_103', 'TE_104', "TE_105",
'LT_301', 'LT_302', 'LT_303', 'LT_304', "LT_305", 'LT_101', 'LT_102', 'LT_103', 'LT_104', "LT_105"]
# 初始连接
opc = connect_opc()
executor = ThreadPoolExecutor(max_workers=5)
def send_data_via_udp(json_data, server_ip, server_port):
"""通过UDP发送数据的函数"""
try:
udp_socket.sendto(json_data.encode('utf-8'), (server_ip, server_port))
print(f"数据已发送到 {server_ip}:{server_port}")
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
except Exception as e:
print(f"发送数据时出错: {e}")
def read_tags_in_batches(opc, tags, batch_size=5):
"""分批读取标签"""
data_dict = {}
for i in range(0, len(tags), batch_size):
batch = tags[i:i+batch_size]
try:
values = opc.read(batch, sync=True)
for tag, value in zip(batch, values):
# value是一个元组 (tag_name, value, quality, time)
data_dict[tag] = value[1] # 取实际值部分
# print(f"{tag}: {value[1]}")
except Exception as e:
print(f"读取批次标签时出错: {e}")
raise
return data_dict
try:
while True:
# 检查OPC连接状态,如果断开则尝试重连
if opc is None:
print("等待30秒后尝试重连...")
time.sleep(30)
opc = connect_opc()
continue
try:
# 分批读取标签值
data_dict = read_tags_in_batches(opc, tags, batch_size=5)
# 将数据转换为JSON格式
json_data = json.dumps(data_dict)
# 使用线程池发送数据
executor.submit(send_data_via_udp, json_data, server_ip, server_port)
except Exception as e:
print(f"读取数据时出错: {e}")
print("关闭当前OPC连接")
try:
opc.close()
except:
pass
opc = None
print("等待30秒后尝试重连...")
time.sleep(30)
opc = connect_opc()
continue
# 等待5秒
time.sleep(5)
except Exception as e:
print(f"程序已停止: {e}")
print("程序已停止")
finally:
print("关闭线程池")
# 清理资源
if opc:
try:
opc.close()
except:
pass
udp_socket.close()
executor.shutdown(wait=True) # 关闭线程池
import OpenOPC
import socket
import json
import time
from concurrent.futures import ThreadPoolExecutor
def connect_opc():
print("正在连接OPC服务器...")
"""连接到OPC服务器"""
opc = OpenOPC.client()
try:
opc.connect(opc_server='SUPCON.JXServer.1', opc_host="192.168.0.3")
print("OPC连接成功!")
return opc
except Exception as e:
print(f"OPC连接失败: {e}")
return None
# UDP服务器配置
server_ip = "192.168.166.255"
server_port = 9876
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 读取多个标签
# 1007液位:T40541、温度T40483;
# 1009液位COM2_9_40002_JS、温度COM2_9_40001_JS;
# 1010液位COM2_10_40002_JS、温度COM2_10_40001_JS;
# 1011液位COM2_11_40002_JS、温度COM2_11_40001_JS;不可用
# 1011液位T40121、温度T40122;
# 1013液位COM2_13_40002_JS、温度COM2_13_40001_JS;
# 1006液位T40184;1006温度:T40183;
# 1014液位:T40182;1014温度:T40181;
tags = ['T40541','T40483','COM2_9_40001_JS','COM2_9_40002_JS',
'COM2_10_40001_JS','COM2_10_40002_JS',
'T40121','T40122','COM2_13_40001_JS','COM2_13_40002_JS',
'T40184','T40183','T40182','T40181']
# 初始连接
opc = connect_opc()
executor = ThreadPoolExecutor(max_workers=5)
def send_data_via_udp(json_data, server_ip, server_port):
"""通过UDP发送数据的函数"""
try:
udp_socket.sendto(json_data.encode('utf-8'), (server_ip, server_port))
print(f"数据已发送到 {server_ip}:{server_port}")
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
except Exception as e:
print(f"发送数据时出错: {e}")
def read_tags_in_batches(opc, tags, batch_size=5):
"""分批读取标签"""
data_dict = {}
for i in range(0, len(tags), batch_size):
batch = tags[i:i+batch_size]
try:
values = opc.read(batch, sync=True)
for tag, value in zip(batch, values):
# value是一个元组 (tag_name, value, quality, time)
data_dict[tag] = value[1] # 取实际值部分
# print(f"{tag}: {value[1]}")
except Exception as e:
print(f"读取批次标签时出错: {e}")
raise
print(f'读取到的数据:{data_dict}')
return data_dict
try:
while True:
# 检查OPC连接状态,如果断开则尝试重连
if opc is None:
print("等待30秒后尝试重连...")
time.sleep(30)
opc = connect_opc()
continue
try:
# 分批读取标签值
data_dict = read_tags_in_batches(opc, tags, batch_size=5)
# 将数据转换为JSON格式
json_data = json.dumps(data_dict)
# 使用线程池发送数据
executor.submit(send_data_via_udp, json_data, server_ip, server_port)
except Exception as e:
print(f"读取数据时出错: {e}")
print("关闭当前OPC连接")
try:
opc.close()
except:
pass
opc = None
print("等待30秒后尝试重连...")
time.sleep(30)
opc = connect_opc()
continue
# 等待5秒
time.sleep(5)
except Exception as e:
print(f"程序已停止: {e}")
print("程序已停止")
finally:
print("关闭线程池")
# 清理资源
if opc:
try:
opc.close()
except:
pass
udp_socket.close()
executor.shutdown(wait=True) # 关闭线程池
####OPC DA 是Windows 专属、功能单一的传统工业数据标准,已逐渐被淘汰;OPC UA 是跨平台、全功能、高安全的新一代工业通信架构,是当前工业互联网、智能制造的主流标准。
评论