Jelajahi Sumber

Reorganize JSONL parser/stringer.

Eugene Lazutkin 3 tahun lalu
induk
melakukan
aa39dc1d6b

+ 2 - 8
src/jsonl/parser.js

@@ -1,17 +1,11 @@
 'use strict';
 
-const gen = require('../gen');
-const asStream = require('../asStream');
 const fixUtf8Stream = require('../utils/fixUtf8Stream');
 const lines = require('../utils/lines');
 
-const parser = options => {
-  const reviver = options && options.reviver;
+const parser = reviver => {
   let counter = 0;
-  return asStream(
-    gen(fixUtf8Stream(), lines(), string => ({key: counter++, value: JSON.parse(string, reviver)})),
-    Object.assign({writableObjectMode: false, readableObjectMode: true}, options)
-  );
+  return [fixUtf8Stream(), lines(), string => ({key: counter++, value: JSON.parse(string, reviver)})];
 };
 
 module.exports = parser;

+ 17 - 0
src/jsonl/parserStream.js

@@ -0,0 +1,17 @@
+'use strict';
+
+const gen = require('../gen');
+const asStream = require('../asStream');
+const fixUtf8Stream = require('../utils/fixUtf8Stream');
+const lines = require('../utils/lines');
+
+const parserStream = options => {
+  const reviver = options && options.reviver;
+  let counter = 0;
+  return asStream(
+    gen(fixUtf8Stream(), lines(), string => ({key: counter++, value: JSON.parse(string, reviver)})),
+    Object.assign({writableObjectMode: false, readableObjectMode: true}, options)
+  );
+};
+
+module.exports = parserStream;

src/jsonl/stringer.js → src/jsonl/stringerStream.js


+ 36 - 35
tests/test-jsonl-parser.mjs

@@ -8,6 +8,7 @@ import zlib from 'zlib';
 import {Writable} from 'stream';
 
 import {readString} from './helpers.mjs';
+import chain from '../src/index.js';
 
 import parser from '../src/jsonl/parser.js';
 
@@ -28,22 +29,22 @@ const roundtrip = (t, resolve, len, quant) => {
 
   const input = json.join('\n'),
     result = [];
-  readString(input, quant)
-    .pipe(parser())
-    .pipe(
-      new Writable({
-        objectMode: true,
-        write(chunk, _, callback) {
-          result.push(chunk.value);
-          callback(null);
-        },
-        final(callback) {
-          t.deepEqual(objects, result);
-          resolve();
-          callback(null);
-        }
-      })
-    );
+  chain([
+    readString(input, quant),
+    parser(),
+    new Writable({
+      objectMode: true,
+      write(chunk, _, callback) {
+        result.push(chunk.value);
+        callback(null);
+      },
+      final(callback) {
+        t.deepEqual(objects, result);
+        resolve();
+        callback(null);
+      }
+    })
+  ]);
 };
 
 test.asPromise('jsonl parser: smoke test', (t, resolve) => roundtrip(t, resolve));
@@ -106,28 +107,28 @@ test.asPromise('jsonl parser: read file', (t, resolve) => {
   if (!/^file:\/\//.test(import.meta.url)) throw Error('Cannot get the current working directory');
   const fileName = path.join(path.dirname(import.meta.url.substring(7)), './data/sample.jsonl.gz');
   let count = 0;
-  fs.createReadStream(fileName)
-    .pipe(zlib.createGunzip())
-    .pipe(parser())
-    .pipe(
-      new Writable({
-        objectMode: true,
-        write(chunk, _, callback) {
-          t.equal(count, chunk.key);
-          ++count;
-          callback(null);
-        },
-        final(callback) {
-          t.equal(count, 100);
-          resolve();
-          callback(null);
-        }
-      })
-    );
+  chain([
+    fs.createReadStream(fileName),
+    zlib.createGunzip(),
+    parser(),
+    new Writable({
+      objectMode: true,
+      write(chunk, _, callback) {
+        t.equal(count, chunk.key);
+        ++count;
+        callback(null);
+      },
+      final(callback) {
+        t.equal(count, 100);
+        resolve();
+        callback(null);
+      }
+    })
+  ]);
 });
 
 test.asPromise('jsonl parser: bad json', (t, resolve) => {
-  const pipeline = readString(' not json ').pipe(parser());
+  const pipeline = chain([readString(' not json '), parser()]);
 
   pipeline.on('data', () => t.fail("We shouldn't be here."));
   pipeline.on('error', e => {

+ 141 - 0
tests/test-jsonl-parserStream.mjs

@@ -0,0 +1,141 @@
+'use strict';
+
+import test from 'tape-six';
+
+import fs from 'fs';
+import path from 'path';
+import zlib from 'zlib';
+import {Writable} from 'stream';
+
+import {readString} from './helpers.mjs';
+
+import parserStream from '../src/jsonl/parserStream.js';
+
+const roundtrip = (t, resolve, len, quant) => {
+  const objects = [];
+  for (let n = 0; n < len; n += 1) {
+    objects.push({
+      stringWithTabsAndNewlines: "Did it work?\nNo...\t\tI don't think so...",
+      anArray: [n + 1, n + 2, true, 'tabs?\t\t\t\u0001\u0002\u0003', false],
+      n
+    });
+  }
+
+  const json = [];
+  for (let n = 0; n < objects.length; n += 1) {
+    json.push(JSON.stringify(objects[n]));
+  }
+
+  const input = json.join('\n'),
+    result = [];
+  readString(input, quant)
+    .pipe(parserStream())
+    .pipe(
+      new Writable({
+        objectMode: true,
+        write(chunk, _, callback) {
+          result.push(chunk.value);
+          callback(null);
+        },
+        final(callback) {
+          t.deepEqual(objects, result);
+          resolve();
+          callback(null);
+        }
+      })
+    );
+};
+
+test.asPromise('jsonl parserStream: smoke test', (t, resolve) => roundtrip(t, resolve));
+
+test.asPromise('jsonl parserStream: roundtrip with 1 set of objects', (t, resolve) => {
+  roundtrip(t, resolve, 1);
+});
+
+test.asPromise('jsonl parserStream: roundtrip with 2 sets of objects', (t, resolve) => {
+  roundtrip(t, resolve, 2);
+});
+
+test.asPromise('jsonl parserStream: roundtrip with 3 sets of objects', (t, resolve) => {
+  roundtrip(t, resolve, 3);
+});
+
+test.asPromise('jsonl parserStream: roundtrip with 4 sets of objects', (t, resolve) => {
+  roundtrip(t, resolve, 4);
+});
+
+test.asPromise('jsonl parserStream: roundtrip with 5 sets of objects', (t, resolve) => {
+  roundtrip(t, resolve, 5);
+});
+
+test.asPromise('jsonl parserStream: roundtrip with 6 sets of objects', (t, resolve) => {
+  roundtrip(t, resolve, 6);
+});
+
+test.asPromise('jsonl parserStream: roundtrip with 7 sets of objects', (t, resolve) => {
+  roundtrip(t, resolve, 7);
+});
+
+test.asPromise('jsonl parserStream: roundtrip with 8 sets of objects', (t, resolve) => {
+  roundtrip(t, resolve, 8);
+});
+
+test.asPromise('jsonl parserStream: roundtrip with 9 sets of objects', (t, resolve) => {
+  roundtrip(t, resolve, 9);
+});
+
+test.asPromise('jsonl parserStream: roundtrip with 10 sets of objects', (t, resolve) => {
+  roundtrip(t, resolve, 10);
+});
+
+test.asPromise('jsonl parserStream: roundtrip with 11 sets of objects', (t, resolve) => {
+  roundtrip(t, resolve, 11);
+});
+
+test.asPromise('jsonl parserStream: roundtrip with 12 sets of objects', (t, resolve) => {
+  roundtrip(t, resolve, 12);
+});
+
+test.asPromise('jsonl parserStream: roundtrip with different window sizes', (t, resolve) => {
+  for (let i = 1; i <= 12; ++i) {
+    roundtrip(t, resolve, 10, i);
+  }
+});
+
+test.asPromise('jsonl parserStream: read file', (t, resolve) => {
+  if (!/^file:\/\//.test(import.meta.url)) throw Error('Cannot get the current working directory');
+  const fileName = path.join(path.dirname(import.meta.url.substring(7)), './data/sample.jsonl.gz');
+  let count = 0;
+  fs.createReadStream(fileName)
+    .pipe(zlib.createGunzip())
+    .pipe(parserStream())
+    .pipe(
+      new Writable({
+        objectMode: true,
+        write(chunk, _, callback) {
+          t.equal(count, chunk.key);
+          ++count;
+          callback(null);
+        },
+        final(callback) {
+          t.equal(count, 100);
+          resolve();
+          callback(null);
+        }
+      })
+    );
+});
+
+test.asPromise('jsonl parserStream: bad json', (t, resolve) => {
+  const pipeline = readString(' not json ').pipe(parserStream());
+
+  pipeline.on('data', () => t.fail("We shouldn't be here."));
+  pipeline.on('error', e => {
+    t.ok(e);
+    resolve();
+  });
+  pipeline.on('end', value => {
+    t.fail("We shouldn't be here.");
+    resolve();
+  });
+});

+ 8 - 8
tests/test-jsonl-stringer.mjs

@@ -6,10 +6,10 @@ import {Writable, Transform} from 'stream';
 
 import {readString} from './helpers.mjs';
 
-import parser from '../src/jsonl/parser.js';
-import stringer from '../src/jsonl/stringer.js';
+import parserStream from '../src/jsonl/parserStream.js';
+import stringerStream from '../src/jsonl/stringerStream.js';
 
-test.asPromise('jsonl stringer: smoke test', (t, resolve) => {
+test.asPromise('jsonl stringerStream: smoke test', (t, resolve) => {
   const pattern = {
       a: [[[]]],
       b: {a: 1},
@@ -27,7 +27,7 @@ test.asPromise('jsonl stringer: smoke test', (t, resolve) => {
 
   let buffer = '';
   readString(string)
-    .pipe(parser())
+    .pipe(parserStream())
     .pipe(
       new Transform({
         writableObjectMode: true,
@@ -38,7 +38,7 @@ test.asPromise('jsonl stringer: smoke test', (t, resolve) => {
         }
       })
     )
-    .pipe(stringer())
+    .pipe(stringerStream())
     .pipe(
       new Writable({
         write(chunk, _, callback) {
@@ -54,7 +54,7 @@ test.asPromise('jsonl stringer: smoke test', (t, resolve) => {
     );
 });
 
-test.asPromise('jsonl stringer: multiple', (t, resolve) => {
+test.asPromise('jsonl stringerStream: multiple', (t, resolve) => {
   const pattern = {
     a: [[[]]],
     b: {a: 1},
@@ -74,7 +74,7 @@ test.asPromise('jsonl stringer: multiple', (t, resolve) => {
   string = string + '\n' + string + '\n' + string;
 
   readString(string + '\n')
-    .pipe(parser())
+    .pipe(parserStream())
     .pipe(
       new Transform({
         writableObjectMode: true,
@@ -85,7 +85,7 @@ test.asPromise('jsonl stringer: multiple', (t, resolve) => {
         }
       })
     )
-    .pipe(stringer())
+    .pipe(stringerStream())
     .pipe(
       new Writable({
         write(chunk, _, callback) {