Преглед изворни кода

Refactored comp() and related code.

Eugene Lazutkin пре 7 година
родитељ
комит
142b4d0816
3 измењених фајлова са 96 додато и 47 уклоњено
  1. 20 19
      index.js
  2. 29 3
      tests/test_comp.js
  3. 47 25
      utils/comp.js

+ 20 - 19
index.js

@@ -2,6 +2,7 @@
 
 const {Readable, Writable, Duplex, Transform} = require('stream');
 
+const none = {};
 function Final(value) {
   this.value = value;
 }
@@ -9,21 +10,6 @@ function Many(values) {
   this.values = values;
 }
 
-const processData = (result, stream) => {
-  if (result instanceof Chain.Final) {
-    result = result.value;
-  } else if (result instanceof Chain.Many) {
-    result = result.values;
-  }
-  if (result !== undefined && result !== null) {
-    if (result instanceof Array) {
-      result.forEach(value => value !== undefined && value !== null && stream.push(value));
-    } else {
-      stream.push(result);
-    }
-  }
-};
-
 const runAsyncGenerator = async (gen, stream) => {
   for (;;) {
     let data = gen.next();
@@ -31,7 +17,7 @@ const runAsyncGenerator = async (gen, stream) => {
       data = await data;
     }
     if (data.done) break;
-    processData(data.value, stream);
+    Chain.sanitize(data.value, stream);
   }
 };
 
@@ -44,7 +30,7 @@ const wrapFunction = fn =>
         const result = fn.call(this, chunk, encoding);
         if (result && typeof result.then == 'function') {
           // thenable
-          result.then(result => (processData(result, this), callback(null)), error => callback(error));
+          result.then(result => (Chain.sanitize(result, this), callback(null)), error => callback(error));
           return;
         }
         if (result && typeof result.next == 'function') {
@@ -52,7 +38,7 @@ const wrapFunction = fn =>
           runAsyncGenerator(result, this).then(() => callback(null), error => callback(error));
           return;
         }
-        processData(result, this);
+        Chain.sanitize(result, this);
         callback(null);
       } catch (error) {
         callback(error);
@@ -75,7 +61,7 @@ const wrapArray = array =>
           }
           value = result;
         }
