Jelajahi Sumber

Added FromIterable and regularized tests.

Eugene Lazutkin 7 tahun lalu
induk
melakukan
aede2ad6ba

+ 2 - 11
tests/helpers.js

@@ -1,15 +1,6 @@
 'use strict';
 
-const {Readable, Writable} = require('stream');
-
-const streamFromArray = array =>
-  new Readable({
-    objectMode: true,
-    read() {
-      if (isNaN(this.index)) this.index = 0;
-      this.push(this.index < array.length ? array[this.index++] : null);
-    }
-  });
+const {Writable} = require('stream');
 
 const streamToArray = array =>
   new Writable({
@@ -31,4 +22,4 @@ const delay = (fn, ms = 20) => async (...args) =>
     }, ms);
   });
 
-module.exports = {streamFromArray, streamToArray, delay};
+module.exports = {streamToArray, delay};

+ 95 - 0
tests/test_FromIterable.js

@@ -0,0 +1,95 @@
+'use strict';
+
+const unit = require('heya-unit');
+
+const Chain = require('../index');
+const {streamToArray, delay} = require('./helpers');
+
+const {fromIterable} = require('../utils/FromIterable');
+
+unit.add(module, [
+  function test_FromIterable(t) {
+    const async = t.startAsync('test_FromIterable');
+
+    const output = [],
+      chain = new Chain([fromIterable([1, 2, 3]), streamToArray(output)]);
+
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [1, 2, 3])'));
+      async.done();
+    });
+  },
+  function test_FromIterableFun(t) {
+    const async = t.startAsync('test_FromIterableFun');
+
+    const output = [],
+      chain = new Chain([fromIterable(() => 0), streamToArray(output)]);
+
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [0])'));
+      async.done();
+    });
+  },
+  function test_FromIterableAsyncFun(t) {
+    const async = t.startAsync('test_FromIterableAsyncFun');
+
+    const output = [],
+      chain = new Chain([fromIterable(delay(() => 0)), streamToArray(output)]);
+
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [0])'));
+      async.done();
+    });
+  },
+  function test_FromIterableGen(t) {
+    const async = t.startAsync('test_FromIterableGen');
+
+    const output = [],
+      chain = new Chain([
+        fromIterable(function*() {
+          yield 0;
+          yield 1;
+        }),
+        streamToArray(output)
+      ]);
+
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [0, 1])'));
+      async.done();
+    });
+  },
+  // function test_FromIterableAsyncGen(t) {
+  //   const async = t.startAsync('test_FromIterableAsyncGen');
+
+  //   const output = [],
+  //     chain = new Chain([
+  //       fromIterable(async function*() {
+  //         yield delay(() => 0)();
+  //         yield delay(() => 1)();
+  //       }),
+  //       streamToArray(output)
+  //     ]);
+
+  //   chain.on('end', () => {
+  //     eval(t.TEST('t.unify(output, [0, 1])'));
+  //     async.done();
+  //   });
+  // },
+  function test_FromIterableNextable(t) {
+    const async = t.startAsync('test_FromIterableNextable');
+
+    const output = [],
+      chain = new Chain([
+        fromIterable((function*() {
+          yield 0;
+          yield 1;
+        })()),
+        streamToArray(output)
+      ]);
+
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [0, 1])'));
+      async.done();
+    });
+  }
+]);

+ 46 - 59
tests/test_comp.js

@@ -3,8 +3,9 @@
 const unit = require('heya-unit');
 
 const Chain = require('../index');
-const {streamFromArray, delay} = require('./helpers');
+const {streamToArray, delay} = require('./helpers');
 
+const {fromIterable} = require('../utils/FromIterable');
 const comp = require('../utils/comp');
 
 const {none, final, many} = Chain;
