以Python 做服务器,N Robot 做客户端,小小UI,拿捏
先Show 结果~
虚拟开足
import socket
import threading
import logging
from tkinter import Tk, Canvas, BOTH, YES, NW, N, mainloop, font as tkfont
from tkinter.constants import S, E, W
from datetime import datetime
import os
# 定义文件夹路径
folder_path = r'c:\Log6'
# 检查文件夹是否存在,如果不存在则创建
if not os.path.exists(folder_path):
os.makedirs(folder_path)
class RobotServer:
def __init__(self, host, port, canvas):
self.host = host
self.port = port
self.server_socket = None
self.clients = {}
self.lock = threading.Lock()
self.canvas = canvas
self.A = 0
self.B = 4
self.R1_LS = []
self.R2_LS = []
self.R3_LS = []
self.R4_LS = []
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 初始化画布内容
self.init_canvas()
def init_canvas(self):
w = self.canvas.winfo_width()
h = self.canvas.winfo_height()
L_w, M_w, R_w = w//3, w//3, w - (w//3 * 2)
L_h, M_h, R_h = h, h, h
# 显示整体、曲线、计划的文本标签,字体为60,背景为立体感
titles = ["整体"]
for i, title in enumerate(titles):
x = 130
shadow_offset = 4
#self.canvas.create_rectangle(x-100+shadow_offset, 20+shadow_offset, x+100+shadow_offset, 80+shadow_offset, fill="darkgray", outline="")
#self.canvas.create_rectangle(x-100, 20, x+100, 80, fill="lightgray", outline="gray")
self.canvas.create_text(x, 50, text=title, anchor=N, font=tkfont.Font(size=60))
titles = ["Remark:\r\n分子为在线数量,\r\n分母为Total 数量,\r\n小于55%,显示红色,\r\n50%~75%,显示黄色,\r\n大于75%,显示绿色"]
for i, title in enumerate(titles):
self.canvas.create_text(130, 400, text=title, anchor=N, font=tkfont.Font(size=16))
titles = ["曲线"]
for i, title in enumerate(titles):
x = (i+0.5) * w/3 +300
shadow_offset = 4
#self.canvas.create_rectangle(x-100+shadow_offset, 20+shadow_offset, x+100+shadow_offset, 80+shadow_offset, fill="darkgray", outline="")
#self.canvas.create_rectangle(x-100, 20, x+100, 80, fill="lightgray", outline="gray")
self.canvas.create_text(x, 50, text=title, anchor=N, font=tkfont.Font(size=60))
titles = ["计划"]
for i, title in enumerate(titles):
x = 1080
shadow_offset = 4
#self.canvas.create_rectangle(x-100+shadow_offset, 20+shadow_offset, x+100+shadow_offset, 80+shadow_offset, fill="darkgray", outline="")
#self.canvas.create_rectangle(x-100, 20, x+100, 80, fill="lightgray", outline="gray")
self.canvas.create_text(x, 50, text=title, anchor=N, font=tkfont.Font(size=60))
titles = ["R1_Data"]
for i, title in enumerate(titles):
x = 350
self.canvas.create_rectangle(x-70, 160, x+60, 160+30, fill="lightgray", outline="gray")
self.canvas.create_text(x, 160, text=title, anchor=N, font=tkfont.Font(size=20))
titles = ["R2_Data"]
for i, title in enumerate(titles):
x = 350
self.canvas.create_rectangle(x-70, 460, x+60, 460+30, fill="darkgray", outline="gray")
self.canvas.create_text(x, 460, text=title, anchor=N, font=tkfont.Font(size=20))
self.draw_ring(90, 300, 70, 40)
self.update_connection_status()
self.draw_table(w, h)
# 使用 after 方法更新曲线
self.update_scatter_plots()
def draw_ring(self, x, y, outer_radius, inner_radius):
ratio = self.A / self.B if self.B != 0 else 0
color = "red" if ratio < 0.5 else "yellow" if ratio <= 0.75 else "green"
self.canvas.create_oval(x-outer_radius, y-outer_radius, x+outer_radius, y+outer_radius, fill=color)
self.canvas.create_oval(x-inner_radius, y-inner_radius, x+inner_radius, y+inner_radius, fill="white")
self.connection_text = self.canvas.create_text(x, y, text=f"{self.A}/{self.B}", fill="black")
def update_connection_status(self):
ratio = self.A / self.B if self.B != 0 else 0
color = "red" if ratio < 0.5 else "yellow" if ratio <= 0.75 else "green"
self.canvas.itemconfig(self.connection_text, text=f"{self.A}/{self.B}")
self.canvas.itemconfig(self.connection_text, fill="black")
self.canvas.itemconfig(self.connection_text, state='normal')
self.draw_ring(90, 300, 70, 40)
def draw_table(self, w, h):
# 计划区域的宽度和高度
plan_area_width = w // 3
plan_area_height = h
# 表格应该占用计划区域的大部分空间,但要留出一些边距
margin = 20
table_width = plan_area_width - 2 * margin
table_height = min(150, plan_area_height - 2 * margin) # 确保表格不会过高
# 表格起始坐标,使得表格居中于计划区域
table_x = w - plan_area_width + margin
table_y = (h - table_height) // 2 -30
row_height = 40
col_width = table_width // 4 # 每一列的宽度等于表格宽度的四分之一
# 清除旧表格
self.canvas.delete("table")
# 使用“+”绘制表格
for j in range(6): # 包括表头和四行数据 6
self.canvas.create_line(table_x, table_y + j * row_height, table_x + table_width, table_y + j * row_height, fill="black", tags="table")
for i in range(table_width // col_width + 1):
self.canvas.create_text(table_x + i * col_width, table_y + j * row_height, text="+", fill="black", font=tkfont.Font(size=10), tags="table")
for i in range(6): # 包括边界线和四个分隔线 6 ## 5 ##
self.canvas.create_line(table_x + i * col_width, table_y, table_x + i * col_width, table_y + 5 * row_height, fill="black", tags="table")
# 绘制表头(列标签)
for j, quarter in enumerate(['', 'Q1', 'Q2', 'Q3', 'Q4']):
self.canvas.create_text(table_x + j * col_width + col_width // 2, table_y - 10, text=quarter, font=tkfont.Font(size=30), anchor=N, tags="table")
# 绘制表格内容(行标签与X标记)
for i, row in enumerate(['R1', 'R2', 'R3', 'R4']):
self.canvas.create_text(table_x + col_width // 2, table_y + (i + 1) * row_height, text=row, font=tkfont.Font(size=30), anchor=N, tags="table")
self.canvas.create_text(table_x + (i + 1) * col_width + col_width // 2, table_y + (i + 1) * row_height, text="X", font=tkfont.Font(size=30), anchor=N, tags="table")
def update_scatter_plots(self):
self.draw_scatter_plot('R1', self.R1_LS[-30:], 270, 250, 500, 100)
self.draw_scatter_plot('R2', self.R2_LS[-30:], 270, 350, 500, 100)
self.draw_scatter_plot('R3', self.R3_LS[-30:], 270, 550, 500, 100)
self.draw_scatter_plot('R4', self.R4_LS[-30:], 270, 650, 500, 100)
self.canvas.after(500, self.update_scatter_plots)
def draw_scatter_plot(self, prefix, data_list, start_x, start_y, width, height):
scale = width / max(len(data_list), 1)
self.canvas.delete(f"{prefix}_scatter") # 删除旧的散点图元素
# 增加圆点大小
dot_size = 6 # 圆点直径变为原来的三倍大
for i, value in enumerate(data_list):
x = start_x + i * scale
y = start_y - int(value) * (height / 100)
color = "green" if 23 <= int(value) <= 29 else "red"
self.canvas.create_oval(x - dot_size, y - dot_size, x + dot_size, y + dot_size, fill=color, tags=f"{prefix}_scatter")
# 清除旧的文本标签
self.canvas.delete(f"{prefix}_text")
# 只显示最近进来的2个数字,蓝色显示值
for i, value in enumerate(data_list[-2:]):
text_color = "blue"
self.canvas.create_text(start_x + len(data_list) * scale - i * scale, start_y - height / 2 - 10,
text=value, font=tkfont.Font(size=12), anchor=S, fill=text_color, tags=f"{prefix}_text")
# 中间水平线
self.canvas.create_line(start_x, start_y, start_x + width, start_y, fill="black", tags=f"{prefix}_line")
# 中间分割线
self.canvas.create_line(start_x, start_y - height / 2, start_x + width, start_y - height / 2, fill="black", tags=f"{prefix}_line")
def start_server(self):
try:
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
logging.info(f"Server started at {self.host}:{self.port}")
while True:
client_socket, client_address = self.server_socket.accept()
with self.lock:
self.clients[client_address] = client_socket
self.A += 1
self.update_connection_status()
logging.info(f"Accepted connection from {client_address}")
threading.Thread(target=self.handle_client, args=(client_socket, client_address), daemon=True).start()
except Exception as e:
logging.error(f"Failed to start server: {e}")
finally:
if self.server_socket:
self.server_socket.close()
def handle_client(self, client_socket, client_address):
try:
while True:
current_time = datetime.now().strftime("%H:%M:%S")
data = client_socket.recv(1024)
if not data:
break
decoded_data = data.decode('utf-8')
logging.info(f"{current_time} -- Received from {client_address}: {decoded_data}")
response = current_time
client_socket.sendall(response.encode('utf-8'))
logging.info(f"{current_time} -- response to RobotServer('192.168.0.109', 7777): {response}")
# 解析数据并更新散点图数据
if decoded_data.startswith('R1'):
self.R1_LS.append(decoded_data[2:4])
elif decoded_data.startswith('R2'):
self.R2_LS.append(decoded_data[2:4])
elif decoded_data.startswith('R3'):
self.R3_LS.append(decoded_data[2:4])
elif decoded_data.startswith('R4'):
self.R4_LS.append(decoded_data[2:4])
log_entry = f"[{current_time}]-Recive:{data}---Response:{response}\n"
with open(r"c:\Log6\log.txt", "a") as log_file:
log_file.write(log_entry)
except Exception as e:
logging.error(f"Error handling client {client_address}: {e}")
finally:
client_socket.close()
with self.lock:
del self.clients[client_address]
self.A -= 1
self.update_connection_status()
logging.info(f"Closed connection from {client_address}")
def on_resize(event):
global server
w = event.width
h = event.height
canvas = event.widget
canvas.delete("all")
server.init_canvas()
def main():
global server
root = Tk()
root.title("Design By Tim")
root.geometry("800x600")
canvas = Canvas(root, bg="white")
canvas.pack(fill=BOTH, expand=YES)
server = RobotServer('192.168.0.109', 7788, canvas)
threading.Thread(target=server.start_server, daemon=True).start()
canvas.bind("<Configure>", on_resize)
root.mainloop()
if __name__ == "__main__":
main()