mt logoMyToken
总市值:
0%
恐慌指数:
0%
币种:--
平台 --
ETH Gas:--
EN
USD
APP
Ap Store QR Code

Scan Download

精通IPFS:IPFS 保存内容之上篇

收藏
分享

经过 前面的分析 ,我们已经明白了 IPFS 启动过程,从今天起,我会分析一些常见的命令或动作,希望大家喜欢。
在开始真正分析这些命令/动作之前,先要对 pull-stream 类库进行简单介绍,如果不熟悉这个类库,接下来就没办法进行。

pull-stream 是一个新型的流库,数据被从源中拉取到目的中,它有两种基本类型的流:Source 源和 Sink 接收器。除此之外,有两种复合类型的流:Through 通道流(比如转换)和 Duplex 双向流。

source 流,这类流返回一个匿名函数,这个匿名函数被称为 read 函数,它被后续的 sink 流函数或 through 流函数调用,从而读取 source 流中的内容。

sink 流,这类流最终都返回内部 drain.js 中的 sink 函数。这类流主要是读取数据,并且对每一个读取到的数据进行处理,如果流已经结束,则调用用户指定结束函数进行处理。

through 流,这类流的函数会返回嵌套的匿名函数,第一层函数接收一个 source 流的 read 函数或其他 through 函数返回的第一层函数为参数,第二层函数接收最终 sink 提供的写函数或其他 through 返回的第二层函数,第二层函数内部调用 read 函数,从而直接或间接从 source 中取得数据,获取数据后直接或间接调用 sink 函数,从而把数据写入到目的地址。

在 pull-streams 中,数据在流动之前,必须有一个完整的管道,这意味着一个源、零个或多个通道、一个接收器。但是仍然可以创建一个部分化的管道,这非常有用。也就是说可以创建一个完整的管道,比如 pull(source, sink) => undefined ,也可以部分化的管道,比如 pull(through, sink) => sink ,或者 pull(through1, through2) => through ,我们在下面会大量遇到这种部分化的管道。 今天,我们看下第一个最常用的 add 命令/动作,我们使用 IPFS 就是为了把文件保存到 IPFS,自然少不了保存操作, add 命令就是干这个的,闲话少数,我们来看一段代码。

const {createNode} = require('ipfs')

const node = createNode({   libp2p:{     config:{       dht:{         enabled:true       }     }   } })

node.on('ready', async () => {

    const content = `我爱黑萤`;

    const filesAdded = await node.add({       content: Buffer.from(content)     },{       chunkerOptions:{         maxChunkSize:1000,         avgChunkSize:1000       }     })

    console.log('Added file:', filesAdded[0].path, filesAdded[0].hash) })

这次我们没有完全使用默认配置,开启了 DHT,看过我文章的读者都知道 DHT 是什么东东,这里不详细解释。在程序中,通过调用 IPFS 节点的 add 方法来上传内容,内容可以是文件,也可以是直接的内容,两者有稍微的区别,在讲到相关代码时,我们指出这种区别的,这里我们为了简单直接上传内容为例来说明。

add 方法位于 core/components/files-regular/add.js 文件中,在 《精通IPFS:系统启动之概览》 那篇文章中,我们说过,系统会把 core/components/files-regular 目录下的所有文件扩展到 IPFS 对象上面,这其中自然包括这里的 add.js 文件。下面,我们直接看这个函数的执行流程。

这个函数返回了一个内部定义的函数,在这个内部定义的函数中对参数做了一些处理,然后就调用内部的 add 函数,后者才是主体,它的逻辑如下:

  1. 首先,检查选项对象是否为函数,如果是,则重新生成相关的变量。
    if (typeof options === 'function') {
      callback = options
      options = {}
    }
    
  2. 定义检测内容的工具函数来检测我们要上传的内容。
    const isBufferOrStream = obj => Buffer.isBuffer(obj) || isStream.readable(obj) || isSource(obj)
    const isContentObject = obj => {
      if (typeof obj !== 'object') return false
      if (obj.content) return isBufferOrStream(obj.content)
      return Boolean(obj.path) && typeof obj.path === 'string'
    }

    const isInput = obj => isBufferOrStream(obj) || isContentObject(obj) const ok = isInput(data) || (Array.isArray(data) && data.every(isInput))

    if (!ok) {   return callback(new Error('invalid input: expected buffer, readable stream, pull stream, object or array of objects')) }

  3. 接下来,执行 pull-stream 类库提供的 pull 函数。我们来看 pull 函数的主要内容。它的第一个参数是 pull.values 函数执行的结果,这个 values 函数就是一个 source 流,它返回一个称为 read 的函数来读取我们提供的数据。这个 read 函数从数组中读取当前索引位置的值,以此值为参数,调用它之后的 through 函数第二层函数内部定义的回调函数或最终的 sink 函数内部定义的回调函数。如果数组已经读取完成,则直接以 true 为参数进行调用。

    第二个参数是 IPFS 对象的 addPullStream 方法,这个方法也是在启动时候使用同样的方法扩展到 IPFS 对象,它的主体是当前目录的 add-pull-stream.js 文件中的函数。接下来,我们会详细看这个函数,现在我们只需要知道这个函数返回了一个部分化的管道。

    第三个参数是 pull-sort 中定义的函数,这是一个依赖于 pull-stream 的库,根据一定规则来排序,这个函数我们不用管。

    最后一个参数是 pull.collect 函数执行的结果,这个 collect 函数就是一个 sink 流。它把最终的结果放入一个数组中,然后调用回调函数。我们在前面代码中看到的 filesAdded 之所以是一个数组就是拜这个函数所赐。

    上面逻辑的代码如下:

    pull(
      pull.values([data]),
      self.addPullStream(options),
      sort((a, b) => {
        if (a.path < b.path) return 1
        if (a.path > b.path) return -1
        return 0
      }),
      pull.collect(callback)
    )
    
    在上面的代码中,我们把要保存的内容构成一个数组,具体原因下面解释。
