Skip to content

stream 实践

stream 实现方式

  • fs.createReadStream node老牌写法
ts
import http, { IncomingMessage, ServerResponse } from 'node:http'
import fs from 'node:fs'
import { open } from 'node:fs/promises'
import path from 'node:path'
import archiver from 'archiver'
import { Writable, Readable } from 'node:stream'
import { pipeline } from 'node:stream/promises'

const PORT = 3000

type RouteHandler = (
  req: IncomingMessage,
  res: ServerResponse
) => void | Promise<void>

interface ZipFileOptions {
  filePath: string
  zipFileName?: string
  entryName?: string
}

interface SendFileOptions {
  filePath: string
  downloadFileName?: string
  contentType?: string
}

/**
 * 统一发送文本响应
 */
function sendText(
  res: ServerResponse,
  statusCode: number,
  text: string
): void {
  res.statusCode = statusCode
  res.setHeader('Content-Type', 'text/plain; charset=utf-8')
  res.end(text)
}

/**
 * 统一发送 JSON 响应
 */
function sendJSON(
  res: ServerResponse,
  statusCode: number,
  data: unknown
): void {
  res.statusCode = statusCode
  res.setHeader('Content-Type', 'application/json; charset=utf-8')
  res.end(JSON.stringify(data))
}

/**
 * 检查文件是否存在
 */
function ensureFileExists(res: ServerResponse, filePath: string): boolean {
  if (!fs.existsSync(filePath)) {
    sendText(res, 404, 'File not found')
    return false
  }

  return true
}

/**
 * 动态压缩单个文件并输出到响应流
 */
function streamZipFile(
  res: ServerResponse,
  options: ZipFileOptions
): void {
  const {
    filePath,
    zipFileName = 'archive.zip',
    entryName = path.basename(filePath)
  } = options

  if (!ensureFileExists(res, filePath)) {
    return
  }

  res.statusCode = 200
  res.setHeader('Content-Type', 'application/zip')
  res.setHeader('Content-Disposition', `attachment; filename="${zipFileName}"`)

  const archive = archiver('zip', {
    zlib: { level: 9 }
  })

  archive.on('error', (err: Error) => {
    console.error('ZIP error:', err)

    if (!res.headersSent) {
      sendText(res, 500, 'Error creating ZIP archive')
      return
    }

    res.destroy(err)
  })

  archive.on('end', () => {
    console.log('ZIP archive created successfully')
  })

  archive.pipe(res)
  archive.file(filePath, { name: entryName })
  void archive.finalize()
}

async function sendGenerateChunksByNodeStream(
  res: ServerResponse,
): Promise<void> {
  try {
    // 把“程序逐步生成的数据”也当成流。
    async function* generateChunks() {
      yield 'part1\n'
      yield 'part2\n'
      yield 'part3\n'
    }
    const generateChunksStream = Readable.from(generateChunks())
    res.setHeader('Content-Type', 'text/plain');
    res.setHeader('Content-Disposition', `attachment; filename="generateChunks.text"`);
    await pipeline(generateChunksStream, res);

  } catch (err) {
    console.error('sendgenerateChunksByNodeStream failed.', err);
  }
}

async function sendStringByNodeStream(
  res: ServerResponse,
): Promise<void> {
  try {
    const largeString = '********1111111111111111';
    // Readable.from(data) 返回的是 Node Readable,不是 Web ReadableStream
    // Readable.from(data):更适合把内存里已有的数据(string、Buffer、Array、Iterable、AsyncIterable)包装成可读流
    const stringStream = Readable.from(largeString);

    res.setHeader('Content-Type', 'text/plain');
    res.setHeader('Content-Disposition', `attachment; filename="string.text"`);
    res.setHeader('Content-Length', largeString.length);

    // stringStream.pipe(res);
    await pipeline(stringStream, res);
  } catch (err) {
    console.error('sendStringByNodeStream failed.', err);
  }
}

/**
 * 旧写法:Node.js Stream
 * 使用 fs.createReadStream(filePath).pipe(res)
 */
