【p2p、分布式,区块链笔记 Torrent】WebTorrent bittorrent-dht DHT的构造+lookup+announce
DHT(分布式哈希表)依赖
const bencode = require('bencode') // 导入 bencode 库,用于编码和解码 Bencode 格式,常用于 BitTorrent 协议中的数据序列化。
const debug = require('debug')('bittorrent-dht') // 导入 debug 库,用于调试输出,指定命名空间为 'bittorrent-dht',便于过滤调试信息。
const KBucket = require('k-bucket') // 导入 KBucket 类,用于实现 Kademlia DHT 中的节点管理,负责存储和查找 DHT 节点。
const krpc = require('k-rpc') // 导入 k-rpc 库,提供用于实现 Kademlia DHT 的 RPC(远程过程调用)功能。
const low = require('last-one-wins') // 导入 last-one-wins 库,可能用于处理多个并发请求,保证最后一个请求的结果被保留。
const LRU = require('lru') // 导入 LRU 缓存实现,使用最近最少使用(Least Recently Used)策略,常用于缓存管理。
const randombytes = require('randombytes') // 导入 randombytes 库,用于生成随机字节,通常用于生成节点 ID 或其他安全性相关的数据。
const records = require('record-cache') // 导入 record-cache 库,可能用于缓存记录,优化数据访问。
const simpleSha1 = require('simple-sha1') // 导入 simple-sha1 库,用于计算 SHA-1 哈希值,常用于数据完整性校验。
const { EventEmitter } = require('events') // 从 events 模块导入 EventEmitter 类,提供事件发布和监听的功能,便于处理异步事件。
k-rpc
和k-bucket
是实现 DHT(分布式哈希表)协议中的关键组件,帮助 DHT 在分布式网络中高效地进行节点查找和数据交换。
k-rpc
-
https://www.npmjs.com/package/k-rpc是一个实现
k-rpc
(Kademlia RPC)协议的库。 -
k-rpc
负责节点间的通信和消息传递,是 DHT 的网络交互部分。通过事件驱动的方式(如onquery
、onnode
、onerror
等),使得 DHT 能够对网络变化做出响应。
k-bucket
-
https://www.npmjs.com/package/k-bucket 实现了用于存储联系人(对等节点)信息的 Kademlia DHT k-bucket。
-
k-bucket
是 Kademlia 协议中用于存储和管理网络节点信息的数据结构,用于高效管理节点信息,确保节点可以快速找到彼此并维护活跃的连接。
DHT 类构造函数
- 这段代码定义了一个 DHT 类:
constructor (opts = {}) {
super() // 调用父类 EventEmitter 的构造函数
// 创建 LRU 缓存用于存储 DHT 表
this._tables = new LRU({ maxAge: ROTATE_INTERVAL, max: opts.maxTables || 1000 })
this._values = new LRU(opts.maxValues || 1000) // 存储值的 LRU 缓存
this._peers = records({
maxAge: opts.maxAge || 0, // 记录的最大年龄
maxSize: opts.maxPeers || 10000 // 最大对等体数量
})
this._secrets = null // 存储秘密???
this._hash = opts.hash || sha1 // 使用指定的哈希函数,默认是 SHA1
this._hashLength = this._hash(Buffer.from('')).length // 计算哈希长度
// 初始化 KRPC,用于 DHT 的网络通信,并注册相应的事件处理函数。
this._rpc = opts.krpc || krpc(Object.assign({ idLength: this._hashLength }, opts)) // 初始化 KRPC
this._rpc.on('query', onquery) // 注册事件处理器
this._rpc.on('node', onnode)
this._rpc.on('warning', onwarning)
this._rpc.on('error', onerror)
this._rpc.on('listening', onlistening)
this._rotateSecrets() // 初始化秘密轮换
this._verify = opts.verify || null // 可选的验证函数
this._host = opts.host || null // 可选的主机地址
this._interval = setInterval(rotateSecrets, ROTATE_INTERVAL) // 定时轮换秘密
this._runningBucketCheck = false // 标记是否正在检查桶
this._bucketCheckTimeout = null // 存储桶检查超时
this._bucketOutdatedTimeSpan = opts.timeBucketOutdated || BUCKET_OUTDATED_TIMESPAN // 桶过时时间范围
this.listening = false // 标记是否正在监听
this.destroyed = false // 标记 DHT 是否已销毁
this.nodeId = this._rpc.id // DHT 节点 ID
this.nodes = this._rpc.nodes // 存储节点信息
// 确保同时只有一个 ping 操作在进行,避免无限递归
const onping = low(ping)
this._rpc.on('ping', (older, swap) => {
onping({ older, swap }) // 处理 ping 请求
})
process.nextTick(bootstrap) // 在下一个事件循环中引导 DHT
this._debug('new DHT %s', this.nodeId) // 输出调试信息
const self = this // 保存当前上下文
function ping (opts, cb) {
const older = opts.older // 获取旧节点
const swap = opts.swap // 获取交换节点
self._debug('received ping', older) // 输出接收到的 ping 信息
self._checkNodes(older, false, (_, deadNode) => {
if (deadNode) {
self._debug('swapping dead node with newer', deadNode) // 如果有死节点,进行交换
swap(deadNode) // 交换死节点
return cb() // 回调
}
self._debug('no node added, all other nodes ok') // 所有节点正常
cb() // 回调
})
}
function onlistening () {
self.listening = true // 标记为正在监听
self._debug('listening %d', self.address().port) // 输出监听端口
self.updateBucketTimestamp() // 更新桶时间戳
self._setBucketCheckInterval() // 设置桶检查间隔
self.emit('listening') // 触发 'listening' 事件
}
function onquery (query, peer) {
self._onquery(query, peer) // 处理查询
}
function rotateSecrets () {
self._rotateSecrets() // 轮换秘密
}
function bootstrap () {
if (!self.destroyed) self._bootstrap(opts.bootstrap !== false) // 引导 DHT
}
function onwarning (err) {
self.emit('warning', err) // 触发 'warning' 事件
}
function onerror (err) {
self.emit('error', err) // 触发 'error' 事件
}
function onnode (node) {
self.emit('node', node) // 触发 'node' 事件
}
}
lookup 方法
-
lookup
方法用于在 DHT 中查找与给定infoHash
相关的对等体(peers)。-
lookup
方法在 DHT 中执行对等体查找,首先通过最近节点的查找发送查询请求,然后通过接收到的响应处理对等体信息。它还提供了中止查找的能力,允许在需要时停止操作。通过事件机制,将找到的对等体信息及时传递出去,供其他部分使用。 -
infoHash
:要查找的目标信息哈希值。 -
cb
:可选的回调函数,用于处理查找结果。
-
lookup (infoHash, cb) {
// 将 infoHash 转换为 Buffer 类型
infoHash = toBuffer(infoHash)
// 如果没有提供回调函数,则使用 noop(空函数)
if (!cb) cb = noop
const self = this // 保存当前上下文的引用
let aborted = false // 标记操作是否已被中止
// 输出调试信息,显示正在查找的 infoHash
this._debug('lookup %s', infoHash)
// 使用 process.nextTick 将 emit 函数推迟到事件循环的下一轮执行
process.nextTick(emit)
// 调用 _closest 方法查找最近的节点并发送 get_peers 查询
this._closest(infoHash, {
q: 'get_peers', // 查询类型
a: {
id: this._rpc.id, // 当前 RPC 的 ID
info_hash: infoHash // 要查找的 infoHash
}
}, onreply, cb) // 指定回复处理函数和完成回调
// emit 函数负责处理已知的对等体并触发事件
function emit (values, from) {
// 如果没有提供值,则从缓存中获取最多 100 个对等体
if (!values) values = self._peers.get(infoHash.toString('hex'), 100)
const peers = decodePeers(values) // 解码对等体信息
// 遍历对等体列表并触发 'peer' 事件
for (let i = 0; i < peers.length; i++) {
self.emit('peer', peers[i], infoHash, from || null) // 触发 'peer' 事件,传递对等体信息和来源
}
}
// onreply 函数处理从节点收到的响应
function onreply (message, node) {
if (aborted) return false // 如果已中止,直接返回
if (message.r.values) emit(message.r.values, node) // 如果响应中包含对等体信息,调用 emit 处理
}
// 返回一个可以中止查找的函数 返回一个名为 `abort` 的函数,用于中止查找过程。当调用这个函数时,会将 `aborted` 标记设置为 `true`,防止后续的处理。
return function abort () { aborted = true }
}
_closest 方法
_closest (target, message, onmessage, cb) {
const self = this // 保存当前上下文的引用
// 创建一个新的 KBucket,用于存储找到的节点信息
const table = new KBucket({
localNodeId: target, // 当前节点的 ID
numberOfNodesPerKBucket: this._rpc.k // KBucket 中节点的数量
})
// 调用 RPC 的 closest 方法,查找与目标节点最近的节点
this._rpc.closest(target, message, onreply, done)
// 处理查找完成后的回调
function done (err, n) {
if (err) return cb(err) // 如果出错,调用回调并返回错误
self._tables.set(target.toString('hex'), table) // 将找到的 KBucket 存储到 _tables 中
self._debug('visited %d nodes', n) // 输出调试信息,显示访问的节点数量
cb(null, n) // 调用回调,传入访问的节点数量
}
// 处理收到的回复
function onreply (message, node) {
if (!message.r) return true // 如果消息没有响应部分,直接返回
// 检查响应中是否包含有效的 token 和节点 ID
if (message.r.token && message.r.id && Buffer.isBuffer(message.r.id) && message.r.id.length === self._hashLength) {
self._debug('found node %s (target: %s)', message.r.id, target) // 输出调试信息,显示找到的节点
// 将节点信息添加到 KBucket 中
table.add({
id: message.r.id,
host: node.host || node.address, // 节点的主机地址
port: node.port, // 节点的端口
token: message.r.token // 节点的 token
})
}
// 如果没有提供 onmessage 回调,直接返回
if (!onmessage) return true
return onmessage(message, node) // 调用 onmessage 回调,处理消息
}
}
announce 方法
announce
和_preannounce
方法用于在 DHT 网络中宣布一个资源的可用性:- 通过
this._tables.get(infoHash.toString('hex'))
获取与infoHash
相关的 KBucket。如果找不到,调用_preannounce
方法以确保节点能够找到对等体。 - 如果
this._host
被指定,且 DHT 正在监听,使用_addPeer
方法将本节点的对等体信息添加到 DHT。 - 通过KRPC 消息 (
announce_peer
),并发送到 DHT 网络中的其他节点,以宣布该资源的可用性。消息包含必要的参数,例如info_hash
和port
。
- 通过
announce (infoHash, port, cb) {
if (typeof port === 'function') return this.announce(infoHash, 0, port) // 处理没有明确端口的情况
infoHash = toBuffer(infoHash) // 将 infoHash 转换为 Buffer 对象
if (!cb) cb = noop // 如果没有提供回调,设置为一个空函数
const table = this._tables.get(infoHash.toString('hex')) // 获取与 infoHash 关联的 KBucket 表
if (!table) return this._preannounce(infoHash, port, cb) // 如果没有找到表,调用 _preannounce 方法
// 如果指定了主机
if (this._host) {
const dhtPort = this.listening ? this.address().port : 0 // 获取 DHT 的端口,如果正在监听
// 添加对等体信息
this._addPeer(
{ host: this._host, port: port || dhtPort }, // 目标对等体的主机和端口
infoHash,
{ host: this._host, port: dhtPort } // 自身的主机和端口
)
}
// 创建 announce_peer 消息
const message = {
q: 'announce_peer',
a: {
id: this._rpc.id, // 当前 DHT 节点的 ID
token: null, // 令牌,由 queryAll 设置
info_hash: infoHash, // 要宣布的资源的 infoHash
port, // 资源的端口
implied_port: port ? 0 : 1 // 隐含的端口标识,0 表示明确提供,1 表示隐式
}
}
this._debug('announce %s %d', infoHash, port) // 输出调试信息
this._rpc.queryAll(table.closest(infoHash), message, null, cb) // 发送消息并处理回调
}
_preannounce 方法
_preannounce (infoHash, port, cb) {
const self = this
// 查找与 infoHash 关联的对等体
this.lookup(infoHash, err => {
if (self.destroyed) return cb(new Error('dht is destroyed')) // 检查 DHT 是否已销毁
if (err) return cb(err) // 处理查找错误
self.announce(infoHash, port, cb) // 调用 announce 方法
})
}
cg
- Chord:结构化 P2P 网络中的一个 DHT 算法