瀏覽代碼

Add an optimizer based on asSream() and gen(), add ESM goodness.

Eugene Lazutkin 3 年之前
父節點
當前提交
4f44155b82
共有 1 個文件被更改,包括 87 次插入105 次删除
  1. 87 105
      index.js

+ 87 - 105
index.js

@@ -1,108 +1,94 @@
 'use strict';
 
-const {Readable, Writable, Duplex, Transform} = require('stream');
-const {none, finalValue, many, isFinalValue, isMany, getFinalValue, getManyValues} = require('./defs');
+const {Duplex} = require('stream');
+const defs = require('./defs');
+const gen = require('./gen');
+const {asStream} = require('./AsStream');
 
-const runAsyncGenerator = async (gen, stream) => {
-  for (;;) {
-    let data = gen.next();
-    if (data && typeof data.then == 'function') {
-      data = await data;
-    }
-    if (data.done) break;
-    const value = data.value;
-    if (value && typeof value.then == 'function') {
-      value = await value;
-    }
-    Chain.sanitize(value, stream);
-  }
-};
+// is*NodeStream functions taken from https://github.com/nodejs/node/blob/master/lib/internal/streams/utils.js
+const isReadableNodeStream = obj =>
+  obj &&
+  typeof obj.pipe === 'function' &&
+  typeof obj.on === 'function' &&
+  (!obj._writableState ||
+    (typeof obj._readableState === 'object' ? obj._readableState.readable : null) !== false) && // Duplex
+  (!obj._writableState || obj._readableState); // Writable has .pipe.
 
-const wrapFunction = fn =>
-  new Transform({
-    writableObjectMode: true,
-    readableObjectMode: true,
-    transform(chunk, encoding, callback) {
-      try {
-        const result = fn.call(this, chunk, encoding);
-        if (result && typeof result.then == 'function') {
-          // thenable
-          result.then(result => (Chain.sanitize(result, this), callback(null)), error => callback(error));
-          return;
-        }
-        if (result && typeof result.next == 'function') {
-          // generator
-          runAsyncGenerator(result, this).then(() => callback(null), error => callback(error));
-          return;
-        }
-        Chain.sanitize(result, this);
-        callback(null);
-      } catch (error) {
-        callback(error);
-      }
-    }
-  });
+const isWritableNodeStream = obj =>
+  obj &&
+  typeof obj.write === 'function' &&
+  typeof obj.on === 'function' &&
+  (!obj._readableState ||
+    (typeof obj._writableState === 'object' ? obj._writableState.writable : null) !== false); // Duplex
 