async function sendFileByNodeStream(
  res: ServerResponse,
  options: SendFileOptions
): Promise<void> {
  const {
    filePath,
    downloadFileName = path.basename(filePath),
    contentType = 'application/octet-stream'
  } = options

  if (!ensureFileExists(res, filePath)) {
    return
  }

  res.statusCode = 200
  res.setHeader('Content-Type', contentType)
  res.setHeader('Content-Disposition', `attachment; filename="${downloadFileName}"`)
  res.setHeader('Content-Length', (await fs.promises.stat(filePath)).size)

  // fs.createReadStream(...):更适合从文件这种外部数据源里读取
  const readStream = fs.createReadStream(filePath)

  readStream.on('error', (err) => {
    console.error('Node stream file error:', err)

    if (!res.headersSent) {
      sendText(res, 500, 'Error reading file')
      return
    }

    res.destroy(err)
  })
  // 把文件内容直接写到 HTTP 响应里。
  readStream.pipe(res)
  // await pipeline(readStream, res)
}

// 把一个纯 Web 风格的底层字节缓冲区,包成 Node 风格的二进制块。
const normalizeChunk = new TransformStream({
  transform(chunk, controller) {
    if (chunk instanceof ArrayBuffer) {
      controller.enqueue(Buffer.from(chunk))
      return
    }
    // chunk 不是裸 ArrayBuffer,但它是“建立在 ArrayBuffer 之上的视图类型”。
    // Uint8Array、Uint16Array、DataView
    if (ArrayBuffer.isView(chunk)) {
      controller.enqueue(
        Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength)
      )
      return
    }

    controller.enqueue(chunk)
  }
})


/**
 * 新写法:Web Streams
 * 使用 fileHandle.readableWebStream() + Writable.toWeb(res)
 */
async function sendFileByWebStream(
  res: ServerResponse,
  options: SendFileOptions
): Promise<void> {
  const {
    filePath,
    downloadFileName = path.basename(filePath),
    contentType = 'application/octet-stream'
  } = options

  if (!ensureFileExists(res, filePath)) {
    return
  }

  res.statusCode = 200
  res.setHeader('Content-Type', contentType)
  res.setHeader('Content-Disposition', `attachment; filename="${downloadFileName}"`)
  res.setHeader('Content-Length', (await fs.promises.stat(filePath)).size)


  let fileHandle: Awaited<ReturnType<typeof open>> | null = null

  try {
    // fileHandle 理解为 一个“已经打开的文件控制器/句柄”
    // 它不是文件内容本身,而是一个“以后可以继续读这个文件”的操作对象。
    fileHandle = await open(filePath, 'r')
    // 把这个文件句柄,转换成一个 Web 标准的可读流。
    // 这个 webReadable 是 文件字节流
    const webReadable = fileHandle.readableWebStream()
    // 把 Node 的 ServerResponse,包装成一个 Web 标准的可写流。
    // res 原本是 Node 风格的 writable stream
    // pipeTo() 需要的是 Web WritableStream
    const webWritable = Writable.toWeb(res)
    // 先通过 normalizeChunk 把 webReadable 里的数据,转换成 Buffer
    // 如果不进行转换,那么 webReadable “读出来的 chunk 类型”和“ServerResponse 最终能写入的类型”不匹配
    // 把 webReadable 里的数据,持续写入 webWritable,直到完成。
    await webReadable.pipeThrough(normalizeChunk as any).pipeTo(webWritable)
  } catch (err) {
    console.error('Web stream file error:', err)

    if (!res.headersSent) {
      sendText(res, 500, 'Error reading file')
      return
    }

    res.destroy(err as Error)
  } finally {
    if (fileHandle) {
      // 不管成功还是失败,只要文件已经打开了,就尽量关掉。
      await fileHandle.close().catch((closeErr) => {
        console.error('Close fileHandle error:', closeErr)
      })
    }
  }
}

/**
 * 路由表
 */
