Welcome to the Linux Foundation Forum!

Lab 12

Posts: 7
edited February 2021 in LFW211 Class Forum

Hey guys,

I'm trying to solve lab 2 in chapter 12, but I always get an error when I use the transform in the pipeline.

Does anyone know what may be causing this error:

  1. internal/streams/pipeline.js:27
  2. stream.on('close', () => {
  3. ^
  4.  
  5. TypeError: stream.on is not a function
  6. at destroyer (internal/streams/pipeline.js:27:10)
  7. at internal/streams/pipeline.js:79:12
  8. at Array.map (<anonymous>)
  9. at pipeline (internal/streams/pipeline.js:76:28)
  10. at Object.<anonymous> (/Users/joao.hortale/Dev/OpenJsCertification/LFW211/labs/ch-12/labs-2/index.js:33:1)
  11. at Module._compile (internal/modules/cjs/loader.js:999:30)
  12. at Object.Module._extensions..js (internal/modules/cjs/loader.js:1027:10)
  13. at Module.load (internal/modules/cjs/loader.js:863:32)
  14. at Function.Module._load (internal/modules/cjs/loader.js:708:14)
  15. at Function.executeUserEntryPoint [as runMain] (internal/modules/run_main.js:60:12)

this is my code:

  1. 'use strict'
  2. const { Readable, Writable, Transform, PassThrough, pipeline } = require('stream')
  3. const assert = require('assert')
  4. const createWritable = () => {
  5. const sink = []
  6. const writable = new Writable({
  7. write(chunk, enc, cb) {
  8. sink.push(chunk.toString())
  9. cb()
  10. }
  11. })
  12. writable.sink = sink
  13. return writable
  14. }
  15. const readable = Readable.from(['a', 'b', 'c'])
  16. const writable = createWritable()
  17.  
  18. const transform = () => { 
  19. return new Transform({    
  20. objectMode: true,  
  21. transform (chunk, enc, next) {   
  22. chunk.toString().toUpperCase()
  23. next()
  24. } 
  25. })
  26. }
  27.  
  28. pipeline(readable, transform, writable, (err) => {
  29. assert.ifError(err)
  30. assert.deepStrictEqual(writable.sink, ['A', 'B', 'C'])
  31. console.log('passed!')
  32. })

Thanks in advanced for your support.

Welcome!

It looks like you're new here. Sign in or register to get started.
Sign In

Comments

  • I updated my code but I still have the same problem:

    1. 'use strict'
    2. const { Readable, Writable, Transform, PassThrough, pipeline } = require('stream')
    3. const assert = require('assert')
    4. const createWritable = () => {
    5. const sink = []
    6. const writable = new Writable({
    7. write(chunk, enc, cb) {
    8. sink.push(chunk.toString())
    9. cb()
    10. }
    11. })
    12. writable.sink = sink
    13. return writable
    14. }
    15. const readable = Readable.from(['a', 'b', 'c'])
    16. const writable = createWritable()
    17.  
    18. const transform = () => { 
    19. return new Transform({    
    20. transform (chunk, enc, next) {      
    21. const uppercased = chunk.toString().toUpperCase()      
    22. next(null, uppercased)    
    23. } 
    24. })
    25. }
    26.  
    27. pipeline(readable, transform, writable, (err) => {
    28. assert.ifError(err)
    29. assert.deepStrictEqual(writable.sink, ['A', 'B', 'C'])
    30. console.log('passed!')
    31. })
  • @jhortale you're passing transform to pipeline but it's a function (and so has no on method).

    fix: pipeline(readable, transform(), writable, ...)

  • @davidmarkclements said:
    @jhortale you're passing transform to pipeline but it's a function (and so has no on method).

    fix: pipeline(readable, transform(), writable, ...)

    stupid error. Thank you so much @davidmarkclements

  • Posts: 2

    That helped @davidmarkclements . Thanks. I was perplexed.

  • Posts: 115

    It works, but is it correct if I do it like this... ?

    Spoiler
    1. const transform = new Transform({
    2. decodeStrings: false,
    3. transform(chunk,enc,next) {
    4. const uppercased = chunk.toUpperCase()
    5. next(null, uppercased)
    6. }
    7. })

    In that case no need to change pipeline call...

  • @k0dard yes that would work. Using a function that returns the stream is a better pattern, as it allows for re-use.

  • Posts: 115

    @davidmarkclements Could you explain a bit further ? I'm not sure I understand difference in re-usability, could you give example where the one could be reused and other couldn't ?

  • Posts: 270
    edited October 2021

    hey @k0dard

    Once a stream has been consumed, it can't be re-used.

    So a key example would be something like an HTTP server, where a function (the request listener function) is called for each request

    1. const { createReadStream } = require('fs')
    2. const { createServer } = require('http')
    3. const selfRead = createReadStream(__filename) // NOTE: DON'T DO THIS!
    4. createServer((req, res) => {
    5. selfRead.pipe(res)
    6. }).listen(3000)

    This above server will only stream content on the first request.

    Subsequent request will be blank.

    We can fix this by creating a new stream for each request:

    1. const { createReadStream } = require('fs')
    2. const { createServer } = require('http')
    3. const selfRead = () => createReadStream(__filename)
    4. createServer((req, res) => {
    5. selfRead().pipe(res)
    6. }).listen(3000)
  • Posts: 115

    Wow, didn't know that a stream can't be reused.

    Thanks a lot for your answer !

  • @davidmarkclements You saved my time, Thanks bro

Welcome!

It looks like you're new here. Sign in or register to get started.
Sign In

Welcome!

It looks like you're new here. Sign in or register to get started.
Sign In

Categories

Upcoming Training