[Python学习日记-82] 网络编程基础实战 —— 多用户 FTP(File Transfer Protocol,文件传输协议)项目
[Python学习日记-82] 网络编程基础实战 —— 多用户 FTP(File Transfer Protocol,文件传输协议)项目
简介
开发要求
实现:多用户 FTP 项目
简介
网络编程的基础知识已经结束了,前面我们通过 socket 来模拟 SSH 远程执行命令,以及传输文件功能,那我们可以结合前面学习的知识来实现一个 FTP 项目。开发要求如下。
开发要求
1、用户加密认证
2、允许同时多用户登录(需要并发编程,目前还没学习,不作要求)
3、每个用户有自己的家目录 ,且只能访问自己的家目录
4、对用户进行磁盘配额,每个用户的可用空间不同
5、允许用户在 ftp server 上随意切换目录
6、允许用户查看当前目录下文件
7、允许上传和下载文件,保证文件一致性
8、文件传输过程中显示进度条
9、附加功能:支持文件的断点续传
实现:多用户 FTP 项目
一、非多线程的 FTP 项目
项目代码:https://github.com/529507929/Multi_User_FTP
目录结构:
./Multi_User_FTP/
| -- client/
| | -- bin/ # 客户端启动脚本
| | -- conf/ # 客户端配置文件
| | -- core/ # 客户端主程序
| | -- download/ # 用于存放客户端从服务器端下载的文件
| | -- share/ # 用于存放客户端需要上传到服务器端的文件
| | -- log/ # 用于存放客户端的日志
|
| -- doc/ # 用于存放一些说明文档
| -- server/
| | -- bin/ # 服务器端启动脚本
| | -- conf/ # 服务器端配置文件
| | -- core/ # 服务器端主程序
| | -- home/ # 不同用户的家目录,用于存放上传和可下载的文件
| | | -- jove/
| | | -- kerry/
| | -- log/ # 用于存放服务器端的日志
程序说明:
README.md:
实现功能:
该程序是一个文件传输系统,该系统拥有文件的上传下载功能以及基本的目录查看功能,可以使用的命令为 dir(查看当前目录下的文件)
cd(进入指定目录) get(下载指定文件) put(上传指定的文件) del(删除指定文件与目录) mkdir(创建目录),同时get和put还有
断点续传的功能,get与put只能下载文件,无法下载目录,输入的命令需要严格遵守语法规则示例当中的符号示意:
{} 当中的option必填其之一,| 或,* 代表所有,option_* 任意option,[] 必填,abs_path:绝对路径,
relative_path:当前目录下的文件路径 例:share\1.mp4;当中规定的空格是严格规定;下面将会讲解命令的使用规范:
查看当前目录下的文件(dir)
dir {abs_path | relative_path}
进入指定目录(cd)
cd {.. | abs_path | relative_path}
下载指定文件(get)
get [filename]
# filename只能是Server上该用户share目录下的文件才能下载,Client的下载路径可在Client conf/setting.py上设置
上传指定的文件(put)
put [filename]
# filename只能上传到Server上的recv目录下,Client的上传路径可在Client conf/setting.py上设置
删除指定文件与目录(del)
del {folder | file}
# 如果删除folder的话会连folder内的文件也删除
创建目录 mkdir
mkdir {abs_path | relative_path | multi_path}断点续传:
get:下载的断点续传功能只能保证断网时或者是服务器崩溃时才能保存下断点时的数据到undo.log,如果强制退出程序将会无法记录数据,只能从头开始下载
put:上传断点续传的前提是服务器是稳定的,当客户端崩溃或者断网时,服务器将会保存断点时的数据到undo.log导入虚拟数据:
Client:本地目录下的share文件夹下的文件
Server:home目录下的所有文件启动项目:
进入到程序所在目录
./Multi_User_FTP/client/bin
python ftp_client.py
./Multi_User_FTP/server/bin
python ftp_server.py
流程图:
服务器端:
bin/ftp_server.py:
from Multi_User_FTP.server.core.main import Main
start = Main()
start.run()
conf/account.py:账户信息
account = {
'jove': {
'password': 'e10adc3949ba59abbe56e057f20f883e', # 123456
'home': r'G:\luffy\Project\Multi_User_FTP\server\home\jove',
'quotation': 400.0 # MB
},
'kerry': {
'password': 'e10adc3949ba59abbe56e057f20f883e', # 123456
'home': r'G:\luffy\Project\Multi_User_FTP\server\home\kerry',
'quotation': 500.0
}
}
conf/settings.py:设置参数
import socket
import os
ADDRES_FAMILY = socket.AF_INET
ADDRESS_TYPE = socket.SOCK_STREAM
IS_REUSE_ADDRESS = True
MAX_LISTEN = 5
MAX_RECV = 1024
HOST = '127.0.0.1'
PORT = 8080
HOME_DIR = r'G:\luffy\Project\Multi_User_FTP\server\home'
SERVER_ROOT_DIR = '\\'.join(os.getcwd().split('\\')[:-1])
core/_cd.py:
import os
from Multi_User_FTP.server.conf.account import account
from Multi_User_FTP.server.conf import settings
class Cd:
def __init__(self, cmds, server, conn):
if len(cmds) > 1:
self.new_dir = cmds[1]
else:
self.new_dir = None
self.cmds = cmds
self.old_dir = server.pwd
self.server = server
self.conn = conn
self.result = False
def run(self):
if self.new_dir == '..':
self.server.header(self.conn, self.server.pwd)
old_dir_list = self.old_dir.split('\\')
if old_dir_list[-1] != self.server.username:
self.server.pwd = '\\'.join(old_dir_list[:-1])
log = '%s enter %s' % (self.server.username, self.server.pwd)
self.server.wri_log(settings.SERVER_ROOT_DIR, 'log\\server.log', log)
print(log)
elif self.new_dir:
account_path = account[self.server.username]['home']
comple_path = self.server.path_process(self.new_dir)
if os.path.exists(comple_path) and account_path == comple_path[:len(account_path)]: # 判断路径是否存在
self.server.pwd = comple_path
self.result = True
# 记录日志
log = '%s enter %s' % (self.server.username, comple_path)
self.server.wri_log(settings.SERVER_ROOT_DIR, 'log\\server.log', log)
print(log)
self.server.header(self.conn, self.result)
core/_del.py:
import os
from Multi_User_FTP.server.conf.account import account
from Multi_User_FTP.server.conf import settings
class Del:
def __init__(self, cmds, server, conn):
if len(cmds) > 1:
self.new_path = cmds[1]
else:
self.new_path = None
self.cmds = cmds
self.server = server
self.conn = conn
def run(self):
account_path = account[self.server.username]['home']
comple_path = self.server.path_process(self.new_path)
is_file = self.server.is_file(comple_path)
if os.path.exists(comple_path) and account_path == comple_path[:len(account_path)]:
if is_file:
os.remove(comple_path)
log = '%s del file by %s' % (self.server.username, comple_path)
self.server.wri_log(settings.SERVER_ROOT_DIR, 'log\\server.log', log)
print(log)
self.server.header(self.conn, True)
elif not is_file:
for root, dirs, file in os.walk(comple_path, topdown=False): # 清空指定文件夹内的文件
for name in file:
os.remove(os.path.join(root, name))
for name in dirs:
os.rmdir(os.path.join(root, name))
os.rmdir(comple_path) # 删除指定的文件夹
log = '%s del dir by %s' % (self.server.username, comple_path)
self.server.wri_log(settings.SERVER_ROOT_DIR, 'log\\server.log', log)
print(log)
self.server.header(self.conn, True)
else:
log = '%s del file find ERROR!' % self.server.username
self.server.wri_log(settings.SERVER_ROOT_DIR, 'log\\server.log', log)
print(log)
self.server.header(self.conn, False)
else:
self.server.header(self.conn, 'not found')
core/_dir.py:
import os
from Multi_User_FTP.server.conf import settings
class Dir:
def __init__(self, cmds, server, conn):
self.server = server
if len(cmds) > 1:
self.path = server.path_process(cmds[1])
# os.path.join(self.server.pwd, cmds[1])
else:
self.path = self.server.pwd
self.conn = conn
def run(self):
if os.path.exists(self.path):
file_list = os.listdir(self.path)
self.server.header(self.conn, file_list)
else:
log = '%s path in cmd, not this path.' % self.server.username
self.server.wri_log(settings.SERVER_ROOT_DIR, 'log\\server.log', log)
print(log)
self.server.header(self.conn, log)
core/_get.py:
import os
import hashlib
from Multi_User_FTP.server.conf import settings
class Get:
def __init__(self, cmds, server, conn, cmd_header_dic):
self.cmds = cmds
self.server = server
self.conn = conn
self.cmd_header_dic = cmd_header_dic
def run(self):
filename = self.cmds[1]
user_share_dir = '%s/%s/%s' % (settings.HOME_DIR, self.server.username, 'share')
file_dir = '%s/%s' % (user_share_dir, filename)
if not os.path.exists(file_dir): # 判断服务器中是否有该文件
self.server.header(self.conn, False)
return
else:
self.server.header(self.conn, True)
done_size = 0
if self.cmd_header_dic['done_size'] != 0:
done_size = self.cmd_header_dic['done_size']
with open(file_dir, 'rb') as f:
file_md5 = hashlib.md5(f.read())
header_dic = {
'filename': filename,
'md5': file_md5.hexdigest(),
'file_size': os.path.getsize(file_dir)
}
self.server.header(self.conn, header_dic)
with open(file_dir, 'rb') as f:
f.seek(done_size) # 根据Client传输过来的以传输的数据大小来移动指针
for line in f:
self.conn.send(line)
# 记录日志
log = '%s get %s.' % (self.server.username, file_dir)
self.server.wri_log(settings.SERVER_ROOT_DIR, 'log\\server.log', log)
print(log)
core/_mkdir.py:
import os
from Multi_User_FTP.server.conf import settings
class Mkdir:
def __init__(self, cmds, server, conn):
if len(cmds) > 1:
self.new_path = cmds[1]
else:
self.new_path = None
self.cmds = cmds
self.server = server
self.conn = conn
def run(self):
comple_path = self.server.path_process(self.new_path)
os.makedirs(comple_path)
# 记录日志
log = '%s make dir %s' % (self.server.username, comple_path)
self.server.wri_log(settings.SERVER_ROOT_DIR, 'log\\server.log', log)
print(log)
self.server.header(self.conn, True)
core/_put.py:
import hashlib
import os
from Multi_User_FTP.server.conf import settings
class Put:
def __init__(self, cmds, server, conn, cmd_header_dic):
self.cmds = cmds
self.server = server
self.conn = conn
self.cmd_header_dic = cmd_header_dic
def run(self):
user_recv_dir = '%s/%s/%s' % (settings.HOME_DIR, self.server.username, 'recv')
mode = 'ab'
is_undo_file = False
filename = self.cmds[1] # 如果断点续传选择没有改变则和Client的同名
file_dir = '%s/%s' % (user_recv_dir, filename) # 没有改名的原目录
rec_size = 0 # 已经接受了的文件大小
rename = None
# 发送断点续传信息
undo_dic = self.server.get_undo(settings.SERVER_ROOT_DIR, 'log\\undo.log')
self.server.header(self.conn, undo_dic)
# 断点续传选择
undo_file = undo_dic[self.server.username]['put']
if undo_file:
is_undo_file = self.server.unheader(self.conn)
if is_undo_file is True:
filename = self.server.unheader(self.conn)
rec_size = undo_file[filename][2]
else:
mode = 'wb'
# 判断有无改名
if not undo_file[filename][3] is None:
file_dir = '%s/%s' % (user_recv_dir, undo_file[filename][3])
print(file_dir)
elif undo_file[filename][3] is None:
file_dir = '%s/%s' % (user_recv_dir, filename)
# 判断上传来的数据是否是文件夹
is_dir = self.server.unheader(self.conn)
if is_dir: # 如果是文件夹则停止put程序的运行
# 记录日志
log = '%s try put not file class to server.' % self.server.username
self.server.wri_log(settings.SERVER_ROOT_DIR, 'log\\server.log', log)
print(log)
return
""" 正常上传流程 """
header_dic = self.server.unheader(self.conn)
file_md5 = header_dic['md5']
if is_undo_file is True:
total_size = undo_file[filename][1]
else:
total_size = header_dic['file_size']
# 检查文件是否已经存在
num = 0
self.server.header(self.conn, os.path.exists(file_dir))
while os.path.exists(file_dir) and not is_undo_file:
# 判断文件是否已经存在,如果在的话可以秒传
with open(file_dir, 'rb') as f:
server_file_md5 = hashlib.md5(f.read()).hexdigest()
if header_dic['md5'] == server_file_md5:
self.server.header(self.conn, True) # MD5一样的话就返回True
return
else:
self.server.header(self.conn, False)
num += 1
rename_list = filename.split('.') # 这里重命名的文件名是保存到本地的文件名
rename_list[0] += '(%d)' % num
rename = '.'.join(rename_list)
file_dir = '%s/%s' % (user_recv_dir, rename)
mode = 'wb'
with open(file_dir, mode) as f:
while rec_size < total_size:
try:
line = self.conn.recv(settings.MAX_RECV)
f.write(line)
rec_size += len(line)
except ConnectionResetError:
# 与客户端连接断开,记录断点续传信息
undo_dic[self.server.username]['put'][filename] = [file_dir, total_size, rec_size, rename]
self.server.set_undo(settings.SERVER_ROOT_DIR, 'log\\undo.log', undo_dic)
# 记录日志
log = '%s put %s interrupt.' % (self.server.username, file_dir)
self.server.wri_log(settings.SERVER_ROOT_DIR, 'log\\server.log', log)
print(log)
return
else:
if is_undo_file is True:
del undo_dic[self.server.username]['put'][filename]
self.server.set_undo(settings.SERVER_ROOT_DIR, 'log\\undo.log', undo_dic)
# 记录日志
log = '%s put undo file %s.' % (self.server.username, file_dir)
self.server.wri_log(settings.SERVER_ROOT_DIR, 'log\\server.log', log)
print(log)
# MD5验证
print(file_dir)
with open(file_dir, 'rb') as f:
put_file_md5 = hashlib.md5(f.read()).hexdigest()
check_resutl = False
if file_md5 == put_file_md5:
check_resutl = True
else:
os.remove(file_dir)
self.server.header(self.conn, check_resutl)
core/_sys_cmd.py:运行系统命令
import subprocess
class SysCmd:
def __init__(self, cmd, server, conn):
self.cmd = cmd
self.server = server
self.conn = conn
def run(self):
"""
运行系统命令
:return:
"""
print(self.cmd)
obj = subprocess.Popen(self.cmd, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout = obj.stdout.read()
stderr = obj.stderr.read()
header_dic = {
'md5': 'xxxxdxxx',
'total_size': len(stdout) + len(stderr)
}
self.server.header(self.conn, header_dic)
self.conn.send(stdout)
self.conn.send(stderr)
core/features.py:
from Multi_User_FTP.server.core._get import Get
from Multi_User_FTP.server.core._put import Put
from Multi_User_FTP.server.core._cd import Cd
from Multi_User_FTP.server.core._sys_cmd import SysCmd
from Multi_User_FTP.server.core._dir import Dir
from Multi_User_FTP.server.core._del import Del
from Multi_User_FTP.server.core._mkdir import Mkdir
class Features:
def __init__(self, server, conn, cmds, cmd_header_dic):
self.sys_obj = SysCmd(' '.join(cmds), server, conn)
self.conn = conn
self.server = server
self.cmds = cmds
self.cmd_header_dic = cmd_header_dic
def find(self):
func_name = '_%s' % self.cmds[0]
if hasattr(self, func_name):
func = getattr(self, func_name)
func()
def _get(self, *args):
obj = Get(self.cmds, self.server, self.conn, self.cmd_header_dic)
obj.run()
def _put(self, *args):
obj = Put(self.cmds, self.server, self.conn, self.cmd_header_dic)
obj.run()
def _cd(self, *args):
obj = Cd(self.cmds, self.server, self.conn)
obj.run()
def _dir(self, *args):
obj = Dir(self.cmds, self.server, self.conn)
obj.run()
def _del(self, *args):
obj = Del(self.cmds, self.server, self.conn)
obj.run()
def _mkdir(self, *args):
obj = Mkdir(self.cmds, self.server, self.conn)
obj.run()
core/main.py:
from Multi_User_FTP.server.core import server_socket
from Multi_User_FTP.server.core.features import Features
from Multi_User_FTP.server.conf import settings
from Multi_User_FTP.server.conf.account import account
class Main:
def __init__(self):
self.server = server_socket.ServerSocket((settings.HOST, settings.PORT))
self.server_address = self.server.server_address
self.conn = None
self.client_addr = None
self.username = None
self.password = None
def run(self):
print('start....')
self.server.is_login = False # 把登录状态记录在server面向对象当中
while True: # 链接循环
self.conn, self.client_addr = self.server.accpet()
self.server.header(self.conn, self.server.is_login) # 当Client链接Server时发送登录状态给客户端
while self.server.is_login is False:
try:
if not self.server.is_login:
account_info = self.server.unheader(self.conn)
self.username = account_info[0]
self.password = account_info[1]
if self.username in account.keys() and self.password == account[self.username]['password']:
self.server.is_login = True
self.server.username = self.username # 属于username的socket
self.server.pwd = account[self.username]['home']
self.server.header(self.conn, self.server.is_login) # 登录成功返回登录状态
print('%s is login from %s' % (self.username, self.client_addr))
break
else:
self.server.header(self.conn, self.server.is_login) # 登录失败返回登录状态
print('%s try login by %s, result is bad.' % (self.client_addr, self.username))
continue
except ConnectionResetError:
print(123)
break
else:
self.server.header(self.conn, self.server.is_login) # 当Client链接Server时发送登录状态给客户端
print('%s is login from %s' % (self.username, self.client_addr))
while True: # 通讯循环
try:
res = self.server.unheader(self.conn)
if not res: break
cmds = res['cmd'].split()
func = Features(self.server, self.conn, cmds, res)
func.find()
except ConnectionResetError:
break
self.server.is_login = False
self.server.close()
core/server_socket.py:
import socket
import struct
import json
import os
from Multi_User_FTP.server.conf import settings
class ServerSocket:
address_family = settings.ADDRES_FAMILY
address_type = settings.ADDRESS_TYPE
is_reuse_address = settings.IS_REUSE_ADDRESS
max_listen = settings.MAX_LISTEN
max_recv = settings.MAX_RECV
def __init__(self, server_address, bind_and_active=True):
self.server_address = server_address
self.socket = socket.socket(self.address_family, self.address_type)
if bind_and_active:
try:
self.bind()
self.listen()
except:
self.socket.close()
raise
def bind(self):
if self.is_reuse_address:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(self.server_address)
def listen(self):
self.socket.listen(self.max_listen)
def accpet(self):
return self.socket.accept()
def recv(self):
self.socket.recv(self.max_recv)
def send(self, msg):
self.socket.send(msg)
def close(self):
self.socket.close()
def header(self, conn, header_info):
header_json = json.dumps(header_info)
header_bytes = header_json.encode('utf-8')
conn.send(struct.pack('i', len(header_bytes)))
conn.send(header_bytes)
def unheader(self, conn):
obj = conn.recv(4)
header_size = struct.unpack('i', obj)
header_bytes = conn.recv(header_size[0])
header_json = header_bytes.decode('utf-8')
header_info = json.loads(header_json)
return header_info
def path_process(self, new_path):
new_path_list = new_path.split('\\')
comple_path_list = self.pwd.split('\\')
if os.path.isabs(new_path): # 识别是否绝对路径
comple_path = new_path
else:
comple_path_list.extend(new_path_list)
comple_path = '\\'.join(comple_path_list)
return comple_path
def is_file(self, path):
if os.path.isfile(path):
return True
elif os.path.isdir(path):
return False
def wri_log(self, root_dir, log_dir, log):
new_log_dir = os.path.join(root_dir, '%s.new' % log_dir)
old_log_dir = os.path.join(root_dir, log_dir)
f_log = open(old_log_dir, 'r', encoding='utf-8')
new_f_log = open(new_log_dir, 'w', encoding='utf-8')
for line in f_log:
new_f_log.write(line)
f_log.close()
new_f_log.write(log)
new_f_log.close()
os.replace(new_log_dir, old_log_dir)
def get_undo(self, root_dir, log_dir):
undone_log_dir = os.path.join(root_dir, log_dir)
if os.path.getsize(undone_log_dir) == 0:
with open(undone_log_dir, 'w') as f:
dic = {
self.username: {
'put': {}
}
}
json.dump(dic, f)
else:
with open(undone_log_dir, 'r') as f:
dic = json.load(f)
return dic
def set_undo(self, root_dir, log_dir, undone_dic):
undone_log_dir = os.path.join(root_dir, log_dir)
with open(undone_log_dir, 'w') as f:
json.dump(undone_dic, f)
f.flush()
客户端:
bin/ftp_client.py:
import sys,os
sys.path.append('\\'.join(os.getcwd().split('\\')[:-3]))
from Multi_User_FTP.client.core.main import Main
if __name__ == '__main__':
start = Main()
start.run()
conf/settings.py:
import socket
import os
ADDRES_FAMILY = socket.AF_INET
ADDRESS_TYPE = socket.SOCK_STREAM
MAX_RECV = 1024
HOST = '127.0.0.1'
PORT = 8080
DOWNLOADS_DIR = r'G:\luffy\Project\Multi_User_FTP\client\downloads'
SHARE_DIR = r'G:\luffy\Project\Multi_User_FTP\client\share'
CLIENT_ROOT_DIR = '\\'.join(os.getcwd().split('\\')[:-1])
core/_cd.py:
from Multi_User_FTP.client.conf import settings
class Cd:
def __init__(self, cmds, client):
if len(cmds) > 1:
self.new_path = cmds[1]
else:
self.new_path = None
self.cmds = cmds
self.client = client
self.socket = client.socket
def run(self):
cmd_header_dic = {'cmd': ' '.join(self.cmds)}
self.client.send_cmd(cmd_header_dic)
if self.new_path == '..':
old_path = self.client.unheader(self.socket)
old_path_list = old_path.split('\\')
if old_path_list[-1] == self.client.username:
# 记录日志
log = 'Insufficient permissions, unable to return to previous level.'
self.client.wri_log(settings.CLIENT_ROOT_DIR, 'log\\client.log', log)
print(log)
elif self.new_path:
result = self.client.unheader(self.socket)
if not result:
# 记录日志
log = 'Not this path or insufficient permissions, unable to return to previous level.'
self.client.wri_log(settings.CLIENT_ROOT_DIR, 'log\\client.log', log)
print(log)
else:
# 记录日志
log = 'cmd is error'
self.client.wri_log(settings.CLIENT_ROOT_DIR, 'log\\client.log', log)
print(log)
core/_del.py:
from Multi_User_FTP.client.conf import settings
class Del:
def __init__(self, cmds, client):
if len(cmds) > 1:
self.new_path = cmds[1]
else:
self.new_path = None
self.cmds = cmds
self.client = client
self.socket = client.socket
def run(self):
cmd_header_dic = {'cmd': ' '.join(self.cmds)}
self.client.send_cmd(cmd_header_dic)
del_result = self.client.unheader(self.socket)
if del_result is True:
# 记录日志
log = 'Del complete.'
self.client.wri_log(settings.CLIENT_ROOT_DIR, 'log\\client.log', log)
print(log)
elif del_result == 'not found':
# 记录日志
log = 'Path not found.'
self.client.wri_log(settings.CLIENT_ROOT_DIR, 'log\\client.log', log)
print(log)
else:
# 记录日志
log = 'Del failure.'
self.client.wri_log(settings.CLIENT_ROOT_DIR, 'log\\client.log', log)
print(log)
core/_dir.py:
class Dir:
def __init__(self, cmds, client):
self.cmds = cmds
self.client = client
self.socket = client.socket
def run(self):
cmd_header_dic = {'cmd': ' '.join(self.cmds)}
self.client.send_cmd(cmd_header_dic)
file_list = self.client.unheader(self.socket)
if isinstance(file_list, list):
for i in range(0, len(file_list)):
if i+1 % 5 == 0 or i+1 == len(file_list):
pri_str = file_list[i] + '\n'
else:
pri_str = file_list[i] + ' '
print(pri_str, end='')
else:
print(file_list)
core/_get.py:
import hashlib
import os
from Multi_User_FTP.client.core.progress_bar import ProgressBar
from Multi_User_FTP.client.conf import settings
class Get:
def __init__(self, cmds, client):
self.cmds = cmds
self.client = client
self.socket = self.client.socket
self.undo = {}
def run(self):
if len(self.cmds) < 2:
print('cmd is error')
return
undo_dic = self.client.get_undo(settings.CLIENT_ROOT_DIR, 'log\\undo.log')
undo_file = undo_dic[self.client.username]['get']
cmd_header_dic = {
'cmd': ' '.join(self.cmds), # get 1.mp4 1.mp4一定是Server的文件名
'done_size': 0 # 用于记录断点续传已经完成的数据大小
}
is_undo_file = False # 记录是否断点传输的数据
file_choice = None # 断点续传选择的本地文件名
# 断点续传选择
if undo_file:
for filename in undo_file:
file_info = undo_file[filename]
print('%s\ttotal %d\trecv %d' % (filename, file_info[1], file_info[2]))
while True:
continue_choice = input('Whether to undo file transfer?(Y/N)\n>>: ').strip()
if continue_choice.lower() == 'y':
while True: # 判断是否有该文件的循环
file_choice = input('Choice undo file transfer.\n>>: ').strip()
if file_choice in list(undo_file.keys()):
self.cmds[1] = undo_file[file_choice][3] # 传输到Server的filename
is_undo_file = True # 选择需要继续传输的文件后,变为True
break
else:
print('Not %s, please choose again.' % file_choice)
cmd_header_dic['cmd'] = ' '.join(self.cmds)
cmd_header_dic['done_size'] = undo_file[file_choice][2]
self.client.send_cmd(cmd_header_dic) # 发送命令道server
break
elif continue_choice.lower() == 'n':
self.client.send_cmd(cmd_header_dic) # 发送命令道server
break
else:
print('Only input Y/N.')
else: # 如果没有数据则进行正常的下载
self.client.send_cmd(cmd_header_dic)
has_file = self.client.unheader(self.socket)
if not has_file:
print('Not this file.')
return
""" 正常流程下的下载代码 """
if is_undo_file is True:
filename = file_choice # 如果是断点续传那就需要更改为本地保存的文件名
else:
filename = self.cmds[1] # 这里的文件名指的是Client本地的名字
user_downloads_dir = settings.DOWNLOADS_DIR
file_dir = '%s/%s' % (user_downloads_dir, filename)
header_dic = self.client.unheader(self.socket)
file_md5 = header_dic['md5']
total_size = header_dic['file_size']
mode = 'ab'
# 检查是否重名
while os.path.exists(file_dir) and not is_undo_file:
exists_choice = input('File already exists, Y --> rename, N --> cover.\n>>: ').strip()
if exists_choice.lower() == 'y':
filename = input('Input new file name.\n>>: ').strip() # 这里重命名的文件名是保存到本地的文件名
file_dir = '%s/%s' % (user_downloads_dir, filename)
elif exists_choice.lower() == 'n':
mode = 'wb'
break
else:
print('Only input Y/N.')
with open(file_dir, mode) as f:
recv_size = cmd_header_dic['done_size']
while recv_size < total_size:
try:
line = self.socket.recv(settings.MAX_RECV)
f.write(line)
recv_size += len(line)
progress_bar = ProgressBar(recv_size, total_size)
tran_result = progress_bar.run()
if tran_result is True:
if is_undo_file: # 如果是断点续传的文件则删除记录
del undo_dic[self.client.username]['get'][filename]
self.client.set_undo(settings.CLIENT_ROOT_DIR, 'log\\undo.log', undo_dic)
# 记录日志
log = '%s download complete.' % filename
self.client.wri_log(settings.CLIENT_ROOT_DIR, 'log\\client.log', log)
print(log)
except ConnectionResetError:
print('\n', end='')
undo_dic[self.client.username]['get'][filename] = [file_dir, total_size, recv_size, self.cmds[1]]
self.client.set_undo(settings.CLIENT_ROOT_DIR, 'log\\undo.log', undo_dic)
exit('\nDisconnect from server.')
with open(file_dir, 'rb') as f:
download_file_md5 = hashlib.md5(f.read()).hexdigest()
if download_file_md5 == file_md5: # 校验下载文件的MD5值,并提示相关验证结果
# 记录日志
log = 'Check MD5 done to %s' % file_dir
self.client.wri_log(settings.CLIENT_ROOT_DIR, 'log\\client.log', log)
print('Check MD5 done to %s.' % filename)
else:
# 记录日志
log = 'MD5 is incorrect to %s' % file_dir
self.client.wri_log(settings.CLIENT_ROOT_DIR, 'log\\client.log', log)
print('MD5 is incorrect to %s.' % filename)
core/_mkdir.py:
from Multi_User_FTP.client.conf import settings
class Mkdir:
def __init__(self, cmds, client):
self.cmds = cmds
self.client = client
self.socket = client.socket
def run(self):
cmd_header_dic = {'cmd': ' '.join(self.cmds)}
self.client.send_cmd(cmd_header_dic)
mkdir_result = self.client.unheader(self.socket)
if mkdir_result is True:
# 记录日志
log = 'Mkdir complete.'
self.client.wri_log(settings.CLIENT_ROOT_DIR, 'log\\client.log', log)
print(log)
else:
# 记录日志
log = 'Mkdir failure.'
self.client.wri_log(settings.CLIENT_ROOT_DIR, 'log\\client.log', log)
print(log)
core/_put.py:
import os
import hashlib
from Multi_User_FTP.client.core.progress_bar import ProgressBar
from Multi_User_FTP.client.conf import settings
class Put:
def __init__(self, cmds, client):
self.cmds = cmds
self.client = client
self.socket = self.client.socket
def run(self):
if len(self.cmds) < 2:
return
filename = self.cmds[1]
user_share_dir = settings.SHARE_DIR
send_size = 0
# 命令发送
cmd_header_dic = {
'cmd': ' '.join(self.cmds),
}
self.client.header(self.socket, cmd_header_dic)
# self.client.send_cmd(cmd_header_dic) # 发送cmd到server
# 接收断点续传信息
undo_dic = self.client.unheader(self.socket)
# 断点续传选择
is_undo_file = False # 记录是否断点传输的数据
undo_file = undo_dic[self.client.username]['put']
if undo_file:
for name in undo_file:
file_info = undo_file[name]
print('%s \ttotal %d \tsend %d \trename %s' % (name, file_info[1], file_info[2], file_info[3]))
while True:
continue_choice = input('Whether to undo file transfer?(Y/N)\n>>: ').strip()
if continue_choice.lower() == 'y':
while True: # 判断是否有该文件的循环
file_choice = input('Choice undo file transfer.\n>>: ').strip()
if file_choice in list(undo_file.keys()):
self.cmds[1] = file_choice
filename = file_choice
is_undo_file = True # 选择需要继续传输的文件后,变为True
break
else:
print('Not %s, please choose again.' % file_choice)
send_size = undo_dic[self.client.username]['put'][file_choice][2]
self.client.header(self.socket, is_undo_file)
self.client.header(self.socket, file_choice)
break
elif continue_choice.lower() == 'n':
self.client.header(self.socket, is_undo_file)
break
else:
print('Only input Y/N.')
# 判断是否为文件,put只能传输文件
is_dir = False
if not os.path.isfile('%s/%s' % (user_share_dir, filename)):
is_dir = True
self.client.header(self.socket, is_dir)
print('Only put file.')
return
else:
self.client.header(self.socket, is_dir)
""" 传输准备 """
with open('%s/%s' % (user_share_dir, filename), 'rb') as f:
file_md5 = hashlib.md5(f.read())
header_dic = {
'filename': filename,
'md5': file_md5.hexdigest(),
'file_size': os.path.getsize('%s/%s' % (user_share_dir, filename))
}
self.client.header(self.socket, header_dic)
# 是否已经存在与Server上,是的话直接秒传
is_has_file = self.client.unheader(self.socket)
if is_has_file and not is_undo_file:
compare_md5_result = self.client.unheader(self.socket)
if compare_md5_result is True:
progress_bar = ProgressBar(header_dic['file_size'], header_dic['file_size'])
progress_bar.run()
return
""" 开始传输 """
with open('%s/%s' % (user_share_dir, filename), 'rb') as f:
f.seek(send_size)
for line in f:
self.socket.send(line)
send_size += len(line)
progress_bar = ProgressBar(send_size, header_dic['file_size'])
progress_bar.run()
check_result = self.client.unheader(self.socket)
if is_undo_file is True:
# 记录日志
log = '%s again upload completed.' % '%s/%s' % (user_share_dir, filename)
self.client.wri_log(settings.CLIENT_ROOT_DIR, 'log\\client.log', log)
print('%s again upload completed.' % filename)
elif check_result:
# 记录日志
log = '%s upload completed.' % '%s/%s' % (user_share_dir, filename)
self.client.wri_log(settings.CLIENT_ROOT_DIR, 'log\\client.log', log)
print('%s upload completed.' % filename)
else:
# 记录日志
log = '%s upload failed.' % '%s/%s' % (user_share_dir, filename)
self.client.wri_log(settings.CLIENT_ROOT_DIR, 'log\\client.log', log)
print('%s upload failed.' % filename)
core/_sys_cmd.py:
from Multi_User_FTP.client.conf import settings
class SysCmd:
def __init__(self, cmds, client):
self.cmds = cmds
self.client = client
self.socket = self.client.socket
def run(self):
cmd_header_dic = {'cmd': ' '.join(self.cmds)}
self.client.send_cmd(cmd_header_dic)
header_dic = self.client.unheader(self.socket)
total_size = header_dic['total_size']
recv_size = 0
recv_info = b''
while recv_size < total_size:
recv_info += self.socket.recv(settings.MAX_RECV)
recv_size = len(recv_info)
print(recv_info.decode('gbk')) # 使用windows系统所以解析为gbk
core/client_socket.py:
import socket
import struct
import json
import os
from Multi_User_FTP.client.conf import settings
class ClientSocket:
address_family = settings.ADDRES_FAMILY
address_type = settings.ADDRESS_TYPE
max_recv = settings.MAX_RECV
def __init__(self, server_address, bind_and_active=True):
self.server_address = server_address
self.socket = socket.socket(self.address_family, self.address_type)
if bind_and_active:
try:
self.connect()
except:
self.socket.close()
raise
def connect(self):
self.socket.connect(self.server_address)
def close(self):
self.socket.close()
def send_cmd(self, dic):
cmd_header_dic = dic
self.header(self.socket, cmd_header_dic)
def header(self, conn, header_info):
header_json = json.dumps(header_info)
header_bytes = header_json.encode('utf-8')
conn.send(struct.pack('i', len(header_bytes)))
conn.send(header_bytes)
def unheader(self, conn):
obj = conn.recv(4)
header_size = struct.unpack('i', obj)
header_bytes = conn.recv(header_size[0])
header_json = header_bytes.decode('utf-8')
header_info = json.loads(header_json)
return header_info
def wri_log(self, root_dir, log_dir, log):
new_log_dir = os.path.join(root_dir, '%s.new' % log_dir)
old_log_dir = os.path.join(root_dir, log_dir)
f_log = open(old_log_dir, 'r', encoding='utf-8')
new_f_log = open(new_log_dir, 'w', encoding='utf-8')
for line in f_log:
new_f_log.write(line)
f_log.close()
new_f_log.write(log+'\n')
new_f_log.close()
os.replace(new_log_dir, old_log_dir)
def get_undo(self, root_dir, log_dir):
undone_log_dir = os.path.join(root_dir, log_dir)
if os.path.getsize(undone_log_dir) == 0:
with open(undone_log_dir, 'w') as f:
dic = {
self.username: {
'get': {}
}
}
json.dump(dic, f)
else:
with open(undone_log_dir, 'r') as f:
dic = json.load(f)
return dic
def set_undo(self, root_dir, log_dir, undone_dic):
undone_log_dir = os.path.join(root_dir, log_dir)
with open(undone_log_dir, 'w') as f:
json.dump(undone_dic, f)
f.flush()
core/features.py:
from Multi_User_FTP.client.core._get import Get
from Multi_User_FTP.client.core._put import Put
from Multi_User_FTP.client.core._pwd import Pwd
from Multi_User_FTP.client.core._cd import Cd
from Multi_User_FTP.client.core._dir import Dir
from Multi_User_FTP.client.core._del import Del
from Multi_User_FTP.client.core._sys_cmd import SysCmd
from Multi_User_FTP.client.core._mkdir import Mkdir
class Features:
def __init__(self, client, cmds):
self.sys_obj = SysCmd(cmds, client)
self.client = client
self.cmds = cmds
def find(self):
func_name = '_%s' % self.cmds[0]
if hasattr(self, func_name):
func = getattr(self, func_name)
func()
def _get(self, *args):
obj = Get(self.cmds, self.client)
obj.run()
def _put(self, *args):
obj = Put(self.cmds, self.client)
obj.run()
def _pwd(self, *args):
obj = Pwd(self.cmds, self.client)
obj.run()
def _cd(self, *args):
obj = Cd(self.cmds, self.client)
obj.run()
def _dir(self, *args):
obj = Dir(self.cmds, self.client)
obj.run()
def _del(self, *args):
obj = Del(self.cmds, self.client)
obj.run()
def _mkdir(self, *args):
obj = Mkdir(self.cmds, self.client)
obj.run()
core/main.py:
import sys
import socket
import hashlib
from Multi_User_FTP.client.core import client_socket
from Multi_User_FTP.client.core.features import Features
from Multi_User_FTP.client.conf import settings
class Main:
def __init__(self):
self.client = client_socket.ClientSocket((settings.HOST, settings.PORT))
self.server_address = self.client.server_address
self.socket = self.client.socket
def run(self):
# a = sys.argv
is_login = self.client.unheader(self.socket) # 启动Client,接收Server发送的登录状态
while True:
if is_login is False:
username = input('username:').strip()
password = input('password:').strip()
password_hash = hashlib.md5(password.encode('utf-8')).hexdigest()
print(password_hash)
self.client.header(self.socket, (username, password_hash))
is_login = self.client.unheader(self.socket)
if not is_login:
print('username or password is bad.')
continue
self.client.username = username # 当登录成功后把username与客户端关联起来
cmd = input('>>: ').strip()
if not cmd: continue
cmds = cmd.split()
func = Features(self.client, cmds)
func.find()
core/progress_bar.py:
import sys
class ProgressBar:
def __init__(self, tran_size, total_size):
self.tran_size = tran_size
self.total_size = total_size
def run(self):
part = self.total_size / 50 # 2% 数据的大小
count = int(self.tran_size / part)
# sys.stdout.write('\r') # 将指针调回开头
print('\r', end='')
# sys.stdout.write('[%-50s]%.2f%%' % (('>' * count), (self.tran_size / self.total_size) * 100))
print('[%-50s]%.2f%%' % (('>' * count), (self.tran_size / self.total_size) * 100), end='', flush=True)
# sys.stdout.flush() # python2 为了强制显示到屏幕上
if self.tran_size >= self.total_size:
# sys.stdout.write('\n') # 在结束的时候在结尾加上'\n'换行
print('\n', end='')
return True
代码输出如下:
下载:
上传:
删除:
创建目录:
二、多线程 FTP 项目
项目代码:https://github.com/529507929/Multi_User_FTP_Thread
目录结构:
./Multi_User_FTP_Thread/
| -- client/
| | -- bin/ # 客户端启动脚本
| | -- conf/ # 客户端配置文件
| | -- core/ # 客户端主程序
| | -- download/ # 用于存放客户端从服务器端下载的文件
| | -- share/ # 用于存放客户端需要上传到服务器端的文件
| | -- log/ # 用于存放客户端的日志
|
| -- doc/ # 用于存放一些说明文档
| -- server/
| | -- bin/ # 服务器端启动脚本
| | -- conf/ # 服务器端配置文件
| | -- core/ # 服务器端主程序
| | -- home/ # 不同用户的家目录,用于存放上传和可下载的文件
| | | -- jove/
| | | -- kerry/
| | -- log/ # 用于存放服务器端的日志
程序说明:
README.md:
实现功能
该程序是一个多线程文件传输系统,该系统最大支持10个用户并发使用,并拥有文件的上传下载功能以及基本的目录查看功能,可以使用的命令为 dir(查看当前目录下的文件)
set(设置用户参数) cd(进入指定目录) get(下载指定文件) put(上传指定的文件) del(删除指定文件与目录) mkdir(创建目录),同时get和put还有
断点续传的功能,get与put只能下载文件,无法下载目录,输入的命令需要严格遵守语法规则示例当中的符号示意:
{} 当中的option必填其之一,| 或,* 代表所有,option_* 任意option,[] 必填,abs_path:绝对路径,
relative_path:当前目录下的文件路径 例:share\1.mp4;当中规定的空格是严格规定;下面将会讲解命令的使用规范:
查看当前目录下的文件(dir)
dir {abs_path | relative_path}
设置用户参数(set)
设置用户储存空间: set {-q | --quotation} {int_* | float_*}
进入指定目录(cd)
cd {.. | abs_path | relative_path}
下载指定文件(get)
get [filename]
# filename只能是Server上该用户share目录下的文件才能下载,Client的下载路径可在Client conf/setting.py上设置
上传指定的文件(put)
put [filename]
# filename只能上传到Server上的recv目录下,Client的上传路径可在Client conf/setting.py上设置
删除指定文件与目录(del)
del {folder | file}
# 如果删除folder的话会连folder内的文件也删除
创建目录 mkdir
mkdir {abs_path | relative_path | multi_path}断点续传:
get:下载的断点续传功能,当断点发生时数据保存到undo.log
put:上传的断点续传功能,当断点发生时数据保存到undo.log导入虚拟数据:
Client:本地目录下的share文件夹下的文件与conf的数据
Server:home目录下的所有文件与conf的数据启动项目:
进入到程序所在目录
./Multi_User_FTP_Thread/client/bin
python ftp_client.py -s ftp_server_ip -P ftp_server_port
./Multi_User_FTP_Thread/server/bin
python ftp_server.py start
流程图:
服务器端:
bin/ftp_server.py:
import sys,os
# 添加环境变量
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
if __name__ == '__main__': # 当手动调用的时候才会执行,当被当作模块导入时不会被执行
from core import management
argv_parser = management.ManagementTool(sys.argv)
argv_parser.execute()
conf/account.py:
from conf import settings
account = {
"jove": {
"password": "e10adc3949ba59abbe56e057f20f883e", # 123456
"home": r"%s\%s" % (settings.HOME_DIR, 'jove'),
"quotation": 400
},
"kerry": {
"password": "e10adc3949ba59abbe56e057f20f883e", # 123456
"home": r"%s\%s" % (settings.HOME_DIR, 'kerry'),
"quotation": 500
}
}
conf/settings.py:
import socket
import os
ADDRES_FAMILY = socket.AF_INET
ADDRESS_TYPE = socket.SOCK_STREAM
IS_REUSE_ADDRESS = True
MAX_LISTEN = 5
MAX_RECV = 1024 # 消息最长1024
MAX_ACCOUNT_CONCURRENT = 10
HOST = '127.0.0.1'
PORT = 8080
HOME_DIR = r'%s\%s' % ('\\'.join(os.getcwd().split('\\')[:-1]),'home')
SERVER_ROOT_DIR = '\\'.join(os.getcwd().split('\\')[:-1])
conf/quotation_db:
{"jove": 500.0, "kerry": 500.0}
core/_cd.py:
import os
from conf.account import account
from conf import settings
class Cd:
def __init__(self, cmds, server, conn, pwd, username):
if len(cmds) > 1:
self.new_dir = cmds[1]
else:
self.new_dir = None
self.cmds = cmds
self.old_dir = pwd
self.server = server
self.conn = conn
self.result = False
self.pwd = pwd
self.username = username
def run(self):
if self.new_dir == '..':
self.server.header(self.conn, self.pwd)
old_dir_list = self.old_dir.split('\\')
if old_dir_list[-1] != self.username:
self.pwd = '\\'.join(old_dir_list[:-1])
log = '%s enter %s' % (self.username, self.pwd)
self.server.wri_log(settings.SERVER_ROOT_DIR, 'log\\server.log', log)
print(log)
elif self.new_dir:
account_path = account[self.username]['home']
comple_path = self.server.path_process(self.new_dir, self.pwd)
if os.path.exists(comple_path) and account_path == comple_path[:len(account_path)]: # 判断路径是否存在
self.pwd = comple_path
self.result = True
# 记录日志
log = '%s enter %s' % (self.username, comple_path)
self.server.wri_log(settings.SERVER_ROOT_DIR, 'log\\server.log', log)
print(log)
self.server.header(self.conn, self.result)
return self.pwd
core/_del.py:
import os
from conf.account import account
from conf import settings
class Del:
def __init__(self, cmds, server, conn, pwd, username):
if len(cmds) > 1:
self.new_path = cmds[1]
else:
self.new_path = None
self.cmds = cmds
self.server = server
self.conn = conn
self.username = username
self.pwd = pwd
def run(self):
account_path = account[self.username]['home']
comple_path = self.server.path_process(self.new_path, self.pwd)
is_file = self.server.is_file(comple_path)
if os.path.exists(comple_path) and account_path == comple_path[:len(account_path)]:
if is_file:
os.remove(comple_path)
log = '%s del file by %s' % (self.username, comple_path)
self.server.wri_log(settings.SERVER_ROOT_DIR, 'log\\server.log', log)
print(log)
self.server.header(self.conn, True)
elif not is_file:
for root, dirs, file in os.walk(comple_path, topdown=False): # 清空指定文件夹内的文件
for name in file:
os.remove(os.path.join(root, name))
for name in dirs:
os.rmdir(os.path.join(root, name))
os.rmdir(comple_path) # 删除指定的文件夹
log = '%s del dir by %s' % (self.username, comple_path)
self.server.wri_log(settings.SERVER_ROOT_DIR, 'log\\server.log', log)
print(log)
self.server.header(self.conn, True)
else:
log = '%s del file find ERROR!' % self.username
self.server.wri_log(settings.SERVER_ROOT_DIR, 'log\\server.log', log)
print(log)
self.server.header(self.conn, False)
else:
self.server.header(self.conn, 'not found')
core/_dir.py:
import os
from conf import settings
class Dir:
def __init__(self, cmds, server, conn, pwd):
self.server = server
if len(cmds) > 1:
self.path = server.path_process(cmds[1])
# os.path.join(self.server.pwd, cmds[1])
else:
self.path = pwd
self.conn = conn
def run(self):
if os.path.exists(self.path):
file_list = os.listdir(self.path)
self.server.header(self.conn, file_list)
else:
log = '%s path in cmd, not this path.' % self.server.username
self.server.wri_log(settings.SERVER_ROOT_DIR, 'log\\server.log', log)
print(log)
self.server.header(self.conn, log)
core/_get.py:
import os
import hashlib
from conf import settings
class Get:
def __init__(self, cmds, server, conn, username, cmd_header_dic):
self.cmds = cmds
self.server = server
self.conn = conn
self.cmd_header_dic = cmd_header_dic
self.username = username
def run(self):
filename = self.cmds[1]
user_share_dir = '%s/%s/%s' % (settings.HOME_DIR, self.username, 'share')
file_dir = '%s/%s' % (user_share_dir, filename)
if os.path.exists(file_dir): # 判断服务器中是否有该文件
self.server.header(self.conn, True)
else:
self.server.header(self.conn, False)
return
done_size = 0
if self.cmd_header_dic['done_size'] != 0:
done_size = self.cmd_header_dic['done_size']
with open(file_dir, 'rb') as f:
file_md5 = hashlib.md5(f.read())
header_dic = {
'filename': filename,
'md5': file_md5.hexdigest(),
'file_size': os.path.getsize(file_dir)
}
self.server.header(self.conn, header_dic)
with open(file_dir, 'rb') as f:
f.seek(done_size) # 根据Client传输过来的以传输的数据大小来移动指针
for line in f:
self.conn.send(line)
# 记录日志
log = '%s get %s.' % (self.username, file_dir)
self.server.wri_log(settings.SERVER_ROOT_DIR, 'log\\server.log', log)
print(log)
core/_mkdir.py:
import os
from conf import settings
class Mkdir:
def __init__(self, cmds, server, conn, pwd, username):
if len(cmds) > 1:
self.new_path = cmds[1]
else:
self.new_path = None
self.cmds = cmds
self.server = server
self.conn = conn
self.pwd = pwd
self.username = username
def run(self):
comple_path = self.server.path_process(self.new_path, self.pwd)
os.makedirs(comple_path)
# 记录日志
log = '%s make dir %s' % (self.username, comple_path)
self.server.wri_log(settings.SERVER_ROOT_DIR, 'log\\server.log', log)
print(log)
self.server.header(self.conn, True)
core/_put.py:
import hashlib
import os
from conf import settings
class Put:
def __init__(self, cmds, server, conn, username, cmd_header_dic):
self.cmds = cmds
self.server = server
self.conn = conn
self.cmd_header_dic = cmd_header_dic
self.username = username
def run(self):
user_recv_dir = '%s/%s/%s' % (settings.HOME_DIR, self.username, 'recv')
mode = 'ab'
is_undo_file = False
filename = self.cmds[1] # 如果断点续传选择没有改变则和Client的同名
file_dir = '%s/%s' % (user_recv_dir, filename) # 没有改名的原目录
rec_size = 0 # 已经接受了的文件大小
rename = None
# 发送断点续传信息
undo_dic = self.server.get_undo(self.username, settings.SERVER_ROOT_DIR, 'log\\undo.log')
self.server.header(self.conn, undo_dic)
# 断点续传选择
undo_file = undo_dic[self.username]['put']
if undo_file:
is_undo_file = self.server.unheader(self.conn)
if is_undo_file:
filename = self.server.unheader(self.conn)
rec_size = undo_file[filename][2]
# 判断有无改名
# is_setfilename = undo_file[filename][3]
if undo_file[filename][3] is None:
file_dir = '%s/%s' % (user_recv_dir, filename)
else:
file_dir = '%s/%s' % (user_recv_dir, undo_file[filename][3])
print(file_dir)
else:
mode = 'wb'
# 判断上传来的数据是否是文件夹
is_error = self.server.unheader(self.conn) # 客户端返回上传的数据是否符合要求 True代表不符合
if is_error: # 如果是文件夹则停止put程序的运行
# 记录日志
log = '%s try put not file class to server.' % self.username
self.server.wri_log(settings.SERVER_ROOT_DIR, 'log\\server.log', log)
print(log)
return
""" 正常上传流程 """
header_dic = self.server.unheader(self.conn)
file_md5 = header_dic['md5']
if is_undo_file is True:
total_size = undo_file[filename][1]
else:
total_size = header_dic['file_size']
# 检查文件是否已经存在
num = 0
self.server.header(self.conn, os.path.exists(file_dir))
while os.path.exists(file_dir) and not is_undo_file:
# 判断文件是否已经存在,如果在的话可以秒传
with open(file_dir, 'rb') as f:
server_file_md5 = hashlib.md5(f.read()).hexdigest()
if header_dic['md5'] == server_file_md5:
self.server.header(self.conn, True) # MD5一样的话就返回True
return
else:
self.server.header(self.conn, False)
num += 1
rename_list = filename.split('.') # 这里重命名的文件名是保存到本地的文件名
rename_list[0] += '(%d)' % num
rename = '.'.join(rename_list)
file_dir = '%s/%s' % (user_recv_dir, rename)
mode = 'wb'
with open(file_dir, mode) as f:
while rec_size < total_size:
try:
line = self.conn.recv(settings.MAX_RECV)
f.write(line)
rec_size += len(line)
except ConnectionResetError:
# 与客户端连接断开,记录断点续传信息
undo_dic[self.username]['put'][filename] = [file_dir, total_size, rec_size, rename]
self.server.set_undo(settings.SERVER_ROOT_DIR, 'log\\undo.log', undo_dic)
# 记录日志
log = '%s put %s interrupt.' % (self.username, file_dir)
self.server.wri_log(settings.SERVER_ROOT_DIR, 'log\\server.log', log)
print(log)
return
else:
if is_undo_file is True:
del undo_dic[self.username]['put'][filename]
self.server.set_undo(settings.SERVER_ROOT_DIR, 'log\\undo.log', undo_dic)
# 记录日志
log = '%s put undo file %s.' % (self.username, file_dir)
self.server.wri_log(settings.SERVER_ROOT_DIR, 'log\\server.log', log)
print(log)
# MD5验证
print(file_dir)
with open(file_dir, 'rb') as f:
put_file_md5 = hashlib.md5(f.read()).hexdigest()
check_resutl = False
if file_md5 == put_file_md5:
check_resutl = True
else:
os.remove(file_dir)
self.server.header(self.conn, check_resutl)
core/_set.py:
from conf import settings
from conf.account import account
import json
class Set:
def __init__(self, cmds, server, conn, username, total_quotation_dic):
self.cmds = cmds
self.server = server
self.conn = conn
self.username = username
self.total_quotation_dic = total_quotation_dic
def run(self):
"""
命令运行方法
:return:
"""
option = self.cmds[1]
parameter = float(self.cmds[2])
quotation_options = ['-q', '--quotation']
if option in quotation_options:
self._quotation(parameter)
else:
print('%s input %s option is bad.' % (self.username, option))
def _quotation(self, parameter):
"""
用户设置使用容量
:return:
"""
try:
self.total_quotation_dic[self.username] = parameter
with open(r'%s\%s\%s' % (settings.SERVER_ROOT_DIR,'conf','quotation_db'),'w') as f:
json.dump(self.total_quotation_dic, f)
log = '%s change quotation to %f MB!' % (self.username, parameter)
self.server.wri_log(settings.SERVER_ROOT_DIR, 'log\\server.log', log)
print(log)
self.server.header(self.conn, True)
except Exception as e:
log = '%s change quotation is false! --> %s' % (self.username, e)
self.server.wri_log(settings.SERVER_ROOT_DIR, 'log\\server.log', log)
print(log)
self.server.header(self.conn, False)
core/_sys_cmd.py:
import subprocess
class SysCmd:
def __init__(self, cmd, server, conn):
self.cmd = cmd
self.server = server
self.conn = conn
def run(self):
"""
运行系统命令
:return:
"""
print(self.cmd)
obj = subprocess.Popen(self.cmd, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout = obj.stdout.read()
stderr = obj.stderr.read()
header_dic = {
'md5': 'xxxxdxxx',
'total_size': len(stdout) + len(stderr)
}
self.server.header(self.conn, header_dic)
self.conn.send(stdout)
self.conn.send(stderr)
core/features.py:
from core._get import Get
from core._put import Put
from core._pwd import Pwd
from core._cd import Cd
from core._sys_cmd import SysCmd
from core._dir import Dir
from core._del import Del
from core._mkdir import Mkdir
from core._set import Set
class Features:
def __init__(self, server, username, conn, pwd, total_quotation_dic):
# self.sys_obj = SysCmd(' '.join(cmds), server, conn) # 运行系统命令
self.conn = conn
self.server = server
self.username = username
self.pwd = pwd
self.total_quotation_dic = total_quotation_dic
self.cmds = None
def find(self, cmd_header_dic, cmds):
self.cmds = cmds
func_name = '_%s' % self.cmds[0]
if hasattr(self, func_name):
func = getattr(self, func_name)
func(cmd_header_dic)
def _get(self, *args):
obj = Get(self.cmds, self.server, self.conn, self.username, args[0])
obj.run()
def _put(self, *args):
obj = Put(self.cmds, self.server, self.conn, self.username, args[0])
obj.run()
def _pwd(self, *args):
obj = Pwd(self.server, self.conn, self.pwd)
obj.run()
def _cd(self, *args):
obj = Cd(self.cmds, self.server, self.conn, self.pwd, self.username)
self.pwd = obj.run() # 每变更当前目录就要改变以下self.pwd的参数
def _dir(self, *args):
obj = Dir(self.cmds, self.server, self.conn, self.pwd)
obj.run()
def _del(self, *args):
obj = Del(self.cmds, self.server, self.conn, self.pwd, self.username)
obj.run()
def _mkdir(self, *args):
obj = Mkdir(self.cmds, self.server, self.conn, self.pwd, self.username)
obj.run()
def _set(self, *args):
obj = Set(self.cmds, self.server, self.conn, self.username, self.total_quotation_dic)
obj.run()
core/main.py:
from core import server_socket
from core.features import Features
from conf import settings
from conf.account import account
from threading import Thread, currentThread # 用于测试,查看当前所在线程
import queue
import os
import json
class Main:
"""
服务器主程序
"""
def __init__(self, management_instance):
self.management_instance = management_instance
self.server = server_socket.ServerSocket((settings.HOST, settings.PORT))
self.q = queue.Queue(settings.MAX_ACCOUNT_CONCURRENT)
self.server_address = self.server.server_address
self.sock = None # 服务器的socket
self.client_addr = None
self.username = None
self.password = None
self.cmds = None
def run(self):
"""
主线程,等待客户端链接的方法
接待处,迎接客户端的地方
:return:
"""
print('starting FTP server on %s:%s'.center(50, '-') % (settings.HOST, settings.PORT))
while True: # 链接循环
conn, self.client_addr = self.server.accept()
try:
t = Thread(target=self.handle, args=(conn,))
self.q.put(t) # 每来一个客户端就put一个线程进去队列当中,如果满了后面进来的就等待着
print(t)
t.start()
except Exception as e:
print(e)
conn.close()
self.q.get()
self.server.socket.close()
def handle(self, conn):
"""
用于处理每一个客户端的登录信息与命令信息
服务员,每一个线程就有一个这种服务员
:param conn: 当前线程的通道(conn)
:return:
"""
is_login = False # 每个客户端链接上来一定是未登录状态
is_login = self._auth(conn, is_login) # 登录模块
with open(r'%s\%s\%s' % (settings.SERVER_ROOT_DIR, 'conf', 'quotation_db'), 'r') as f: # 读取空间配额
total_quotation_dic = json.load(f)
user_quotation = (self.get_dir_size(account[self.username]['home']),total_quotation_dic[self.username]) # 获取用户当前已使用容量与可使用的总容量 MB为单位
self.server.header(conn, user_quotation)
pwd = account[self.username]['home'] # 初始化当前目录
func = Features(self.server, self.username, conn, pwd, total_quotation_dic) # 初始话功能类的同时也要导入初始话变量
while is_login:
if is_login:
try:
res = self.server.unheader(conn)
if not res:
conn.close()
self.q.get()
self.cmds = res['cmd'].split()
func.find(res, self.cmds)
except Exception as e:
print(e)
conn.close()
self.q.get()
break
def _auth(self, conn, is_login):
"""
登陆方法,用于处理登录信息
:param conn: 当前线程的通道(conn),用于与客户端通信
:param is_login: 登录结果,初始值为False
:return: is_login 为判断是否已经登录
"""
while not is_login:
try:
account_info = self.server.unheader(conn)
self.username = account_info[0]
self.password = account_info[1]
if self.username in account.keys() and self.password == account[self.username]['password']:
is_login = True
# self.server.username = self.username # 属于username的socket
self.server.header(conn, is_login) # 登录成功返回登录状态
print('%s is login from %s' % (self.username, self.client_addr))
break
else:
self.server.header(conn, is_login) # 登录失败返回登录状态
print('%s try login by %s, result is bad.' % (self.client_addr, self.username))
continue
except ConnectionResetError:
is_login = False
break
return is_login
def get_dir_size(self, path, total_size=0):
"""
获取目录大小
:param path: 需要获取的目录路径
:param total_size: 目录的总大小
:return:
"""
file_list = os.listdir(path)
for filename in file_list:
path_tmp = os.path.join(path,filename)
if os.path.isfile(path_tmp):
file_size = os.path.getsize(path_tmp)
total_size += file_size
elif os.path.isdir(path_tmp): # 当是文件夹的时候递归计算文件夹当中的文件大小
total_size = self.get_dir_size(path_tmp, total_size)
return total_size # 字节(byte)为单位
core/management.py:
from core import main
class ManagementTool(object):
"""负责对用户输入的指令进行解析并调用相应模块处理"""
def __init__(self,sys_argv):
self.sys_argv = sys_argv
self.verify_argv()
def verify_argv(self):
"""验证指令合法性"""
if len(self.sys_argv) < 2:
self.help_msg()
cmd = self.sys_argv[1]
if not hasattr(self,cmd):
print('invalid argument!')
self.help_msg()
def help_msg(self):
msg = '''
start start FTP server
stop stop FTP server
restart restart FTP server
createuser username create a ftp user
'''
exit(msg)
def execute(self):
"""解析并执行指令"""
cmd = self.sys_argv[1]
func = getattr(self,cmd)
func()
def start(self):
"""start ftp server"""
server = main.Main(self)
server.run()
def creteuser(self):
print(self.sys_argv)
core/server_socket.py:
import socket
import struct
import json
import os
from conf import settings
class ServerSocket:
address_family = settings.ADDRES_FAMILY
address_type = settings.ADDRESS_TYPE
is_reuse_address = settings.IS_REUSE_ADDRESS
max_listen = settings.MAX_LISTEN
max_recv = settings.MAX_RECV
def __init__(self, server_address, bind_and_active=True):
self.server_address = server_address
self.socket = socket.socket(self.address_family, self.address_type)
if bind_and_active:
try:
self.bind()
self.listen()
except Exception:
self.socket.close()
raise
def bind(self):
if self.is_reuse_address:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(self.server_address)
def listen(self):
self.socket.listen(self.max_listen)
def accept(self):
return self.socket.accept()
def recv(self):
self.socket.recv(self.max_recv)
def send(self, msg):
self.socket.send(msg)
def close(self):
self.socket.close()
def header(self, conn, header_info):
header_json = json.dumps(header_info)
header_bytes = header_json.encode('utf-8')
conn.send(struct.pack('i', len(header_bytes)))
conn.send(header_bytes)
def unheader(self, conn):
obj = conn.recv(4)
header_size = struct.unpack('i', obj)
header_bytes = conn.recv(header_size[0])
header_json = header_bytes.decode('utf-8')
header_info = json.loads(header_json)
return header_info
def path_process(self, new_path, pwd):
new_path_list = new_path.split('\\')
comple_path_list = pwd.split('\\')
if os.path.isabs(new_path): # 识别是否绝对路径
comple_path = new_path
else:
comple_path_list.extend(new_path_list)
comple_path = '\\'.join(comple_path_list)
return comple_path
def is_file(self, path):
if os.path.isfile(path):
return True
elif os.path.isdir(path):
return False
def wri_log(self, root_dir, log_dir, log):
new_log_dir = os.path.join(root_dir, '%s.new' % log_dir)
old_log_dir = os.path.join(root_dir, log_dir)
f_log = open(old_log_dir, 'r', encoding='utf-8')
new_f_log = open(new_log_dir, 'w', encoding='utf-8')
for line in f_log:
new_f_log.write(line)
f_log.close()
new_f_log.write(log)
new_f_log.close()
os.replace(new_log_dir, old_log_dir)
def get_undo(self, username, root_dir, log_dir):
undone_log_dir = os.path.join(root_dir, log_dir)
if os.path.getsize(undone_log_dir) == 0: # 当字典为空时,输入初始json {}
with open(undone_log_dir, 'w') as f:
dic = {}
json.dump(dic, f)
with open(undone_log_dir, 'r') as f: # 读取文件中的字典
dic = json.load(f)
if username not in list(dic.keys()): # 判断有无该用户名
dic[username] = {'put': {}}
with open(undone_log_dir, 'w') as fw: # 没有则写入
json.dump(dic, fw)
return dic
def set_undo(self, root_dir, log_dir, undone_dic):
undone_log_dir = os.path.join(root_dir, log_dir)
with open(undone_log_dir, 'w') as f:
json.dump(undone_dic, f)
f.flush()
客户端:
bin/ftp_client.py:
import sys,os
# 添加环境变量
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
if __name__ == '__main__': # 当手动调用的时候才会执行,当被当作模块导入时不会被执行
from core.main import Main
start = Main()
start.run()
conf/settings.py:
import socket
import os
ADDRES_FAMILY = socket.AF_INET
ADDRESS_TYPE = socket.SOCK_STREAM
MAX_RECV = 1024
HOST = '127.0.0.1'
PORT = 8080
DOWNLOADS_DIR = r'G:\luffy\Project\Multi_User_FTP_Thread\client\downloads'
SHARE_DIR = r'G:\luffy\Project\Multi_User_FTP_Thread\client\share'
CLIENT_ROOT_DIR = '\\'.join(os.getcwd().split('\\')[:-1])
core/_cd.py:
from conf import settings
class Cd:
def __init__(self, cmds, client):
if len(cmds) > 1:
self.new_path = cmds[1]
else:
self.new_path = None
self.cmds = cmds
self.client = client
self.socket = client.socket
def run(self):
cmd_header_dic = {'cmd': ' '.join(self.cmds)}
self.client.send_cmd(cmd_header_dic)
if self.new_path == '..':
old_path = self.client.unheader(self.socket)
old_path_list = old_path.split('\\')
if old_path_list[-1] == self.client.username:
# 记录日志
log = 'Insufficient permissions, unable to return to previous level.'
self.client.wri_log(settings.CLIENT_ROOT_DIR, 'log\\client.log', log)
print(log)
elif self.new_path:
result = self.client.unheader(self.socket)
if not result:
# 记录日志
log = 'Not this path or insufficient permissions, unable to return to previous level.'
self.client.wri_log(settings.CLIENT_ROOT_DIR, 'log\\client.log', log)
print(log)
else:
# 记录日志
log = 'cmd is error'
self.client.wri_log(settings.CLIENT_ROOT_DIR, 'log\\client.log', log)
print(log)
core/_del.py:
from conf import settings
class Del:
def __init__(self, cmds, client):
if len(cmds) > 1:
self.new_path = cmds[1]
else:
self.new_path = None
self.cmds = cmds
self.client = client
self.socket = client.socket
def run(self):
cmd_header_dic = {'cmd': ' '.join(self.cmds)}
self.client.send_cmd(cmd_header_dic)
del_result = self.client.unheader(self.socket)
if del_result is True:
# 记录日志
log = 'Del complete.'
self.client.wri_log(settings.CLIENT_ROOT_DIR, 'log\\client.log', log)
print(log)
elif del_result == 'not found':
# 记录日志
log = 'Path not found.'
self.client.wri_log(settings.CLIENT_ROOT_DIR, 'log\\client.log', log)
print(log)
else:
# 记录日志
log = 'Del failure.'
self.client.wri_log(settings.CLIENT_ROOT_DIR, 'log\\client.log', log)
print(log)
core/_dir.py:
class Dir:
def __init__(self, cmds, client):
self.cmds = cmds
self.client = client
self.socket = client.socket
def run(self):
cmd_header_dic = {'cmd': ' '.join(self.cmds)}
self.client.send_cmd(cmd_header_dic)
file_list = self.client.unheader(self.socket)
if isinstance(file_list, list):
for i in range(0, len(file_list)):
if i+1 % 5 == 0 or i+1 == len(file_list):
pri_str = file_list[i] + '\n'
else:
pri_str = file_list[i] + ' '
print(pri_str, end='')
else:
print(file_list)
core/_get.py:
import hashlib
import os
import datetime
from core.progress_bar import ProgressBar
from core.undo import Undo
from conf import settings
class Get:
def __init__(self, cmds, client):
self.cmds = cmds
self.client = client
self.socket = self.client.socket
self.undo = {}
def run(self):
if len(self.cmds) < 2:
print('cmd is error')
return
undo_dic = self.client.get_undo(settings.CLIENT_ROOT_DIR, 'log\\undo.log')
undo_file = undo_dic[self.client.username]['get']
cmd_header_dic = {
'cmd': ' '.join(self.cmds), # get 1.mp4 1.mp4一定是Server的文件名
'done_size': 0 # 用于记录断点续传已经完成的数据大小
}
is_undo_file = False # 记录是否断点传输的数据
file_choice = None # 断点续传选择的本地文件名
# 断点续传选择
if undo_file:
for filename in undo_file:
file_info = undo_file[filename]
print('%s\ttotal %d\trecv %d' % (filename, file_info[1], cmd_header_dic['done_size']))
while True:
continue_choice = input('Whether to undo file transfer?(Y/N)\n>>: ').strip()
if continue_choice.lower() == 'y':
while True: # 判断是否有该文件的循环
file_choice = input('Choice undo file transfer.\n>>: ').strip()
if file_choice in list(undo_file.keys()):
self.cmds[1] = undo_file[file_choice][2] # 传输到Server的filename
is_undo_file = True # 选择需要继续传输的文件后,变为True
break
else:
print('Not %s, please choose again.' % file_choice)
cmd_header_dic['cmd'] = ' '.join(self.cmds)
cmd_header_dic['done_size'] = os.path.getsize(r'%s\%s' % (settings.DOWNLOADS_DIR, file_choice))
self.client.send_cmd(cmd_header_dic) # 发送命令道server
break
elif continue_choice.lower() == 'n':
self.client.send_cmd(cmd_header_dic) # 发送命令道server
break
else:
print('Only input Y/N.')
else: # 如果没有数据则进行正常的下载
self.client.send_cmd(cmd_header_dic)
has_file = self.client.unheader(self.socket)
if not has_file:
print('Not this file.')
return
""" 正常流程下的下载代码 """
if is_undo_file is True:
filename = file_choice # 如果是断点续传那就需要更改为本地保存的文件名
else:
filename = '%s.%s' % (self.cmds[1],datetime.datetime.now().microsecond) # 这里的文件名是保存在本地的临时名称
user_downloads_dir = settings.DOWNLOADS_DIR
file_dir = '%s/%s' % (user_downloads_dir, filename)
header_dic = self.client.unheader(self.socket)
file_md5 = header_dic['md5']
total_size = header_dic['file_size']
mode = 'ab'
undo_dic_fun = Undo(self.client,'get',filename,undo_dic)
undo_dic_fun.run(file_dir,total_size,self.cmds)
# 检查是否重名
while os.path.exists(file_dir) and not is_undo_file:
exists_choice = input('File already exists, Y --> rename, N --> cover.\n>>: ').strip()
if exists_choice.lower() == 'y':
filename = input('Input new file name.\n>>: ').strip() # 这里重命名的文件名是保存到本地的文件名
file_dir = '%s/%s' % (user_downloads_dir, filename)
elif exists_choice.lower() == 'n':
mode = 'wb'
break
else:
print('Only input Y/N.')
with open(file_dir, mode) as f:
recv_size = cmd_header_dic['done_size']
while recv_size < total_size:
try:
line = self.socket.recv(settings.MAX_RECV)
f.write(line)
recv_size += len(line)
progress_bar = ProgressBar(recv_size, total_size)
tran_result = progress_bar.run()
if tran_result is True:
# if is_undo_file: # 如果是断点续传的文件则删除记录
del undo_dic[self.client.username]['get'][filename]
self.client.set_undo(settings.CLIENT_ROOT_DIR, 'log\\undo.log', undo_dic)
# 记录日志
log = '%s download complete.' % filename
self.client.wri_log(settings.CLIENT_ROOT_DIR, 'log\\client.log', log)
print(log)
except ConnectionResetError:
# print('\n', end='')
# undo_dic[self.client.username]['get'][filename] = [file_dir, total_size, self.cmds[1]]
# self.client.set_undo(settings.CLIENT_ROOT_DIR, 'log\\undo.log', undo_dic)
exit('\nDisconnect from server.')
with open(file_dir, 'rb') as f:
download_file_md5 = hashlib.md5(f.read()).hexdigest()
if download_file_md5 == file_md5: # 校验下载文件的MD5值,并提示相关验证结果
# 记录日志
log = 'Check MD5 done to %s' % file_dir
self.client.wri_log(settings.CLIENT_ROOT_DIR, 'log\\client.log', log)
print('Check MD5 done to %s.' % filename)
# 下载完成 md5验证通过 将临时名称修改为真正的名称
filename_list = filename.split('.')
real_filename = '.'.join(filename_list[:-1])
os.replace(file_dir,r'%s/%s' % (user_downloads_dir, real_filename))
else:
# 记录日志
log = 'MD5 is incorrect to %s' % file_dir
self.client.wri_log(settings.CLIENT_ROOT_DIR, 'log\\client.log', log)
print('MD5 is incorrect to %s.' % filename)
core/_mkdir.py:
from conf import settings
class Mkdir:
def __init__(self, cmds, client):
self.cmds = cmds
self.client = client
self.socket = client.socket
def run(self):
cmd_header_dic = {'cmd': ' '.join(self.cmds)}
self.client.send_cmd(cmd_header_dic)
mkdir_result = self.client.unheader(self.socket)
if mkdir_result is True:
# 记录日志
log = 'Mkdir complete.'
self.client.wri_log(settings.CLIENT_ROOT_DIR, 'log\\client.log', log)
print(log)
else:
# 记录日志
log = 'Mkdir failure.'
self.client.wri_log(settings.CLIENT_ROOT_DIR, 'log\\client.log', log)
print(log)
core/_put.py:
import os
import hashlib
from core.progress_bar import ProgressBar
from conf import settings
class Put:
def __init__(self, cmds, client):
self.cmds = cmds
self.client = client
self.socket = self.client.socket
def run(self):
if len(self.cmds) < 2:
return
filename = self.cmds[1]
user_share_dir = settings.SHARE_DIR
send_size = 0
# 命令发送
cmd_header_dic = {
'cmd': ' '.join(self.cmds),
}
self.client.header(self.socket, cmd_header_dic)
# self.client.send_cmd(cmd_header_dic) # 发送cmd到server
# 接收断点续传信息
undo_dic = self.client.unheader(self.socket)
# 断点续传选择
is_undo_file = False # 记录是否断点传输的数据
undo_file = undo_dic[self.client.username]['put']
if undo_file:
for name in undo_file:
file_info = undo_file[name]
print('%s \ttotal %d \tsend %d \trename %s' % (name, file_info[1], file_info[2], file_info[3]))
while True:
continue_choice = input('Whether to undo file transfer?(Y/N)\n>>: ').strip()
if continue_choice.lower() == 'y':
while True: # 判断是否有该文件的循环
file_choice = input('Choice undo file transfer.\n>>: ').strip()
if file_choice in list(undo_file.keys()):
self.cmds[1] = file_choice
filename = file_choice
is_undo_file = True # 选择需要继续传输的文件后,变为True
break
else:
print('Not %s, please choose again.' % file_choice)
send_size = undo_dic[self.client.username]['put'][file_choice][2]
self.client.header(self.socket, is_undo_file)
self.client.header(self.socket, file_choice)
break
elif continue_choice.lower() == 'n':
self.client.header(self.socket, is_undo_file)
break
else:
print('Only input Y/N.')
# 判断是否为文件,put只能传输文件
is_dir = False
if not os.path.isfile('%s/%s' % (user_share_dir, filename)):
is_dir = True
self.client.header(self.socket, is_dir)
print('Only put file.')
return
elif os.path.getsize('%s/%s' % (user_share_dir, filename)) > self.client.usable_quotation:
self.client.header(self.socket, True)
print('Space is not enough.')
else:
self.client.header(self.socket, is_dir)
""" 传输准备 """
with open('%s/%s' % (user_share_dir, filename), 'rb') as f:
file_md5 = hashlib.md5(f.read())
header_dic = {
'filename': filename,
'md5': file_md5.hexdigest(),
'file_size': os.path.getsize('%s/%s' % (user_share_dir, filename))
}
self.client.header(self.socket, header_dic)
# 是否已经存在与Server上,是的话直接秒传
is_has_file = self.client.unheader(self.socket)
if is_has_file and not is_undo_file:
compare_md5_result = self.client.unheader(self.socket)
if compare_md5_result is True:
progress_bar = ProgressBar(header_dic['file_size'], header_dic['file_size'])
progress_bar.run()
return
""" 开始传输 """
with open('%s/%s' % (user_share_dir, filename), 'rb') as f:
f.seek(send_size)
for line in f:
self.socket.send(line)
send_size += len(line)
progress_bar = ProgressBar(send_size, header_dic['file_size'])
progress_bar.run()
check_result = self.client.unheader(self.socket)
if is_undo_file is True:
# 记录日志
log = '%s again upload completed.' % '%s/%s' % (user_share_dir, filename)
self.client.wri_log(settings.CLIENT_ROOT_DIR, 'log\\client.log', log)
print('%s again upload completed.' % filename)
elif check_result:
# 记录日志
log = '%s upload completed.' % '%s/%s' % (user_share_dir, filename)
self.client.wri_log(settings.CLIENT_ROOT_DIR, 'log\\client.log', log)
print('%s upload completed.' % filename)
else:
# 记录日志
log = '%s upload failed.' % '%s/%s' % (user_share_dir, filename)
self.client.wri_log(settings.CLIENT_ROOT_DIR, 'log\\client.log', log)
print('%s upload failed.' % filename)
core/_set.py:
import optparse
import os
class Set:
def __init__(self, cmds, client):
self.cmds = cmds
self.client = client
self.socket = self.client.socket
def run(self):
"""
命令格式核对方法
:return:
"""
if len(self.cmds) == 3:
option = self.cmds[1]
parameter = self.cmds[2]
quotation_options = ['-q','--quotation']
if option in quotation_options:
self._quotation(parameter)
else:
print('%s option is bad.' % option)
return
else:
print("Error: must supply size parameters\n"
"Usage: set [options] size\n"
"Size unit is MB.")
return
def _quotation(self, parameter):
"""
处理用户设置使用容量
:return:
"""
if ''.join(parameter.split('.')).isdigit():
quotation_header_dic = {
'cmd': ' '.join(self.cmds)
}
self.client.send_cmd(quotation_header_dic)
is_set = self.client.unheader(self.socket)
if is_set:
print('Set complete.')
else:
print('Set false.')
else:
print('Parameter must be floating point or integer and unit is MB.')
return
core/_sys_cmd.py:
from conf import settings
class SysCmd:
def __init__(self, cmds, client):
self.cmds = cmds
self.client = client
self.socket = self.client.socket
def run(self):
cmd_header_dic = {'cmd': ' '.join(self.cmds)}
self.client.send_cmd(cmd_header_dic)
header_dic = self.client.unheader(self.socket)
total_size = header_dic['total_size']
recv_size = 0
recv_info = b''
while recv_size < total_size:
recv_info += self.socket.recv(settings.MAX_RECV)
recv_size = len(recv_info)
print(recv_info.decode('gbk')) # 使用windows系统所以解析为gbk
core/client_socket.py:
import socket
import struct
import json
import os
from conf import settings
class ClientSocket:
address_family = settings.ADDRES_FAMILY
address_type = settings.ADDRESS_TYPE
max_recv = settings.MAX_RECV
def __init__(self, server_address, bind_and_active=True):
self.server_address = server_address
self.socket = socket.socket(self.address_family, self.address_type)
if bind_and_active:
try:
self.connect()
except:
self.socket.close()
raise
def connect(self):
self.socket.connect(self.server_address)
def close(self):
self.socket.close()
def send_cmd(self, dic):
cmd_header_dic = dic
self.header(self.socket, cmd_header_dic)
def header(self, conn, header_info):
header_json = json.dumps(header_info)
header_bytes = header_json.encode('utf-8')
conn.send(struct.pack('i', len(header_bytes)))
conn.send(header_bytes)
def unheader(self, conn):
obj = conn.recv(4)
header_size = struct.unpack('i', obj)
header_bytes = conn.recv(header_size[0])
header_json = header_bytes.decode('utf-8')
header_info = json.loads(header_json)
return header_info
def wri_log(self, root_dir, log_dir, log):
new_log_dir = os.path.join(root_dir, '%s.new' % log_dir)
old_log_dir = os.path.join(root_dir, log_dir)
f_log = open(old_log_dir, 'r', encoding='utf-8')
new_f_log = open(new_log_dir, 'w', encoding='utf-8')
for line in f_log:
new_f_log.write(line)
f_log.close()
new_f_log.write(log+'\n')
new_f_log.close()
os.replace(new_log_dir, old_log_dir)
def get_undo(self, root_dir, log_dir):
undone_log_dir = os.path.join(root_dir, log_dir)
if os.path.getsize(undone_log_dir) == 0:
with open(undone_log_dir, 'w') as f:
dic = {}
json.dump(dic, f)
with open(undone_log_dir, 'r') as f:
dic = json.load(f)
if self.username not in list(dic.keys()): # 判断用户是否已经在字典当中
dic[self.username] = {'get': {}} # 如果没有就添加上去
with open(undone_log_dir, 'w') as fw:
json.dump(dic, fw) # 写入到文件
return dic # 如果有就取出来
def set_undo(self, root_dir, log_dir, undone_dic):
undone_log_dir = os.path.join(root_dir, log_dir)
with open(undone_log_dir, 'w') as f:
json.dump(undone_dic, f)
f.flush()
core/features.py:
from core._get import Get
from core._put import Put
from core._pwd import Pwd
from core._cd import Cd
from core._dir import Dir
from core._del import Del
from core._sys_cmd import SysCmd
from core._mkdir import Mkdir
from core._set import Set
class Features:
def __init__(self, client, cmds):
self.sys_obj = SysCmd(cmds, client)
self.client = client
self.cmds = cmds
def find(self):
func_name = '_%s' % self.cmds[0]
if hasattr(self, func_name):
func = getattr(self, func_name)
func()
def _get(self, *args):
obj = Get(self.cmds, self.client)
obj.run()
def _put(self, *args):
obj = Put(self.cmds, self.client)
obj.run()
def _pwd(self, *args):
obj = Pwd(self.cmds, self.client)
obj.run()
def _cd(self, *args):
obj = Cd(self.cmds, self.client)
obj.run()
def _dir(self, *args):
obj = Dir(self.cmds, self.client)
obj.run()
def _del(self, *args):
obj = Del(self.cmds, self.client)
obj.run()
def _mkdir(self, *args):
obj = Mkdir(self.cmds, self.client)
obj.run()
def _set(self, *args):
obj = Set(self.cmds, self.client)
obj.run()
core/main.py:
import hashlib
from core import client_socket
from core.features import Features
from conf import settings
class Main:
def __init__(self):
self.client = client_socket.ClientSocket((settings.HOST, settings.PORT))
self.server_address = self.client.server_address
self.socket = self.client.socket
self.is_login = False # 登录状态
self.cmds = None
def run(self):
while True:
if self.is_login is False:
username = input('username:').strip()
password = input('password:').strip()
password_hash = hashlib.md5(password.encode('utf-8')).hexdigest()
self.client.header(self.socket, (username, password_hash))
self.is_login = self.client.unheader(self.socket)
if not self.is_login:
print('username or password is bad.')
continue
self.client.username = username # 当登录成功后把username与客户端关联起来
user_quotation = self.client.unheader(self.socket) # 登录成功后接收用户已使用与可使用的储存空间
used_quotation = user_quotation[0] / 1024 / 1024
total_quotation = user_quotation[1]
self.client.usable_quotation = total_quotation - used_quotation
quotation = '''
\rUsed Size : <%f>MB
\rUsable Size: <%f>MB
\rTotal Size : <%f>MB
''' % (used_quotation, self.client.usable_quotation, total_quotation) # 把存储信息打印出来
print(quotation)
cmd = input('>>: ').strip()
if not cmd: continue
self.cmds = cmd.split()
func = Features(self.client, self.cmds)
func.find()
core/progress_bar.py:
import sys
class ProgressBar:
def __init__(self, tran_size, total_size):
self.tran_size = tran_size
self.total_size = total_size
def run(self):
part = self.total_size / 50 # 2% 数据的大小
count = int(self.tran_size / part)
# sys.stdout.write('\r') # 将指针调回开头
print('\r', end='')
# sys.stdout.write('[%-50s]%.2f%%' % (('>' * count), (self.tran_size / self.total_size) * 100))
print('[%-50s]%.2f%%' % (('>' * count), (self.tran_size / self.total_size) * 100), end='', flush=True)
# sys.stdout.flush() # python2 为了强制显示到屏幕上
if self.tran_size >= self.total_size:
# sys.stdout.write('\n') # 在结束的时候在结尾加上'\n'换行
print('\n', end='')
return True
core/undo.py:
from conf import settings
class Undo:
def __init__(self, client, mode, filename, undo_dic):
self.client = client
self.mode = mode
self.filename = filename
self.undo_dic = undo_dic
def run(self,file_dir, total_size, cmds):
self.undo_dic[self.client.username][self.mode][self.filename] = [file_dir, total_size, cmds[1]]
self.client.set_undo(settings.CLIENT_ROOT_DIR, 'log\\undo.log', self.undo_dic)
代码输出如下:
多线程版本的其他功能与非多线程的基本一致,这里只展示多线程的效果。
服务器端在命令行终端当中以命令的方式启动。
可以同时支持多个客户端进行同时操作。