mt logoMyToken
总市值:
0%
恐慌指数:
0%
币种:--
平台 --
ETH Gas:--
EN
USD
APP
Ap Store QR Code

Scan Download

精通IPFS:IPFS 保存内容之下篇

收藏
分享

在上一篇文章中,我们指出在 builder/builder.js 文件中调用调用 pull 函数进行保存文件,这篇文章我们就来详细研究下这个过程。
  1. 设置源流为 file.content
  2. 调用 chunker 流,对保存的内容进行分块。通过前面的文章,我们知道 chunker 流的默认实现为 chunker/fixed-size.js ,它是一个 pull-through 流。这个流提供了两个函数,分别称为 onData onEnd ,前者在每次数据到来时调用,后者当数据发送完成时调用。 fixed-size.js 在初始化时,根据选项中指定的 maxChunkSize 属性设置每一个区块的大小。下面,我们来看下它的在它的 onData onEnd 两个方法。

onData 函数处理如下:

  • 每次收到数据之后就保存在 BufferList 中,同时把当前数据长度也加上读取到数据的长度。bl.append(buffer) currentLength += buffer.length
  • 如果当前数据长度大于等于规定的区块大小时,那么就进行下面的循环处理,直到当前数据长度小于规定的区块大小。
    • 从缓冲列表中取得规定的区块大小数据到队列中。
      this.queue(bl.slice(0, maxSize))
      
    • 如果缓冲列表的长度刚好等于规定的区块大小,那么重新一个新的缓冲区列表,并将当前数据长度设置为 0;否则,生成一个新的缓冲区列表,并从老缓冲区中区块大小处把数据读取到新的缓冲区列表中(0 到区块大小处的数据已经在上面读取过),同时设置其为老的缓冲区列表,并更新当前数据长度设减去前一步读取到区块大小长度,从而更缓冲区列表及其长度。
      if (maxSize === bl.length) {
        bl = new BufferList()
        currentLength = 0
      } else {
        const newBl = new BufferList()
        newBl.append(bl.shallowSlice(maxSize))
        bl = newBl
        currentLength -= maxSize
      }
      
    看完了 onData 方法,接下来我们再看 onEnd 函数,这个函数首先检查缓冲列表中是否有数据(少于区块大小),如果有则同样保存到队列中。
      if (currentLength) {
        this.queue(bl.slice(0, currentLength))
        emitted = true
      }

      if (!emitted) {     this.queue(Buffer.alloc(0))   }

      this.queue(null)