const routes: Record<string, RouteHandler> = {
  '/': (_req, res) => {
    sendText(res, 200, 'Hello, this is the server!')
  },

  '/health': (_req, res) => {
    sendJSON(res, 200, {
      success: true,
      message: 'Server is running'
    })
  },

  /**
   * 动态压缩 .Lab 文件后返回 zip
   */
  '/getZipFile': (_req, res) => {
    // 原始文件:用于“动态压缩后返回”
    const LAB_FILE_PATH = path.join(__dirname, 'files', '1937330293275095040.Lab')
    streamZipFile(res, {
      filePath: LAB_FILE_PATH,
      zipFileName: 'archive.zip',
      entryName: '1937330293275095040.Lab'
    })
  },
  /**
   * 直接返回现成字符串(Node Stream 写法)
   */
  '/getStringByNodeStream': async (req, res) => {
    console.log('getStringByNodeStream')
    await sendStringByNodeStream(res)
  },

  /**
   * 直接返回生成器字符串(Node Stream 写法)
   */
  '/getGenerateChunksByNodeStream': async (req, res) => {
    console.log('getGenerateChunksByNodeStream')
    await sendGenerateChunksByNodeStream(res)
  },

  /**
   * 直接返回现成 zip 文件(Node Stream 写法)
   */
  '/getFileByNodeStream': async (req, res) => {
    console.log('getFileByNodeStream')

    const host = req.headers.host ?? 'localhost:3000'
    const url = new URL(req.url ?? '/', `http://${host}`)
    const version = url.searchParams.get('version')

    const filePath = path.join(__dirname, 'files',  `${version}.zip`)
    await sendFileByNodeStream(res, {
      filePath,
      downloadFileName: `${version}.zip`,
      contentType: 'application/octet-stream'
    })
  },

  /**
   * 直接返回现成 zip 文件(Web Streams 写法)
   */
  '/getFileByWebStream': async (req, res) => {
    console.log('getFileByWebStream')
    const host = req.headers.host ?? 'localhost:3000'
    const url = new URL(req.url ?? '/', `http://${host}`)
    const version = url.searchParams.get('version')

    const filePath = path.join(__dirname, 'files',  `${version}.zip`)
    await sendFileByWebStream(res, {
      filePath,
      downloadFileName: `${version}.zip`,
      contentType: 'application/octet-stream'
    })
  }
}

/**
 * 创建服务
 */
const server = http.createServer(async (req: IncomingMessage, res: ServerResponse) => {
  const host = req.headers.host ?? `localhost:${PORT}`
  const url = new URL(req.url ?? '/', `http://${host}`)
  const pathname = url.pathname

  const handler = routes[pathname]

  if (!handler) {
    sendText(res, 404, 'Not found')
    return
  }

  try {
    await handler(req, res)
  } catch (error) {
    console.error('Server error:', error)

    if (!res.headersSent) {
      sendText(res, 500, 'Internal server error')
      return
    }

    res.destroy(error as Error)
  }
})

server.listen(PORT, () => {
  console.log(`Server running at http://localhost:${PORT}/`)
})
  • stream/web 较新的 Web Streams 风格写法
ts
import http, { IncomingMessage, ServerResponse } from 'node:http'
import fs from 'node:fs'
import { open } from 'node:fs/promises'
import path from 'node:path'
import archiver from 'archiver'
import { Writable, Readable } from 'node:stream'
import { pipeline } from 'node:stream/promises'

const PORT = 3000

type RouteHandler = (
  req: IncomingMessage,
  res: ServerResponse
) => void | Promise<void>

interface ZipFileOptions {
  filePath: string
  zipFileName?: string
  entryName?: string
}

interface SendFileOptions {
  filePath: string
  downloadFileName?: string
  contentType?: string
}

/**
 * 统一发送文本响应
 */
function sendText(
  res: ServerResponse,
  statusCode: number,
  text: string
): void {
  res.statusCode = statusCode
  res.setHeader('Content-Type', 'text/plain; charset=utf-8')
  res.end(text)
}

/**
 * 统一发送 JSON 响应
 */
function sendJSON(
  res: ServerResponse,
  statusCode: number,
  data: unknown
): void {
  res.statusCode = statusCode
  res.setHeader('Content-Type', 'application/json; charset=utf-8')
  res.end(JSON.stringify(data))
}

/**
 * 检查文件是否存在
 */
function ensureFileExists(res: ServerResponse, filePath: string): boolean {
  if (!fs.existsSync(filePath)) {
    sendText(res, 404, 'File not found')
    return false
  }

  return true
}

