Eugene Lazutkin 3 jaren geleden
bovenliggende
commit
9edea83e41
15 gewijzigde bestanden met toevoegingen van 238 en 1 verwijderingen
  1. 1 1
      AsStream.js
  2. 0 0
      src/defs.js
  3. 0 0
      src/fun.js
  4. 0 0
      src/gen.js
  5. 0 0
      src/index.js
  6. 94 0
      src/utils/FromIterable.js
  7. 44 0
      src/utils/ReduceStream.js
  8. 19 0
      src/utils/fold.js
  9. 1 0
      src/utils/reduce.js
  10. 11 0
      src/utils/scan.js
  11. 7 0
      src/utils/skip.js
  12. 23 0
      src/utils/skipWhile.js
  13. 7 0
      src/utils/take.js
  14. 23 0
      src/utils/takeWhile.js
  15. 8 0
      src/utils/takeWithSkip.js

+ 1 - 1
AsStream.js

@@ -41,7 +41,7 @@ class AsStream extends Duplex {
       return;
     }
     this._processChunk(defs.none, null).then(
-      () => callback(null),
+      () => (this.push(null), callback(null)),
       error => callback(error)
     );
   }

defs.js → src/defs.js


fun.js → src/fun.js


gen.js → src/gen.js


index.js → src/index.js


+ 94 - 0
src/utils/FromIterable.js

@@ -0,0 +1,94 @@
+'use strict';
+
+const {Readable} = require('stream');
+
+class FromIterable extends Readable {
+  constructor(options) {
+    super(Object.assign({}, options, {objectMode: true}));
+    this._iterable = null;
+    this._next = null;
+    if (options) {
+      'iterable' in options && (this._iterable = options.iterable);
+    }
+    !this._iterable && (this._read = this._readStop);
+  }
+
+  _read() {
+    if (Symbol.asyncIterator && typeof this._iterable[Symbol.asyncIterator] == 'function') {
+      this._next = this._iterable[Symbol.asyncIterator]();
+      this._iterable = null;
+      this._read = this._readNext;
+      this._readNext();
+      return;
+    }
+    if (Symbol.iterator && typeof this._iterable[Symbol.iterator] == 'function') {
+      this._next = this._iterable[Symbol.iterator]();
+      this._iterable = null;
+      this._read = this._readNext;
+      this._readNext();
+      return;
+    }
+    if (typeof this._iterable.next == 'function') {
+      this._next = this._iterable;
+      this._iterable = null;
+      this._read = this._readNext;
+      this._readNext();
+      return;
+    }
+    const result = this._iterable();
+    this._iterable = null;
+    if (result && typeof result.then == 'function') {
+      result.then(value => this.push(value), error => this.emit('error', error));
+      this._read = this._readStop;
+      return;
+    }
+    if (result && typeof result.next == 'function') {
+      this._next = result;
+      this._read = this._readNext;
+      this._readNext();
+      return;
+    }
+    this.push(result);
+    this._read = this._readStop;
+  }
+
+  _readNext() {
+    for (;;) {
+      const result = this._next.next();
+      if (result && typeof result.then == 'function') {
+        result.then(
+          value => {
+            if (value.done || value.value === null) {
+              this.push(null);
+              this._next = null;
+              this._read = this._readStop;
+            } else {
+              this.push(value.value);
+            }
+          },
+          error => this.emit('error', error)
+        );
+        break;
+      }
+      if (result.done || result.value === null) {
+        this.push(null);
+        this._next = null;
+        this._read = this._readStop;
+        break;
+      }
+      if (!this.push(result.value)) break;
+    }
+  }
+
+  _readStop() {
+    this.push(null);
+  }
+
+  static make(iterable) {
+    return new FromIterable(typeof iterable == 'object' && iterable.iterable ? iterable : {iterable});
+  }
+}
+FromIterable.fromIterable = FromIterable.make;
+FromIterable.make.Constructor = FromIterable;
+
+module.exports = FromIterable;

+ 44 - 0
src/utils/ReduceStream.js