-        processData(value, this);
+        Chain.sanitize(value, this);
         callback(null);
       } catch (error) {
         callback(error);
@@ -157,6 +143,20 @@ class Chain extends Duplex {
   static many(values) {
     return new Chain.Many(values);
   }
+  static sanitize(result, stream) {
+    if (result instanceof Chain.Final) {
+      result = result.value;
+    } else if (result instanceof Chain.Many) {
+      result = result.values;
+    }
+    if (result !== undefined && result !== null && result !== Chain.none) {
+      if (result instanceof Array) {
+        result.forEach(value => value !== undefined && value !== null && stream.push(value));
+      } else {
+        stream.push(result);
+      }
+    }
+  };
   static convertToTransform(fn) {
     if (typeof fn === 'function') return wrapFunction(fn);
     if (fn instanceof Array) return fn.length ? wrapArray(fn) : 0;
@@ -164,6 +164,7 @@ class Chain extends Duplex {
   }
 }
 
+Chain.none = none;
 Chain.Final = Final;
 Chain.Many = Many;
 

+ 29 - 3
tests/test_comp.js

@@ -7,8 +7,7 @@ const {streamFromArray} = require('./helpers');
 
 const comp = require('../utils/comp');
 
-const {final, many} = Chain;
-const none = final();
+const {none, final, many} = Chain;
 
 unit.add(module, [
   function test_comp(t) {
@@ -42,7 +41,7 @@ unit.add(module, [
   function test_compNothing(t) {
     const async = t.startAsync('test_compNothing');
 
-    const chain = new Chain([comp(x => x * x, () => final(), x => 2 * x + 1)]),
+    const chain = new Chain([comp(x => x * x, () => none, x => 2 * x + 1)]),
       output = [];
 
     streamFromArray([1, 2, 3]).pipe(chain);
@@ -181,6 +180,33 @@ unit.add(module, [
 
     streamFromArray([1, 2]).pipe(chain);
 
+    chain.on('data', value => output.push(value));
+    chain.on('end', () => {
+      eval(t.TEST('t.unify(output, [1, -2, 10, -11, 2, -3, 20, -21])'));
+      async.done();
+    });
+  },
+  function test_compAsFun(t) {
+    const async = t.startAsync('test_compAsFun');
+
+    const chain = new Chain([
+        comp.asFun(
+          async x =>
+            await new Promise(resolve => {
+              setTimeout(() => resolve(-x), 20);
+            }),
+          x => many([x, x * 10]),
+          function*(x) {
+            yield x;
+            yield final(x - 1);
+          },
+          x => -x
+        )
+      ]),
+      output = [];
+
+    streamFromArray([1, 2]).pipe(chain);
+
     chain.on('data', value => output.push(value));
     chain.on('end', () => {
       eval(t.TEST('t.unify(output, [1, -2, 10, -11, 2, -3, 20, -21])'));

+ 47 - 25
utils/comp.js

@@ -1,59 +1,81 @@
 'use strict';
 
-const {Final, Many} = require('../index');
+const {Transform} = require('stream');
 
-const next = async (value, fns, index) => {
+const {none, Final, Many, sanitize} = require('../index');
+
+const next = async (value, fns, index, push) => {
   for (let i = index; i <= fns.length; ++i) {
     if (value && typeof value.then == 'function') {
       // thenable
       value = await value;
     }
+    if (value === none) break;
     if (value instanceof Final) {
-      return value.value;
+      push(value.value);
+      break;
     }
     if (value instanceof Many) {
-      if (i == fns.length) return value;
-      const results = [],
-        values = value.values;
+      if (i == fns.length) {
+        value.values.forEach(val => push(val));
+        break;
+      }
+      const values = value.values;
       for (let j = 0; j < values.length; ++j) {
-        const result = await next(values[j], fns, i);
-        if (result instanceof Many) {
-          results.push(...result.values);
-        } else {
-          results.push(result);
-        }
+        await next(values[j], fns, i, push);
       }
-      return new Many(results);
+      break;
     }
     if (value && typeof value.next == 'function') {
       // generator
-      const results = [];
       for (;;) {
         let data = value.next();
         if (data && typeof data.then == 'function') {
           data = await data;
         }
         if (data.done) break;
-        const result = await next(data.value, fns, i);
-        if (result instanceof Many) {
-          results.push(...result.values);
-        } else {
-          results.push(result);
-        }
+        await next(data.value, fns, i, push);
       }
-      return new Many(results);
+      break;
     }
     const fn = fns[i];
-    if (!fn) break;
+    if (!fn) {
+      push(value);
+      break;
+    }
     value = fn(value);
   }
-  return value;
 };
 
 const comp = (...fns) => {
   fns = fns.filter(fn => fn);
-  if (!fns.length) return null;
-  return async value => next(value, fns, 0);
+  return fns.length
+    ? new Transform({
+        writableObjectMode: true,
+        readableObjectMode: true,
+        transform(chunk, encoding, callback) {
+          next(chunk, fns, 0, value => sanitize(value, this)).then(() => callback(null), error => callback(error));
+        }
+      })
+    : null;
+};
+
+const nop = () => {};
+
+comp.asFun = (...fns) => {
+  fns = fns.filter(fn => fn);
+  if (!fns.length) return nop;
+  return async value => {
+    const results = [];
+    await next(value, fns, 0, value => results.push(value));
+    switch (results.length) {
+      case 0:
+        return none;
+      case 1:
+        return results[0];
+    }
+    return new Many(results);
+  };
 };
 
 module.exports = comp;