Skip to content

stream

stream เป็นโมดูลในตัวของ Node.js ที่ให้ความสามารถในการทำงานกับข้อมูลแบบ streaming ซึ่งเหมาะสำหรับการจัดการข้อมูลขนาดใหญ่

import { Readable, Writable, Duplex, Transform, pipeline, finished } from 'node:stream';

APIคำอธิบายลักษณะคำอธิบายเพิ่มเติม
Readableสร้าง readable streamAsynchronousใช้สำหรับอ่านข้อมูล
Writableสร้าง writable streamAsynchronousใช้สำหรับเขียนข้อมูล
Duplexสร้าง duplex streamAsynchronousสามารถทั้งอ่านและเขียนข้อมูล
Transformสร้าง transform streamAsynchronousสำหรับแปลงข้อมูลระหว่างอ่านและเขียน
pipelineเชื่อมต่อ streamsAsynchronousแบบ async/await และจัดการ error อัตโนมัติ
finishedตรวจสอบเมื่อ stream จบการทำงานAsynchronousใช้ตรวจสอบเมื่อ stream ทำงานเสร็จหรือเกิด error

ตัวอย่าง

สร้าง Readable Stream

js
import { Readable } from "stream";

// สร้าง readable stream
const readable = new Readable({
  read(size) {
    // สร้างข้อมูลสุ่ม
    const data = Math.random().toString(36).substring(2);
    this.push(data);

    // หยุดหลังจาก 5 ครั้ง
    if (this.count === undefined) this.count = 0;
    if (++this.count >= 5) this.push(null); // ส่ง null เพื่อจบ stream
  },
});

// อ่านข้อมูลจาก stream
readable.on("data", (chunk) => {
  console.log("Received:", chunk.toString());
});

readable.on("end", () => {
  console.log("Stream ended");
});

สร้าง Writable Stream

js
import { Writable } from "stream";

// สร้าง writable stream
const writable = new Writable({
  write(chunk, encoding, callback) {
    console.log("Writing:", chunk.toString());
    callback(); // เรียกเมื่อเขียนเสร็จ
  },
});

// เขียนข้อมูล
writable.write("First chunk");
writable.write("Second chunk");
writable.end("Final chunk");

ใช้ pipeline สำหรับการ process ข้อมูล

js
import { createReadStream, createWriteStream } from "fs";
import { pipeline } from "stream/promises";
import { createGzip } from "zlib";

// บีบอัดไฟล์ด้วย pipeline
async function compressFile(input, output) {
  await pipeline(
    createReadStream(input),
    createGzip(),
    createWriteStream(output),
  );
  console.log("Compression completed");
}

compressFile("input.txt", "input.txt.gz");

สร้าง Transform Stream

js
import { Transform } from "stream";

// สร้าง transform stream ที่แปลงข้อความเป็นตัวใหญ่
const upperCase = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  },
});

// ใช้งาน
process.stdin
  .pipe(upperCase)
  .pipe(process.stdout);

console.log("Type something and press enter:");

ใช้ finished สำหรับจัดการการปิด stream

js
import { createReadStream } from "fs";
import { finished } from "stream/promises";

async function readFileSafely(path) {
  const stream = createReadStream(path);

  try {
    // อ่านข้อมูล
    for await (const chunk of stream) {
      console.log("Chunk:", chunk.toString());
    }

    // รอจน stream ปิด
    await finished(stream);
    console.log("Stream finished successfully");
  } catch (err) {
    console.error("Stream error:", err);
  }
}

readFileSafely("example.txt");

สร้าง Custom Duplex Stream

js
import { Duplex } from "stream";

class Throttle extends Duplex {
  constructor(ms) {
    super();
    this.delay = ms;
  }

  _read(size) {}

  _write(chunk, encoding, callback) {
    this.push(chunk);
    setTimeout(callback, this.delay);
  }

  _final(callback) {
    this.push(null);
    callback();
  }
}

// ใช้งาน
const throttle = new Throttle(1000); // ชะลอ 1 วินาทีต่อ chunk

throttle.on("data", (chunk) => {
  console.log("Received after delay:", chunk.toString());
});

throttle.write("First");
throttle.write("Second");
throttle.end("Last");