服务端实现
import threading
import time
import wx
from socket import socket, AF_INET, SOCK_STREAM
class LServer(wx.Frame):
def __init__(self):
wx.Frame.__init__(self, None, id=1002, title='L服务器端界面', pos=wx.DefaultPosition, size=(400, 450))
pl = wx.Panel(self)
box = wx.BoxSizer(wx.VERTICAL)
fgz1 = wx.FlexGridSizer(wx.HSCROLL)
start_server_btn = wx.Button(pl, size=(133, 40), label='启动服务')
record_btn = wx.Button(pl, size=(133, 40), label='保存记录')
stop_server_btn = wx.Button(pl, size=(133, 40), label='停止服务')
fgz1.Add(start_server_btn, 1, wx.TOP | wx.LEFT)
fgz1.Add(record_btn, 1, wx.TOP | wx.CENTRE)
fgz1.Add(stop_server_btn, 1, wx.TOP | wx.RIGHT)
box.Add(fgz1, 1, wx.ALIGN_CENTRE)
self.show_text = wx.TextCtrl(pl, size=(400, 410), style=wx.TE_MULTILINE | wx.TE_READONLY)
box.Add(self.show_text, 1, wx.ALIGN_CENTRE)
pl.SetSizer(box)
'''-----------------------------以上代码为界面的绘制代码-----------------------------------'''
'''------------------------------设置服务器功能实现的必要属性----------------------------------'''
self.isOn = False
self.host_port = ('', 8888)
self.server_socket = socket(AF_INET, SOCK_STREAM)
self.server_socket.bind(self.host_port)
self.server_socket.listen(5)
self.session_thread_dict = {}
'''----------------------------------------------------------------'''
self.Bind(wx.EVT_BUTTON, self.start_server, start_server_btn)
self.Bind(wx.EVT_BUTTON, self.save_record, record_btn)
self.Bind(wx.EVT_BUTTON, self.stop_server, stop_server_btn)
def stop_server(self,event):
print('服务器已停止服务')
self.isOn=False
def save_record(self,event):
record_data=self.show_text.GetValue()
with open('record.log','w',encoding='utf-8') as file:
file.write(record_data)
def start_server(self, event):
if not self.isOn:
self.isOn = True
main_thread = threading.Thread(target=self.do_work)
main_thread.daemon = True
main_thread.start()
def do_work(self):
while self.isOn:
session_socket, client_addr = self.server_socket.accept()
user_name = session_socket.recv(1024).decode('utf-8')
session_thread = SessionThread(session_socket, user_name, self)
self.session_thread_dict[user_name] = session_thread
session_thread.start()
self.show_info_and_send_client('服务器通知', f'欢迎{user_name}进入聊天室!',
time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))
self.server_socket.close()
def show_info_and_send_client(self, data_source, data, data_time):
send_data = f'{data_source}:{data}\n时间:{data_time}'
self.show_text.AppendText('-' * 40 + '\n' + send_data + '\n')
for client in self.session_thread_dict.values():
if client.isOn:
client.client_socket.send(send_data.encode('utf-8'))
class SessionThread(threading.Thread):
def __init__(self, client_socket, user_name, server):
threading.Thread.__init__(self)
self.client_socket = client_socket
self.user_name = user_name
self.server = server
self.isOn = True
def run(self):
print(f'客户端:{self.user_name}已经与服务端建立连接.')
while self.isOn:
data = self.client_socket.recv(1024).decode('utf-8');
if data == 'C-DISCONNECT-S':
self.isOn = False
self.server.show_info_and_send_client('服务器通知', f'{self.user_name}离开了聊天室',
time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))
else:
self.server.show_info_and_send_client(self.user_name, data,
time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))
self.client_socket.close()
if __name__ == '__main__':
app = wx.App()
client = LServer()
client.Show()
app.MainLoop()
客户端实现
import threading
import wx
from socket import socket, AF_INET, SOCK_STREAM
class LClinet(wx.Frame):
def __init__(self, clent_name):
wx.Frame.__init__(self, None, id=1001, title=clent_name + '的客户端界面', pos=wx.DefaultPosition,
size=(400, 600))
pl = wx.Panel(self)
box = wx.BoxSizer(wx.VERTICAL)
fgz1 = wx.FlexGridSizer(wx.HSCROLL)
connect_btn = wx.Button(pl, size=(200, 40), label='连接')
disconnect_btn = wx.Button(pl, size=(200, 40), label='断开')
fgz1.Add(connect_btn, 1, wx.TOP | wx.LEFT)
fgz1.Add(disconnect_btn, 1, wx.TOP | wx.RIGHT)
box.Add(fgz1, 1, wx.ALIGN_CENTRE)
self.show_text = wx.TextCtrl(pl, size=(400, 210), style=wx.TE_MULTILINE | wx.TE_READONLY)
box.Add(self.show_text, 1, wx.ALIGN_CENTRE)
self.chat_text = wx.TextCtrl(pl, size=(400, 210), style=wx.TE_MULTILINE)
box.Add(self.chat_text, 1, wx.ALIGN_CENTRE)
fgz2 = wx.FlexGridSizer(wx.HSCROLL)
reset_btn = wx.Button(pl, size=(200, 40), label='重置')
send_btn = wx.Button(pl, size=(200, 40), label='发送')
fgz2.Add(reset_btn, 1, wx.TOP | wx.LEFT)
fgz2.Add(send_btn, 1, wx.TOP | wx.LEFT)
box.Add(fgz2, 1, wx.ALIGN_CENTRE)
pl.SetSizer(box)
'''------------------以上代码时客户端界面的绘制---------------------'''
self.Bind(wx.EVT_BUTTON, self.connect_to_server, connect_btn)
self.client_name = clent_name
self.isConnected = False
self.client_socket = None
self.Bind(wx.EVT_BUTTON, self.send_to_server, send_btn)
self.Bind(wx.EVT_BUTTON, self.disconnect_to_server, disconnect_btn)
self.Bind(wx.EVT_BUTTON, self.reset, reset_btn)
def reset(self,event):
self.chat_text.SetValue('')
def disconnect_to_server(self,event):
self.client_socket.send('C-DISCONNECT-S'.encode('utf-8'))
self.isConnected=False
def send_to_server(self, event):
if self.isConnected:
input_data = self.chat_text.GetValue()
if input_data != '':
self.client_socket.send(input_data.encode('utf-8'))
self.chat_text.SetValue('')
def connect_to_server(self, event):
print(f'客户端{self.client_name}连接服务器成功')
if not self.isConnected:
server_host_port = ('127.0.0.1', 8888)
self.client_socket = socket(AF_INET, SOCK_STREAM)
self.client_socket.connect(server_host_port)
self.client_socket.send(self.client_name.encode('utf-8'))
client_thread = threading.Thread(target=self.recv_data)
client_thread.daemon = True
self.isConnected = True
client_thread.start()
def recv_data(self):
while self.isConnected:
data = self.client_socket.recv(1024).decode('utf-8')
self.show_text.AppendText('-' * 40 + '\n' + data + '\n')
if __name__ == '__main__':
app = wx.App()
name = input('请输入客户端名称:')
client = LClinet(name)
client.Show()
app.MainLoop()