z.js 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. const {Writable, Transform} = require('stream');
  2. const makeStreamT = id => {
  3. const stream = new Transform({
  4. readableObjectMode: true,
  5. writableObjectMode: true,
  6. transform(chunk, encoding, callback) {
  7. console.log(id + ':', 'transform', chunk, encoding);
  8. const flag = this.push(chunk, encoding);
  9. console.log(id + ':', 'transform-push', flag);
  10. callback(null);
  11. },
  12. flush(callback) {
  13. console.log(id + ':', 'flush');
  14. callback(null);
  15. }
  16. });
  17. stream._id = id;
  18. stream.on('error', error => console.log(id + ':', 'event-error', error));
  19. stream.on('end', () => console.log(id + ':', 'event-end'));
  20. stream.on('finish', () => console.log(id + ':', 'event-finish'));
  21. stream.on('close', () => console.log(id + ':', 'event-close'));
  22. stream.on('pipe', src => console.log(id + ':', 'event-pipe', src._id));
  23. stream.on('unpipe', src => console.log(id + ':', 'event-unpipe', src._id));
  24. return stream;
  25. };
  26. const makeStreamW = id => {
  27. const stream = new Writable({
  28. objectMode: true,
  29. write(chunk, encoding, callback) {
  30. console.log(id + ':', 'write', chunk, encoding);
  31. callback(null);
  32. },
  33. final(callback) {
  34. console.log(id + ':', 'final');
  35. callback(null);
  36. },
  37. destroy(error, callback) {
  38. console.log(id + ':', 'destroy', error);
  39. callback(null);
  40. }
  41. });
  42. stream._id = id;
  43. stream.on('error', error => console.log(id + ':', 'event-error', error));
  44. stream.on('finish', () => console.log(id + ':', 'event-finish'));
  45. stream.on('close', () => console.log(id + ':', 'event-close'));
  46. stream.on('pipe', src => console.log(id + ':', 'event-pipe', src._id));
  47. stream.on('unpipe', src => console.log(id + ':', 'event-unpipe', src._id));
  48. return stream;
  49. };
  50. console.log('Creating streams ...');
  51. const a = makeStreamT('A'),
  52. b = makeStreamT('B'),
  53. c = makeStreamT('C'),
  54. w = makeStreamW('W');
  55. console.log('Connecting streams ...');
  56. a.pipe(b).pipe(c).pipe(w);
  57. console.log('Passing a value ...');
  58. a.write({a: 1});
  59. // a.end();
  60. console.log('Destroying B ...');
  61. // a.destroy();
  62. a.unpipe(b);
  63. b.end();
  64. console.log('Done.');