
Node.js Stream 이란?
Sizz-J
·2021. 6. 25. 21:42
스트림의 중요성
Node.js와 같이 이벤트 기반 플랫폼에서 I/O를 처리하는 가장 효율적인 방법은 실시간으로 들어온 입력 값을 가능한 즉시 애플리케이션에 내보내는 것이다.
버퍼 vs 스트리밍
버퍼 모드는 리소스로 오는 모든 데이터를 버퍼에 수집했다가 자원을 모두 다 읽어들인 후 콜백에 전달. 반면 스트림은 리소스에서 도착하자마자 데이터를 처리할 수 있다.
VS
스트림의 이점은 공간 효율성, 시간 효율성, 결합성이 있다.
공간 효율성
스트림은 버퍼링하여 모든 데이터를 한꺼번에 처리하는 방식으로 불가능한 작업을 처리할 수 있다. V8 버퍼는 0x3FFFFFFF바이트(1GB 정도)보다 클 수 없다. 물리적인 메모리가 부족해지기 전에 대용량 파일을 처리할 때 문제가 된다. 만약 1GB보다 큰 파일을 읽어들일 때 허용된 최대 버퍼의 크기보다 크다는 오류 메시지를 보게 될 것이다.
버퍼링된 API를 사용한 Gzipping
const fs = require('fs'); | |
const zlib = require('zlib'); | |
const file = process.argv[2]; | |
fs.readFile(file, (err, buffer) => { | |
zlib.gzip(buffer, (err, buffer) => { | |
fs.writeFile(file + '.gz', buffer, err => { | |
console.log('File successfully compressed'); | |
}); | |
}); | |
}); |
view rawbuffer_gzip.js hosted with ❤ by GitHub
스트림을 사용한 Gzipping
const fs = require('fs'); | |
const zlib = require('zlib'); | |
const file = process.argv[2]; | |
fs.createReadStream(file) | |
.pipe(zlib.createGzip()) | |
.pipe(fs.createWriteStream(file + '.gz')) | |
.on('finish', () => { console.log('File successfully compressed')); |
view rawstream_gzip.js hosted with ❤ by GitHub
시간 효율성
스트림을 사용하면 전체 파일을 읽을 때까지 기다리지 않고 첫 번째 데이터 덩어리를 수신하자마자 조립 라인이 사작된다. 데이터의 다음 덩어리가 사용 가능한 상태가 될 때, 이전 작업들이 완료될 때까지 기다리지 않고 비동기적으로 실행된다. 유일한 제약은 데이터 덩어리가 도착하는 순서가 보존되어야 한다는 것인데 Node.js 스트림이 처리해준다. 버퍼를 사용하는 것 보다 전체 프로세스의 시간이 단축된다.
- [Client] 파일 시스템에서 데이터를 읽는다.
- [Client] 데이터를 압축.
- [Client] 서버로 전송.
- [Server] 클라이언트로부터 수신.
- [Server] 데이터의 압축을 푼다.
- [Server] 디스크에 데이터를 쓴다.
결합성
스트림 코드를 작성할 때, 단일 기능을 담당하는 서로 다른 프로세스 유닛들을 연결할 수 있는 pipe()메소드를 사용하였다. 이는 스트림이 균일한 인터페이스를 가지며 API측면에서 서로를 이해할 수 있기 때문에 가능하다. 유일한 전제 조건은 파이프라인의 다음 스트림이 이전 스트림에 의해 생성되어 전달된 데이터 타입을 지원해야 한다는 것이다. 이 데이터 타입에는 바이너리, 텍스트, 또는 객체가 될 수 있다.
const crypto = require('crypto');
//...
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(crypto.createCipher('aes192', 'a_shared_secret'))
.pipe(req)
.on('finish', () => console.log('File successfully sent'));
이 접근법의 주요 장점은 재사용성이지만 확실히 스트림을 사용하면 더 깨끗하고 모듈화된 코드를 만들 수 있다. 스트림은 순수한 I/O를 다루는 것뿐만 아니라 코드를 단순화하고 모듈화하는 수단으로 사용되기도 한다.
스트림 시작하기
스트림은 Node.js 핵심 모듈을 비롯한 모든 곳에서 사용된다. fs 모듈에서 createReadStream(), createWriteStream(), HTTP request 및 response 객체는 기본적으로 스트림이며 데이터 압축과 압축 해제를 위한 zlib는 스트리밍 인터페이스를 사용한다.
스트림의 구조
Node.js에서 모든 스트림은 스트림의 코어 모듈에서 사용할 수 있는 네 가지 추상 클래스 중 하나의 구현체이다.
각 스트림 클래스는 EventEmitter의 인스턴스이기도 하다. 스트림은 읽기가 끝났을 때 발생하는 end, 에러가 발생했을때 error와 같은 다양한 이벤트를 제공한다.
Readable 스트림
데이터 소스를 나타낸다. Readableabstract 클래스르 사용하여 구현된다.
스트림 읽기
데이터를 수신하는 방법으로 non-flowing, flowing 두 가지가 있다.
- non-flowing : Readable 스트림에서 읽기 기본 패턴은 새로운 데이터를 읽을 준비가 되었다는 신호인 readable 이벤트에 listener를 등록하는 것. 데이터는 readable 리스너에서 독점적으로 읽을 수 있다. 리스너는 새로운 데이터가 읽기 가능하게 되는 즉시 호출된다.
- process.stdinlet chunk; console.log(`Chunk read : (${chunk.length}) "${chunk.toString()}"`);})
- .on('end', () => process.stdout.write('End of stream'));
- }
- while((chunk = process.stdin.read()) !== null) {
- .on('readable', () => {
- flowing : data 이벤트에 리스너를 등록하는 것. read()를 사용하여 꺼내지 않고 데이터가 도착하자마자 해당 리스너에게 전달. Flowing모드는 Stream1의 상속이며 데이터 흐름 제어를 위한 유연성이 떨어진다. Stream2 인터페이스 도입으로 기본 작동 모드가 아니다.
- process.stdin console.log(`Chunk read : (${chunk.length}) "${chunk.toString()}"`);.on('end', () => process.stdout.write('End of stream'));
- })
- .on('data', chunk => {
Readable 스트림 구현하기
stream.Readable의 prototype을 상속받아 _read() 함수를 구현.
const stream = require('stream'); | |
const Chance = require('chance'); | |
const chance = new Chance(); | |
class RandomStream extends stream.Readable { | |
constructor(options) { | |
super(options); | |
} | |
_read(size) { | |
const chunk = chance.string(); | |
this.push(chunk, 'utf8'); | |
if(chance.bool({likelihood: 5})) { | |
this.push(null); | |
} | |
} | |
} | |
module.exports = RandomStream; |
view rawrandomStream.js hosted with ❤ by GitHub
RandomStream 객체를 인스턴스 화 하고 그 인스턴스에서 데이터 획득해보기
const RandomStream = require('./randomStream');
const randomStream = new RandomStream();
randomStream.on('readable', () => {
let chunk;
while((chunk = randomStream.read()) !== null) {
console.log(`Chunk received: ${chunk.toString()}`);
}
});
Writable 스트림
Writable 스트림은 데이터의 목적지를 나타낸다.
스트림에 쓰기
writable.write(chunk, [encoding], [callback])
encoding 인자는 선택 사항. chunk가 string일 경우 지정가능 (기본 utf8) chunk가 Buffer인 경우는 무시된다.
writable.end([chunk], [encoding], [callback])
더이상 스트림에 기록할 데이터가 없을 때 end() 메소드를 사용한다. 마지막 데이터 값 전달. 이 경우 콜백 함수는 스트림에 쓴 모든 데이터가 하위 리소스로 flush되었을 때 finish 이벤트에 listener를 등록하는 것과 같다.
Writable 스트림 구현
stream.Writable의 prototype을 상속받아 _write() 함수를 구현.
const stream = require('stream'); | |
const fs = require('fs'); | |
const path = require('path'); | |
const mkdirp = require('mkdirp'); | |
class ToFileStream extends stream.Writable { | |
constructor() { | |
super({objectMode: true}); | |
} | |
_write(chunk, encoding, callback) { | |
mkdirp(path.dirname(chunk.path), err => { | |
if(err) { | |
return callback(err); | |
} | |
fs.writeFile(chunk.path, chunk.content, callback); | |
}); | |
} | |
} | |
module.exports = ToFileStream; | |
view rawtoFileStream.js hosted with ❤ by GitHub
스트림에 대한 쓰기 작업 해보기
const ToFileStream = require('./toFileStream');
const tfs = new ToFileStream();
tfs.write({path: "file1.txt", content: "Hello"});
tfs.write({path: "file2.txt", content: "Node.js"});
tfs.write({path: "file3.txt", content: "Streams"});
tfs.end(() => console.log('All files created'))
실행하면 3개의 파일이 생성된다.
Duplex 스트림
양방향은 Readable과 Writable 모두가 가능한 스트림이다. 소캣처럼 데이터 소스와 목적지를 모두 가지는 항목을 다룰 때 유용하다.
Transform 스트림
Transform 스트림은 데이터 변환을 처리하도록 설계된 특별한 종류의 이중 스트림이다. Writable 쪽에서 받은 각 데이터들에게 어떤 종류의 변경을 적용한 수 변형된 데이터들을 Readable 쪽에서 사용할 수 있도록 한다.
새로운 Transform 스트림을 구현할 때는 transform()과 _flush()메소드를 추가로 작성해야 한다.
Transform 스트림 구현
const stream = require('stream'); | |
class ReplaceStream extends stream.Transform { | |
constructor(searchString, replaceString) { | |
super(); | |
this.searchString = searchString; | |
this.replaceString = replaceString; | |
this.tailPiece = ''; | |
} | |
_transform(chunk, encoding, callback) { | |
const pieces = (this.tailPiece + chunk).split(this.searchString); | |
const lastPiece = pieces[pieces.length -1]; | |
const tailPieceLen = this.searchString.length -1; | |
this.tailPiece = lastPiece.slice(-tailPieceLen); | |
pieces[pieces.length -1] = lastPiece.slice(0, -tailPieceLen); | |
this.push(pieces.join(this.replaceString)); | |
callback(); | |
} | |
_flush(callback) { | |
this.push(this.tailPiece); | |
callback(); | |
} | |
} | |
module.exports = ReplaceStream; |
view rawreplaceStream.js hosted with ❤ by GitHub
const ReplaceStream = require('./replaceStream');
const rs = new ReplaceStream('World', 'Node.js');
rs.on('data', chunk => console.log(chunk.toString()));
rs.write('Hello W');
rs.write('orld!');
rs.end();
https://vanayun.netlify.app/til/2019-05-24-node_09/
Node.js Design Pattern - 스트림 코딩(1)
스트림의 중요성 Node.js와 같이 이벤트 기반 플랫폼에서 I/O를 처리하는 가장 효율적인 방법은 실시간으로 들어온 입력 값을 가능한 즉시 애플리케이션에 내보내는 것이다. 버퍼 vs…
vanayun.netlify.app
'Programming > Node.js' 카테고리의 다른 글
마이크로서비스와 노드의 연관성 (0) | 2021.07.15 |
---|---|
Node.js의 Buffer란? (0) | 2021.06.29 |
PM2를 활용한 무중단 Node.js 및 Log관리 (0) | 2021.02.19 |
Node.js - 동기(Synchronous)와 비동기(Aschronsous) (0) | 2021.02.19 |
Node.js : Express-Gennerator 정리 (0) | 2021.02.18 |