@@ -13,12 +14,9 @@ unit.add(module, [
   function test_comp(t) {
     const async = t.startAsync('test_comp');
 
-    const chain = new Chain([comp(x => x * x, x => 2 * x + 1)]),
-      output = [];
+    const output = [],
+      chain = new Chain([fromIterable([1, 2, 3]), comp(x => x * x, x => 2 * x + 1), streamToArray(output)]);
 
-    streamFromArray([1, 2, 3]).pipe(chain);
-
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [3, 9, 19])'));
       async.done();
@@ -27,12 +25,13 @@ unit.add(module, [
   function test_compFinal(t) {
     const async = t.startAsync('test_compFinal');
 
-    const chain = new Chain([comp(x => x * x, x => final(x), x => 2 * x + 1)]),
-      output = [];
-
-    streamFromArray([1, 2, 3]).pipe(chain);
+    const output = [],
+      chain = new Chain([
+        fromIterable([1, 2, 3]),
+        comp(x => x * x, x => final(x), x => 2 * x + 1),
+        streamToArray(output)
+      ]);
 
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [1, 4, 9])'));
       async.done();
@@ -41,12 +40,9 @@ unit.add(module, [
   function test_compNothing(t) {
     const async = t.startAsync('test_compNothing');
 
-    const chain = new Chain([comp(x => x * x, () => none, x => 2 * x + 1)]),
-      output = [];
-
-    streamFromArray([1, 2, 3]).pipe(chain);
+    const output = [],
+      chain = new Chain([fromIterable([1, 2, 3]), comp(x => x * x, () => none, x => 2 * x + 1), streamToArray(output)]);
 
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [])'));
       async.done();
@@ -55,12 +51,9 @@ unit.add(module, [
   function test_compEmpty(t) {
     const async = t.startAsync('test_compEmpty');
 
-    const chain = new Chain([x => x * x, comp()]),
-      output = [];
+    const output = [],
+      chain = new Chain([fromIterable([1, 2, 3]), x => x * x, comp(), streamToArray(output)]);
 
-    streamFromArray([1, 2, 3]).pipe(chain);
-
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [1, 4, 9])'));
       async.done();
@@ -69,12 +62,9 @@ unit.add(module, [
   function test_compAsync(t) {
     const async = t.startAsync('test_compAsync');
 
-    const chain = new Chain([comp(delay(x => x * x), x => 2 * x + 1)]),
-      output = [];
-
-    streamFromArray([1, 2, 3]).pipe(chain);
+    const output = [],
+      chain = new Chain([fromIterable([1, 2, 3]), comp(delay(x => x * x), x => 2 * x + 1), streamToArray(output)]);
 
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [3, 9, 19])'));
       async.done();
@@ -83,7 +73,9 @@ unit.add(module, [
   function test_compGenerator(t) {
     const async = t.startAsync('test_compGenerator');
 
-    const chain = new Chain([
+    const output = [],
+      chain = new Chain([
+        fromIterable([1, 2, 3]),
         comp(
           x => x * x,
           function*(x) {
@@ -92,13 +84,10 @@ unit.add(module, [
             yield x + 2;
           },
           x => 2 * x + 1
-        )
-      ]),
-      output = [];
+        ),
+        streamToArray(output)
+      ]);
 
-    streamFromArray([1, 2, 3]).pipe(chain);
-
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [3, 5, 7, 9, 11, 13, 19, 21, 23])'));
       async.done();
@@ -107,12 +96,13 @@ unit.add(module, [
   function test_compMany(t) {
     const async = t.startAsync('test_compMany');
 
-    const chain = new Chain([comp(x => x * x, x => many([x, x + 1, x + 2]), x => 2 * x + 1)]),
-      output = [];
-
-    streamFromArray([1, 2, 3]).pipe(chain);
+    const output = [],
+      chain = new Chain([
+        fromIterable([1, 2, 3]),
+        comp(x => x * x, x => many([x, x + 1, x + 2]), x => 2 * x + 1),
+        streamToArray(output)
+      ]);
 
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [3, 5, 7, 9, 11, 13, 19, 21, 23])'));
       async.done();
@@ -121,7 +111,9 @@ unit.add(module, [
   function test_compCombined(t) {
     const async = t.startAsync('test_compCombined');
 
-    const chain = new Chain([
+    const output = [],
+      chain = new Chain([
+        fromIterable([1, 2]),
         comp(
           delay(x => -x),
           x => many([x, x * 10]),
@@ -130,13 +122,10 @@ unit.add(module, [
             yield x - 1;
           },
           x => -x
-        )
-      ]),
-      output = [];
-
-    streamFromArray([1, 2]).pipe(chain);
+        ),
+        streamToArray(output)
+      ]);
 
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [1, 2, 10, 11, 2, 3, 20, 21])'));
       async.done();
@@ -145,7 +134,9 @@ unit.add(module, [
   function test_compCombinedFinal(t) {
     const async = t.startAsync('test_compCombinedFinal');
 
-    const chain = new Chain([
+    const output = [],
+      chain = new Chain([
+        fromIterable([1, 2]),
         comp(
           delay(x => -x),
           x => many([x, x * 10]),
@@ -154,13 +145,10 @@ unit.add(module, [
             yield final(x - 1);
           },
           x => -x
-        )
-      ]),
-      output = [];
+        ),
+        streamToArray(output)
+      ]);
 
-    streamFromArray([1, 2]).pipe(chain);
-
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [1, -2, 10, -11, 2, -3, 20, -21])'));
       async.done();
@@ -169,7 +157,9 @@ unit.add(module, [
   function test_compAsFun(t) {
     const async = t.startAsync('test_compAsFun');
 
-    const chain = new Chain([
+    const output = [],
+      chain = new Chain([
+        fromIterable([1, 2]),
         comp.asFun(
           delay(x => -x),
           x => many([x, x * 10]),
@@ -178,13 +168,10 @@ unit.add(module, [
             yield final(x - 1);
           },
           x => -x
-        )
-      ]),
-      output = [];
-
-    streamFromArray([1, 2]).pipe(chain);
+        ),
+        streamToArray(output)
+      ]);
 
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [1, -2, 10, -11, 2, -3, 20, -21])'));
       async.done();

+ 2 - 2
tests/test_demo.js

@@ -3,7 +3,7 @@
 const unit = require('heya-unit');
 
 const Chain = require('../index');
-const {streamFromArray} = require('./helpers');
+const {fromIterable} = require('../utils/FromIterable');
 const {Transform} = require('stream');
 
 unit.add(module, [
@@ -48,6 +48,6 @@ unit.add(module, [
       async.done();
     });
 
-    streamFromArray([1, 2, 3]).pipe(chain);
+    fromIterable([1, 2, 3]).pipe(chain);
   }
 ]);

+ 19 - 30
tests/test_fold.js

@@ -3,22 +3,20 @@
 const unit = require('heya-unit');
 
 const Chain = require('../index');
-const {streamFromArray, delay} = require('./helpers');
+const {streamToArray, delay} = require('./helpers');
 
+const {fromIterable} = require('../utils/FromIterable');
 const fold = require('../utils/fold');
 const scan = require('../utils/scan');
-const Reduce = require('../utils/Reduce');
+const {reduce} = require('../utils/Reduce');
 
 unit.add(module, [
   function test_fold(t) {
     const async = t.startAsync('test_fold');
 
-    const chain = new Chain([fold((acc, x) => acc + x, 0)]),
-      output = [];
+    const output = [],
+      chain = new Chain([fromIterable([1, 2, 3]), fold((acc, x) => acc + x, 0), streamToArray(output)]);
 
-    streamFromArray([1, 2, 3]).pipe(chain);
-
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [6])'));
       async.done();
@@ -27,12 +25,9 @@ unit.add(module, [
   function test_foldAsync(t) {
     const async = t.startAsync('test_foldAsync');
 
-    const chain = new Chain([fold(delay((acc, x) => acc + x), 0)]),
-      output = [];
-
-    streamFromArray([1, 2, 3]).pipe(chain);
+    const output = [],
+      chain = new Chain([fromIterable([1, 2, 3]), fold(delay((acc, x) => acc + x), 0), streamToArray(output)]);
 
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [6])'));
       async.done();
@@ -41,12 +36,9 @@ unit.add(module, [
   function test_foldScan(t) {
     const async = t.startAsync('test_foldScan');
 
-    const chain = new Chain([scan((acc, x) => acc + x, 0)]),
-      output = [];
+    const output = [],
+      chain = new Chain([fromIterable([1, 2, 3]), scan((acc, x) => acc + x, 0), streamToArray(output)]);
 
-    streamFromArray([1, 2, 3]).pipe(chain);
-
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [1, 3, 6])'));
       async.done();
@@ -55,12 +47,9 @@ unit.add(module, [
   function test_foldScanAsync(t) {
     const async = t.startAsync('test_foldScanAsync');
 
-    const chain = new Chain([scan(delay((acc, x) => acc + x), 0)]),
-      output = [];
-
-    streamFromArray([1, 2, 3]).pipe(chain);
+    const output = [],
+      chain = new Chain([fromIterable([1, 2, 3]), scan(delay((acc, x) => acc + x), 0), streamToArray(output)]);
 
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [1, 3, 6])'));
       async.done();
@@ -69,24 +58,24 @@ unit.add(module, [
   function test_foldReduce(t) {
     const async = t.startAsync('test_foldReduce');
 
-    const reduce = new Reduce({reducer: (acc, x) => acc + x, initial: 0});
+    const r = reduce((acc, x) => acc + x, 0);
 
-    streamFromArray([1, 2, 3]).pipe(reduce);
+    fromIterable([1, 2, 3]).pipe(r);
 
-    reduce.on('finish', () => {
-      eval(t.TEST('t.unify(reduce.accumulator, 6)'));
+    r.on('finish', () => {
+      eval(t.TEST('t.unify(r.accumulator, 6)'));
       async.done();
     });
   },
   function test_foldReduceAsync(t) {
     const async = t.startAsync('test_foldReduceAsync');
 
-    const reduce = new Reduce({reducer: delay((acc, x) => acc + x), initial: 0});
+    const r = reduce({reducer: delay((acc, x) => acc + x), initial: 0});
 
-    streamFromArray([1, 2, 3]).pipe(reduce);
+    fromIterable([1, 2, 3]).pipe(r);
 
-    reduce.on('finish', () => {
-      eval(t.TEST('t.unify(reduce.accumulator, 6)'));
+    r.on('finish', () => {
+      eval(t.TEST('t.unify(r.accumulator, 6)'));
       async.done();
     });
   }

+ 17 - 16
tests/test_readWrite.js

@@ -3,7 +3,8 @@
 const unit = require('heya-unit');
 
 const Chain = require('../index');
-const {streamFromArray, streamToArray} = require('./helpers');
+const {streamToArray} = require('./helpers');
+const {fromIterable} = require('../utils/FromIterable');
 
 unit.add(module, [
   function test_readWriteReadable(t) {
@@ -11,7 +12,7 @@ unit.add(module, [
 
     const output1 = [],
       output2 = [],
-      chain = new Chain([streamFromArray([1, 2, 3]), x => x * x]);
+      chain = new Chain([fromIterable([1, 2, 3]), x => x * x]);
 
     chain.pipe(streamToArray(output1));
 
@@ -25,24 +26,24 @@ unit.add(module, [
   function test_readWriteWritable(t) {
     const async = t.startAsync('test_readWriteWritable');
 
-    const output1 = [],
-      chain = new Chain([x => x * x, streamToArray(output1)]);
+    const output = [],
+      chain = new Chain([x => x * x, streamToArray(output)]);
 
-    streamFromArray([1, 2, 3]).pipe(chain);
+    fromIterable([1, 2, 3]).pipe(chain);
 
     chain.on('end', () => {
-      eval(t.TEST('t.unify(output1, [1, 4, 9])'));
+      eval(t.TEST('t.unify(output, [1, 4, 9])'));
       async.done();
     });
   },
   function test_readWriteReadableWritable(t) {
     const async = t.startAsync('test_readWriteReadableWritable');
 
-    const output1 = [],
-      chain = new Chain([streamFromArray([1, 2, 3]), x => x * x, streamToArray(output1)]);
+    const output = [],
+      chain = new Chain([fromIterable([1, 2, 3]), x => x * x, streamToArray(output)]);
 
     chain.on('end', () => {
-      eval(t.TEST('t.unify(output1, [1, 4, 9])'));
+      eval(t.TEST('t.unify(output, [1, 4, 9])'));
       async.done();
     });
   },
@@ -51,7 +52,7 @@ unit.add(module, [
 
     const output1 = [],
       output2 = [],
-      chain = new Chain([streamFromArray([1, 2, 3])]);
+      chain = new Chain([fromIterable([1, 2, 3])]);
 
     chain.pipe(streamToArray(output1));
 
@@ -65,13 +66,13 @@ unit.add(module, [
   function test_readWriteSingleWritable(t) {
     const async = t.startAsync('test_readWriteSingleWritable');
 
-    const output1 = [],
-      chain = new Chain([streamToArray(output1)]);
+    const output = [],
+      chain = new Chain([streamToArray(output)]);
 
-    streamFromArray([1, 2, 3]).pipe(chain);
+    fromIterable([1, 2, 3]).pipe(chain);
 
     chain.on('end', () => {
-      eval(t.TEST('t.unify(output1, [1, 2, 3])'));
+      eval(t.TEST('t.unify(output, [1, 2, 3])'));
       async.done();
     });
   },
@@ -80,9 +81,9 @@ unit.add(module, [
 
     const output1 = [],
       output2 = [],
-      chain = new Chain([streamFromArray([1, 2, 3]), streamToArray(output1)]);
+      chain = new Chain([fromIterable([1, 2, 3]), streamToArray(output1)]);
 
-    streamFromArray([4, 5, 6])
+    fromIterable([4, 5, 6])
       .pipe(chain)
       .pipe(streamToArray(output2));
 

+ 25 - 41
tests/test_simple.js

@@ -3,7 +3,8 @@
 const unit = require('heya-unit');
 
 const Chain = require('../index');
-const {streamFromArray, streamToArray, delay} = require('./helpers');
+const {streamToArray, delay} = require('./helpers');
+const {fromIterable} = require('../utils/FromIterable');
 const {Transform} = require('stream');
 
 unit.add(module, [
@@ -14,7 +15,7 @@ unit.add(module, [
       output1 = [],
       output2 = [];
 
-    streamFromArray([1, 2, 3])
+    fromIterable([1, 2, 3])
       .pipe(chain)
       .pipe(streamToArray(output1));
 
@@ -28,18 +29,17 @@ unit.add(module, [
   function test_simpleGenerator(t) {
     const async = t.startAsync('test_simpleGenerator');
 
-    const chain = new Chain([
+    const output = [],
+      chain = new Chain([
+        fromIterable([1, 2, 3]),
         function*(x) {
           yield x * x;
           yield x * x * x;
           yield 2 * x;
-        }
-      ]),
-      output = [];
+        },
+        streamToArray(output)
+      ]);
 
-    streamFromArray([1, 2, 3]).pipe(chain);
-
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [1, 1, 2, 4, 8, 4, 9, 27, 6])'));
       async.done();
@@ -48,12 +48,9 @@ unit.add(module, [
   function test_simpleAsync(t) {
     const async = t.startAsync('test_simpleAsync');
 
-    const chain = new Chain([delay(x => x + 1)]),
-      output = [];
-
-    streamFromArray([1, 2, 3]).pipe(chain);
+    const output = [],
+      chain = new Chain([fromIterable([1, 2, 3]), delay(x => x + 1), streamToArray(output)]);
 
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [2, 3, 4])'));
       async.done();
@@ -62,12 +59,9 @@ unit.add(module, [
   function test_simpleArray(t) {
     const async = t.startAsync('test_simpleArray');
 
-    const chain = new Chain([x => [x * x, x * x * x, 2 * x]]),
-      output = [];
+    const output = [],
+      chain = new Chain([fromIterable([1, 2, 3]), x => [x * x, x * x * x, 2 * x], streamToArray(output)]);
 
-    streamFromArray([1, 2, 3]).pipe(chain);
-
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [1, 1, 2, 4, 8, 4, 9, 27, 6])'));
       async.done();
@@ -76,12 +70,9 @@ unit.add(module, [
   function test_simpleMany(t) {
     const async = t.startAsync('test_simpleMany');
 
-    const chain = new Chain([x => Chain.many([x * x, x * x * x, 2 * x])]),
-      output = [];
-
-    streamFromArray([1, 2, 3]).pipe(chain);
+    const output = [],
+      chain = new Chain([fromIterable([1, 2, 3]), x => Chain.many([x * x, x * x * x, 2 * x]), streamToArray(output)]);
 
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [1, 1, 2, 4, 8, 4, 9, 27, 6])'));
       async.done();
@@ -90,12 +81,9 @@ unit.add(module, [
   function test_simpleChain(t) {
     const async = t.startAsync('test_simpleChain');
 
-    const chain = new Chain([x => x * x, x => 2 * x + 1]),
-      output = [];
+    const output = [],
+      chain = new Chain([fromIterable([1, 2, 3]), x => x * x, x => 2 * x + 1, streamToArray(output)]);
 
-    streamFromArray([1, 2, 3]).pipe(chain);
-
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [3, 9, 19])'));
       async.done();
@@ -104,20 +92,19 @@ unit.add(module, [
   function test_simpleStream(t) {
     const async = t.startAsync('test_simpleStream');
 
-    const chain = new Chain([
+    const output = [],
+      chain = new Chain([
+        fromIterable([1, 2, 3]),
         new Transform({
           objectMode: true,
           transform(x, _, callback) {
             callback(null, x * x);
           }
         }),
-        x => 2 * x + 1
-      ]),
-      output = [];
-
-    streamFromArray([1, 2, 3]).pipe(chain);
+        x => 2 * x + 1,
+        streamToArray(output)
+      ]);
 
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [3, 9, 19])'));
       async.done();
@@ -126,12 +113,9 @@ unit.add(module, [
   function test_simpleFactory(t) {
     const async = t.startAsync('test_simpleChain');
 
-    const chain = Chain.chain([x => x * x, x => 2 * x + 1]),
-      output = [];
-
-    streamFromArray([1, 2, 3]).pipe(chain);
+    const output = [],
+      chain = Chain.chain([fromIterable([1, 2, 3]), x => x * x, x => 2 * x + 1, streamToArray(output)]);
 
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [3, 9, 19])'));
       async.done();

+ 8 - 16
tests/test_skip.js

@@ -3,8 +3,9 @@
 const unit = require('heya-unit');
 
 const Chain = require('../index');
-const {streamFromArray, delay} = require('./helpers');
+const {streamToArray, delay} = require('./helpers');
 
+const {fromIterable} = require('../utils/FromIterable');
 const skip = require('../utils/skip');
 const skipWhile = require('../utils/skipWhile');
 
@@ -12,12 +13,9 @@ unit.add(module, [
   function test_skip(t) {
     const async = t.startAsync('test_skip');
 
-    const chain = new Chain([skip(2)]),
-      output = [];
+    const output = [],
+      chain = new Chain([fromIterable([1, 2, 3, 4, 5]), skip(2), streamToArray(output)]);
 
-    streamFromArray([1, 2, 3, 4, 5]).pipe(chain);
-
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [3, 4, 5])'));
       async.done();
@@ -26,12 +24,9 @@ unit.add(module, [
   function test_skipWhile(t) {
     const async = t.startAsync('test_skipWhile');
 
-    const chain = new Chain([skipWhile(x => x != 3)]),
-      output = [];
-
-    streamFromArray([1, 2, 3, 4, 5]).pipe(chain);
+    const output = [],
+      chain = new Chain([fromIterable([1, 2, 3, 4, 5]), skipWhile(x => x != 3), streamToArray(output)]);
 
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [3, 4, 5])'));
       async.done();
@@ -40,12 +35,9 @@ unit.add(module, [
   function test_skipWhileAsync(t) {
     const async = t.startAsync('test_skipWhileAsync');
 
-    const chain = new Chain([skipWhile(delay(x => x != 3))]),
-      output = [];
-
-    streamFromArray([1, 2, 3, 4, 5]).pipe(chain);
+    const output = [],
+      chain = new Chain([fromIterable([1, 2, 3, 4, 5]), skipWhile(delay(x => x != 3)), streamToArray(output)]);
 
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [3, 4, 5])'));
       async.done();

+ 10 - 21
tests/test_take.js

@@ -3,8 +3,9 @@
 const unit = require('heya-unit');
 
 const Chain = require('../index');
-const {streamFromArray, delay} = require('./helpers');
+const {streamToArray, delay} = require('./helpers');
 
+const {fromIterable} = require('../utils/FromIterable');
 const take = require('../utils/take');
 const takeWhile = require('../utils/takeWhile');
 
@@ -12,12 +13,9 @@ unit.add(module, [
   function test_take(t) {
     const async = t.startAsync('test_take');
 
-    const chain = new Chain([take(2)]),
-      output = [];
+    const output = [],
+      chain = new Chain([fromIterable([1, 2, 3, 4, 5]), take(2), streamToArray(output)]);
 
-    streamFromArray([1, 2, 3, 4, 5]).pipe(chain);
-
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [1, 2])'));
       async.done();
@@ -26,12 +24,9 @@ unit.add(module, [
   function test_takeWithSkip(t) {
     const async = t.startAsync('test_takeWithSkip');
 
-    const chain = new Chain([take({n: 2, skip: 2})]),
-      output = [];
-
-    streamFromArray([1, 2, 3, 4, 5]).pipe(chain);
+    const output = [],
+      chain = new Chain([fromIterable([1, 2, 3, 4, 5]), take({n: 2, skip: 2}), streamToArray(output)]);
 
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [3, 4])'));
       async.done();
@@ -40,12 +35,9 @@ unit.add(module, [
   function test_takeWhile(t) {
     const async = t.startAsync('test_takeWhile');
 
-    const chain = new Chain([takeWhile(x => x != 3)]),
-      output = [];
+    const output = [],
+      chain = new Chain([fromIterable([1, 2, 3, 4, 5]), takeWhile(x => x != 3), streamToArray(output)]);
 
-    streamFromArray([1, 2, 3, 4, 5]).pipe(chain);
-
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [1, 2])'));
       async.done();
@@ -54,12 +46,9 @@ unit.add(module, [
   function test_takeWhileAsync(t) {
     const async = t.startAsync('test_takeWhileAsync');
 
-    const chain = new Chain([takeWhile(delay(x => x != 3))]),
-      output = [];
-
-    streamFromArray([1, 2, 3, 4, 5]).pipe(chain);
+    const output = [],
+      chain = new Chain([fromIterable([1, 2, 3, 4, 5]), takeWhile(delay(x => x != 3)), streamToArray(output)]);
 
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [1, 2])'));
       async.done();

+ 20 - 26
tests/test_transducers.js

@@ -3,18 +3,16 @@
 const unit = require('heya-unit');
 
 const Chain = require('../index');
-const {streamFromArray} = require('./helpers');
+const {streamToArray} = require('./helpers');
+const {fromIterable} = require('../utils/FromIterable');
 
 unit.add(module, [
   function test_transducers(t) {
     const async = t.startAsync('test_transducers');
 
-    const chain = new Chain([[x => x * x, x => 2 * x + 1]]),
-      output = [];
+    const output = [],
+      chain = new Chain([fromIterable([1, 2, 3]), [x => x * x, x => 2 * x + 1], streamToArray(output)]);
 
-    streamFromArray([1, 2, 3]).pipe(chain);
-
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [3, 9, 19])'));
       async.done();
@@ -23,12 +21,13 @@ unit.add(module, [
   function test_transducersFinal(t) {
     const async = t.startAsync('test_transducersFinal');
 
-    const chain = new Chain([[x => x * x, x => Chain.final(x), x => 2 * x + 1]]),
-      output = [];
-
-    streamFromArray([1, 2, 3]).pipe(chain);
+    const output = [],
+      chain = new Chain([
+        fromIterable([1, 2, 3]),
+        [x => x * x, x => Chain.final(x), x => 2 * x + 1],
+        streamToArray(output)
+      ]);
 
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [1, 4, 9])'));
       async.done();
@@ -37,12 +36,13 @@ unit.add(module, [
   function test_transducersNothing(t) {
     const async = t.startAsync('test_transducersNothing');
 
-    const chain = new Chain([[x => x * x, () => Chain.none, x => 2 * x + 1]]),
-      output = [];
-
-    streamFromArray([1, 2, 3]).pipe(chain);
+    const output = [],
+      chain = new Chain([
+        fromIterable([1, 2, 3]),
+        [x => x * x, () => Chain.none, x => 2 * x + 1],
+        streamToArray(output)
+      ]);
 
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [])'));
       async.done();
@@ -51,12 +51,9 @@ unit.add(module, [
   function test_transducersEmpty(t) {
     const async = t.startAsync('test_transducersEmpty');
 
-    const chain = new Chain([x => x * x, []]),
-      output = [];
+    const output = [],
+      chain = new Chain([fromIterable([1, 2, 3]), x => x * x, [], streamToArray(output)]);
 
-    streamFromArray([1, 2, 3]).pipe(chain);
-
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [1, 4, 9])'));
       async.done();
@@ -65,12 +62,9 @@ unit.add(module, [
   function test_transducersOne(t) {
     const async = t.startAsync('test_transducersOne');
 
-    const chain = new Chain([x => x * x, [x => 2 * x + 1]]),
-      output = [];
-
-    streamFromArray([1, 2, 3]).pipe(chain);
+    const output = [],
+      chain = new Chain([fromIterable([1, 2, 3]), x => x * x, [x => 2 * x + 1], streamToArray(output)]);
 
-    chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [3, 9, 19])'));
       async.done();

+ 2 - 0
tests/tests.js

@@ -2,6 +2,8 @@
 
 const unit = require('heya-unit');
 
+require('./test_FromIterable');
+
 require('./test_simple');
 require('./test_readWrite');
 require('./test_errors');

+ 94 - 0
utils/FromIterable.js

@@ -0,0 +1,94 @@
+'use strict';
+
+const {Readable} = require('stream');
+
+class FromIterable extends Readable {
+  constructor(options) {
+    super(Object.assign({}, options, {objectMode: true}));
+    this._iterable = null;
+    this._next = null;
+    if (options) {
+      'iterable' in options && (this._iterable = options.iterable);
+    }
+    !this._iterable && (this._read = this._readStop);
+  }
+
+  _read() {
+    if (Symbol.asyncIterator && typeof this._iterable[Symbol.asyncIterator] == 'function') {
+      this._next = this._iterable[Symbol.asyncIterator]();
+      this._iterable = null;
+      this._read = this._readNext;
+      this._readNext();
+      return;
+    }
+    if (Symbol.iterator && typeof this._iterable[Symbol.iterator] == 'function') {
+      this._next = this._iterable[Symbol.iterator]();
+      this._iterable = null;
+      this._read = this._readNext;
+      this._readNext();
+      return;
+    }
+    if (typeof this._iterable.next == 'function') {
+      this._next = this._iterable;
+      this._iterable = null;
+      this._read = this._readNext;
+      this._readNext();
+      return;
+    }
+    const result = this._iterable();
+    this._iterable = null;
+    if (result && typeof result.then == 'function') {
+      result.then(value => this.push(value), error => this.emit('error', error));
+      this._read = this._readStop;
+      return;
+    }
+    if (result && typeof result.next == 'function') {
+      this._next = result;
+      this._read = this._readNext;
+      this._readNext();
+      return;
+    }
+    this.push(result);
+    this._read = this._readStop;
+  }
+
+  _readNext() {
+    for (;;) {
+      const result = this._next.next();
+      if (result && typeof result.then == 'function') {
+        result.then(
+          value => {
+            if (value.done || value.value === null) {
+              this.push(null);
+              this._next = null;
+              this._read = this._readStop;
+            } else {
+              this.push(value.value);
+            }
+          },
+          error => this.emit('error', error)
+        );
+        break;
+      }
+      if (result.done || result.value === null) {
+        this.push(null);
+        this._next = null;
+        this._read = this._readStop;
+        break;
+      }
+      if (!this.push(result.value)) break;
+    }
+  }
+
+  _readStop() {
+    this.push(null);
+  }
+
+  static make(iterable) {
+    return new FromIterable(typeof iterable == 'object' && iterable.iterable ? iterable : {iterable});
+  }
+}
+FromIterable.fromIterable = FromIterable.make;
+FromIterable.make.Constructor = FromIterable;
+
+module.exports = FromIterable;