ScarShow

< IS >

NodeJS - Stream 行為觀察與事件使用

前言

在NodeJS使用廣泛的就是Event-driven的機制,當此機制與串流功能相結合後,便能大幅增進資料傳輸的效能。

所以今天的主題為NodeJS中Stream的行為觀察,在先前專案中大量使用Stream的功能,所以這邊就寫下一些使用心得。

Stream模組簡介

Stream是一個NodeJS中的模組,主要用作於資料的讀寫,它可以被其它的模組所繼承,而成為該模組的一部分基礎。

Stream的種類

斯斯有三種、Stream也有三種,分別是Readable(可讀取)、Wirteable(可寫入)以及Duplex(雙向,可讀寫)這三種。

不同種類的Stream所對應到的事件也不相同,分別如下:

  • Readable
    • readable
    • end
    • data
    • error
    • close
  • Writeable
    • drain
    • error
    • close
    • finish
    • pipe
    • unpipe

那上面只列了Readable以及Writeable,而Duplex則是ReadableWriteable的綜合體,它有ReadableWriteable個別所擁有的Event。

繼承Stream的模組

前面有說到Stream可被繼承,所以在NodeJS中有使用到Stream的模組分別是File SystemHTTP/HTTPSNet,以及它裡面是使用到哪一種類型的Stream以下有一些範例供參考。

File System

var fs = require('fs');

// rs is readable stream
var rs = fs.createReadStream('/path/to/file');

// ws is writeable stream
var ws = fs.createWriteStream('/path/to/file');

HTTP/HTTPS

var http = require('http');

// HTTP Server
// req is readable stream
// res is writeable stream
http.createServer(function (req, res) {
    // do something
}).listen(80);

// HTTP Client
// req is writeable stream
// res is readable stream
var req = http.request({
    hostname: '127.0.0.1',
    port: 80,
    path: '/',
    method: 'GET'
}, function (res) {
    // do something
});

Net

var net = require('net');

// TCP Server
// socket is duplex stream
net.createServer(function (socket) {
    // do something
}).listen(3000);

// TCP Client
// client is duplex stream
var client = net.connect({
    port: 3000,
    host: '127.0.0.1'
}, function () {
    // do something
});

Stream事件行為

沒事多看文件,多看文件沒事。NodeJS的文件對於事件有些簡單的敘述以及使用方式可以參考,這邊就針對資料傳輸的部份做些說明。

在這邊我使用File System模組作為範例,觀察Stream呼叫事件的行為,我先在/tmp底下產生一個測試檔a供下面的範例所使用。

dd if=/dev/urandom of=/tmp/a bs=256k count=1

範例的動作很單純就是將檔案讀出然後在寫入的檔案,然後觀察事件何時會觸發。

var fs = require('fs'),
    rs = fs.createReadStream('/tmp/a'),
    ws = fs.createWriteStream('/tmp/b');

Readable Stream Event

readable

Buffer中已有資料且準備好讀取,這個事件就會被觸發,這時候可以使用rs.read()將資料讀出。

rs.on('readable', function () {
    console.log('rs readable');

    var chunk = rs.read();
    ws.write(chunk);
});

data

Buffer中已有資料且可以使用時就會出發此事件,並將Buffer中的資料傳送到callback function中。

使用data這個事件時就會將stream切換至old mode,在此模式下可呼叫rs.pause()rs.resume()這兩個function來控制Readable Stream

所謂的old mode是NodeJS v0.10為了舊程式的謙容性(Compatibility)所產生的。

rs.on('data', function (chunk) {
    console.log('rs data');

    ws.write(chunk);

    // rs.pause();
    // rs.resume();
});

end

Reabable Stream中所有的資料都讀取出來後則會觸發此事件(在TCP則是用FIN結束連線時觸發),這時候data事件將不再被觸發。

close

當系統底層的資源被關閉時會觸發,但不一定每個Stream都會觸發。但是File System會觸發。

Writeable Stream Event

drain

ws.write()返回false時,表示寫入用的緩衝區已滿,當緩衝區再次被清空時將會觸發drain事件。

close

Readable Stream Event

pipe/unpipe

Readable Stream使用pipe()時,Writeable Streampipe事件將會被觸發。反之,如果Readable Stream呼叫unpipe()關閉管道時,Writeable Streamunpipe事件將會被觸發。

ws.on('pipe', function () {
    console.log('ws pipe');
});

ws.on('unpipe', function () {
    console.log('ws unpipe');
});

rs.pipe(ws);

finish

通常當Readable Stream讀取出所有的資料後,我們就會去呼叫callback function,但是Writeable不一定也跟著完成寫入所有的資料,有時候我們必須要等待完成寫入有資料才呼叫callback function,所以我們需要finish這個事件。

但是finish並不會自動觸發,我們必須透過呼叫ws.end()去觸發它,所以我們可以在Readableend事件中去呼叫它。

rs.on('end', function (chunk) {
    console.log('rs end');

    ws.end();
});

ws.on('finish', function () {
    console.log('ws finish');

    callback && callback();
});

總結

總結以上的範例,我們可以發現資料的存取以及寫入的方法有三種且視情況而定,如果在寫入資料的同時需要針對資料做處理或是檢查,則可以使用1.或是2.的方式,如果只是單純的資料傳輸則使用3.會比較方便。

// 1.
rs.on('readable', function () {
    ws.write(rs.read());
});

// 2.
rs.on('data', function (chunk) {
    ws.write(chunk);
});

// 3.
rs.pipe(ws);

另外上面所提昇增進效能的部份則是指,使用串流的方式傳輸資料可以不需要像是傳統的Web Server必須先將資料暫存在硬碟中然後再視情形對資料做處理或搬移。

在資料進來的同時就可以即時對於資料做處理,並且可以將資料隨使用情境做多層的傳遞,例如Read File -> HTTP Request -> TCP Server -> Write File這樣的流程,它將檔案使用HTTP傳送出去,然後在Web Server接收到資料的同時做處理,然後再使用TCP協定將檔案傳送給其它裝置並寫入硬碟。

將Stream的事件弄清楚後,對於資料的傳遞就能更隨自己的意願做處理,也能將以上的流程實做出來。

Reference

Node.js Manual & Documentation