Streams

Briefly, a stream is an abstract interface for streaming data continuously in Node.js. A stream can be a sequence of data coming over time from a source and running to a destination. The source can be anything—a database of 50 million records, a file of size 4.5 GB, some HTTP call, and so on. Streams are not available all at once; they don't fit in memory, they just come some chunks at a time. Streams are not only for handling large files or a huge amount of data, but also they give us a good option of composability through piping and chaining. Streams are one of the ways to do reactive programming, which we will look at in more detail in the next chapter. There are four streams available in Node.js:

  • Readable stream: The streams from which data can be read only; that is, here data can only be consumed. Examples of readable streams can be HTTP responses on the client, zlib streams, and fs read streams. Data at any stage in this stream will either be in a flowing state or paused state. On any readable stream, various events can be attached such as data, error, end, and readable.
  • Writable stream: Streams to which data can be written. For example, fs.createWriteStream().
  • Duplex stream: Streams that are both readable and writable. For example, net.socket or a TCP socket.
  • Transform stream: A transform stream is basically a duplex stream that can be used to transform data while it is being written or read. For example, zlib.createGzip is one of the streams to compress a lot of data using gzip.

Now, let's understand the workings of a stream via an example. We will create a custom Transform stream and extend the Transform class, thus seeing read, write, and transform operations all at once. Here, the output of the transform stream will be computed from its input:

  • Problem: We have a user's information and we want to hide sensitive parts such as email address, phone number, and so on.
  • Solution: We will create one transform stream. The transform stream will read data and transform it by removing sensitive information. So, let's start coding. Create one empty project with npm init, add one folder, src, and the tsconfig.json file of the earlier section. Now, we will add Node.js typings from DefinitelyTyped. Open up a Terminal and type the following:
npm install @types/node --only=dev

Now, we will write our custom filter transform stream. Create a filter_stream.ts file and inside it, let's write the transform logic:

import { Transform } from "stream";
export class FilterTransform extends Transform {
private filterProps: Array<String>;
constructor(filterprops: Array<String>, options?: any) {
if (!options) options = {};
options.objectMode = true;
super(options);
this.filterProps = filterprops;
}
_transform(chunk: any, encoding?: string, callback?: Function) {
let filteredKeys = Object.keys(chunk).filter((key) => {
return this.filterProps.indexOf(key) == -1;
});
let filteredObj = filteredKeys.reduce((accum: any, key: any) => {
accum[key] = chunk[key];
return accum;
}, {})
this.push(filteredObj);
callback();
}
_flush(cb: Function) {
console.log("this method is called at the end of all transformations");
}
}

What did we just do?

  • We created a custom transform and exported it, so it can be used anywhere in other files.
  • Options are not mandatory in a constructor if they aren't passed; we create the default options.
  • By default, streams expect buffer/string values. There is an objectMode flag that we have to set in the stream so it can accept any JavaScript object, which we did in the constructor.
  • We overrode the transform method to suit it to our needs. In the transform method, we removed those keys that are passed out in filter options and created a filtered object.

Next, we will create an object of filter stream, to test out our results. Create a file called stream_test.ts parallelly to filter_stream.ts and add the following contents:

import { FilterTransform } from "./filter_stream";
//we create object of our custom transformation & pass phone and email as sensitive properties
let filter = new FilterTransform(['phone', 'email']);
//create a readable stream that reads the transformed objects.
filter.on('readable', function () { console.log("Transformation:-", filter.read()); });
//create a writable stream that writes data to get it transformed
filter.write({ name: 'Parth', phone: 'xxxxx-xxxxx', email: 'ghiya.parth@gmail.com', id: 1 });
filter.write({ name: 'Dhruvil', phone: 'xxxxx-xxxxx', email: 'dhruvil.thaker@gmail.com', id: 2 });
filter.write({ name: 'Dhaval', phone: 'xxxxx-xxxxx', email: 'dhaval.marthak@gmail.com', id: 3 });
filter.write({ name: 'Shruti', phone: 'xxxxx-xxxxx', email: 'shruti.patel@gmail.com', id: 4 });
filter.end();

Open up your package.json file and add "start":"tsc && node .\\dist\\stream_test.js" in your scripts tag. Now when you run npm start, you will be able to see the transformed output.

Note that if you are on Linux/macOS, replace \\ with //.