Revisiting Streams (again)
I've looked at streams in the context of Node and the browser, but I haven't really done anything with them because I couldn't figure out what the best use for them was.
As I started working with more complex processes, I realized that streams are a great way to handle files without knowing the size of the file in advance.
Readable Streams #
The most basic readable stream example in Node does the following
- Import
createReadStream
from thefs
module - Capture the name of the file as the second CLI argument
- If we didn't pass a parameter log an error to console and exit
- Create a readable stream from the file
- Event listener for when data (a chunk) is received
- Event listener for when the stream has finished reading the file
- Event listener to handle errors
// 1
import { createReadStream } from 'fs';
// 2
const inputFile = process.argv[2];
// 3
if (!inputFile) {
console.error('Error: Provide a file name as the first argument.');
process.exit(1);
}
// 4
const readableStream = createReadStream(
inputFile, {
encoding: 'utf8'
});
// 5
readableStream.on('data', (chunk) => {
console.log(chunk);
});
// 6
readableStream.on('end', () => {
console.log('All chunks loaded.');
});
// 7
readableStream.on('error', (error) => {
console.error('Error reading file:', error);
});
An equivalent for the browser would look like this:
- Capture the
fileInput
HTML element - Add a
change
event listener that will trigger whenever the value of the input field changes - Capture the value of the
fileInput
input field - We check if there is a file to work on
- If the file exists then we initialize a stream, a reader, and a text decoder
- Use the
reader
method of thefileReader
object to read the content of a file - If we finish reading the file (the value for
done
is true), we're done reading the file so we log it to console and exit - Use the
decode
method of thetextEncoder
to read the next available chunk, log the value to console and call theread
function again to continue processing the file - If there is an error log it to console
- Call the
read()
function for the first time
// 1
const fileInput = document.getElementById('fileInput');
// 2
fileInput.addEventListener('change', handleFileSelect);
function handleFileSelect(event) {
// 3
const file = event.target.files[0];
// 4
if (file) {
// 5
const stream = file.stream();
const reader = stream.getReader();
const decoder = new TextDecoder('utf-8');
// 6
function read() {
reader.read().then(({
done,
value
}) => {
// 7
if (done) {
console.log('File reading completed.');
return;
}
// 8
const textChunk = decoder.decode(value);
console.log(textChunk);
read();
// 9
}).catch(error => {
console.error('Error reading file:', error);
});
}
// 10
read();
}
}
Writable Streams #
Encoders and Decoders #
The encoding standard defines a set of commonly used encodings at the time it the specification was written and a Javascript API to work with these encodings.
- TextDecoder
- TextDecoderStream
- TextEncoder
- TextEncoderStream
textEncoder #
The TextEncoder interface encodes strings into Uint8Array byte sequences using a specified character encoding (default is UTF-8). This is useful when you need to convert text data into binary data for storage or transmission.
const encoder = new TextEncoder();
const text = "Hello, world!";
const encoded = encoder.encode(text);
console.log(encoded);
textEncoderStream #
The TextEncoderStream interface converts a stream of strings into bytes in the UTF-8 encoding. It is the streaming equivalent of TextEncoder.
textDecoder #
The TextDecoder interface decodes Uint8Array byte sequences into strings using a specified character encoding.
const decoder = new TextDecoder(); // Default is 'utf-8'
const uint8Array = new Uint8Array([72, 101, 108, 108, 111, 44, 32, 119, 111, 114, 108, 100, 33]);
const decoded = decoder.decode(uint8Array);
console.log(decoded); // "Hello World!"
textDecoderStream #
TextDecoderStream interface of the Encoding API converts a stream of text in a binary encoding, such as UTF-8 etc., to a stream of strings. It is the streaming equivalent of TextDecoder.
Piping And Copying Streams #
You can also pipe or redirect streams to create processing chains.
In this section we'll cover pipeThrought, pipeTo and Tee
PipeThrough and PipeTo #
The pipeThrough()
method provides a chainable way of piping the current stream through a transform stream or any other writable/readable pair.
Piping a stream will generally lock it for the duration of the pipe, preventing other readers from locking it.
The pipeTo()
method pipes the current ReadableStream to a given WritableStream and returns a Promise that fulfills when the piping process completes successfully, or rejects if any errors were encountered.
Piping a stream will generally lock it for the duration of the pipe, preventing other readers from locking it.
This example does the following:
- Create a readable stream and populates with numbers from 1 to 10
- Create a transform stream to process the content and square it
- Create a writeable stream to output the content
- Pipes the output of the readableStream trough the transformStream and into the writeableStream to log the output to console
//1
const readableStream = new ReadableStream({
start(controller) {
for (let i = 1; i <= 10; i++) {
controller.enqueue(i);
}
controller.close();
}
});
// 2
const transformStream = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk * chunk);
}
});
// 3
const writableStream = new WritableStream({
write(chunk) {
console.log('Received chunk:', chunk);
}
});
// 4
readableStream
.pipeThrough(transformStream)
.pipeTo(writableStream)
.then(() => console.log('Stream processing complete'))
.catch(err => console.error('Stream processing error:', err));
Tee #
The tee() method tees the current readable stream returns a two-element array containing the two resulting branches as new ReadableStream instances.
Since a response body cannot be consumed more than once, you'd need two copies to do this. You can fetch a response from the server and stream it to the browser, and stream it to the ServiceWorker cache.
This example is the fetch event listener for a service worker and it performs the following tasks:
- uses the respondWith method from the Fetch object to prevent the browser's default fetch handling, and provide a custom promise for a Response
- Opens the
my-cache
cache - Fetch the requested URL
- If the response is not valid or has no body, return it directly
- Tee the body stream directly from the response
- Create new responses from the two streams
- Put one of the responses in the cache
- Return the other response to the client
You can use similar solutions wherever you would use regular file commands but the file may be too large to keep in memory all at once.
self.addEventListener('fetch', event => {
// 1
event.respondWith(
(async function() {
// 2
const cache = await caches.open('my-cache');
// 3
const response = await fetch(event.request);
// 4
if (!response || !response.ok || !response.body) {
return response;
}
// 5
const [
body1,
body2,
] = response.body.tee();
// 6
const responseForCache = new Response(body1, {
headers: response.headers,
status: response.status,
statusText: response.statusText
});
const responseForClient = new Response(body2, {
headers: response.headers,
status: response.status,
statusText: response.statusText
});
// 7
event.waitUntil(cache.put(event.request, responseForCache));
// 8
return responseForClient;
})()
);
});
Specialized Streams #
We can use the stream methods we've discussed in this post to build custom applications.
This example uses streams and the jsZip library to generate a zip file.
The steps are as follows:
- Capture a reference to the button with ID
zipButton
- Add a click event listener to the button with id
zipButton
- Capture a reference to the file input where we entered the file to zip and a reference to the file itself
- If the file doesn't exist then display an alert and return, there's nothing to do
- Start a
try/catch
block - Create a new JSZip instance
- Read the file as a stream
- While there are chunks of content available, push it to the chunks array
- If done is true then we're done
- Concatenate chunks into a Blob
- Add the file to the ZIP archive
- Generate the ZIP file as a Blob
- Create a link and download the ZIP file
- Handle any errors
// 1
const zipButton = document.getElementById('zipButton');
// 2
zipButton.addEventListener('click', generateZip);
async function generateZip() {
// 3
const input = document.getElementById('fileInput');
const file = input.files[0];
// 4
if (!file) {
alert('Please select a file.');
return;
}
// 5
try {
// 6
const zip = new JSZip();
// 7
const stream = file.stream();
const reader = stream.getReader();
let chunks = [];
let done = false;
// 8
while (!done) {
const { value, done: streamDone } = await reader.read();
if (value) {
chunks.push(value);
}
// 9
done = streamDone;
}
// 10
const blob = new Blob(chunks, { type: file.type });
// 11
zip.file(file.name, blob);
// 12
const zipBlob = await zip.generateAsync({ type: 'blob' });
// 13
const link = document.createElement('a');
link.href = URL.createObjectURL(zipBlob);
link.download = 'files.zip';
document.body.appendChild(link);
} catch (error) {
// 14
console.error('Error generating zip file:', error);
}
}
Final Notes #
Streams can be used in the same instances where you can read and write a file in Node.
Since this is a new(ish) feature, browser support may be an issue. Plan accordingly.
Links and Resources #
- How To Work with Files Using Streams in Node.js — Digital Ocean
- Jake Archibald
- MDN
- Encoders and Decoders
- Node
- Node.js Streams
- Stream — Node 22.9.0