当前位置: 首页 > article >正文

以Python 做服务器,N Robot 做客户端,小小UI,拿捏

先Show 结果~

虚拟开足
 

import socket
import threading
import logging
from tkinter import Tk, Canvas, BOTH, YES, NW, N, mainloop, font as tkfont
from tkinter.constants import S, E, W
from datetime import datetime
import os

# 定义文件夹路径
folder_path = r'c:\Log6'
# 检查文件夹是否存在,如果不存在则创建
if not os.path.exists(folder_path):
    os.makedirs(folder_path)

class RobotServer:
    def __init__(self, host, port, canvas):
        self.host = host
        self.port = port
        self.server_socket = None
        self.clients = {}
        self.lock = threading.Lock()
        self.canvas = canvas
        self.A = 0
        self.B = 4
        self.R1_LS = []
        self.R2_LS = []
        self.R3_LS = []
        self.R4_LS = []

        logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

        # 初始化画布内容
        self.init_canvas()

    def init_canvas(self):
        w = self.canvas.winfo_width()
        h = self.canvas.winfo_height()
        L_w, M_w, R_w = w//3, w//3, w - (w//3 * 2)
        L_h, M_h, R_h = h, h, h

        # 显示整体、曲线、计划的文本标签,字体为60,背景为立体感
        titles = ["整体"]
        for i, title in enumerate(titles):
            x = 130
            shadow_offset = 4
            #self.canvas.create_rectangle(x-100+shadow_offset, 20+shadow_offset, x+100+shadow_offset, 80+shadow_offset, fill="darkgray", outline="")
            #self.canvas.create_rectangle(x-100, 20, x+100, 80, fill="lightgray", outline="gray")
            self.canvas.create_text(x, 50, text=title, anchor=N, font=tkfont.Font(size=60))

        titles = ["Remark:\r\n分子为在线数量,\r\n分母为Total 数量,\r\n小于55%,显示红色,\r\n50%~75%,显示黄色,\r\n大于75%,显示绿色"]
        for i, title in enumerate(titles):
            self.canvas.create_text(130, 400, text=title, anchor=N, font=tkfont.Font(size=16))

        titles = ["曲线"]
        for i, title in enumerate(titles):
            x = (i+0.5) * w/3 +300
            shadow_offset = 4
            #self.canvas.create_rectangle(x-100+shadow_offset, 20+shadow_offset, x+100+shadow_offset, 80+shadow_offset, fill="darkgray", outline="")
            #self.canvas.create_rectangle(x-100, 20, x+100, 80, fill="lightgray", outline="gray")
            self.canvas.create_text(x, 50, text=title, anchor=N, font=tkfont.Font(size=60))

        titles = ["计划"]
        for i, title in enumerate(titles):
            x = 1080
            shadow_offset = 4
            #self.canvas.create_rectangle(x-100+shadow_offset, 20+shadow_offset, x+100+shadow_offset, 80+shadow_offset, fill="darkgray", outline="")
            #self.canvas.create_rectangle(x-100, 20, x+100, 80, fill="lightgray", outline="gray")
            self.canvas.create_text(x, 50, text=title, anchor=N, font=tkfont.Font(size=60))

        titles = ["R1_Data"]
        for i, title in enumerate(titles):
            x = 350
            self.canvas.create_rectangle(x-70, 160, x+60, 160+30, fill="lightgray", outline="gray")
            self.canvas.create_text(x, 160, text=title, anchor=N, font=tkfont.Font(size=20))

        titles = ["R2_Data"]
        for i, title in enumerate(titles):
            x = 350
            self.canvas.create_rectangle(x-70, 460, x+60, 460+30, fill="darkgray", outline="gray")
            self.canvas.create_text(x, 460, text=title, anchor=N, font=tkfont.Font(size=20))


        self.draw_ring(90, 300, 70, 40)    
        self.update_connection_status()   
        self.draw_table(w, h)         

        # 使用 after 方法更新曲线
        self.update_scatter_plots()

    def draw_ring(self, x, y, outer_radius, inner_radius):
        ratio = self.A / self.B if self.B != 0 else 0
        color = "red" if ratio < 0.5 else "yellow" if ratio <= 0.75 else "green"
        self.canvas.create_oval(x-outer_radius, y-outer_radius, x+outer_radius, y+outer_radius, fill=color)
        self.canvas.create_oval(x-inner_radius, y-inner_radius, x+inner_radius, y+inner_radius, fill="white")
        self.connection_text = self.canvas.create_text(x, y, text=f"{self.A}/{self.B}", fill="black")

    def update_connection_status(self):
        ratio = self.A / self.B if self.B != 0 else 0
        color = "red" if ratio < 0.5 else "yellow" if ratio <= 0.75 else "green"
        self.canvas.itemconfig(self.connection_text, text=f"{self.A}/{self.B}")
        self.canvas.itemconfig(self.connection_text, fill="black")
        self.canvas.itemconfig(self.connection_text, state='normal')
        self.draw_ring(90, 300, 70, 40)

    def draw_table(self, w, h):
        # 计划区域的宽度和高度
        plan_area_width = w // 3
        plan_area_height = h

        # 表格应该占用计划区域的大部分空间,但要留出一些边距
        margin = 20
        table_width = plan_area_width - 2 * margin
        table_height = min(150, plan_area_height - 2 * margin)  # 确保表格不会过高

        # 表格起始坐标,使得表格居中于计划区域
        table_x = w - plan_area_width + margin
        table_y = (h - table_height) // 2 -30

        row_height = 40
        col_width = table_width // 4  # 每一列的宽度等于表格宽度的四分之一

        # 清除旧表格
        self.canvas.delete("table")

        # 使用“+”绘制表格
        for j in range(6):  # 包括表头和四行数据  6
            self.canvas.create_line(table_x, table_y + j * row_height, table_x + table_width, table_y + j * row_height, fill="black", tags="table")
            for i in range(table_width // col_width + 1):
                self.canvas.create_text(table_x + i * col_width, table_y + j * row_height, text="+", fill="black", font=tkfont.Font(size=10), tags="table")
        for i in range(6):  # 包括边界线和四个分隔线  6                                           ##       5   ##
            self.canvas.create_line(table_x + i * col_width, table_y, table_x + i * col_width, table_y + 5 * row_height, fill="black", tags="table")

        # 绘制表头(列标签)
        for j, quarter in enumerate(['', 'Q1', 'Q2', 'Q3', 'Q4']):
            self.canvas.create_text(table_x + j * col_width + col_width // 2, table_y - 10, text=quarter, font=tkfont.Font(size=30), anchor=N, tags="table")

        # 绘制表格内容(行标签与X标记)
        for i, row in enumerate(['R1', 'R2', 'R3', 'R4']):
            self.canvas.create_text(table_x + col_width // 2, table_y + (i + 1) * row_height, text=row, font=tkfont.Font(size=30), anchor=N, tags="table")
            self.canvas.create_text(table_x + (i + 1) * col_width + col_width // 2, table_y + (i + 1) * row_height, text="X", font=tkfont.Font(size=30), anchor=N, tags="table")

    def update_scatter_plots(self):
        self.draw_scatter_plot('R1', self.R1_LS[-30:], 270, 250, 500, 100)  
        self.draw_scatter_plot('R2', self.R2_LS[-30:], 270, 350, 500, 100)
        self.draw_scatter_plot('R3', self.R3_LS[-30:], 270, 550, 500, 100)  
        self.draw_scatter_plot('R4', self.R4_LS[-30:], 270, 650, 500, 100)
        self.canvas.after(500, self.update_scatter_plots)  

    def draw_scatter_plot(self, prefix, data_list, start_x, start_y, width, height):
        scale = width / max(len(data_list), 1)
        self.canvas.delete(f"{prefix}_scatter")  # 删除旧的散点图元素
        
        # 增加圆点大小
        dot_size = 6  # 圆点直径变为原来的三倍大

        for i, value in enumerate(data_list):
            x = start_x + i * scale
            y = start_y - int(value) * (height / 100)
            color = "green" if 23 <= int(value) <= 29 else "red"
            self.canvas.create_oval(x - dot_size, y - dot_size, x + dot_size, y + dot_size, fill=color, tags=f"{prefix}_scatter")  

        # 清除旧的文本标签
        self.canvas.delete(f"{prefix}_text")

        # 只显示最近进来的2个数字,蓝色显示值
        for i, value in enumerate(data_list[-2:]):
            text_color = "blue"
            self.canvas.create_text(start_x + len(data_list) * scale - i * scale, start_y - height / 2 - 10, 
                                    text=value, font=tkfont.Font(size=12), anchor=S, fill=text_color, tags=f"{prefix}_text")

        # 中间水平线
        self.canvas.create_line(start_x, start_y, start_x + width, start_y, fill="black", tags=f"{prefix}_line")
        # 中间分割线
        self.canvas.create_line(start_x, start_y - height / 2, start_x + width, start_y - height / 2, fill="black", tags=f"{prefix}_line")

    def start_server(self):
        try:
            self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.server_socket.bind((self.host, self.port))
            self.server_socket.listen(5)
            logging.info(f"Server started at {self.host}:{self.port}")
            while True:
                client_socket, client_address = self.server_socket.accept()
                with self.lock:
                    self.clients[client_address] = client_socket
                    self.A += 1
                    self.update_connection_status()
                logging.info(f"Accepted connection from {client_address}")
                threading.Thread(target=self.handle_client, args=(client_socket, client_address), daemon=True).start()
        except Exception as e:
            logging.error(f"Failed to start server: {e}")
        finally:
            if self.server_socket:
                self.server_socket.close()

    def handle_client(self, client_socket, client_address):
        try:
            while True:
                current_time = datetime.now().strftime("%H:%M:%S")
                data = client_socket.recv(1024)
                if not data:
                    break
                decoded_data = data.decode('utf-8')
                logging.info(f"{current_time} -- Received from {client_address}: {decoded_data}")
                response = current_time
                client_socket.sendall(response.encode('utf-8'))
                logging.info(f"{current_time} -- response to RobotServer('192.168.0.109', 7777): {response}")

                # 解析数据并更新散点图数据
                if decoded_data.startswith('R1'):
                    self.R1_LS.append(decoded_data[2:4])
                elif decoded_data.startswith('R2'):
                    self.R2_LS.append(decoded_data[2:4])
                elif decoded_data.startswith('R3'):
                    self.R3_LS.append(decoded_data[2:4])
                elif decoded_data.startswith('R4'):
                    self.R4_LS.append(decoded_data[2:4])

                log_entry = f"[{current_time}]-Recive:{data}---Response:{response}\n"  
                with open(r"c:\Log6\log.txt", "a") as log_file:  
                    log_file.write(log_entry)  

        except Exception as e:
            logging.error(f"Error handling client {client_address}: {e}")
        finally:
            client_socket.close()
            with self.lock:
                del self.clients[client_address]
                self.A -= 1
                self.update_connection_status()
            logging.info(f"Closed connection from {client_address}")

def on_resize(event):
    global server
    w = event.width
    h = event.height
    canvas = event.widget
    canvas.delete("all")
    server.init_canvas()

def main():
    global server
    root = Tk()
    root.title("Design By Tim")
    root.geometry("800x600")

    canvas = Canvas(root, bg="white")  
    canvas.pack(fill=BOTH, expand=YES)

    server = RobotServer('192.168.0.109', 7788, canvas)

    threading.Thread(target=server.start_server, daemon=True).start()

    canvas.bind("<Configure>", on_resize)

    root.mainloop()

if __name__ == "__main__":
    main()


http://www.kler.cn/a/517088.html

相关文章:

  • 【真机调试】前端开发:移动端特殊手机型号有问题,如何在电脑上进行调试?
  • Flink运行时架构
  • Java 高级工程师面试高频题:JVM+Redis+ 并发 + 算法 + 框架
  • 深度学习系列76:流式tts的一个简单实现
  • 聊一聊 CSS 样式的导入方式
  • 云原生时代,如何构建高效分布式监控系统
  • 如何使用Midjourney生成中国蛇年的灵蛇绘画作品
  • Spring WebSocket 与 STOMP 协议结合实现私聊私信功能
  • 【Golang 面试题】每日 3 题(四十三)
  • Linux下动静态库的制作与使用
  • C#编程:List.ForEach与foreach循环的深度对比
  • vim在命令模式下的查找功能
  • Redis内部数据结构--跳表详解
  • 【2024年华为OD机试】 (A卷,100分)- 微服务的集成测试(JavaScriptJava PythonC/C++)
  • 【算法篇】从汉明重量的基础理解到高效位运算优化详解
  • AI如何帮助解决生活中的琐碎难题?
  • 智能风控 数据分析 groupby、apply、reset_index组合拳
  • Cosmos学习记录
  • Databend x 沉浸式翻译 | 基于 Databend Cloud 构建高效低成本的业务数据分析体系
  • C++/CLI(Common Language Runtime)关键点详解
  • JDK14特性Java 原生代码编译工具jpackage
  • SpringBoot自定义实现触发器模型的starter
  • 【期末速成】软件设计模式与体系结构
  • 把网站程序数据上传到服务器的方法和注意事项
  • 针对业务系统的开发,如何做需求分析和设计?
  • 【数据结构】_基于顺序表实现通讯录