| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- 'use strict';
- const {Duplex} = require('stream');
- const defs = require('../defs');
- class Stream extends Duplex {
- constructor(fn, options) {
- super(Object.assign({}, options, {writableObjectMode: true, readableObjectMode: true}));
- if (typeof fn != 'function') throw Error('Only function is accepted as the first argument');
- this._fn = fn;
- // pump variables
- this._pause = true;
- this._queue = [];
- this._pending = false;
- this._chunk = this._encoding = this._callback = null;
- }
- _write(chunk, encoding, callback) {
- if (this._pause || this._queue.length) {
- this._pending = true;
- this._chunk = chunk;
- this._encoding = encoding;
- this._callback = callback;
- return;
- }
- this._process(chunk, encoding).then(
- () => callback(null),
- error => callback(error)
- );
- }
- _final(callback) {
- if (this._pause || this._queue.length) {
- this._pending = true;
- this._chunk = this._encoding = null;
- this._callback = callback;
- return;
- }
- this.push(null);
- callback(null);
- }
- _read() {
- this._pause = false;
- this._pump(this).then(() => this._pushPending());
- }
- async _process(chunk, encoding) {
- try {
- const result = this._fn(chunk, encoding);
- if (result && typeof result.then == 'function') {
- // thenable
- return await result.then(result => this._sanitize(result));
- }
- if (result && typeof result.next == 'function') {
- // generator
- this._pushResults(result);
- return await this._pump();
- }
- await this._sanitize(result);
- } catch (error) {
- if (error instanceof defs.Stop) {
- this.push(null);
- this.destroy();
- this._queue = [];
- this._chunk = this._encoding = this._callback = null;
- return;
- }
- throw error;
- }
- }
- async _pump() {
- const queue = this._queue;
- while (!this._pause && queue.length) {
- const gen = queue[queue.length - 1];
- let result = gen.next();
- if (result && typeof result.then == 'function') {
- result = await result;
- }
- if (result.done) {
- queue.pop();
- continue;
- }
- const value = result.value;
- if (value && typeof value.then == 'function') {
- value = await value;
- }
- await this._sanitize(value);
- }
- }
- _pushResults(results) {
- if (results && typeof results.next == 'function') {
- // generator
- this._queue.push(results);
- } else {
- // array
- this._queue.push(results[Symbol.iterator]());
- }
- }
- async _sanitize(result) {
- if (result !== undefined && result !== null && result === defs.none) return;
- if (result === defs.stop) throw new defs.Stop();
- if (defs.isMany(result)) {
- result = defs.getManyValues(result);
- this._pushResults(result);
- return this._pump();
- }
- if (defs.isFinalValue(result)) {
- result = defs.getFinalValue(result);
- }
- this._pause = !this.push(result);
- }
- async _pushPending() {
- if (this._pause || !this._pending) return;
- const chunk = this._chunk,
- encoding = this._encoding,
- callback = this._callback;
- this._pending = false;
- this._chunk = this._encoding = this._callback = null;
- return this._process(chunk, encoding).then(
- () => callback(null),
- error => callback(error)
- );
- }
- static make(fn, options) {
- return new Stream(fn, options);
- }
- }
- Stream.stream = Stream.make;
- Stream.make.Constructor = Stream;
- module.exports = Stream;
|