stream 实践
stream 实现方式
- fs.createReadStream node老牌写法
ts
import http, { IncomingMessage, ServerResponse } from 'node:http'
import fs from 'node:fs'
import { open } from 'node:fs/promises'
import path from 'node:path'
import archiver from 'archiver'
import { Writable, Readable } from 'node:stream'
import { pipeline } from 'node:stream/promises'
const PORT = 3000
type RouteHandler = (
req: IncomingMessage,
res: ServerResponse
) => void | Promise<void>
interface ZipFileOptions {
filePath: string
zipFileName?: string
entryName?: string
}
interface SendFileOptions {
filePath: string
downloadFileName?: string
contentType?: string
}
/**
* 统一发送文本响应
*/
function sendText(
res: ServerResponse,
statusCode: number,
text: string
): void {
res.statusCode = statusCode
res.setHeader('Content-Type', 'text/plain; charset=utf-8')
res.end(text)
}
/**
* 统一发送 JSON 响应
*/
function sendJSON(
res: ServerResponse,
statusCode: number,
data: unknown
): void {
res.statusCode = statusCode
res.setHeader('Content-Type', 'application/json; charset=utf-8')
res.end(JSON.stringify(data))
}
/**
* 检查文件是否存在
*/
function ensureFileExists(res: ServerResponse, filePath: string): boolean {
if (!fs.existsSync(filePath)) {
sendText(res, 404, 'File not found')
return false
}
return true
}
/**
* 动态压缩单个文件并输出到响应流
*/
function streamZipFile(
res: ServerResponse,
options: ZipFileOptions
): void {
const {
filePath,
zipFileName = 'archive.zip',
entryName = path.basename(filePath)
} = options
if (!ensureFileExists(res, filePath)) {
return
}
res.statusCode = 200
res.setHeader('Content-Type', 'application/zip')
res.setHeader('Content-Disposition', `attachment; filename="${zipFileName}"`)
const archive = archiver('zip', {
zlib: { level: 9 }
})
archive.on('error', (err: Error) => {
console.error('ZIP error:', err)
if (!res.headersSent) {
sendText(res, 500, 'Error creating ZIP archive')
return
}
res.destroy(err)
})
archive.on('end', () => {
console.log('ZIP archive created successfully')
})
archive.pipe(res)
archive.file(filePath, { name: entryName })
void archive.finalize()
}
async function sendGenerateChunksByNodeStream(
res: ServerResponse,
): Promise<void> {
try {
// 把“程序逐步生成的数据”也当成流。
async function* generateChunks() {
yield 'part1\n'
yield 'part2\n'
yield 'part3\n'
}
const generateChunksStream = Readable.from(generateChunks())
res.setHeader('Content-Type', 'text/plain');
res.setHeader('Content-Disposition', `attachment; filename="generateChunks.text"`);
await pipeline(generateChunksStream, res);
} catch (err) {
console.error('sendgenerateChunksByNodeStream failed.', err);
}
}
async function sendStringByNodeStream(
res: ServerResponse,
): Promise<void> {
try {
const largeString = '********1111111111111111';
// Readable.from(data) 返回的是 Node Readable,不是 Web ReadableStream
// Readable.from(data):更适合把内存里已有的数据(string、Buffer、Array、Iterable、AsyncIterable)包装成可读流
const stringStream = Readable.from(largeString);
res.setHeader('Content-Type', 'text/plain');
res.setHeader('Content-Disposition', `attachment; filename="string.text"`);
res.setHeader('Content-Length', largeString.length);
// stringStream.pipe(res);
await pipeline(stringStream, res);
} catch (err) {
console.error('sendStringByNodeStream failed.', err);
}
}
/**
* 旧写法:Node.js Stream
* 使用 fs.createReadStream(filePath).pipe(res)
*/
async function sendFileByNodeStream(
res: ServerResponse,
options: SendFileOptions
): Promise<void> {
const {
filePath,
downloadFileName = path.basename(filePath),
contentType = 'application/octet-stream'
} = options
if (!ensureFileExists(res, filePath)) {
return
}
res.statusCode = 200
res.setHeader('Content-Type', contentType)
res.setHeader('Content-Disposition', `attachment; filename="${downloadFileName}"`)
res.setHeader('Content-Length', (await fs.promises.stat(filePath)).size)
// fs.createReadStream(...):更适合从文件这种外部数据源里读取
const readStream = fs.createReadStream(filePath)
readStream.on('error', (err) => {
console.error('Node stream file error:', err)
if (!res.headersSent) {
sendText(res, 500, 'Error reading file')
return
}
res.destroy(err)
})
// 把文件内容直接写到 HTTP 响应里。
readStream.pipe(res)
// await pipeline(readStream, res)
}
// 把一个纯 Web 风格的底层字节缓冲区,包成 Node 风格的二进制块。
const normalizeChunk = new TransformStream({
transform(chunk, controller) {
if (chunk instanceof ArrayBuffer) {
controller.enqueue(Buffer.from(chunk))
return
}
// chunk 不是裸 ArrayBuffer,但它是“建立在 ArrayBuffer 之上的视图类型”。
// Uint8Array、Uint16Array、DataView
if (ArrayBuffer.isView(chunk)) {
controller.enqueue(
Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength)
)
return
}
controller.enqueue(chunk)
}
})
/**
* 新写法:Web Streams
* 使用 fileHandle.readableWebStream() + Writable.toWeb(res)
*/
async function sendFileByWebStream(
res: ServerResponse,
options: SendFileOptions
): Promise<void> {
const {
filePath,
downloadFileName = path.basename(filePath),
contentType = 'application/octet-stream'
} = options
if (!ensureFileExists(res, filePath)) {
return
}
res.statusCode = 200
res.setHeader('Content-Type', contentType)
res.setHeader('Content-Disposition', `attachment; filename="${downloadFileName}"`)
res.setHeader('Content-Length', (await fs.promises.stat(filePath)).size)
let fileHandle: Awaited<ReturnType<typeof open>> | null = null
try {
// fileHandle 理解为 一个“已经打开的文件控制器/句柄”
// 它不是文件内容本身,而是一个“以后可以继续读这个文件”的操作对象。
fileHandle = await open(filePath, 'r')
// 把这个文件句柄,转换成一个 Web 标准的可读流。
// 这个 webReadable 是 文件字节流
const webReadable = fileHandle.readableWebStream()
// 把 Node 的 ServerResponse,包装成一个 Web 标准的可写流。
// res 原本是 Node 风格的 writable stream
// pipeTo() 需要的是 Web WritableStream
const webWritable = Writable.toWeb(res)
// 先通过 normalizeChunk 把 webReadable 里的数据,转换成 Buffer
// 如果不进行转换,那么 webReadable “读出来的 chunk 类型”和“ServerResponse 最终能写入的类型”不匹配
// 把 webReadable 里的数据,持续写入 webWritable,直到完成。
await webReadable.pipeThrough(normalizeChunk as any).pipeTo(webWritable)
} catch (err) {
console.error('Web stream file error:', err)
if (!res.headersSent) {
sendText(res, 500, 'Error reading file')
return
}
res.destroy(err as Error)
} finally {
if (fileHandle) {
// 不管成功还是失败,只要文件已经打开了,就尽量关掉。
await fileHandle.close().catch((closeErr) => {
console.error('Close fileHandle error:', closeErr)
})
}
}
}
/**
* 路由表
*/
const routes: Record<string, RouteHandler> = {
'/': (_req, res) => {
sendText(res, 200, 'Hello, this is the server!')
},
'/health': (_req, res) => {
sendJSON(res, 200, {
success: true,
message: 'Server is running'
})
},
/**
* 动态压缩 .Lab 文件后返回 zip
*/
'/getZipFile': (_req, res) => {
// 原始文件:用于“动态压缩后返回”
const LAB_FILE_PATH = path.join(__dirname, 'files', '1937330293275095040.Lab')
streamZipFile(res, {
filePath: LAB_FILE_PATH,
zipFileName: 'archive.zip',
entryName: '1937330293275095040.Lab'
})
},
/**
* 直接返回现成字符串(Node Stream 写法)
*/
'/getStringByNodeStream': async (req, res) => {
console.log('getStringByNodeStream')
await sendStringByNodeStream(res)
},
/**
* 直接返回生成器字符串(Node Stream 写法)
*/
'/getGenerateChunksByNodeStream': async (req, res) => {
console.log('getGenerateChunksByNodeStream')
await sendGenerateChunksByNodeStream(res)
},
/**
* 直接返回现成 zip 文件(Node Stream 写法)
*/
'/getFileByNodeStream': async (req, res) => {
console.log('getFileByNodeStream')
const host = req.headers.host ?? 'localhost:3000'
const url = new URL(req.url ?? '/', `http://${host}`)
const version = url.searchParams.get('version')
const filePath = path.join(__dirname, 'files', `${version}.zip`)
await sendFileByNodeStream(res, {
filePath,
downloadFileName: `${version}.zip`,
contentType: 'application/octet-stream'
})
},
/**
* 直接返回现成 zip 文件(Web Streams 写法)
*/
'/getFileByWebStream': async (req, res) => {
console.log('getFileByWebStream')
const host = req.headers.host ?? 'localhost:3000'
const url = new URL(req.url ?? '/', `http://${host}`)
const version = url.searchParams.get('version')
const filePath = path.join(__dirname, 'files', `${version}.zip`)
await sendFileByWebStream(res, {
filePath,
downloadFileName: `${version}.zip`,
contentType: 'application/octet-stream'
})
}
}
/**
* 创建服务
*/
const server = http.createServer(async (req: IncomingMessage, res: ServerResponse) => {
const host = req.headers.host ?? `localhost:${PORT}`
const url = new URL(req.url ?? '/', `http://${host}`)
const pathname = url.pathname
const handler = routes[pathname]
if (!handler) {
sendText(res, 404, 'Not found')
return
}
try {
await handler(req, res)
} catch (error) {
console.error('Server error:', error)
if (!res.headersSent) {
sendText(res, 500, 'Internal server error')
return
}
res.destroy(error as Error)
}
})
server.listen(PORT, () => {
console.log(`Server running at http://localhost:${PORT}/`)
})- stream/web 较新的 Web Streams 风格写法
ts
import http, { IncomingMessage, ServerResponse } from 'node:http'
import fs from 'node:fs'
import { open } from 'node:fs/promises'
import path from 'node:path'
import archiver from 'archiver'
import { Writable, Readable } from 'node:stream'
import { pipeline } from 'node:stream/promises'
const PORT = 3000
type RouteHandler = (
req: IncomingMessage,
res: ServerResponse
) => void | Promise<void>
interface ZipFileOptions {
filePath: string
zipFileName?: string
entryName?: string
}
interface SendFileOptions {
filePath: string
downloadFileName?: string
contentType?: string
}
/**
* 统一发送文本响应
*/
function sendText(
res: ServerResponse,
statusCode: number,
text: string
): void {
res.statusCode = statusCode
res.setHeader('Content-Type', 'text/plain; charset=utf-8')
res.end(text)
}
/**
* 统一发送 JSON 响应
*/
function sendJSON(
res: ServerResponse,
statusCode: number,
data: unknown
): void {
res.statusCode = statusCode
res.setHeader('Content-Type', 'application/json; charset=utf-8')
res.end(JSON.stringify(data))
}
/**
* 检查文件是否存在
*/
function ensureFileExists(res: ServerResponse, filePath: string): boolean {
if (!fs.existsSync(filePath)) {
sendText(res, 404, 'File not found')
return false
}
return true
}
/**
* 动态压缩单个文件并输出到响应流
*/
function streamZipFile(
res: ServerResponse,
options: ZipFileOptions
): void {
const {
filePath,
zipFileName = 'archive.zip',
entryName = path.basename(filePath)
} = options
if (!ensureFileExists(res, filePath)) {
return
}
res.statusCode = 200
res.setHeader('Content-Type', 'application/zip')
res.setHeader('Content-Disposition', `attachment; filename="${zipFileName}"`)
const archive = archiver('zip', {
zlib: { level: 9 }
})
archive.on('error', (err: Error) => {
console.error('ZIP error:', err)
if (!res.headersSent) {
sendText(res, 500, 'Error creating ZIP archive')
return
}
res.destroy(err)
})
archive.on('end', () => {
console.log('ZIP archive created successfully')
})
archive.pipe(res)
archive.file(filePath, { name: entryName })
void archive.finalize()
}
async function sendGenerateChunksByNodeStream(
res: ServerResponse,
): Promise<void> {
try {
// 把“程序逐步生成的数据”也当成流。
async function* generateChunks() {
yield 'part1\n'
yield 'part2\n'
yield 'part3\n'
}
const generateChunksStream = Readable.from(generateChunks())
res.setHeader('Content-Type', 'text/plain');
res.setHeader('Content-Disposition', `attachment; filename="generateChunks.text"`);
await pipeline(generateChunksStream, res);
} catch (err) {
console.error('sendgenerateChunksByNodeStream failed.', err);
}
}
async function sendStringByNodeStream(
res: ServerResponse,
): Promise<void> {
try {
const largeString = '********1111111111111111';
// Readable.from(data) 返回的是 Node Readable,不是 Web ReadableStream
// Readable.from(data):更适合把内存里已有的数据(string、Buffer、Array、Iterable、AsyncIterable)包装成可读流
const stringStream = Readable.from(largeString);
res.setHeader('Content-Type', 'text/plain');
res.setHeader('Content-Disposition', `attachment; filename="string.text"`);
res.setHeader('Content-Length', largeString.length);
// stringStream.pipe(res);
await pipeline(stringStream, res);
} catch (err) {
console.error('sendStringByNodeStream failed.', err);
}
}
/**
* 旧写法:Node.js Stream
* 使用 fs.createReadStream(filePath).pipe(res)
*/
async function sendFileByNodeStream(
res: ServerResponse,
options: SendFileOptions
): Promise<void> {
const {
filePath,
downloadFileName = path.basename(filePath),
contentType = 'application/octet-stream'
} = options
if (!ensureFileExists(res, filePath)) {
return
}
res.statusCode = 200
res.setHeader('Content-Type', contentType)
res.setHeader('Content-Disposition', `attachment; filename="${downloadFileName}"`)
res.setHeader('Content-Length', (await fs.promises.stat(filePath)).size)
// fs.createReadStream(...):更适合从文件这种外部数据源里读取
const readStream = fs.createReadStream(filePath)
readStream.on('error', (err) => {
console.error('Node stream file error:', err)
if (!res.headersSent) {
sendText(res, 500, 'Error reading file')
return
}
res.destroy(err)
})
// 把文件内容直接写到 HTTP 响应里。
readStream.pipe(res)
// await pipeline(readStream, res)
}
// 把一个纯 Web 风格的底层字节缓冲区,包成 Node 风格的二进制块。
const normalizeChunk = new TransformStream({
transform(chunk, controller) {
if (chunk instanceof ArrayBuffer) {
controller.enqueue(Buffer.from(chunk))
return
}
// chunk 不是裸 ArrayBuffer,但它是“建立在 ArrayBuffer 之上的视图类型”。
// Uint8Array、Uint16Array、DataView
if (ArrayBuffer.isView(chunk)) {
controller.enqueue(
Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength)
)
return
}
controller.enqueue(chunk)
}
})
/**
* 新写法:Web Streams
* 使用 fileHandle.readableWebStream() + Writable.toWeb(res)
*/
async function sendFileByWebStream(
res: ServerResponse,
options: SendFileOptions
): Promise<void> {
const {
filePath,
downloadFileName = path.basename(filePath),
contentType = 'application/octet-stream'
} = options
if (!ensureFileExists(res, filePath)) {
return
}
res.statusCode = 200
res.setHeader('Content-Type', contentType)
res.setHeader('Content-Disposition', `attachment; filename="${downloadFileName}"`)
res.setHeader('Content-Length', (await fs.promises.stat(filePath)).size)
let fileHandle: Awaited<ReturnType<typeof open>> | null = null
try {
// fileHandle 理解为 一个“已经打开的文件控制器/句柄”
// 它不是文件内容本身,而是一个“以后可以继续读这个文件”的操作对象。
fileHandle = await open(filePath, 'r')
// 把这个文件句柄,转换成一个 Web 标准的可读流。
// 这个 webReadable 是 文件字节流
const webReadable = fileHandle.readableWebStream()
// 把 Node 的 ServerResponse,包装成一个 Web 标准的可写流。
// res 原本是 Node 风格的 writable stream
// pipeTo() 需要的是 Web WritableStream
const webWritable = Writable.toWeb(res)
// 先通过 normalizeChunk 把 webReadable 里的数据,转换成 Buffer
// 如果不进行转换,那么 webReadable “读出来的 chunk 类型”和“ServerResponse 最终能写入的类型”不匹配
// 把 webReadable 里的数据,持续写入 webWritable,直到完成。
await webReadable.pipeThrough(normalizeChunk as any).pipeTo(webWritable)
} catch (err) {
console.error('Web stream file error:', err)
if (!res.headersSent) {
sendText(res, 500, 'Error reading file')
return
}
res.destroy(err as Error)
} finally {
if (fileHandle) {
// 不管成功还是失败,只要文件已经打开了,就尽量关掉。
await fileHandle.close().catch((closeErr) => {
console.error('Close fileHandle error:', closeErr)
})
}
}
}
/**
* 路由表
*/
const routes: Record<string, RouteHandler> = {
'/': (_req, res) => {
sendText(res, 200, 'Hello, this is the server!')
},
'/health': (_req, res) => {
sendJSON(res, 200, {
success: true,
message: 'Server is running'
})
},
/**
* 动态压缩 .Lab 文件后返回 zip
*/
'/getZipFile': (_req, res) => {
// 原始文件:用于“动态压缩后返回”
const LAB_FILE_PATH = path.join(__dirname, 'files', '1937330293275095040.Lab')
streamZipFile(res, {
filePath: LAB_FILE_PATH,
zipFileName: 'archive.zip',
entryName: '1937330293275095040.Lab'
})
},
/**
* 直接返回现成字符串(Node Stream 写法)
*/
'/getStringByNodeStream': async (req, res) => {
console.log('getStringByNodeStream')
await sendStringByNodeStream(res)
},
/**
* 直接返回生成器字符串(Node Stream 写法)
*/
'/getGenerateChunksByNodeStream': async (req, res) => {
console.log('getGenerateChunksByNodeStream')
await sendGenerateChunksByNodeStream(res)
},
/**
* 直接返回现成 zip 文件(Node Stream 写法)
*/
'/getFileByNodeStream': async (req, res) => {
console.log('getFileByNodeStream')
const host = req.headers.host ?? 'localhost:3000'
const url = new URL(req.url ?? '/', `http://${host}`)
const version = url.searchParams.get('version')
const filePath = path.join(__dirname, 'files', `${version}.zip`)
await sendFileByNodeStream(res, {
filePath,
downloadFileName: `${version}.zip`,
contentType: 'application/octet-stream'
})
},
/**
* 直接返回现成 zip 文件(Web Streams 写法)
*/
'/getFileByWebStream': async (req, res) => {
console.log('getFileByWebStream')
const host = req.headers.host ?? 'localhost:3000'
const url = new URL(req.url ?? '/', `http://${host}`)
const version = url.searchParams.get('version')
const filePath = path.join(__dirname, 'files', `${version}.zip`)
await sendFileByWebStream(res, {
filePath,
downloadFileName: `${version}.zip`,
contentType: 'application/octet-stream'
})
}
}
/**
* 创建服务
*/
const server = http.createServer(async (req: IncomingMessage, res: ServerResponse) => {
const host = req.headers.host ?? `localhost:${PORT}`
const url = new URL(req.url ?? '/', `http://${host}`)
const pathname = url.pathname
const handler = routes[pathname]
if (!handler) {
sendText(res, 404, 'Not found')
return
}
try {
await handler(req, res)
} catch (error) {
console.error('Server error:', error)
if (!res.headersSent) {
sendText(res, 500, 'Internal server error')
return
}
res.destroy(error as Error)
}
})
server.listen(PORT, () => {
console.log(`Server running at http://localhost:${PORT}/`)
})Web Streams 不是 Node.js 发明的,而是 Web 平台后来补上的一套标准流 API; Node.js 为了和 Web 平台更一致,也为了适配 fetch / Response / Request 这些越来越通用的接口,所以增加了对 Web Streams 的支持。
contentType 设置
- contentType 设置 与是否使用 stream 没有直接关系
- 真正决定是不是流的是后边的发送方式
js
const readStream = fs.createReadStream(filePath)
readStream.pipe(res)- contentType 设置 getContentTypeByExt(根据扩展名设置真实 contentType)
- 更准确
- 浏览器/客户端更容易正确处理
- 某些文件可直接预览
- contentType 设置 application/octet-stream
- 主要用于下载
- 不需要浏览器预览
- 不在乎客户端准确识别文件类型
Content-Length / chunked
- 已知总长度:带 Content-Length
- 未知总长度或边生成边发送:HTTP/1.1 里常见为 Transfer-Encoding: chunked(传输时采用的编码方式:分块发送)
误区解释
- 流 ≠ chunked
- 流 ≠ 不知道长度
- 流 是传输方式
- 流 设置Content-Length,会禁用默认 chunked 编码,主要用于方便计算进度
- 流 没有设置Content-Length,底层会自动使用 chunked 传输
- chunked:消息体不是靠预先给总长度来定界,而是拆成一个个块发,最后用一个 0 长度块结束
- 接收端有Content-Length方便计算进度,只有 chunked,一边接收,一边累计已收到多少字节
- 如果同时 设置了 Content-Length 和 chunked
- 消息边界解析不一致,到底结束在哪里不知道
- 协议层要强制让消息边界只有一种权威来源
- chunked可配合,单独一个 header:X-File-Size: 104857600
Content-Length / chunked 底层流的区别
- HTTP 如何界定消息结束
- Content-Length: 104857600
- 底层 TCP 连接上可以一段一段地发:第一次发 8KB、第二次发 64KB
- 每一段 TCP 包多大,不重要;HTTP 不需要在消息体内部额外再写“块大小标记”
- Transfer-Encoding: chunked
- 用 chunked 作为消息边界定义机制
- 靠每块长度 + 终止块定界(HTTP 语义应用层协议行为)
- 但每一段前面都要额外写一个块大小标记(16 进制字符串 + \r\n),最后还要一个长度为0的块来标记消息结束
- Content-Length: 104857600
Content-Length / chunked 场景
- Content-Length 固定大小文件下载
- chunked 的设计初衷,就是解决“内容长度事先不知道,但我又不想等全部生成完再发”的问题
- 比如:动态生成内容,大小事先算不出来
- 想尽快把首批数据发给客户端
- 需要在 HTTP/1.1 持久连接下清楚表达结束
http2
- chunked 不能用于 HTTP/2
- HTTP/2 自己有 DATA frames 来表达内容边界
- HTTP/2 中 Content-Length 变得冗余一些,因为长度可以从帧层推断