Dark mode
stream
stream เป็นโมดูลในตัวของ Node.js ที่ให้ความสามารถในการทำงานกับข้อมูลแบบ streaming ซึ่งเหมาะสำหรับการจัดการข้อมูลขนาดใหญ่
import { Readable, Writable, Duplex, Transform, pipeline, finished } from 'node:stream';
API | คำอธิบาย | ลักษณะ | คำอธิบายเพิ่มเติม |
---|---|---|---|
Readable | สร้าง readable stream | Asynchronous | ใช้สำหรับอ่านข้อมูล |
Writable | สร้าง writable stream | Asynchronous | ใช้สำหรับเขียนข้อมูล |
Duplex | สร้าง duplex stream | Asynchronous | สามารถทั้งอ่านและเขียนข้อมูล |
Transform | สร้าง transform stream | Asynchronous | สำหรับแปลงข้อมูลระหว่างอ่านและเขียน |
pipeline | เชื่อมต่อ streams | Asynchronous | แบบ async/await และจัดการ error อัตโนมัติ |
finished | ตรวจสอบเมื่อ stream จบการทำงาน | Asynchronous | ใช้ตรวจสอบเมื่อ stream ทำงานเสร็จหรือเกิด error |
ตัวอย่าง
สร้าง Readable Stream
js
import { Readable } from "stream";
// สร้าง readable stream
const readable = new Readable({
read(size) {
// สร้างข้อมูลสุ่ม
const data = Math.random().toString(36).substring(2);
this.push(data);
// หยุดหลังจาก 5 ครั้ง
if (this.count === undefined) this.count = 0;
if (++this.count >= 5) this.push(null); // ส่ง null เพื่อจบ stream
},
});
// อ่านข้อมูลจาก stream
readable.on("data", (chunk) => {
console.log("Received:", chunk.toString());
});
readable.on("end", () => {
console.log("Stream ended");
});
สร้าง Writable Stream
js
import { Writable } from "stream";
// สร้าง writable stream
const writable = new Writable({
write(chunk, encoding, callback) {
console.log("Writing:", chunk.toString());
callback(); // เรียกเมื่อเขียนเสร็จ
},
});
// เขียนข้อมูล
writable.write("First chunk");
writable.write("Second chunk");
writable.end("Final chunk");
ใช้ pipeline สำหรับการ process ข้อมูล
js
import { createReadStream, createWriteStream } from "fs";
import { pipeline } from "stream/promises";
import { createGzip } from "zlib";
// บีบอัดไฟล์ด้วย pipeline
async function compressFile(input, output) {
await pipeline(
createReadStream(input),
createGzip(),
createWriteStream(output),
);
console.log("Compression completed");
}
compressFile("input.txt", "input.txt.gz");
สร้าง Transform Stream
js
import { Transform } from "stream";
// สร้าง transform stream ที่แปลงข้อความเป็นตัวใหญ่
const upperCase = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
},
});
// ใช้งาน
process.stdin
.pipe(upperCase)
.pipe(process.stdout);
console.log("Type something and press enter:");
ใช้ finished สำหรับจัดการการปิด stream
js
import { createReadStream } from "fs";
import { finished } from "stream/promises";
async function readFileSafely(path) {
const stream = createReadStream(path);
try {
// อ่านข้อมูล
for await (const chunk of stream) {
console.log("Chunk:", chunk.toString());
}
// รอจน stream ปิด
await finished(stream);
console.log("Stream finished successfully");
} catch (err) {
console.error("Stream error:", err);
}
}
readFileSafely("example.txt");
สร้าง Custom Duplex Stream
js
import { Duplex } from "stream";
class Throttle extends Duplex {
constructor(ms) {
super();
this.delay = ms;
}
_read(size) {}
_write(chunk, encoding, callback) {
this.push(chunk);
setTimeout(callback, this.delay);
}
_final(callback) {
this.push(null);
callback();
}
}
// ใช้งาน
const throttle = new Throttle(1000); // ชะลอ 1 วินาทีต่อ chunk
throttle.on("data", (chunk) => {
console.log("Received after delay:", chunk.toString());
});
throttle.write("First");
throttle.write("Second");
throttle.end("Last");