Welcome to the Linux Foundation Forum!

chapter 12 Readable stream, how we control the read() function

On chapter 12 Streams,
on the Readable streams, there is an example for the Readable stream:

  ```'use strict'
      const { Readable } = require('stream')
      const createReadStream = () => {
        // what if the `data` is a looooooong serialized db or a 100000 length array
        const data = ['some', 'data', 'to', 'read']
        return new Readable({
          read () {
            if (data.length === 0) this.push(null)
            else this.push(data.shift())
         }
       })
      }
      const readable = createReadStream()
      readable.on('data', (data) => { console.log('got data', data) })
      readable.on('end', () => { console.log('finished reading') })

```

There is a condition, in the read() function, that we extract the last item of data array, and when there are no more items left, we push null so to emit the end event.

I have some questions regarding that approach, and i d like to know how we will approach it in some other cases, like:

  1. Is this the right approach to monitor the remaining data of an array? By extracting an array item each time, until there are no more ?
  2. Also, from performance view, if that array has 10000, we ll emit the 'data' event, 10000 times!
  3. How are we monitoring the remaining data, if the data is a large serialised database(string). What condition should we put, into read() so to know when to emit the 'data' and when the 'end' event ?

thank you

Best Answer

  • davidmarkclements
    davidmarkclements Posts: 270
    Answer ✓

    hey @theodoros

    Code is always about context, performance isn't always priority #1 - and this is coming from someone who has written, spoken and consulted extensively around performance in Node. This code is optimized for communication, for teaching the general concepts and API of streams. With that in mind:

    1. Typically readable streams are for connecting with some kind of IO, transmitting data isn't a big use-case beyond test code and example code. A better way to do this is outside of explaining the API is to just use Readable.from(array) and you have your readable stream emitting data, then there's no need to be concerned about the details.
    2. Performance isn't a concern here, in fact any time you emit in-memory data from a stream (e.g. in tests) performance tends not to be a concern. One a side note though, streams improve performance for I/O scenarios, particular where you have a large amount of data - they do not improve CPU compute performance. By regulating and processing incremental data, they support an optimal pattern for handling I/O in specific circumstances.
    3. That depends entirely on context. Consider TCP, it's a protocol with the ability to indicate (among other things) connecting and disconnecting. A stream around TCP (e.g. a net socket) would know when to end based on a protocol instruction. If a database supports streaming, its drivers will know how to interpret end of stream, and a streaming implementation around those drivers would take that instruction and turn it into a push(null) to end the stream

    @krave for your questions

    • The default high watermarks of 16kb (write) and 64kb (read) tend to be fine, beyond that its a fine tuning exercise that's highly dependant on the context
    • That's a huge topic, probably the most trivial approach would be a stream wrapper around an existing streaming media processor, e.g. ffmpeg - This project looks interesting: https://github.com/amishshah/prism-media

Answers

  • krave
    krave Posts: 58

    Hi, @theodoros , I would like to join this conversion because I have relavent confusion too.

    Some thoughts about your questions

    1. Keep an index pointing to where the last item has been read is is my approach to do such tasks. I think that would be more performant.
    2. The size of each push can be under your control. For example, you can push 10 items each time.
    3. So as to the scenarios of strings, I will slice the large string into pieces and keep a record of the index from which the stream read last time. Then increment the index increasingly. If the index points out of the string, then I will stop right away. Here is my code.
    'use strict'
    const { Readable } = require('stream')
    const createReadStream = () => {
        // what if the `data` is a looooooong serialized db or a 100000 length array
        const data = '123456789'
        let index = 0
        const step = 6
        return new Readable({
            read() {
                if (data.length < index) {
                    this.push(null)
                }
                else {
                    this.push(data.slice(index, index + step))
                    index = index + step
                }
            }
        })
    }
    const readable = createReadStream()
    readable.on('data', (data) => { console.log('got data:', data.toString()) })
    readable.on('end', () => { console.log('finished reading') })
    

    My questions:
    1. How big the appropriate size of the chunk should be? I refer to something like the step in my code above particularly when chunks are being sent over network.
    2. How to stream video data? For example live video streaming. Is there any great references or tutorials?

  • krave
    krave Posts: 58

    Oh, didn't know that project before. Thanks!

Categories

Upcoming Training