当前位置: 首页 > article >正文

【p2p、分布式,区块链笔记 Torrent】WebTorrent bittorrent-dht DHT的构造+lookup+announce

DHT(分布式哈希表)依赖

const bencode = require('bencode') // 导入 bencode 库,用于编码和解码 Bencode 格式,常用于 BitTorrent 协议中的数据序列化。
const debug = require('debug')('bittorrent-dht') // 导入 debug 库,用于调试输出,指定命名空间为 'bittorrent-dht',便于过滤调试信息。
const KBucket = require('k-bucket') // 导入 KBucket 类,用于实现 Kademlia DHT 中的节点管理,负责存储和查找 DHT 节点。
const krpc = require('k-rpc') // 导入 k-rpc 库,提供用于实现 Kademlia DHT 的 RPC(远程过程调用)功能。
const low = require('last-one-wins') // 导入 last-one-wins 库,可能用于处理多个并发请求,保证最后一个请求的结果被保留。
const LRU = require('lru') // 导入 LRU 缓存实现,使用最近最少使用(Least Recently Used)策略,常用于缓存管理。
const randombytes = require('randombytes') // 导入 randombytes 库,用于生成随机字节,通常用于生成节点 ID 或其他安全性相关的数据。
const records = require('record-cache') // 导入 record-cache 库,可能用于缓存记录,优化数据访问。
const simpleSha1 = require('simple-sha1') // 导入 simple-sha1 库,用于计算 SHA-1 哈希值,常用于数据完整性校验。
const { EventEmitter } = require('events') // 从 events 模块导入 EventEmitter 类,提供事件发布和监听的功能,便于处理异步事件。
  • k-rpck-bucket 是实现 DHT(分布式哈希表)协议中的关键组件,帮助 DHT 在分布式网络中高效地进行节点查找和数据交换。

k-rpc

  • https://www.npmjs.com/package/k-rpc是一个实现 k-rpc (Kademlia RPC)协议的库。

  • k-rpc 负责节点间的通信和消息传递,是 DHT 的网络交互部分。通过事件驱动的方式(如 onqueryonnodeonerror 等),使得 DHT 能够对网络变化做出响应。

k-bucket

  • https://www.npmjs.com/package/k-bucket 实现了用于存储联系人(对等节点)信息的 Kademlia DHT k-bucket。

  • k-bucket 是 Kademlia 协议中用于存储和管理网络节点信息的数据结构,用于高效管理节点信息,确保节点可以快速找到彼此并维护活跃的连接。

DHT 类构造函数

  • 这段代码定义了一个 DHT 类:
constructor (opts = {}) {
  super() // 调用父类 EventEmitter 的构造函数

  // 创建 LRU 缓存用于存储 DHT 表
  this._tables = new LRU({ maxAge: ROTATE_INTERVAL, max: opts.maxTables || 1000 })
  this._values = new LRU(opts.maxValues || 1000) // 存储值的 LRU 缓存
  this._peers = records({
    maxAge: opts.maxAge || 0, // 记录的最大年龄
    maxSize: opts.maxPeers || 10000 // 最大对等体数量
  })

  this._secrets = null // 存储秘密???
  this._hash = opts.hash || sha1 // 使用指定的哈希函数,默认是 SHA1
  this._hashLength = this._hash(Buffer.from('')).length // 计算哈希长度
  
  // 初始化 KRPC,用于 DHT 的网络通信,并注册相应的事件处理函数。
  this._rpc = opts.krpc || krpc(Object.assign({ idLength: this._hashLength }, opts)) // 初始化 KRPC
  this._rpc.on('query', onquery) // 注册事件处理器
  this._rpc.on('node', onnode)
  this._rpc.on('warning', onwarning)
  this._rpc.on('error', onerror)
  this._rpc.on('listening', onlistening)

  this._rotateSecrets() // 初始化秘密轮换
  this._verify = opts.verify || null // 可选的验证函数
  this._host = opts.host || null // 可选的主机地址
  this._interval = setInterval(rotateSecrets, ROTATE_INTERVAL) // 定时轮换秘密

  this._runningBucketCheck = false // 标记是否正在检查桶
  this._bucketCheckTimeout = null // 存储桶检查超时
  this._bucketOutdatedTimeSpan = opts.timeBucketOutdated || BUCKET_OUTDATED_TIMESPAN // 桶过时时间范围

  this.listening = false // 标记是否正在监听
  this.destroyed = false // 标记 DHT 是否已销毁
  this.nodeId = this._rpc.id // DHT 节点 ID
  this.nodes = this._rpc.nodes // 存储节点信息

  // 确保同时只有一个 ping 操作在进行,避免无限递归
  const onping = low(ping)

  this._rpc.on('ping', (older, swap) => {
    onping({ older, swap }) // 处理 ping 请求
  })

  process.nextTick(bootstrap) // 在下一个事件循环中引导 DHT

  this._debug('new DHT %s', this.nodeId) // 输出调试信息

  const self = this // 保存当前上下文

  function ping (opts, cb) {
    const older = opts.older // 获取旧节点
    const swap = opts.swap // 获取交换节点

    self._debug('received ping', older) // 输出接收到的 ping 信息
    self._checkNodes(older, false, (_, deadNode) => {
      if (deadNode) {
        self._debug('swapping dead node with newer', deadNode) // 如果有死节点,进行交换
        swap(deadNode) // 交换死节点
        return cb() // 回调
      }

      self._debug('no node added, all other nodes ok') // 所有节点正常
      cb() // 回调
    })
  }

  function onlistening () {
    self.listening = true // 标记为正在监听
    self._debug('listening %d', self.address().port) // 输出监听端口
    self.updateBucketTimestamp() // 更新桶时间戳
    self._setBucketCheckInterval() // 设置桶检查间隔
    self.emit('listening') // 触发 'listening' 事件
  }

  function onquery (query, peer) {
    self._onquery(query, peer) // 处理查询
  }

  function rotateSecrets () {
    self._rotateSecrets() // 轮换秘密
  }

  function bootstrap () {
    if (!self.destroyed) self._bootstrap(opts.bootstrap !== false) // 引导 DHT
  }

  function onwarning (err) {
    self.emit('warning', err) // 触发 'warning' 事件
  }

  function onerror (err) {
    self.emit('error', err) // 触发 'error' 事件
  }

  function onnode (node) {
    self.emit('node', node) // 触发 'node' 事件
  }
}