-const wrapArray = fns =>
-  new Transform({
-    writableObjectMode: true,
-    readableObjectMode: true,
-    transform(chunk, encoding, callback) {
-      try {
-        let value = chunk;
-        for (let i = 0; i < fns.length; ++i) {
-          const result = fns[i].call(this, value, encoding);
-          if (result === Chain.none) {
-            callback(null);
-            return;
-          }
-          if (Chain.isFinalValue(result)) {
-            value = Chain.getFinalValue(result);
-            break;
-          }
-          value = result;
-        }
-        Chain.sanitize(value, this);
-        callback(null);
-      } catch (error) {
-        callback(error);
-      }
-    }
-  });
+const isDuplexNodeStream = obj =>
+  obj &&
+  typeof obj.pipe === 'function' &&
+  obj._readableState &&
+  typeof obj.on === 'function' &&
+  typeof obj.write === 'function';
+
+const getIterator = x => {
+  if (!x) return null;
+  if (typeof x[Symbol.asyncIterator] == 'function') return x[Symbol.asyncIterator].bind(x);
+  if (typeof x[Symbol.iterator] == 'function') return x[Symbol.iterator].bind(x);
+  return null;
+}
 
 class Chain extends Duplex {
   constructor(fns, options) {
     super(options || {writableObjectMode: true, readableObjectMode: true});
 
     if (!(fns instanceof Array) || !fns.length) {
-      throw Error("Chain's argument should be a non-empty array.");
+      throw TypeError("Chain's argument should be a non-empty array.");
     }
 
     this.streams = fns
       .filter(fn => fn)
-      .map((fn, index, fns) => {
-        if (typeof fn === 'function' || fn instanceof Array) return Chain.convertToTransform(fn);
+      .reduce((output, fn, index, fns) => {
         if (
-          fn instanceof Duplex ||
-          fn instanceof Transform ||
-          (!index && fn instanceof Readable) ||
-          (index === fns.length - 1 && fn instanceof Writable)
+          isDuplexNodeStream(fn) ||
+          (!index && isReadableNodeStream(fn)) ||
+          (index === fns.length - 1 && isWritableNodeStream(fn))
         ) {
-          return fn;
+          output.push(fn);
+          return output;
         }
-        throw Error('Arguments should be functions, arrays or streams.');
+        if (typeof fn != 'function') {
+          const iterator = getIterator(fn);
+          if (!iterator) throw TypeError('Item #' + index + ' is not a proper stream, function, nor iterable.');
+          fn = iterator;
+        }
+        if (!output.length) output.push([]);
+        const last = output[output.length - 1];
+        if (Array.isArray(last)) {
+          last.push(fn);
+        } else {
+          output.push([fn]);
+        }
+        return output;
+      }, [])
+      .map(item => {
+        if (Array.isArray(item)) {
+          if (!item.length) return null;
+          if (item.length == 1) return item[0] && Chain.asStream(item[0]);
+          return Chain.asStream(Chain.gen(...item));
+        }
+        return item;
       })
       .filter(s => s);
     this.input = this.streams[0];
-    this.output = this.streams.reduce((output, stream) => (output && output.pipe(stream)) || stream);
+    this.output = this.streams.reduce(
+      (output, stream) => (output && output.pipe(stream)) || stream
+    );
 
-    if (!(this.input instanceof Writable)) {
+    if (!isWritableNodeStream(this.input)) {
       this._write = (_1, _2, callback) => callback(null);
       this._final = callback => callback(null); // unavailable in Node 6
       this.input.on('end', () => this.end());
     }
 
-    if (this.output instanceof Readable) {
+    if (isReadableNodeStream(this.output)) {
       this.output.on('data', chunk => !this.push(chunk) && this.output.pause());
       this.output.on('end', () => this.push(null));
     } else {
@@ -138,36 +124,32 @@ class Chain extends Duplex {
   static make(fns, options) {
     return new Chain(fns, options);
   }
-  static sanitize(result, stream) {
-    if (Chain.isFinalValue(result)) {
-      result = Chain.getFinalValue(result);
-    } else if (Chain.isMany(result)) {
-      result = Chain.getManyValues(result);
-    }
-    if (result !== undefined && result !== null && result !== Chain.none) {
-      if (result instanceof Array) {
-        result.forEach(value => value !== undefined && value !== null && stream.push(value));
-      } else {
-        stream.push(result);
-      }
-    }
-  }
-  static convertToTransform(fn) {
-    if (typeof fn === 'function') return wrapFunction(fn);
-    if (fn instanceof Array) return fn.length ? wrapArray(fn) : null;
-    return null;
-  }
 }
 
-Chain.none = none;
-Chain.finalValue = finalValue;
-Chain.isFinalValue = isFinalValue;
-Chain.getFinalValue = getFinalValue;
-Chain.many = many;
-Chain.isMany = isMany;
-Chain.getManyValues = getManyValues;
-
 Chain.chain = Chain.make;
 Chain.make.Constructor = Chain;
+Chain.gen = gen;
+Chain.asStream = asStream;
 
 module.exports = Chain;
+
+// to keep ESM happy:
+module.exports.none = defs.none;
+module.exports.stop = defs.stop;
+module.exports.Stop = defs.Stop;
+module.exports.finalSymbol = defs.finalSymbol;
+module.exports.manySymbol = defs.manySymbol;
+module.exports.flushSymbol = defs.flushSymbol;
+module.exports.finalValue = defs.finalValue;
+module.exports.many = defs.many;
+module.exports.flushable = defs.flushable;
+module.exports.isFinalValue = defs.isFinalValue;
+module.exports.isMany = defs.isMany;
+module.exports.isFlushable = defs.isFlushable;
+module.exports.getFinalValue = defs.getFinalValue;
+module.exports.getManyValues = defs.getManyValues;
+module.exports.final = defs.final;
+
+module.exports.chain = Chain.make;
+module.exports.gen = gen;
+module.exports.asStream = asStream;