/**
 * 动态压缩单个文件并输出到响应流
 */
function streamZipFile(
  res: ServerResponse,
  options: ZipFileOptions
): void {
  const {
    filePath,
    zipFileName = 'archive.zip',
    entryName = path.basename(filePath)
  } = options

  if (!ensureFileExists(res, filePath)) {
    return
  }

  res.statusCode = 200
  res.setHeader('Content-Type', 'application/zip')
  res.setHeader('Content-Disposition', `attachment; filename="${zipFileName}"`)

  const archive = archiver('zip', {
    zlib: { level: 9 }
  })

  archive.on('error', (err: Error) => {
    console.error('ZIP error:', err)

    if (!res.headersSent) {
      sendText(res, 500, 'Error creating ZIP archive')
      return
    }

    res.destroy(err)
  })

  archive.on('end', () => {
    console.log('ZIP archive created successfully')
  })

  archive.pipe(res)
  archive.file(filePath, { name: entryName })
  void archive.finalize()
}

async function sendGenerateChunksByNodeStream(
  res: ServerResponse,
): Promise<void> {
  try {
    // 把“程序逐步生成的数据”也当成流。
    async function* generateChunks() {
      yield 'part1\n'
      yield 'part2\n'
      yield 'part3\n'
    }
    const generateChunksStream = Readable.from(generateChunks())
    res.setHeader('Content-Type', 'text/plain');
    res.setHeader('Content-Disposition', `attachment; filename="generateChunks.text"`);
    await pipeline(generateChunksStream, res);

  } catch (err) {
    console.error('sendgenerateChunksByNodeStream failed.', err);
  }
}

async function sendStringByNodeStream(
  res: ServerResponse,
): Promise<void> {
  try {
    const largeString = '********1111111111111111';
    // Readable.from(data) 返回的是 Node Readable,不是 Web ReadableStream
    // Readable.from(data):更适合把内存里已有的数据(string、Buffer、Array、Iterable、AsyncIterable)包装成可读流
    const stringStream = Readable.from(largeString);

    res.setHeader('Content-Type', 'text/plain');
    res.setHeader('Content-Disposition', `attachment; filename="string.text"`);
    res.setHeader('Content-Length', largeString.length);

    // stringStream.pipe(res);
    await pipeline(stringStream, res);
  } catch (err) {
    console.error('sendStringByNodeStream failed.', err);
  }
}

/**
 * 旧写法:Node.js Stream
 * 使用 fs.createReadStream(filePath).pipe(res)
 */
async function sendFileByNodeStream(
  res: ServerResponse,
  options: SendFileOptions
): Promise<void> {
  const {
    filePath,
    downloadFileName = path.basename(filePath),
    contentType = 'application/octet-stream'
  } = options

  if (!ensureFileExists(res, filePath)) {
    return
  }

  res.statusCode = 200
  res.setHeader('Content-Type', contentType)
  res.setHeader('Content-Disposition', `attachment; filename="${downloadFileName}"`)
  res.setHeader('Content-Length', (await fs.promises.stat(filePath)).size)

  // fs.createReadStream(...):更适合从文件这种外部数据源里读取
  const readStream = fs.createReadStream(filePath)

  readStream.on('error', (err) => {
    console.error('Node stream file error:', err)

    if (!res.headersSent) {
      sendText(res, 500, 'Error reading file')
      return
    }

    res.destroy(err)
  })
  // 把文件内容直接写到 HTTP 响应里。
  readStream.pipe(res)
  // await pipeline(readStream, res)
}

// 把一个纯 Web 风格的底层字节缓冲区,包成 Node 风格的二进制块。
const normalizeChunk = new TransformStream({
  transform(chunk, controller) {
    if (chunk instanceof ArrayBuffer) {
      controller.enqueue(Buffer.from(chunk))
      return
    }
    // chunk 不是裸 ArrayBuffer,但它是“建立在 ArrayBuffer 之上的视图类型”。
    // Uint8Array、Uint16Array、DataView
    if (ArrayBuffer.isView(chunk)) {
      controller.enqueue(
        Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength)
      )
      return
    }

    controller.enqueue(chunk)
  }
})