现在,我们来看 addPullStream 方法,这个方法是保存内容的主体, add 方法是只开胃小菜。 addPullStream 方法执行逻辑如下:
  1. 调用 parseChunkerString 函数,处理内容分块相关的选项。这个函数位于相同目录下的 utils.js 文件中,它检查用户指定的分块算法。如果用户没有指定,则使用固定分块算法,大小为系统默认的 262144;如果指定了大小,则使用固定分块算法,但大小为用户指定大小;如果指定为 rabin 类分割法,即变长分割法,则调用内部函数来生成对应的分割选项。上面逻辑代码如下:
    parseChunkerString = (chunker) => {
      if (!chunker) {
        return {
          chunker: 'fixed'
        }
      } else if (chunker.startsWith('size-')) {
        const sizeStr = chunker.split('-')[1]
        const size = parseInt(sizeStr)
        if (isNaN(size)) {
          throw new Error('Chunker parameter size must be an integer')
        }
        return {
          chunker: 'fixed',
          chunkerOptions: {
            maxChunkSize: size
          }
        }
      } else if (chunker.startsWith('rabin')) {
        return {
          chunker: 'rabin',
          chunkerOptions: parseRabinString(chunker)
        }
      } else {
        throw new Error(Unrecognized chunker option: ${chunker})
      }
    }
    
    注意:我们也可以通过重写这个函数来增加自己的分割算法。
  2. 合并整理选项变量。
    const opts = Object.assign({}, {
      shardSplitThreshold: self._options.EXPERIMENTAL.sharding
        ? 1000
        : Infinity
    }, options, chunkerOptions)
    
  3. 设置默认的 CID 版本号。如果指定了 Hash 算法,但是 CID 版本又不是 1,则强制设为 1。CID 是分布式系统的自描述内容寻址标识符,目前有两个版本 0 和 1,版本 0 是一个向后兼容的版本,只支持 sha256 哈希算法,并且不能指定。
    if (opts.hashAlg && opts.cidVersion !== 1) {
      opts.cidVersion = 1
    }
    
  4. 设置进度处理函数,默认空实现。
    const prog = opts.progress || noop
    const progress = (bytes) => {
      total += bytes
      prog(total)
    }

    opts.progress = progress

  5. pull 函数返回一个部分化的 pull-stream 流。这个部分化的 pull-stream 流是处理文件/内容保存的关键,我们仔细研究下。
    1. 首先调用 pull.map 方法对保存的内容进行处理。 pull.map 方法是 pull-stream 流中的一个 source 流,它对数组中的每个元素使用指定的处理函数进行处理。这就是我们在 add 函数中把需要保存的内容转化为数组的原因。在这里,对每个数组元素进行处理的函数是 normalizeContent 。这个函数定义在同一个文件中,它首先检查保存的内容是否为数组,如果不是则转化为数组;然后,对数组中的每一个元素进行处理,具体如下:
      • 如果保存的内容是 Buffer 对象,则把要保存的内容转化为路径为空字符串,内容为 pull-stream 流的对象。
        if (Buffer.isBuffer(data)) {
          data = { path: '', content: pull.values([data]) }
        }
        
      • 如果保存的内容是一个 Node.js 可读流,比如文件,则把要保存的转化为路径为空字符串,内容使用 stream-to-pull-stream 类的 source 方法库把 Node.js 可读流转化为 pull-stream 的 source 流对象。
        if (isStream.readable(data)) {
          data = { path: '', content: toPull.source(data) }
        }
        
      • 如果保存的内容是 pull-stream 的 source 流,则把要保存的内容转化为路径为空字符串,内容不变的对象。
        if (isSource(data)) {
          data = { path: '', content: data }
        }
        
      • 如果要保存的内容是一个对象,并且 content 属性存在,且不是函数,则进行如下处理:
        if (data && data.content && typeof data.content !== 'function') {
          if (Buffer.isBuffer(data.content)) {
            data.content = pull.values([data.content])
          }

          if (isStream.readable(data.content)) {     data.content = toPull.source(data.content)   } }

      • 如果指定的是路径,则进行下面的处理。
        if (opts.wrapWithDirectory && !data.path) {
          throw new Error('Must provide a path when wrapping with a directory')
        }

        if (opts.wrapWithDirectory) {   data.path = WRAPPER + data.path }

      • 返回最终生成的要保存的内容。
    2. 调用 pull.flatten() 方法,把前上步生成的数组进行扁平化处理。 flatten 方法是一个 through 流,主要是把多个流或数组流转换为一个流,比如把多个数组转换成一个数组,比如:
      [
        [1, 2, 3],
        [4, 5, 6],
        [7, 8, 9]
      ]
      
      这样的数组使用这个方法处理后,最终会变成下面的数组
      [1, 2, 3, 4, 5, 6, 7, 8, 9]
      
    3. 调用 importer 函数来保存内容。这个函数定义在 ipfs-unixfs-importer 类库中,这个类库是 IPFS 用于处理文件的布局和分块机制的 JavaScript 实现,具体如何保存内容,如何分块我们将在下篇文章中进行详细分析。
    4. 调用 pull.asyncMap 方法,对已经保存的文件/内容进行预处理,生成用户看到的内容。当程序执行到这里时,我们要保存的文件或内容已经保存在本地 IPFS 仓库,已经可以用使用 cat get ls 等命令来 API 来查看我们保存的内容或文件了。 asyncMap 方法是一个 through 流,类似于 map 流,但是有更好的性能。它会对每一个数组元素进行处理,这里处理函数为 prepareFile

      这个函数定义在同一个文件中,它的处理具体如下:

      • 使用已经生成文件的 multihash 内容生成 CID 对象。
        let cid = new CID(file.multihash)
        
        CID 构造方法会检查传入的参数,如果是 CID 对象,则直接从对象中取出版本号、编码方式、多哈希等属性;如果是字符串,则又分为是否被 multibase 编码过,如果是则需要先解码,然后再分离出各种属性,如果没有经过 multibase 编码,那么肯定是 base58 字符串,则设置版本为0,编码方式为 dag-pb ,再从 base58 串中获取多哈希值;如果是缓冲对象,则取得第一个字节,并按十六进制转化成整数,如果第一个字节是 0或1,则生成各自属性,否则为多哈希,则设置版本为0,编码方式为 dag-pb
      • 如果用户指定 CID 版本为 1,则生成 CID 对象到版本1.
        if (opts.cidVersion === 1) {
            cid = cid.toV1()
        }
        
      • 接下来,调用 waterfall 方法,顺序处理它指定的函数。第一个函数,检查配置选项是否指定了 onlyHash ,即不实际地上传文件到IFS网络,仅仅计算一下这个文件的 HASH,那么直接调用第二个函数,否则,调用 IPFS 对象的 object.get 方法来获取指定文件在仓库中保存的节点信息。这个方法我们后面会专门讲解,这里略去不讲。第二个函数,生成最终返回给用户的对象,这个对象包括了:path、size、hash 等。

        上面代码如下,比较简单,可自己阅读。

          waterfall([
            (cb) => opts.onlyHash
              ? cb(null, file)
              : self.object.get(file.multihash, Object.assign({}, opts, { preload: false }), cb),
            (node, cb) => {
              const b58Hash = cid.toBaseEncodedString()
          let size = node.size

          if (Buffer.isBuffer(node)) {     size = node.length   }

          cb(null, {     path: opts.wrapWithDirectory       ? file.path.substring(WRAPPER.length)       : (file.path || b58Hash),     hash: b58Hash,     size   }) }

          ], callback)
        
    5. 调用 pull.map 方法,把已经保存到本地的文件预加载到指定节点。 map 是一个 through 流,它会对每一个数组元素进行处理,这里处理函数为 preloadFile 。这个函数定义在同一个文件中,这会把已经保存的文件预加载到指定的节点,具体保存在哪些节点,可以参考《精通IPFS:系统启动之概览》篇中 preload.addresses ,也可以手动指定。
    6. 调用 pull.asyncMap 方法,把已经保存到本地的文件长期保存在本地,确保不被垃圾回收。 asyncMap 方法是一个 through 流,这里处理函数为 pinFile 。pin 操作后面我们会详细分析,这里略过不提,读者可以自行阅读相关代码。
免责声明:本文版权归原作者所有,不代表MyToken(www.mytokencap.com)观点和立场;如有关于内容、版权等问题,请与我们联系。