精通IPFS:IPFS 获取内容之下篇
streamBytes
函数,根据偏移量、长度及节点的连接数组,获取指定的内容。在获取文件过程中,通过调用
streamBytes
函数来获取整个文件的内容。
streamBytes
函数调用完成后返回的
pull
函数生成的 through 流就是我们要读取内容的流,它最终被传递给 IPFS
core/components/get-pull-stream.js
中返回的
pull
函数生成的 through 流,而这个流又被同目录下
get.js
文件中 pull-stream 类库中的
asyncMap
流的处理函数转换为完整的缓冲区,从而被最终的应用的所使用,这段程序代码如下:
上篇文章的内容,我们回忆到这里就结束了,下面我们仔细研究
pull(
self.getPullStream(ipfsPath, options),
pull.asyncMap((file, cb) => {
if (file.content) {
pull(
file.content,
pull.collect((err, buffers) => {
if (err) { return cb(err) }
file.content = Buffer.concat(buffers)
cb(null, file)
})
)
} else {
cb(null, file)
}
}),
pull.collect(callback)
)
streamBytes
函数及相关的深度遍历是如何实现的。
streamBytes
函数使用了 pull-traverse 类库提供深度优先、广度优先、叶子优先等算法,它的每个算法都返回一个 pull 类库的
through
流,这个流被它后面的流所调用。在这里使用深度优先算法,返回的流被 pull 类库的
map
流所调用,用于获取每一个元素。
深度优先算法的相关代码如下:
var once = exports.once =
function (value) {
return function (abort, cb) {
if(abort) return cb(abort)
if(value != null) {
var _value = value; value = null
cb(null, _value)
} else
cb(true)
}
}var depthFirst = exports.depthFirst =
function (start, createStream) {
var reads = [], ended
reads.unshift(once(start))
return function next (end, cb) {
if(!reads.length)
return cb(true)
if(ended)
return cb(ended)
reads[0](end, function (end, data) {
if(end) {
if(end !== true) {
ended = end
reads.shift()
while(reads.length)
reads.shift()(end, function () {})
return cb(end)
}
reads.shift()
return next(null, cb)
}
reads.unshift(createStream(data))
cb(end, data)
})
}
}
streamBytes
函数定义在
file.js
文件中,我们来看下它的内容:
function streamBytes (dag, node, fileSize, offset, length) {
if (offset === fileSize || length === 0) {
return once(Buffer.alloc(0))
} const end = offset + length
return pull(
traverse.depthFirst({
node,
start: 0,
end: fileSize
}, getChildren(dag, offset, end)),
map(extractData(offset, end)),
filter(Boolean)
)
}
根据深度优先算法代码我们可知道,它首先把第一个参数包装成一个 pull 类库的
once
流,在这里即把我们的根 DAG 节点包装成一个
once
流,然后作为内部数组的第一个元素,最后返回 pull 类库的
through
流。
我们把返回类似当 pull 类库的function next (end, cb) {}
签名的函数称为 pull 类库的through
流,这个函数被称为读函数。因为它会被后面的流所调用,用来从流中读取数据,当读取数据之后,这个函数通过参数中指定的回调函数把数据传递给后面的流,即传递给调用自己的流。
map
函数返回的
through
流调用深度遍历函数所返回的读取函数时,该读取函数执行如下:
-
如果内部数组中没有数据可以读取,那么调用
map
函数返回的through
流的读取函数,并返回。if(!reads.length) return cb(true)
-
如果
ended
变量为真,那么以这个变量为参数调用map
函数返回的through
流的读取函数,并返回。if(ended) return cb(ended)
-
最后,调用内部数组的第一个元素(类型为 pull 类库的
through
流的读取函数)来读取数据。当读取到数据之后,调用自定义的内部函数来处理数据,在这个内部函数中处理如下:-
如果当前读取完成,即
end
为真时,执行下面逻辑。如果end
不是严格真(出现在变量为字符串true
情况),那么:设置变量ended
为end
的值;删除数组中的第一个元素;如果数组长度不为0,那么持续删除第一个元素(类型为函数),并且调用这个删除的元素;当数组长度为空时,调用回调函数进行处理。否则,即end
严格为真,从数组中删除第一个元素,因为这意味着数组的当前元素的已经处理完成,所以需要调用外层函数继续从数组中读取数据。if(end) { if(end !== true) { ended = end reads.shift()
}while(reads.length) reads.shift()(end, function () {})
return cb(end) }
reads.shift() return next(null, cb)
-
调用
createStream
函数来处理读取到的数据,这个createStream
函数即是我们提供给深度遍历算法的第二个参数getChildren
函数返回的内部函数。getChildren
函数返回的内部函数最终会返回一个pull
函数生成的 through 流。在这个流中通过 pull-stream 类库的flatten
流会把获取到的每个节点及其内部节点最终转换成一个数组形式,比如把
这样的形式转化成下面的形式[1, 2, 3], [4, 5, 6], [7, 8]
这里 [1, 2, 3] 可以认为是第一个 Link 碎片,它下面又有三个包含最终数据的 DAG 节点;[4, 5, 6] 是第二个 Link 碎片,它下面也有三个包含最终数据的 DAG 节点; [7, 8] 是第三个 Link 碎片,它下面只有二个包含最终数据的 DAG 节点。[1, 2, 3, 4, 5, 6, 7, 8]
我们可以发现,通过深度遍历优先算法及其处理函数
getChildren
返回的内部函数流,我们会分别获取每个碎片及其保存的子碎片,并且把它们以正确的顺序排列在一起形成数组,从而最终获取到了 DAG 节点的完整数据。
-
如果当前读取完成,即
getChildren
函数返回的内部函数处理方法如下:
-
如果当前节点对象是一个缓冲区对象,即当前节点是一个叶子节点,那么直接返回一个空的流,因为没有办法再次进行遍历。
if (Buffer.isBuffer(node)) { return empty() }
-
调用静态方法把当前节点对象转换成一个文件对象。
let file
try { file = UnixFS.unmarshal(node.data) } catch (err) { return error(err) }
-
判断流的开始位置。
const nodeHasData = Boolean(file.data && file.data.length) if (nodeHasData && node.links.length) { streamPosition += file.data.length }
-
处理当前节点包含的 Link 信息,并过滤掉不在指定范围内的 Link 信息,然后按顺序返回 Link 信息数组。
const filteredLinks = node.links .map((link, index) => { const child = { link: link, start: streamPosition, end: streamPosition + file.blockSizes[index], size: file.blockSizes[index] }
streamPosition = child.end return child
}) .filter((child) => { return (offset >= child.start && offset < child.end) || // child has offset byte (end > child.start && end <= child.end) || // child has end byte (offset < child.start && end > child.end) // child is between offset and end bytes })
-
如果最终返回的 Link 信息数组存在,则设置流的起始位置为第一个 Link 信息的开头位置。
if (filteredLinks.length) { streamPosition = filteredLinks[0].start }
-
返回一个
pull
函数构成的流。return pull( once(filteredLinks), paramap((children, cb) => { dag.getMany(children.map(child => child.link.cid), (err, results) => { if (err) { return cb(err) }
cb(null, results.map((result, index) => { const child = children[index]
return { start: child.start, end: child.end, node: result, size: child.size } })) })
在这个流中,}), flatten() )
paramap
函数返回的流会调用once
函数返回的一次性流,once
函数返回的一次性流会把 Link 信息数组传递给前者。而前者的处理函数会对 Link 信息数组中的每个碎片进行处理(这里只有一个 Link 信息数组,即只有children
数组,而不是多个children
数组,但是children
数组包含了所有 Link 信息)。在paramap
函数返回的流的处理函数中调用 IPLD 对象的getMany
获取每个 Link 节点的数据,并对返回的数据进行整理,然后以整理后的数组为参数,调用下一个流---即flatten
流的读取函数中指定的---回调函数,把最终的数组传递给它。最终,数组被flatten
流进行扁平化处理后,传递给外部的pull
函数中的流,即前面所看到的 pull 类库的map
流的read
函数中指定的那个函数,在这个函数中又会调用我们提供的extractData
函数返回的内部函数来处理每一个节点对象。
extractData
函数返回的内部函数比较简单,主要是对获取到的每个碎片数据进行处理,然后返回对应的数组,它的代码如下,读者可自行分析,这里不再细讲。
function getData ({ node, start, end }) {
let block if (Buffer.isBuffer(node)) {
block = node
} else {
try {
const file = UnixFS.unmarshal(node.data)
if (!file.data) {
if (file.blockSizes.length) {
return
}
return Buffer.alloc(0)
}
block = file.data
} catch (err) {
throw new Error(`Failed to unmarshal node - ${err.message}`)
}
}
if (block && block.length) {
if (streamPosition === -1) {
streamPosition = start
}
const output = extractDataFromBlock(block, streamPosition, requestedStart, requestedEnd)
streamPosition += block.length
return output
}
return Buffer.alloc(0)
}