【p2p、分布式,区块链笔记 Torrent】WebTorrent 的lt_donthave插件
扩展实现
- https://github.com/webtorrent/lt_donthave/blob/master/index.js
/*! lt_donthave. MIT License. WebTorrent LLC <https://webtorrent.io/opensource> */
// 导入所需模块
import arrayRemove from 'unordered-array-remove' // 用于从数组中删除元素的函数
import { EventEmitter } from 'events' // 导入事件发射器类
import debugFactory from 'debug' // 导入调试工具
// 创建一个调试实例,用于记录调试信息
const debug = debugFactory('lt_donthave')
// 导出一个函数,该函数返回 ltDontHave 类
export default () => {
// 定义 ltDontHave 类,继承自 EventEmitter
class ltDontHave extends EventEmitter {
constructor (wire) {
super() // 调用父类构造函数
// 初始化属性
this._peerSupports = false // 标记对等体是否支持 'lt_donthave'
this._wire = wire // 保存 wire 对象,表示与对等体的连接
}
// 当接收到扩展握手时调用,表示对等体支持 'lt_donthave'
onExtendedHandshake () {
this._peerSupports = true
}
// 处理接收到的消息
onMessage (buf) {
let index // 存储块索引
try {
// 创建 DataView 从接收到的缓冲区中读取数据
const view = new DataView(buf.buffer)
index = view.getUint32(0) // 获取消息中的块索引
} catch (err) {
// 如果消息无效,直接丢弃
return
}
// 如果对等体没有该块,直接返回
if (!this._wire.peerPieces.get(index)) return
debug('got donthave %d', index) // 记录调试信息
this._wire.peerPieces.set(index, false) // 标记该块为不拥有
this.emit('donthave', index) // 触发 'donthave' 事件
this._failRequests(index) // 处理失败的请求
}
// 向远程对等体发送不拥有的块索引
donthave (index) {
if (!this._peerSupports) return // 如果对等体不支持,直接返回
debug('donthave %d', index) // 记录调试信息
const buf = new Uint8Array(4) // 创建一个 4 字节的缓冲区
const view = new DataView(buf.buffer) // 创建 DataView
view.setUint32(0, index) // 将块索引写入缓冲区
console.log(">>>>>>>>>>>>>>>>", index, ">>>>>>>>>>>>>>", buf) // 打印调试信息
this._wire.extended('lt_donthave', buf) // 发送 'lt_donthave' 消息
}
// 处理失败的请求
_failRequests (index) {
const requests = this._wire.requests // 获取当前请求列表
for (let i = 0; i < requests.length; i++) {
const req = requests[i] // 获取当前请求
if (req.piece === index) { // 如果请求的块索引与指定索引匹配
arrayRemove(requests, i) // 从请求列表中删除该请求
i -= 1 // 更新索引以检查新值
this._wire._callback(req, new Error('peer sent donthave'), null) // 调用回调函数,报告请求失败
}
}
}
}
// 设置扩展名称
ltDontHave.prototype.name = 'lt_donthave'
return ltDontHave // 返回定义的类
}
测试代码
import fixtures from 'webtorrent-fixtures'
import Protocol from 'bittorrent-protocol'
import test from 'tape'
import ltDontHave from './index.js'
const { leaves } = fixtures
const id1 = Buffer.from('01234567890123456789')
const id2 = Buffer.from('12345678901234567890')
const wire1 = new Protocol()
wire1.peerPieces.set(30, true) // 在 wire1 的 peerPieces 中标记块 30 为已拥有
wire1.peerPieces.set(29, false)
console.log("wire1 中的块 30 是否存在",wire1.peerPieces.get(30))
const wire2 = new Protocol()
wire1.pipe(wire2).pipe(wire1)// 将 wire1 和 wire2 连接起来,形成双向数据流
wire1.use(ltDontHave())
wire2.use(ltDontHave())
// 设置 wire2 的握手事件处理程序
wire2.on('handshake', (infoHash, peerId, extensions) => {
// 在握手完成后,调用 wire2 的 handshake 方法
console.log("2. wire2.on('handshake') infoHash:",infoHash,"peerId:", peerId)
console.log("3. 在握手完成后,调用 wire2 的 handshake 方法,infoHash:",leaves.parsedTorrent.infoHash,"peerid:", id2)
wire2.handshake(leaves.parsedTorrent.infoHash, id2)
})
// 设置 wire2 的扩展事件处理程序
wire2.on('extended', ext => {
if (ext === 'handshake') { // 如果扩展事件是握手
console.log("4. wire2 extended 发送 donthaven 消息,表示 wire2 不拥有块 30")
wire2.lt_donthave.donthave(30)// 发送 "donthave" 消息,表示 wire2 不拥有块 30
wire2.lt_donthave.donthave(29)
}
})
// 设置 wire1 的 "donthave" 消息事件处理程序(由protocol的index中的this._wire.extended('lt_donthave', buf)决定是否调用)
wire1.lt_donthave.on('donthave', (index) => {
// 验证接收到的索引是否为 30
console.log("5. wire1.lt_donthave.on('donthave') 接收到的索引为:",index)//t.equal(index, 30)
// 检查 wire1 中的块 30 是否被清除
console.log("6. wire1 中的块 30 是否存在",wire1.peerPieces.get(30))//t.notOk(wire1.peerPieces.get(30), 'piece 30 cleared in bitfield')
console.log("6. wire1 中的块 29 是否存在",wire1.peerPieces.get(29))
})
// 在 wire1 上执行握手,传入信息哈希和对等体 ID
console.log("1. 在 wire1 上执行握手,传入infoHash(",leaves.parsedTorrent.infoHash,") 和 peerId(", id1,")")
wire1.handshake(leaves.parsedTorrent.infoHash, id1)
测试输出
PS C:\Users\kingchuxing\Documents\MTGIT\lt_donthave-master> node .\mtest.js
wire1 中的块 30 是否存在 true
1. 在 wire1 上执行握手,传入infoHash( d2474e86c95b19b8bcfdb92bc12c9d44667cfa36 ) 和 peerId( <Buffer 30 31 32 33 34 35 36 37 38 39 30 31 32 33 34 35 36 37 38 39> )
2. wire2.on('handshake') infoHash: d2474e86c95b19b8bcfdb92bc12c9d44667cfa36 peerId: 3031323334353637383930313233343536373839
3. 在握手完成后,调用 wire2 的 handshake 方法,infoHash: d2474e86c95b19b8bcfdb92bc12c9d44667cfa36 peerid: <Buffer 31 32 33 34 35 36 37 38 39 30 31 32 33 34 35 36 37 38 39 30>
4. wire2 extended 发送 donthaven 消息,表示 wire2 不拥有块 30
>>>>>>>>>>>>>>>> 30 >>>>>>>>>>>>>> Uint8Array(4) [ 0, 0, 0, 30 ]
>>>>>>>>>>>>>>>> 29 >>>>>>>>>>>>>> Uint8Array(4) [ 0, 0, 0, 29 ]
5. wire1.lt_donthave.on('donthave') 接收到的索引为: 30
6. wire1 中的块 30 是否存在 false
6. wire1 中的块 29 是否存在 false
lt_donthave
插件的对某个index进行false标记过程:- wire1进行握手
wire1.handshake(leaves.parsedTorrent.infoHash, id1)
,执行Wire对象的handshake (infoHash, peerId, extensions)
函数,但是没有执行
if (this.peerExtensions.extended && !this._extendedHandshakeSent) {
// Peer's handshake indicated support already
// (incoming connection)
this._sendExtendedHandshake()
}
- 后续会执行
_onHandshake
,并在中途触发wire2.on('handshake'
,然后执行wire2的handshake。
// 设置 wire2 的握手事件处理程序
wire2.on('handshake', (infoHash, peerId, extensions) => {
// 在握手完成后,调用 wire2 的 handshake 方法
console.log("2. wire2.on('handshake') infoHash:",infoHash,"peerId:", peerId)
console.log("3. 在握手完成后,调用 wire2 的 handshake 方法,infoHash:",leaves.parsedTorrent.infoHash,"peerid:", id2)
wire2.handshake(leaves.parsedTorrent.infoHash, id2)
})
- 但是,与上次不同,这次的握手会调用
_sendExtendedHandshake()
,然后调用_onMessage
方法
_onMessage
_onMessage
方法用于解析并处理从远程对等体接收到的不同类型的消息。它根据消息的第一个字节(标识消息类型)来调用相应的处理函数,确保能够正确响应对等体的请求和状态变化。对于未知的消息类型,会记录调试信息并触发相应事件。
/**
* Handle a message from the remote peer.
* @param {Uint8Array} buffer // 接收到的消息缓冲区,类型为 Uint8Array
*/
_onMessage (buffer) {
// 解析消息的长度,调用 _onMessageLength 方法处理
this._parse(4, this._onMessageLength)
// 创建 DataView,用于从 buffer 中以不同格式读取数据
const view = new DataView(buffer.buffer, buffer.byteOffset, buffer.byteLength)
// 根据消息的第一个字节决定处理的方式
switch (buffer[0]) {
case 0: // "choke" 消息
return this._onChoke()
case 1: // "unchoke" 消息
return this._onUnchoke()
case 2: // "interested" 消息
return this._onInterested()
case 3: // "uninterested" 消息
return this._onUninterested()
case 4: // "have" 消息
return this._onHave(view.getUint32(1)) // 获取块索引并处理
case 5: // "bitfield" 消息
return this._onBitField(buffer.slice(1)) // 处理位图数据
case 6: // "request" 消息
return this._onRequest(
view.getUint32(1), // 获取块索引
view.getUint32(5), // 获取块的偏移
view.getUint32(9) // 获取块的长度
)
case 7: // "piece" 消息
return this._onPiece(
view.getUint32(1), // 获取块索引
view.getUint32(5), // 获取块的偏移
buffer.slice(9) // 获取块数据
)
case 8: // "cancel" 消息
return this._onCancel(
view.getUint32(1), // 获取块索引
view.getUint32(5), // 获取块的偏移
view.getUint32(9) // 获取块的长度
)
case 9: // "port" 消息
return this._onPort(view.getUint16(1)) // 获取端口号并处理
case 0x0D: // "suggest" 消息
return this._onSuggest(view.getUint32(1)) // 获取建议的块索引
case 0x0E: // "have all" 消息
return this._onHaveAll() // 处理拥有所有块的消息
case 0x0F: // "have none" 消息
return this._onHaveNone() // 处理不拥有任何块的消息
case 0x10: // "reject" 消息
return this._onReject(
view.getUint32(1), // 获取被拒绝的块索引
view.getUint32(5), // 获取偏移
view.getUint32(9) // 获取长度
)
case 0x11: // "allowed fast" 消息
return this._onAllowedFast(view.getUint32(1)) // 获取可以快速请求的块索引
case 20: // "extended" 消息
return this._onExtended(buffer[1], buffer.slice(2)) // 处理扩展消息
default: // 未知消息类型
this._debug('got unknown message') // 输出调试信息
return this.emit('unknownmessage', buffer) // 触发未知消息事件
}
}
_onExtended
_onExtended
方法处理来自远程对等体的扩展消息。首先,它检查是否为扩展握手消息(ext === 0
),然后解码并保存握手信息,处理扩展映射并调用相应的扩展处理器。如果消息是其他类型的扩展消息,则将其传递给相应的处理器。最后,它触发与扩展相关的事件,方便其他模块进行相应处理。
- 以下是代码实现:
_onExtended (ext, buf) {
// 检查扩展类型是否为0,表示扩展握手消息
if (ext === 0) {
let info
try {
// 尝试解码扩展握手消息的内容
info = bencode.decode(buf)
} catch (err) {
// 如果解码失败,输出调试信息并忽略该消息
this._debug('ignoring invalid extended handshake: %s', err.message || err)
}
// 如果信息为空,直接返回
if (!info) return
// 保存对等体的扩展握手信息
this.peerExtendedHandshake = info
// 检查握手信息中的扩展映射
if (typeof info.m === 'object') {
// 遍历每个扩展名,将其转换为数字并存储
for (const name in info.m) {
this.peerExtendedMapping[name] = Number(info.m[name].toString())
}
}
// 遍历已注册的扩展
for (const name in this._ext) {
// 如果对等体支持该扩展,则调用其握手处理方法
if (this.peerExtendedMapping[name]) {
this._ext[name].onExtendedHandshake(this.peerExtendedHandshake)
}
}
// 输出调试信息,表示收到了扩展握手
this._debug('got extended handshake')
// 触发扩展握手事件,即lt_donthave的wire2.on('extended', ext => {
this.emit('extended', 'handshake', this.peerExtendedHandshake)
} else {
// 如果扩展类型不是0,则处理其他扩展消息
if (this.extendedMapping[ext]) {
// 将扩展类型转换为友好的名称
ext = this.extendedMapping[ext]
// 检查是否有注册的扩展处理器
if (this._ext[ext]) {
// 调用对应扩展的消息处理方法
this._ext[ext].onMessage(buf)
}
}
// 输出调试信息,显示收到的扩展消息类型
this._debug('got extended message ext=%s', ext)
// 触发扩展消息事件
this.emit('extended', ext, buf)
}
}
- 以上代码的第46行
this._ext[ext].onMessage(buf)
调用onMessage
方法 - lt_donthave的onMessage方法(扩展中只有这个函数操作了标记是否有数据的peerPieces对象):
onMessage (buf) {
let index
try {
const view = new DataView(buf.buffer)
index = view.getUint32(0)
} catch (err) {
// drop invalid messages
return
}
if (!this._wire.peerPieces.get(index)) return
debug('got donthave %d', index)
this._wire.peerPieces.set(index, false)
this.emit('donthave', index)
this._failRequests(index)
}
使用示例
import BitField from 'bitfield'
import Protocol from 'bittorrent-protocol'
import net from 'net'
net.createServer(socket => {
var wire = new Protocol()
socket.pipe(wire).pipe(socket)
// handle handshake
wire.on('handshake', (infoHash, peerId) => {
wire.handshake(Buffer.from('my info hash'), Buffer.from('my peer id'))
// advertise that we have all 10 pieces of the torrent
const bitfield = new BitField(10)
for (let i = 0; i <= 10; i++) {
bitfield.set(i, true)
}
wire.bitfield(bitfield)
})
}).listen(6881)
- npm install lt_donthave
import BitField from 'bitfield'
import Protocol from 'bittorrent-protocol'
import net from 'net'
import lt_donthave from 'lt_donthave'
net.createServer(socket => {
const wire = new Protocol()
socket.pipe(wire).pipe(socket)
// initialize the extension
wire.use(lt_donthave())
// all `lt_donthave` functionality can now be accessed at wire.lt_donthave
wire.on('request', (pieceIndex, offset, length, cb) => {
// whoops, turns out we don't have any pieces after all
wire.lt_donthave.donthave(pieceIndex)
cb(new Error('not found'))
})
// 'donthave' event will fire when the remote peer indicates it no longer has a piece
wire.lt_donthave.on('donthave', index => {
// remote peer no longer has piece `index`
})
// handle handshake
wire.on('handshake', (infoHash, peerId) => {
wire.handshake(Buffer.from('my info hash'), Buffer.from('my peer id'))
// advertise that we have all 10 pieces of the torrent
const bitfield = new BitField(10)
for (let i = 0; i <= 10; i++) {
bitfield.set(i, true)
}
wire.bitfield(bitfield)
})
}).listen(6881)