lookup 方法

  • lookup 方法用于在 DHT 中查找与给定 infoHash 相关的对等体(peers)。

    • lookup 方法在 DHT 中执行对等体查找,首先通过最近节点的查找发送查询请求,然后通过接收到的响应处理对等体信息。它还提供了中止查找的能力,允许在需要时停止操作。通过事件机制,将找到的对等体信息及时传递出去,供其他部分使用。

    • infoHash:要查找的目标信息哈希值。

    • cb:可选的回调函数,用于处理查找结果。

lookup (infoHash, cb) {
  // 将 infoHash 转换为 Buffer 类型
  infoHash = toBuffer(infoHash) 
  
  // 如果没有提供回调函数,则使用 noop(空函数)
  if (!cb) cb = noop 
  
  const self = this // 保存当前上下文的引用
  let aborted = false // 标记操作是否已被中止

  // 输出调试信息,显示正在查找的 infoHash
  this._debug('lookup %s', infoHash) 
  
  // 使用 process.nextTick 将 emit 函数推迟到事件循环的下一轮执行
  process.nextTick(emit) 
  
  // 调用 _closest 方法查找最近的节点并发送 get_peers 查询
  this._closest(infoHash, {
    q: 'get_peers', // 查询类型
    a: {
      id: this._rpc.id, // 当前 RPC 的 ID
      info_hash: infoHash // 要查找的 infoHash
    }
  }, onreply, cb) // 指定回复处理函数和完成回调

  // emit 函数负责处理已知的对等体并触发事件
  function emit (values, from) {
    // 如果没有提供值,则从缓存中获取最多 100 个对等体
    if (!values) values = self._peers.get(infoHash.toString('hex'), 100) 
    const peers = decodePeers(values) // 解码对等体信息
    
    // 遍历对等体列表并触发 'peer' 事件
    for (let i = 0; i < peers.length; i++) {
      self.emit('peer', peers[i], infoHash, from || null) // 触发 'peer' 事件,传递对等体信息和来源
    }
  }

  // onreply 函数处理从节点收到的响应
  function onreply (message, node) {
    if (aborted) return false // 如果已中止,直接返回
    if (message.r.values) emit(message.r.values, node) // 如果响应中包含对等体信息,调用 emit 处理
  }

  // 返回一个可以中止查找的函数  返回一个名为 `abort` 的函数,用于中止查找过程。当调用这个函数时,会将 `aborted` 标记设置为 `true`,防止后续的处理。
  return function abort () { aborted = true }
}

_closest 方法

_closest (target, message, onmessage, cb) {
  const self = this // 保存当前上下文的引用

  // 创建一个新的 KBucket,用于存储找到的节点信息
  const table = new KBucket({
    localNodeId: target, // 当前节点的 ID
    numberOfNodesPerKBucket: this._rpc.k // KBucket 中节点的数量
  })

  // 调用 RPC 的 closest 方法,查找与目标节点最近的节点
  this._rpc.closest(target, message, onreply, done)

  // 处理查找完成后的回调
  function done (err, n) {
    if (err) return cb(err) // 如果出错,调用回调并返回错误
    self._tables.set(target.toString('hex'), table) // 将找到的 KBucket 存储到 _tables 中
    self._debug('visited %d nodes', n) // 输出调试信息,显示访问的节点数量
    cb(null, n) // 调用回调,传入访问的节点数量
  }

  // 处理收到的回复
  function onreply (message, node) {
    if (!message.r) return true // 如果消息没有响应部分,直接返回

    // 检查响应中是否包含有效的 token 和节点 ID
    if (message.r.token && message.r.id && Buffer.isBuffer(message.r.id) && message.r.id.length === self._hashLength) {
      self._debug('found node %s (target: %s)', message.r.id, target) // 输出调试信息,显示找到的节点
      // 将节点信息添加到 KBucket 中
      table.add({
        id: message.r.id,
        host: node.host || node.address, // 节点的主机地址
        port: node.port, // 节点的端口
        token: message.r.token // 节点的 token
      })
    }

    // 如果没有提供 onmessage 回调,直接返回
    if (!onmessage) return true
    return onmessage(message, node) // 调用 onmessage 回调,处理消息
  }
}

