瀏覽代碼

Major update: represent a chain as a Duplex instance.

Eugene Lazutkin 7 年之前
父節點
當前提交
6c05c1d88d
共有 9 個文件被更改,包括 294 次插入53 次删除
  1. 109 18
      README.md
  2. 51 11
      main.js
  3. 1 1
      package-lock.json
  4. 1 1
      package.json
  5. 1 1
      tests/test_demo.js
  6. 2 2
      tests/test_errors.js
  7. 94 0
      tests/test_readWrite.js
  8. 34 19
      tests/test_simple.js
  9. 1 0
      tests/tests.js

文件差異過大導致無法顯示
+ 109 - 18
README.md


+ 51 - 11
main.js

@@ -1,7 +1,6 @@
 'use strict';
 
-const EventEmitter = require('events');
-const {Duplex, Transform} = require('stream');
+const {Readable, Writable, Duplex, Transform} = require('stream');
 
 function processData(result, stream) {
   if (result !== undefined && result !== null) {
@@ -13,18 +12,19 @@ function processData(result, stream) {
   }
 }
 
-class Chain extends EventEmitter {
-  constructor(fns, skipEvents) {
-    super();
+class Chain extends Duplex {
+  constructor(fns, options) {
+    super(options || {writableObjectMode: true, readableObjectMode: true});
 
     if (!(fns instanceof Array) || !fns.length) {
       throw Error("Chain's argument should be a non-empty array.");
     }
 
-    this.streams = fns.map(fn => {
+    this.streams = fns.map((fn, index) => {
       if (typeof fn === 'function') {
         return new Transform({
-          objectMode: true,
+          writableObjectMode: true,
+          readableObjectMode: true,
           transform(chunk, encoding, callback) {
             try {
               const result = fn.call(this, chunk, encoding);
@@ -49,7 +49,12 @@ class Chain extends EventEmitter {
           }
         });
       }
-      if (fn instanceof Duplex || fn instanceof Transform) {
+      if (
+        fn instanceof Duplex ||
+        fn instanceof Transform ||
+        (!index && fn instanceof Readable) ||
+        (index === fns.length - 1 && fn instanceof Writable)
+      ) {
         return fn;
       }
       throw Error('Arguments should be functions or streams.');
@@ -57,13 +62,48 @@ class Chain extends EventEmitter {
     this.input = this.streams[0];
     this.output = this.streams.reduce((output, stream) => (output && output.pipe(stream)) || stream);
 
+    if (!(this.input instanceof Writable)) {
+      this._write = (_1, _2, callback) => callback(null);
+      this._final = callback => callback(null);
+      this.input.on('end', () => this.end());
+    }
+
+    if (this.output instanceof Readable) {
+      this.output.on('data', chunk => !this.push(chunk) && this.output.pause());
+      this.output.on('end', () => this.push(null));
+    } else {
+      this._read = () => {}; // nop
+      this.resume();
+      this.output.on('finish', () => this.push(null));
+    }
+
     // connect events
-    if (!skipEvents) {
+    if (!options || !options.skipEvents) {
       this.streams.forEach(stream => stream.on('error', error => this.emit('error', error)));
-      this.output.on('data', item => this.emit('data', item));
-      this.output.on('end', () => this.emit('end'));
     }
   }
+  _write(chunk, encoding, callback) {
+    let error = null;
+    try {
+      this.input.write(chunk, encoding, e => callback(e || error));
+    } catch (e) {
+      error = e;
+    }
+  }
+  _final(callback) {
+    let error = null;
+    try {
+      this.input.end(null, null, e => callback(e || error));
+    } catch (e) {
+      error = e;
+    }
+  }
+  _read() {
+    this.output.resume();
+  }
+  static chain(fns, options) {
+    return new Chain(fns, options);
+  }
 }
 
 module.exports = Chain;

+ 1 - 1
package-lock.json

@@ -1,6 +1,6 @@
 {
   "name": "stream-chain",
-  "version": "1.0.3",
+  "version": "2.0.0",
   "lockfileVersion": 1,
   "requires": true,
   "dependencies": {

+ 1 - 1
package.json

@@ -1,6 +1,6 @@
 {
   "name": "stream-chain",
-  "version": "1.0.3",
+  "version": "2.0.0",
   "description": "Chain functions as transform streams.",
   "main": "main.js",
   "scripts": {

+ 1 - 1
tests/test_demo.js

@@ -48,6 +48,6 @@ unit.add(module, [
       async.done();
     });
 
-    streamFromArray([1, 2, 3]).pipe(chain.input);
+    streamFromArray([1, 2, 3]).pipe(chain);
   }
 ]);

+ 2 - 2
tests/test_errors.js

@@ -5,7 +5,7 @@ const unit = require('heya-unit');
 const Chain = require('../main');
 
 unit.add(module, [
-  function test_noStreams(t) {
+  function test_errorsNoStreams(t) {
     try {
       const chain = new Chain([]);
       t.test(false); // shouldn't be here
@@ -13,7 +13,7 @@ unit.add(module, [
       eval(t.TEST('e instanceof Error'));
     }
   },
-  function test_wrongStreams(t) {
+  function test_errorsWrongStreams(t) {
     try {
       const chain = new Chain([1]);
       t.test(false); // shouldn't be here

+ 94 - 0
tests/test_readWrite.js

@@ -0,0 +1,94 @@
+'use strict';
+
+const unit = require('heya-unit');
+
+const Chain = require('../main');
+const {streamFromArray, streamToArray} = require('./helpers');
+const {Transform} = require('stream');
+
+unit.add(module, [
+  function test_readWriteReadable(t) {
+    const async = t.startAsync('test_readWriteReadable');
+
+    const output1 = [],
+      output2 = [],
+      chain = new Chain([streamFromArray([1, 2, 3]), x => x * x]);
+
+    chain.pipe(streamToArray(output1));
+
+    chain.on('data', value => output2.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output1, [1, 4, 9])'));
+      eval(t.TEST('t.unify(output2, [1, 4, 9])'));
+      async.done();
+    });
+  },
+  function test_readWriteWritable(t) {
+    const async = t.startAsync('test_readWriteWritable');
+
+    const output1 = [],
+      chain = new Chain([x => x * x, streamToArray(output1)]);
+
+    streamFromArray([1, 2, 3]).pipe(chain);
+
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output1, [1, 4, 9])'));
+      async.done();
+    });
+  },
+  function test_readWriteReadableWritable(t) {
+    const async = t.startAsync('test_readWriteReadableWritable');
+
+    const output1 = [],
+      chain = new Chain([streamFromArray([1, 2, 3]), x => x * x, streamToArray(output1)]);
+
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output1, [1, 4, 9])'));
+      async.done();
+    });
+  },
+  function test_readWriteSingleReadable(t) {
+    const async = t.startAsync('test_readWriteSingleReadable');
+
+    const output1 = [],
+      output2 = [],
+      chain = new Chain([streamFromArray([1, 2, 3])]);
+
+    chain.pipe(streamToArray(output1));
+
+    chain.on('data', value => output2.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output1, [1, 2, 3])'));
+      eval(t.TEST('t.unify(output2, [1, 2, 3])'));
+      async.done();
+    });
+  },
+  function test_readWriteSingleWritable(t) {
+    const async = t.startAsync('test_readWriteSingleWritable');
+
+    const output1 = [],
+      chain = new Chain([streamToArray(output1)]);
+
+    streamFromArray([1, 2, 3]).pipe(chain);
+
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output1, [1, 2, 3])'));
+      async.done();
+    });
+  },
+  function test_readWritePipeable(t) {
+    const async = t.startAsync('test_readWritePipeable');
+
+    const output1 = [],
+      output2 = [],
+      chain = new Chain([streamFromArray([1, 2, 3]), streamToArray(output1)]);
+
+    streamFromArray([4, 5, 6]).pipe(chain).pipe(streamToArray(output2));
+
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output1, [1, 2, 3])'));
+      eval(t.TEST('t.unify(output2, [])'));
+      async.done();
+    });
+  }
+]);

+ 34 - 19
tests/test_simple.js

@@ -7,15 +7,16 @@ const {streamFromArray, streamToArray} = require('./helpers');
 const {Transform} = require('stream');
 
 unit.add(module, [
-  function test_simple(t) {
-    const async = t.startAsync('test_simple');
+  function test_simpleGeneric(t) {
+    const async = t.startAsync('test_simpleGeneric');
 
     const chain = new Chain([x => x * x]),
       output1 = [],
       output2 = [];
 
-    streamFromArray([1, 2, 3]).pipe(chain.input);
-    chain.output.pipe(streamToArray(output1));
+    streamFromArray([1, 2, 3])
+      .pipe(chain)
+      .pipe(streamToArray(output1));
 
     chain.on('data', value => output2.push(value));
     chain.on('end', () => {
@@ -24,8 +25,8 @@ unit.add(module, [
       async.done();
     });
   },
-  function test_gen(t) {
-    const async = t.startAsync('test_gen');
+  function test_simpleGenerator(t) {
+    const async = t.startAsync('test_simpleGenerator');
 
     const chain = new Chain([
         function*(x) {
@@ -36,7 +37,7 @@ unit.add(module, [
       ]),
       output = [];
 
-    streamFromArray([1, 2, 3]).pipe(chain.input);
+    streamFromArray([1, 2, 3]).pipe(chain);
 
     chain.on('data', value => output.push(value));
     chain.on('end', () => {
@@ -44,8 +45,8 @@ unit.add(module, [
       async.done();
     });
   },
-  function test_async(t) {
-    const async = t.startAsync('test_async');
+  function test_simpleAsync(t) {
+    const async = t.startAsync('test_simpleAsync');
 
     const chain = new Chain([
         async x =>
@@ -55,7 +56,7 @@ unit.add(module, [
       ]),
       output = [];
 
-    streamFromArray([1, 2, 3]).pipe(chain.input);
+    streamFromArray([1, 2, 3]).pipe(chain);
 
     chain.on('data', value => output.push(value));
     chain.on('end', () => {
@@ -63,13 +64,13 @@ unit.add(module, [
       async.done();
     });
   },
-  function test_array(t) {
-    const async = t.startAsync('test_array');
+  function test_simpleArray(t) {
+    const async = t.startAsync('test_simpleArray');
 
     const chain = new Chain([x => [x * x, x * x * x, 2 * x]]),
       output = [];
 
-    streamFromArray([1, 2, 3]).pipe(chain.input);
+    streamFromArray([1, 2, 3]).pipe(chain);
 
     chain.on('data', value => output.push(value));
     chain.on('end', () => {
@@ -77,13 +78,13 @@ unit.add(module, [
       async.done();
     });
   },
-  function test_chain(t) {
-    const async = t.startAsync('test_chain');
+  function test_simpleChain(t) {
+    const async = t.startAsync('test_simpleChain');
 
     const chain = new Chain([x => x * x, x => 2 * x + 1]),
       output = [];
 
-    streamFromArray([1, 2, 3]).pipe(chain.input);
+    streamFromArray([1, 2, 3]).pipe(chain);
 
     chain.on('data', value => output.push(value));
     chain.on('end', () => {
@@ -91,8 +92,8 @@ unit.add(module, [
       async.done();
     });
   },
-  function test_stream(t) {
-    const async = t.startAsync('test_stream');
+  function test_simpleStream(t) {
+    const async = t.startAsync('test_simpleStream');
 
     const chain = new Chain([
         new Transform({
@@ -105,7 +106,21 @@ unit.add(module, [
       ]),
       output = [];
 
-    streamFromArray([1, 2, 3]).pipe(chain.input);
+    streamFromArray([1, 2, 3]).pipe(chain);
+
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [3, 9, 19])'));
+      async.done();
+    });
+  },
+  function test_simpleFactory(t) {
+    const async = t.startAsync('test_simpleChain');
+
+    const chain = Chain.chain([x => x * x, x => 2 * x + 1]),
+      output = [];
+
+    streamFromArray([1, 2, 3]).pipe(chain);
 
     chain.on('data', value => output.push(value));
     chain.on('end', () => {

+ 1 - 0
tests/tests.js

@@ -3,6 +3,7 @@
 const unit = require('heya-unit');
 
 require('./test_simple');
+require('./test_readWrite');
 require('./test_errors');
 require('./test_demo');