/**
 * 新写法:Web Streams
 * 使用 fileHandle.readableWebStream() + Writable.toWeb(res)
 */
async function sendFileByWebStream(
  res: ServerResponse,
  options: SendFileOptions
): Promise<void> {
  const {
    filePath,
    downloadFileName = path.basename(filePath),
    contentType = 'application/octet-stream'
  } = options

  if (!ensureFileExists(res, filePath)) {
    return
  }

  res.statusCode = 200
  res.setHeader('Content-Type', contentType)
  res.setHeader('Content-Disposition', `attachment; filename="${downloadFileName}"`)
  res.setHeader('Content-Length', (await fs.promises.stat(filePath)).size)


  let fileHandle: Awaited<ReturnType<typeof open>> | null = null

  try {
    // fileHandle 理解为 一个“已经打开的文件控制器/句柄”
    // 它不是文件内容本身,而是一个“以后可以继续读这个文件”的操作对象。
    fileHandle = await open(filePath, 'r')
    // 把这个文件句柄,转换成一个 Web 标准的可读流。
    // 这个 webReadable 是 文件字节流
    const webReadable = fileHandle.readableWebStream()
    // 把 Node 的 ServerResponse,包装成一个 Web 标准的可写流。
    // res 原本是 Node 风格的 writable stream
    // pipeTo() 需要的是 Web WritableStream
    const webWritable = Writable.toWeb(res)
    // 先通过 normalizeChunk 把 webReadable 里的数据,转换成 Buffer
    // 如果不进行转换,那么 webReadable “读出来的 chunk 类型”和“ServerResponse 最终能写入的类型”不匹配
    // 把 webReadable 里的数据,持续写入 webWritable,直到完成。
    await webReadable.pipeThrough(normalizeChunk as any).pipeTo(webWritable)
  } catch (err) {
    console.error('Web stream file error:', err)

    if (!res.headersSent) {
      sendText(res, 500, 'Error reading file')
      return
    }

    res.destroy(err as Error)
  } finally {
    if (fileHandle) {
      // 不管成功还是失败,只要文件已经打开了,就尽量关掉。
      await fileHandle.close().catch((closeErr) => {
        console.error('Close fileHandle error:', closeErr)
      })
    }
  }
}

/**
 * 路由表
 */
const routes: Record<string, RouteHandler> = {
  '/': (_req, res) => {
    sendText(res, 200, 'Hello, this is the server!')
  },

  '/health': (_req, res) => {
    sendJSON(res, 200, {
      success: true,
      message: 'Server is running'
    })
  },

  /**
   * 动态压缩 .Lab 文件后返回 zip
   */
  '/getZipFile': (_req, res) => {
    // 原始文件:用于“动态压缩后返回”
    const LAB_FILE_PATH = path.join(__dirname, 'files', '1937330293275095040.Lab')
    streamZipFile(res, {
      filePath: LAB_FILE_PATH,
      zipFileName: 'archive.zip',
      entryName: '1937330293275095040.Lab'
    })
  },
  /**
   * 直接返回现成字符串(Node Stream 写法)
   */
  '/getStringByNodeStream': async (req, res) => {
    console.log('getStringByNodeStream')
    await sendStringByNodeStream(res)
  },

  /**
   * 直接返回生成器字符串(Node Stream 写法)
   */
  '/getGenerateChunksByNodeStream': async (req, res) => {
    console.log('getGenerateChunksByNodeStream')
    await sendGenerateChunksByNodeStream(res)
  },

  /**
   * 直接返回现成 zip 文件(Node Stream 写法)
   */
  '/getFileByNodeStream': async (req, res) => {
    console.log('getFileByNodeStream')

    const host = req.headers.host ?? 'localhost:3000'
    const url = new URL(req.url ?? '/', `http://${host}`)
    const version = url.searchParams.get('version')

    const filePath = path.join(__dirname, 'files',  `${version}.zip`)
    await sendFileByNodeStream(res, {
      filePath,
      downloadFileName: `${version}.zip`,
      contentType: 'application/octet-stream'
    })
  },

  /**
   * 直接返回现成 zip 文件(Web Streams 写法)
   */
  '/getFileByWebStream': async (req, res) => {
    console.log('getFileByWebStream')
    const host = req.headers.host ?? 'localhost:3000'
    const url = new URL(req.url ?? '/', `http://${host}`)
    const version = url.searchParams.get('version')

    const filePath = path.join(__dirname, 'files',  `${version}.zip`)
    await sendFileByWebStream(res, {
      filePath,
      downloadFileName: `${version}.zip`,
      contentType: 'application/octet-stream'
    })
  }
}