announce 方法

  • announce_preannounce 方法用于在 DHT 网络中宣布一个资源的可用性:
    • 通过 this._tables.get(infoHash.toString('hex')) 获取与 infoHash 相关的 KBucket。如果找不到,调用 _preannounce 方法以确保节点能够找到对等体。
    • 如果 this._host 被指定,且 DHT 正在监听,使用 _addPeer 方法将本节点的对等体信息添加到 DHT。
    • 通过KRPC 消息 (announce_peer),并发送到 DHT 网络中的其他节点,以宣布该资源的可用性。消息包含必要的参数,例如 info_hashport
announce (infoHash, port, cb) {
  if (typeof port === 'function') return this.announce(infoHash, 0, port) // 处理没有明确端口的情况
  infoHash = toBuffer(infoHash) // 将 infoHash 转换为 Buffer 对象
  if (!cb) cb = noop // 如果没有提供回调,设置为一个空函数

  const table = this._tables.get(infoHash.toString('hex')) // 获取与 infoHash 关联的 KBucket 表
  if (!table) return this._preannounce(infoHash, port, cb) // 如果没有找到表,调用 _preannounce 方法

  // 如果指定了主机
  if (this._host) {
    const dhtPort = this.listening ? this.address().port : 0 // 获取 DHT 的端口,如果正在监听
    // 添加对等体信息
    this._addPeer(
      { host: this._host, port: port || dhtPort }, // 目标对等体的主机和端口
      infoHash,
      { host: this._host, port: dhtPort } // 自身的主机和端口
    )
  }

  // 创建 announce_peer 消息
  const message = {
    q: 'announce_peer',
    a: {
      id: this._rpc.id, // 当前 DHT 节点的 ID
      token: null, // 令牌,由 queryAll 设置
      info_hash: infoHash, // 要宣布的资源的 infoHash
      port, // 资源的端口
      implied_port: port ? 0 : 1 // 隐含的端口标识,0 表示明确提供,1 表示隐式
    }
  }

  this._debug('announce %s %d', infoHash, port) // 输出调试信息
  this._rpc.queryAll(table.closest(infoHash), message, null, cb) // 发送消息并处理回调
}

_preannounce 方法

_preannounce (infoHash, port, cb) {
  const self = this

  // 查找与 infoHash 关联的对等体
  this.lookup(infoHash, err => {
    if (self.destroyed) return cb(new Error('dht is destroyed')) // 检查 DHT 是否已销毁
    if (err) return cb(err) // 处理查找错误
    self.announce(infoHash, port, cb) // 调用 announce 方法
  })
}

cg

  • Chord:结构化 P2P 网络中的一个 DHT 算法

http://www.kler.cn/a/383493.html

相关文章:

  • 数据管理的四大支柱:揭秘数据中台、数据仓库、数据治理和主数据
  • 如何选择适合CMS运行的服务器?
  • Python实例:爱心代码
  • 字节青训-小S的倒排索引
  • 网站架构知识之Ansible(day020)
  • Ubuntu使用Qt虚拟键盘,支持中英文切换
  • 领克双十一营销设计:视觉与策略的完美融合
  • Flutter 鸿蒙next中的 Stack 和 Positioned 用法详解
  • 算法练习:1004. 最大连续1的个数 III
  • 基于SSM+VUE守护萌宠宠物网站JAVA|VUE|Springboot计算机毕业设计源代码+数据库+LW文档+开题报告+答辩稿+部署教+代码讲解
  • ORACLE 19C 安装数据库补丁的详细过程
  • 利用全排列解决LeetCode第3343题“统计平衡排列的数目”问题
  • 【Java SE语法】抽象类(abstract class)和接口(interface)有什么异同?
  • 一个国产 API 开源项目,在 ProductHunt 杀疯了...
  • 【HarmonyOS】引导用户跳转APP设置详情页开启权限
  • AI预测体彩排3采取888=3策略+和值012路+胆码+通杀1码测试11月7日升级新模型预测第127弹
  • AI在创造还是毁掉音乐?
  • Vue 指令
  • ENSP RIP动态路由
  • HLS SAMPLE-AES加密方法
  • 京东毫秒级热key探测框架JD-hotkey
  • 哈希表,哈希桶及配套习题
  • 数据分析:转录组差异fgsea富集分析
  • 第08章 排序ORDER BY
  • 创新实践:基于边缘智能+扣子的智慧婴儿监控解决方案
  • 歌词结构的艺术:写歌词的技巧和方法深度剖析,妙笔生词AI智能写歌词软件