@@ -0,0 +1,44 @@
+'use strict';
+
+const {Writable} = require('stream');
+
+const defaultInitial = 0;
+const defaultReducer = (acc, value) => value;
+
+class ReduceStream extends Writable {
+  constructor(options) {
+    super(Object.assign({}, options, {objectMode: true}));
+    this.accumulator = defaultInitial;
+    this._reducer = defaultReducer;
+    if (options) {
+      'initial' in options && (this.accumulator = options.initial);
+      'reducer' in options && (this._reducer = options.reducer);
+    }
+  }
+  _write(chunk, encoding, callback) {
+    const result = this._reducer.call(this, this.accumulator, chunk);
+    if (result && typeof result.then == 'function') {
+      result.then(
+        value => {
+          this.accumulator = value;
+          callback(null);
+        },
+        error => callback(error)
+      );
+    } else {
+      this.accumulator = result;
+      callback(null);
+    }
+  }
+  static make(reducer, initial) {
+    return new ReduceStream(typeof reducer == 'object' ? reducer : {reducer, initial});
+  }
+}
+ReduceStream.reduceStream = ReduceStream.make;
+ReduceStream.make.Constructor = ReduceStream;
+
+module.exports = ReduceStream;
+
+// to keep ESM happy:
+module.exports.reduceStream = ReduceStream.make;
+module.exports.make = ReduceStream.make;

+ 19 - 0
src/utils/fold.js

@@ -0,0 +1,19 @@
+'use strict';
+
+const {none, flushable} = require('../defs');
+
+const fold = (f, acc) =>
+  flushable(value => {
+    if (value === none) return acc;
+    const result = f(acc, value);
+    if (result && typeof result.then == 'function') {
+      return result.then(result => {
+        acc = result;
+        return none;
+      });
+    }
+    acc = result;
+    return none;
+  });
+
+module.exports = fold;

+ 1 - 0
src/utils/reduce.js

@@ -0,0 +1 @@
+module.exports = require('./fold');

+ 11 - 0
src/utils/scan.js

@@ -0,0 +1,11 @@
+'use strict';
+
+const scan = (f, acc) => value => {
+  const result = f(acc, value);
+  if (result && typeof result.then == 'function') {
+    return result.then(result => (acc = result));
+  }
+  return (acc = result);
+};
+
+module.exports = scan;

+ 7 - 0
src/utils/skip.js

@@ -0,0 +1,7 @@
+'use strict';
+
+const {none} = require('../defs');
+
+const skip = n => value => (n > 0 ? (--n, none) : value);
+
+module.exports = skip;

+ 23 - 0
src/utils/skipWhile.js

@@ -0,0 +1,23 @@
+'use strict';
+
+const {none} = require('../defs');
+
+const skipWhile = f => {
+  let test = true;
+  return value => {
+    if (!test) return value;
+    const result = f(value);
+    if (result && typeof result.then == 'function') {
+      return result.then(result => {
+        if (result) return none;
+        test = false;
+        return value;
+      });
+    }
+    if (result) return none;
+    test = false;
+    return value;
+  };
+};
+
+module.exports = skipWhile;

+ 7 - 0
src/utils/take.js

@@ -0,0 +1,7 @@
+'use strict';
+
+const {none} = require('../defs');
+
+const take = (n, finalValue = none) => value => (n > 0 ? (--n, value) : finalValue);
+
+module.exports = take;

+ 23 - 0
src/utils/takeWhile.js

@@ -0,0 +1,23 @@
+'use strict';
+
+const {none} = require('../defs');
+
+const takeWhile = (f, finalValue = none) => {
+  let test = true;
+  return value => {
+    if (!test) return finalValue;
+    const result = f(value);
+    if (result && typeof result.then == 'function') {
+      return result.then(result => {
+        if (result) return value;
+        test = false;
+        return finalValue;
+      });
+    }
+    if (result) return value;
+    test = false;
+    return finalValue;
+  };
+};
+
+module.exports = takeWhile;

+ 8 - 0
src/utils/takeWithSkip.js

@@ -0,0 +1,8 @@
+'use strict';
+
+const {none} = require('../defs');
+
+const takeWithSkip = (n, skip = 0, finalValue = none) => value =>
+  skip > 0 ? (--skip, none) : n > 0 ? (--n, value) : finalValue;
+
+module.exports = takeWithSkip;