Welcome to the Linux Foundation Forum!

Lab 12

jhortale
jhortale Posts: 7
edited February 2021 in LFW211 Class Forum

Hey guys,

I'm trying to solve lab 2 in chapter 12, but I always get an error when I use the transform in the pipeline.

Does anyone know what may be causing this error:

internal/streams/pipeline.js:27
  stream.on('close', () => {
         ^

TypeError: stream.on is not a function
    at destroyer (internal/streams/pipeline.js:27:10)
    at internal/streams/pipeline.js:79:12
    at Array.map (<anonymous>)
    at pipeline (internal/streams/pipeline.js:76:28)
    at Object.<anonymous> (/Users/joao.hortale/Dev/OpenJsCertification/LFW211/labs/ch-12/labs-2/index.js:33:1)
    at Module._compile (internal/modules/cjs/loader.js:999:30)
    at Object.Module._extensions..js (internal/modules/cjs/loader.js:1027:10)
    at Module.load (internal/modules/cjs/loader.js:863:32)
    at Function.Module._load (internal/modules/cjs/loader.js:708:14)
    at Function.executeUserEntryPoint [as runMain] (internal/modules/run_main.js:60:12)

this is my code:

'use strict'
const { Readable, Writable, Transform, PassThrough, pipeline } = require('stream')
const assert = require('assert')
const createWritable = () => {
  const sink = []
  const writable = new Writable({
    write(chunk, enc, cb) {
      sink.push(chunk.toString())
      cb()
    }
  })
  writable.sink = sink
  return writable
}
const readable = Readable.from(['a', 'b', 'c'])
const writable = createWritable()

const transform = () => {  
    return new Transform({    
        objectMode: true,  
        transform (chunk, enc, next) {   
      chunk.toString().toUpperCase()
      next()
        }  
    })
}

pipeline(readable, transform, writable, (err) => {
  assert.ifError(err)
  assert.deepStrictEqual(writable.sink, ['A', 'B', 'C'])
  console.log('passed!')
})

Thanks in advanced for your support.

Comments

  • I updated my code but I still have the same problem:

    'use strict'
    const { Readable, Writable, Transform, PassThrough, pipeline } = require('stream')
    const assert = require('assert')
    const createWritable = () => {
      const sink = []
      const writable = new Writable({
        write(chunk, enc, cb) {
          sink.push(chunk.toString())
          cb()
        }
      })
      writable.sink = sink
      return writable
    }
    const readable = Readable.from(['a', 'b', 'c'])
    const writable = createWritable()
    
    const transform = () => {  
        return new Transform({    
            transform (chunk, enc, next) {      
                const uppercased = chunk.toString().toUpperCase()      
                next(null, uppercased)    
            }  
        })
    }
    
    pipeline(readable, transform, writable, (err) => {
      assert.ifError(err)
      assert.deepStrictEqual(writable.sink, ['A', 'B', 'C'])
      console.log('passed!')
    })
    
  • @jhortale you're passing transform to pipeline but it's a function (and so has no on method).

    fix: pipeline(readable, transform(), writable, ...)

  • @davidmarkclements said:
    @jhortale you're passing transform to pipeline but it's a function (and so has no on method).

    fix: pipeline(readable, transform(), writable, ...)

    stupid error. Thank you so much @davidmarkclements

  • DevB
    DevB Posts: 2

    That helped @davidmarkclements . Thanks. I was perplexed.

  • k0dard
    k0dard Posts: 115

    It works, but is it correct if I do it like this... ?

    const transform = new Transform({
          decodeStrings: false,
          transform(chunk,enc,next) {
                                    const uppercased = chunk.toUpperCase()
                                    next(null, uppercased)
                                    }
          })   
    

    In that case no need to change pipeline call...

  • @k0dard yes that would work. Using a function that returns the stream is a better pattern, as it allows for re-use.

  • k0dard
    k0dard Posts: 115

    @davidmarkclements Could you explain a bit further ? I'm not sure I understand difference in re-usability, could you give example where the one could be reused and other couldn't ?

  • davidmarkclements
    davidmarkclements Posts: 270
    edited October 2021

    hey @k0dard

    Once a stream has been consumed, it can't be re-used.

    So a key example would be something like an HTTP server, where a function (the request listener function) is called for each request

    const { createReadStream } = require('fs')
    const { createServer } = require('http')
    const selfRead = createReadStream(__filename) // NOTE: DON'T DO THIS!
    createServer((req, res) => {
      selfRead.pipe(res)
    }).listen(3000)
    

    This above server will only stream content on the first request.

    Subsequent request will be blank.

    We can fix this by creating a new stream for each request:

    const { createReadStream } = require('fs')
    const { createServer } = require('http')
    const selfRead = () => createReadStream(__filename)
    createServer((req, res) => {
      selfRead().pipe(res)
    }).listen(3000)
    
  • k0dard
    k0dard Posts: 115

    Wow, didn't know that a stream can't be reused.

    Thanks a lot for your answer !

  • @davidmarkclements You saved my time, Thanks bro

Categories

Upcoming Training