以上就 IPFS 中固定分块的逻辑,其实也很简单。
  • 调用 paraMap 流(类型为 pull-paramap),对每一个分块进行处理。当前面的流对文件进行分块之后,每一个分区都会下一个流进行拉取,在这里就是这个函数,我们看下这个函数是如何处理每一个分块的。它的主体是一个 waterfall 函数,这个函数正如其名字所示,每一个函数都进行各自的处理,并把结果传递给下一个函数,我们看下它的几个处理函数。

    首先,我们来看第一个函数,它主要用来创建 DAGNode ,并把相关信息传递给第二个函数,它的执行逻辑如下:

    • 生成一个 UnixFS 对象。
      const file = new UnixFS(options.leafType, buffer)
      
      UnixFS 是一种基于协议缓冲区的格式,用于描述IPFS中的文件,目录和符号链接。目前它支持:原始数据、目录、文件、原数据、符号连接、hamt-sharded-directory 等几种类型。

      leafType 默认为文件,在文件初始化时通过默认选项 defaultOptions 指定的。

    • 调用 DAGNode.create 静态方法,创建 DAGNode 节点,成功之后,把相信信息传递下一个函数。
      DAGNode.create(file.marshal(), [], (err, node) => {
        if (err) {
          return cb(err)
        }

        cb(null, {     size: node.size,     leafSize: file.fileSize(),     data: node   }) })

      UnixFS 的 marshal 方法主要内容是对文件内容(字节缓冲区)进行编码。这里 DAGNode 引用的是 ipld-dag-pb 库中的 dag-node/index.js 中定义的 DAGNode 函数对象,它的 create 方法,定义于同一个目录下的 create.js 中,我们来看下这个方法。它的主要内容是对文件的分区数据和对其他区块的连接 link 进行检查,并把两者序列后之后再创建 DAGNode 对象。而后者的构造函数比较简单,仅把区块的数据及与其他区块的连接(代表与其他区块的关系)保存起来。
    接下来,我们看第二个函数,它的主要作用是把生成的 DAGNode 保存到系统中,并把保存的结果传递给下一个函数,它的执行逻辑如下:
    • 调用 persist 方法,保存 DAG 节点。这是非常重要的一步,它不仅把区块对象保存在本地仓库,也涉及与是否把区块 CID 保存在与它最近的节点上,还涉及到把区块通过 bitswap 协义发送到那些想要它的节点中。它的执行如下:
      • 从选项中获取 CID 版本号、哈希算法、编码方式等。
        let cidVersion = options.cidVersion || defaultOptions.cidVersion
        let hashAlg = options.hashAlg || defaultOptions.hashAlg
        let codec = options.codec || defaultOptions.codec

        if (Buffer.isBuffer(node)) {     cidVersion = 1     codec = 'raw' }

        if (hashAlg !== 'sha2-256') {     cidVersion = 1 }

        默认情况下,版本号为0,哈希算法为 SHA256,编码方式为 dag-pb,这是一种基于 Protocol 规定的 JS 实现。
      • 如果选项中指定不保存而仅仅是计算哈希值,那么调用 ipld-dag-pb 库中的 util.js 中的 cid 函数,获取 DAG 节点的 CID,然后直接返回。
        if (options.onlyHash) {
            return cid(node, {
              version: cidVersion,
              hashAlg: hashAlg
            }, (err, cid) => {
              callback(err, {
                cid,
                node
              })
            })
        }
        
      • 如果不是只计算哈希,那么调用 IPLD 对象的 put 来保存 DAG 节点。
        ipld.put(node, {
                version: cidVersion,
                hashAlg: hashAlg,
                format: codec
            }, (error, cid) => {
            callback(error, {
                cid,
                node
            })
        })
        
        IPLD 对象定义于 ipld 库中。IPLD 在 IPFS 中具有非常重要的作用,它是 InterPlanetary Linked-Data 的缩写,代表了 IPFS 的野心与希望,把一切东西连结起来的愿望,目前可以边结比特币、以太坊、Zcash、git 等。它持有 ipfs-block-service,后者又持有 ipfs 仓库对象和 bitswap 对象,这几个对象构成了 ipfs 的核心。

        下面我们来看 put 方法,看它是怎么来保存 DAG 对象的。它的主体是调用内部方法获取当前 DAG 对象编码用的格式,然后使用与这种格式相匹配的 cid 方法来取得对象的 CID 对象,然后调用内部的 _put 来保存数据。

        this._getFormat(options.format, (err, format) => {
          if (err) return callback(err)

          format.util.cid(node, options, (err, cid) => {     if (err) {       return callback(err)     }

        if (options.onlyHash) {
          return callback(null, cid)
        }

        this._put(cid, node, callback)

          })
        })
        
        接下来,我们来看这个内部 _put 方法,这个方法主体是一个 waterfall 函数,它内部的几个函数分别根据 CID 对象获得对应的编码格式,然后使用编码格式对应的方法序列化 DAG 节点对象,最后生成区块 Block 对象,并调用区块服务对象的 put 方法来保存区块。

        区块服务对象定义于 ipfs-block-service 库,它的 put 方法,根据是否有 bitswap 对象(初始化是这个对象为空)来决定是调用仓库对象来保存区块,还是调用 bitswap 来保存区块。对于我们的例子来说,它会调用 bitswap 来保存区块。

        bitswap 对象的 put 方法,不仅会把区块保存在底层的 blockstore 中,还会把它发送给那些需要它的节点。它的主体是一个 waterfall 函数,其中第一个函数检查本地区块存储是否有这个区块,第二个根据本地是否有这个区块来确定是否忽略调用,还是真正来保存区块。

        waterfall([
          (cb) => this.blockstore.has(block.cid, cb),
          (has, cb) => {
            if (has) {
              return nextTick(cb)
            }
        this._putBlock(block, cb)
        
          }
        ], callback)
        
        bitswap 对象的 _putBlock 方法调用区块存储对象的 put 方法在本地仓库中保存区块对象,并在成功之后触发一个收到区块的事件,同时通过网络对象的 provide 方法,从而把 CID 保存在最近的节点中,然后调用引擎对象的 receivedBlocks 方法,把接收到的区块对象发送到所有想要这个区块的所有节点中。
        this.blockstore.put(block, (err) => {
          if (err) {
            return callback(err)
          }

          this.notifications.hasBlock(block)   this.network.provide(block.cid, (err) => {     if (err) {       this._log.error('Failed to provide: %s', err.message)     }   })

          this.engine.receivedBlocks([block.cid])   callback() })

        bitswap 对象中有两个重要的对象,一个是网络对象,一个是引擎对象。

        网络对象的 provide 方法直接调用 libp2p 对象的内容路由的同名方法来处理区块的 CID。libp2p 对象的内容路由中保存所有具体的路由方法,默认情况下,是空的,即没有任何路由方法,而我们通过在配置文件中,指定 libp2p.config.dht.enabled 为真,为内容路由指定了 DHT 路由,所以最终区块的 CID 会被保存在最合适的节点中。

        网络对象在初始方法中,指定了自身的两个方法作为 libp2p 对象的节点连接与断开事件的处理器,从而在连接与断开时获得相应的通知,并且还调用了 libp2p 对象的 handle 方法,从而使自己成为 libp2p 对象 /ipfs/bitswap/1.0.0 /ipfs/bitswap/1.1.0 这两种协义的处理对象,从而当 libp2p 收到这两种消息时,会调用网络对象对象的相应方法进行处理。

        网络对象处理 bitswap 协义是通过 pull 函数处理的,大致流程如下:从连接对象中获取消息,然后反序列化成为消息对象,然后通过连接对象获取它的节点信息对象,再然后调用 bitswap 对象的内部方法 _receiveMessage 处理传递进来的消息,而这个方法又会调用引擎对象的 messageReceived 方法来处理接收到的消息。

        引擎对象的 messageReceived 方法的大致流程如下:

        1)调用内部方法 _findOrCreate ,找到或创建远程对等节点的总账本对象 Ledger,如果是新创建的总账本对象,还要放入内部映射集合中,key 为远程对等节点的 Base58 字符串;

        2)如果这个消息是完全的消息,则生成一个新的想要请求列表。

        3)调用内部方法 _processBlocks ,处理消息中的区块对象。

        4)如果消息中的想要列表为空的,则退出方法。

        5)遍历消息中的想要列表,如果当前想要的实体被取消,则从对应的节点的总账本中去掉对应项,同时保存在取消项列表中;否则,把当前项保存在对应节点的总账本中,同时保存在想要列表中。

        6)调用内部方法 _cancelWants ,把任务中已经取消的过滤掉,即删除任务中已经取消的任务。

        7)调用内部方法 _addWants ,处理远程对等节点所有想要的列表。调用区块存储对象判断想要的项本地仓库中是否已经有,如果已经有,则生成相应的任务。

        引擎对象的 receivedBlocks 方法在收到具体区块时,检查所有已连接的远程节点(总账本对象),看它们是否想要这个区块,如果是则生成一个任务,在后台进行处理。

  • 调用 pullThrough 流(类型为 pull-through 流),对收到的每个数据进行处理。这个过程比较简单,这里不细讲。
  • 调用 reducer 流,把所有生成的分块进行归一处理。在默认情况下, reducer 流是在 balanced/index.js 中通过调用 balanced/balanced-reducer.js 中的 balancedReduceToRoot 的函数生成的。我们看下这个函数的执行过程:
    • 生成 pull-pair 对象和 pull-pushable 对象。
      const pair = pullPair()
      const source = pair.source

      const result = pushable()

    • 调用 reduceToParents 函数,建立内部 pull 流。函数的主体就是一个 pull 函数建立起来的流,它的几个函数如下:
      • 第一个函数是前面建立的 source 流。
      • 第二个函数是一个 pull-batch 类库定义的流,这是一个 pull-through 流,它实现了自己的 writer、ender 两个函数,它把每次获取到的数据保存在内部数组中,达到一定程序之后才会保存到 pull-through 流的队列中。
      • 第三个函数是 pull-stream 类库的 async-map 流,这是一个 through 流,与 map 流相似,但有更好的性能。它的归一处理函数 reduce 默认情况下为 builder/reduce.js 中返回的 reducefile 函数。它的流程如下:1)如果当前叶子节点数量是1,并且其 single 标志为真,并且选项中有配置把单独叶子归一到自身,那么直接调用回调对象;否则,执行下面的流。
        if (leaves.length === 1 && leaves[0].single && options.reduceSingleLeafToSelf) {
          const leaf = leaves[0]

          return callback(null, {     size: leaf.size,     leafSize: leaf.leafSize,     multihash: leaf.multihash,     path: file.path,     name: leaf.name   }) }

        2)创建父节点,并添加它的所有叶子节点。当文件比较大的时候,IPFS 会进行分块,每一个分块就构成了这里的叶子节点,最终这些叶子按照它们分块的顺序,生成对应的 DAGLink ,然后依次添加到父 DAGNode 中,这时候父 DAGNode 保存的不是文件内容,而是这些叶子节点的 DAGLink,从而构成文件的完整内容。
        const f = new UnixFS('file')

        const links = leaves.map((leaf) => {   f.addBlockSize(leaf.leafSize)

          return new DAGLink(leaf.name, leaf.size, leaf.multihash) })

        3)调用 waterfall 函数,顺序处理父节点。这个地方和处理单个分块类似,就是创建 DAGNode 对象、调用 persist 函数进行持久化处理。注意:这里的区别是父节点有叶子节点,即 links 不空。
        waterfall([
          (cb) => DAGNode.create(f.marshal(), links, cb),
          (node, cb) => persist(node, ipld, options, cb)
        ], (error, result) => {
          if (error) {
            return callback(error)
          }

          callback(null, {     size: result.node.size,     leafSize: f.fileSize(),     multihash: result.cid.buffer,     path: file.path,     name: ''   }) })

        4)上面 waterfall 函数处理完成后,调用回调函数进行继续处理。

        归一处理函数 reduce 中的回调函数是下面 collect 流即 sink 流中的读取回调函数,当归一函数读取到数据之后,调用这个回调函数,从而数据 pull 到 collect 流,进而进入 reduced 函数中进行处理。

      • 第四个函数是 pull-stream 类库的 collect 流,这是一个 sink 流。它的处理函数 reduced 流程如下:1)如果前面的流有错误,则直接调用 reduceToParents 函数的回调函数进行处理;

        2)否则,如果当前收到的数据长度大于1,即前面归一处理之后,还是有多个根 DAGNode,则调用 reduceToParents 函数继续进行归一处理;

        3)否则,调用 reduceToParents 函数的回调函数进行处理。

        reduceToParents 函数的回调函数,这是一个很关键的函数,在这个函数内部把读取到的数据写入 result 表示的 pull-pushable 流,以便在它后面的外部流流获取数据。

    • 返回双向流对象。这里返回的双向流对象为
      {
          sink: pair.sink,
          source: result
      }
      
      其中 sink pull-pair 类库中定义的 sink 流,它被外部的 pull 函数调用用来从前面一个流中读取数据; source pull-pushable 类库中的流,在 reduceToParents 函数的回调函数中被 push 数据,从而外部的 pull 函数中相关的流可以从它中读取函数。
  • 调用 collect 流,在这个流的处理函数中,把保存文件的结果传递到外部函数中。
    collect((err, roots) => {
        if (err) {
          callback(err)
        } else {
          callback(null, roots[0])
        }
    })
    
    这里的 callback 是调用 createAndStoreFile 函数时传递进来的,而它的调用是在 builer/builder.js 文件中,简单回顾一下调用代码:
    createAndStoreFile(item, (err, node) => {
      if (err) {
        return cb(err)
      }
      if (node) {
        source.push(node)
      }
      cb()
    })
    
    这里的匿名回调函数即是上面的 callback ,在回调函数中,通过保存文件的结果写入 source 流中,从而把数据传递到更外层的 pull 流中。
  • 到这里,我们已经把保存文件/内容这一核心流程完整分析了一遍,从头看到尾的你是不是收获很大。接下来,敬请期待下篇获取内容。

    点击回顾:

    精通IPFS:IPFS 保存内容之上篇

    精通IPFS:IPFS 保存内容之中篇

    免责声明:本文版权归原作者所有,不代表MyToken(www.mytokencap.com)观点和立场;如有关于内容、版权等问题,请与我们联系。