/**
 * 创建服务
 */
const server = http.createServer(async (req: IncomingMessage, res: ServerResponse) => {
  const host = req.headers.host ?? `localhost:${PORT}`
  const url = new URL(req.url ?? '/', `http://${host}`)
  const pathname = url.pathname

  const handler = routes[pathname]

  if (!handler) {
    sendText(res, 404, 'Not found')
    return
  }

  try {
    await handler(req, res)
  } catch (error) {
    console.error('Server error:', error)

    if (!res.headersSent) {
      sendText(res, 500, 'Internal server error')
      return
    }

    res.destroy(error as Error)
  }
})

server.listen(PORT, () => {
  console.log(`Server running at http://localhost:${PORT}/`)
})

Web Streams 不是 Node.js 发明的,而是 Web 平台后来补上的一套标准流 API; Node.js 为了和 Web 平台更一致,也为了适配 fetch / Response / Request 这些越来越通用的接口,所以增加了对 Web Streams 的支持。

contentType 设置

  • contentType 设置 与是否使用 stream 没有直接关系
  • 真正决定是不是流的是后边的发送方式
js
const readStream = fs.createReadStream(filePath)
readStream.pipe(res)
  • contentType 设置 getContentTypeByExt(根据扩展名设置真实 contentType)
    • 更准确
    • 浏览器/客户端更容易正确处理
    • 某些文件可直接预览
  • contentType 设置 application/octet-stream
    • 主要用于下载
    • 不需要浏览器预览
    • 不在乎客户端准确识别文件类型

Content-Length / chunked

  • 已知总长度:带 Content-Length
  • 未知总长度或边生成边发送:HTTP/1.1 里常见为 Transfer-Encoding: chunked(传输时采用的编码方式:分块发送)

误区解释

  • 流 ≠ chunked
  • 流 ≠ 不知道长度
  • 流 是传输方式
  • 流 设置Content-Length,会禁用默认 chunked 编码,主要用于方便计算进度
  • 流 没有设置Content-Length,底层会自动使用 chunked 传输
  • chunked:消息体不是靠预先给总长度来定界,而是拆成一个个块发,最后用一个 0 长度块结束
  • 接收端有Content-Length方便计算进度,只有 chunked,一边接收,一边累计已收到多少字节
  • 如果同时 设置了 Content-Length 和 chunked
    • 消息边界解析不一致,到底结束在哪里不知道
    • 协议层要强制让消息边界只有一种权威来源
    • chunked可配合,单独一个 header:X-File-Size: 104857600

Content-Length / chunked 底层流的区别

  • HTTP 如何界定消息结束
    • Content-Length: 104857600
      • 底层 TCP 连接上可以一段一段地发:第一次发 8KB、第二次发 64KB
      • 每一段 TCP 包多大,不重要;HTTP 不需要在消息体内部额外再写“块大小标记”
    • Transfer-Encoding: chunked
      • 用 chunked 作为消息边界定义机制
      • 靠每块长度 + 终止块定界(HTTP 语义应用层协议行为)
      • 但每一段前面都要额外写一个块大小标记(16 进制字符串 + \r\n),最后还要一个长度为0的块来标记消息结束

Content-Length / chunked 场景

  • Content-Length 固定大小文件下载
  • chunked 的设计初衷,就是解决“内容长度事先不知道,但我又不想等全部生成完再发”的问题
    • 比如:动态生成内容,大小事先算不出来
    • 想尽快把首批数据发给客户端
    • 需要在 HTTP/1.1 持久连接下清楚表达结束

http2

  • chunked 不能用于 HTTP/2
  • HTTP/2 自己有 DATA frames 来表达内容边界
  • HTTP/2 中 Content-Length 变得冗余一些,因为长度可以从帧层推断