Create the third phase for an ETL pipeline in Node.
Posted
Updated
Europe’s developer-focused job platform
Let companies apply to you
Developer-focused, salary and tech stack upfront.
Just one profile, no job applications!
This article is based on Node v16.14.0.
ETL is a process of extracting, transforming, and loading data from one or multiple sources into a destination. Have a look at the article ETL pipeline explained for a general overview on ETL pipelines.
💰 The Pragmatic Programmer: journey to mastery. 💰 One of the best books in software development, sold over 200,000 times.
This is the third article of a series of three articles, and it tries to explain the load phase in an ETL pipeline.
The third phase in an ETL pipeline is to Load the transformed data to the destination. After extracting the data from source and transforming it in the format we want, we can load it to its destination. In the example we are going to write the transformed data into a JSON flat file on the file system. In a real world scenario the data would be loaded into a database, or some kind of cloud storage.
There are two different approaches on loading data:
Deciding on the data loading approach depends on the destination and on the size of the data to be loaded. For some systems, it might be more resource intensive to insert each record one by one than to load bulk data at once, for others it may be the other way around. It depends on the destination and what loading method is support and what the limitations are.
Loading the data in bulks can be the simplest, and most efficient approach. It's basically sending many data items at once to be inserted to the destination.
Let's take a look at bulk loading all of our data to a JSON file. We are going to write a function, which will get the transformed photoAlbum (array with photoObjects) as an argument, stringify them to JSON, and eventually write them to a file. In the example the destination is a JSON file on the file system.
We are going to continue with the example used in the previous article ETL: Transform Data with Node.js, but with only one photoAlbum.
Create another file load.js
in the project folder, which is going to contain the load functions.
touch load.js
Create a bulkLoadPhotoAlbum
function, which will get the transformed data, the output file path, and the file name as arguments.
Error handling is needed, since outputFilePath
and fileName
are required, so we have to throw errors if one or both are missing.
const { promisify } = require('util');
const fs = require('fs');
const writeFilePromised = promisify(fs.writeFile);
function bulkLoadPhotoAlbum(photoAlbums, outputFilePath, fileName) {
if (!outputFilePath) {
throw new Error('Filepath required as second arguement');
}
if (!fileName) {
throw new Error('FileName is required as third arguement');
}
return writeFilePromised(
`${outputFilePath}/${fileName}.json`,
JSON.stringify(photoAlbums, null, 2),
);
}
module.exports = { bulkLoadPhotoAlbum };
We have to add the bulkLoadPhotoAlbum
function in the orchestrateFunction()
in index.js
at the last step.
const { getPhotos } = require('./extract');
const { addTimeStamp, transformPhoto } = require('./transform');
const { bulkLoadPhotoAlbum } = require('./load');
const orchestrateEtlPipeline = async () => {
try {
// EXTRACT
const photoAlbum1 = await getPhotos(1);
// TRANSFORM
let transformedPhotoAlbum1 = photoAlbum1.map(photo =>
transformPhoto(photo),
);
transformedPhotoAlbum1 = addTimeStamp(transformedPhotoAlbum1);
// LOAD
await bulkLoadPhotoAlbum(
transformedPhotoAlbum1,
__dirname,
'album-1',
);
} catch (error) {
console.error(error);
}
};
orchestrateEtlPipeline();
Now, run the function with node index.js
and check the created album-1.json
file with the transformed data in it.
Sometimes the destination system requires you to load records one at a time. That could be the case, if the destination system needs to treat each record as an incoming event to handle the data appropriately. When you load one data a time, this would result in many more requests than one bulk load and has to be considered. To load data records one at a time, you can iterate over all the records and handle them individually.
In the following example we are going to mock the insert behaviour in a data base table. Since there is no data base table present in this example, there will be no file written to, just an output when everything is done.
Add the inserRecord()
function in load.js
, which is going to represent inserting one record at a time.
function insertRecord(photo) {
// mocked function
// return a Promise that resolves when photo was inserted
return Promise.resolve();
}
module.exports = { insertRecord };
In the index.js
file, we have to treat each transformed record individually with calling insertRecord()
on the photoObject.
We are going to iterate over each item in the array with reduce
.
When using reduce
we pass it a function to run on each iteration, and an initial accumulator value.
Using reduce
this way allows us to use async/await
to sequentially chain any number of promises without having to hard code each .then
handler.
Read more about reduce
in the MDN docs.
const { getPhotos } = require('./extract');
const { addTimeStamp, transformPhoto } = require('./transform');
const { bulkLoadPhotoAlbum } = require('./load');
const orchestrateEtlPipeline = async () => {
try {
// EXTRACT
const photoAlbum1 = await getPhotos(1);
// TRANSFORM
let transformedPhotoAlbum1 = photoAlbum1.map(photo =>
transformPhoto(photo),
);
transformedPhotoAlbum1 = addTimeStamp(transformedPhotoAlbum1);
// LOAD
await transformedPhotoAlbum1.data.reduce(
async (previousPromise, photo) => {
await previousPromise;
return insertRecord(photo);
},
Promise.resolve(),
);
} catch (error) {
console.error(error);
}
};
orchestrateEtlPipeline();
Thanks for reading and if you have any questions, use the comment function or send me a message @mariokandut.
If you want to know more about Node, have a look at these Node Tutorials.
References (and Big thanks):
Never miss an article.