Streaming conversion of CSV/TSV to JSON
Recently while exploring the IMDb data set, I need to convert Tab-Separated Values (TSV) files to JSON files.
Based on my experience doing that, this tutorial will show you how to do streaming transformations of CSV/TSV files into JSON by using the csv
library and how to test file reading/writing with the mock-fs
library.
Although this post talks about tsv
files, it applies equally well to csv
files.
There are many existing solutions that can convert tsv
or csv
files to JSON.
However, most of them requires holding the entire input tsv
and output JSON files in memory.
Although that works well for small files, the IMDb data set contains a few large files, which will take up a lot of memory if done that way.
For this tutorial, we’ll use two files for demo:
title.basics.tsv
at 422MB, which contains the title, run time, year and genres of each movie. The corresponding JSON output is 1GB. I’ll call this the “simple” input because its conversion is more straightforward.title.principals.tsv
at 1.25GB, which links each movie with the name, job and characters of that movie’s cast members. Because the JSON output is 2.5 GB, the memory consumption for this conversion is at least 3.75GB, which is half the available memory of most laptops these days. I’ll call this the “advanced” input because its conversion takes a bit more work.
The best solution is a streaming conversion: data is streamed from the input file through a transformer (that performs the conversion while trying to retain as little data in memory as possible) into the output file. The memory footprint will be much smaller with the streaming approach because only small chunks of data is read and held in memory for processing at any one time.
I picked the csv
library for this job because it looks well-maintained and has good documentation.
The library consists of a few parts, of which we’ll only use two in this tutorial:
csv-parse
, which parses acsv
file into a row-by-row readable NodeJS stream.stream-transform
, which implements a NodeJS transform stream (i.e. both readable and writable).
The result is in this repo.
If you want to look at the actual giant input files, run npm run download:simple
and npm run download:advanced
to download the simple and advanced input files, respectively, into the realInput
directory.
To see the conversion in action, run npm run convert:simple
and npm run convert:advanced
, respectively.
Be warned that the conversions can take a while.
Simple example
Let’s take a look at the “simple” example of converting title.basics.tsv
.
The first few rows look this:
1tconst titleType primaryTitle originalTitle isAdult startYear endYear runtimeMinutes genres2tt0000001 short Carmencita Carmencita 1 1894 \N 1 Documentary,Short3tt0000002 short Le clown et ses chiens Le clown et ses chiens 0 1892 \N 5 Animation,Short
where tconst
is the movie ID and \N
indicates no data. We want to convert it into something like this:
1[{2 "tconst": "tt0000001",3 "titleType": "short",4 "primaryTitle": "Carmencita",5 "originalTitle": "Carmencita",6 "isAdult": true,7 "startYear": 1894,8 "endYear": null,9 "runtimeMinutes": 1,10 "genres": [ "Documentary", "Short" ]11}, {12 "tconst": "tt0000002",13 "titleType": "short",14 "primaryTitle": "Le clown et ses chiens",15 "originalTitle": "Le clown et ses chiens",16 "isAdult": false,17 "startYear": 1892,18 "endYear": null,19 "runtimeMinutes": 5,20 "genres": [ "Animation", "Short" ]21}]
First we set up source
, which streams data from the input file, and destination
, which streams data to the output file:
1// https://github.com/huy-nguyen/streaming-conversion-tsv-to-json/blob/58729bd5/src/simpleConvert.ts23// create empty output file. Otherwise, we wont' be able to create a writable4// stream for output:5await writeFile(outputPath, '', 'utf8');67const source = fs.createReadStream(inputPath, 'utf8');8const destination = fs.createWriteStream(outputPath, 'utf8');
Then we create the parser
, which is a Node transform stream that will read the tsv
file row-by-row, associate the data in each column with the column heading and emit a JavaScript object for downstream consumers:
1// https://github.com/huy-nguyen/streaming-conversion-tsv-to-json/blob/58729bd5/src/simpleConvert.ts2const parser = parse({3 // Because the input is tab-delimited:4 delimiter: '\t',5 // Because we want the library to automatically associate the column name6 // with column value in each row for us:7 columns: true,8 // Because we don't want accidental quotes inside a column to be9 // interpreted as "wrapper" for that column content:10 quote: false,11});
The most important part of this whole operation is the transformer
, which, in this simple example, just calls JSON.stringify
on every row emitted by the parser:
1// https://github.com/huy-nguyen/streaming-conversion-tsv-to-json/blob/58729bd5/src/simpleConvert.ts2let outputIndex = 0;3const transformer = transform((rawRow: RawRow): string => {4 const currentRecordIndex = outputIndex;5 outputIndex += 1;6 if (outputIndex % 100000 === 0 && shouldLogProgress === true) {7 console.info('processing row ', outputIndex);8 }9 const {isAdult, startYear, endYear, runtimeMinutes, genres, ...rest} = rawRow;10 const parsedRow: ParsedRow = {11 ...rest,12 isAdult: !!(isAdult === '1'),13 startYear: parseInt(startYear, 10),14 endYear: (endYear === '\N') ? null : parseInt(endYear, 10),15 runtimeMinutes: (runtimeMinutes === '\N') ? null : parseInt(runtimeMinutes, 10),16 genres: genres.split(','),17 };18 const result = (currentRecordIndex === 0) ? `[${JSON.stringify(parsedRow)}` : `,${JSON.stringify(parsedRow)}`;19 return result;20});
Note that we do have to take care to close the JSON list after the last movie has been written to JSON:
1// https://github.com/huy-nguyen/streaming-conversion-tsv-to-json/blob/58729bd5/src/simpleConvert.ts2destination.on('finish', async () => {3 if (outputIndex === 0) {4 // In this case, no row has been processed from TSV file so the5 // output should be an empty list:6 await appendFile(outputPath, '[]', 'utf8');7 } else {8 // In this case, at least one row has been processed so we just need9 // to write the closing bracket:10 await appendFile(outputPath, ']', 'utf8');11 }12 resolve();13});
Having set up all these pipes, now we need to connect them together to create a continuous pipeline that our data can flow through like water.
We do this by literally .pipe
-ing the input of one stream into the next:
1// https://github.com/huy-nguyen/streaming-conversion-tsv-to-json/blob/58729bd5/src/simpleConvert.ts2source.pipe(parser).pipe(transformer).pipe(destination);
Advanced example:
In this example, the cast of a single movie is recorded over multiple rows, one for each cast member. The first few rows look like this:
1tconst ordering nconst category job characters2tt0000001 1 nm1588970 self \N ["Herself"]3tt0000001 2 nm0005690 director \N \N4tt0000001 3 nm0374658 cinematographer director of photography \N
We want to consolidate the information about each movie’s cast members into a single JSON object like this (where nconst
is the person ID):
1[2 {3 "tconst": "tt0000001",4 "principals": [5 {6 "nconst": "nm1588970",7 "category": "self",8 "job": null,9 "characters": ["Herself"]10 },11 {12 "nconst": "nm0005690",13 "category": "director",14 "job": null,15 "characters": null16 },17 {18 "nconst": "nm0374658",19 "category": "cinematographer",20 "job": "director of photography",21 "characters": null22 }23 ]24 }25]
Unlike the simple example above, which uses a memory-less transformer
(i.e. it doesn’t remember previous rows), this next transformer
needs to do some record keeping because the rows are related.
This transformer
essentially needs to keep comparing the next row’s movie ID (tconst
) with the previous row’s movie ID to detect when the movies change between rows.
When that change happens, we create a new element in the output JSON list:
1// https://github.com/huy-nguyen/streaming-conversion-tsv-to-json/blob/58729bd5/src/advancedConvert.ts2let prevRow: RawRow | undefined;3let outputIndex = 0;4let inputRowIndex = 0;5let outputObject!: ParsedRow;67const transformer = transform((nextRow: RawRow) => {8inputRowIndex += 1;9if (inputRowIndex % 100000 === 0 && shouldLogProgress === true) {10 console.info('processing row ', inputRowIndex);11}1213const {tconst, nconst, category, job, characters} = nextRow;1415let toBeReturned;16if (17 // If this is the first row ...18 prevRow === undefined ||19 // ... or if the movie has changed ...20 nextRow.tconst !== prevRow.tconst) {2122 // ... return previous movie;23 if (prevRow !== undefined) {24 toBeReturned = (outputIndex === 1) ?25 `[${JSON.stringify(outputObject)}` : `,${JSON.stringify(outputObject)}`;26 }27 // ... then create a new movie:28 outputObject = {29 tconst,30 principals: [],31 };32 outputIndex += 1;33}3435const {principals} = outputObject;36let outputCharacters: string[] | null;37if (characters === '\\N') {38 // This means `characters` is not provided:39 outputCharacters = null;40} else if (characters.startsWith('[') && characters.endsWith(']')) {41 // `characters` should be interpreted as an array of strings:42 outputCharacters = JSON.parse(characters);43} else {44 // If `characters` is a string, put it in a list:45 // (also need to remove quoted literal quotes surrounding the text):46 outputCharacters = [47 characters.replace(/^"/, '').replace(/"$/, ''),48 ];49}50principals.push({51 nconst,52 category,53 job: (job === '\\N') ? null : job,54 characters: outputCharacters,55});5657prevRow = nextRow;5859if (toBeReturned !== undefined) {60 return toBeReturned;61}
Because we only know that we’re done with the data for each movie when we see the next one, we wouldn’t know that we have seen the last movie until the input data stream has finished. This code takes care of writing the last movie to the output file:
1// https://github.com/huy-nguyen/streaming-conversion-tsv-to-json/blob/58729bd5/src/advancedConvert.ts2destination.on('finish', async () => {3 if (outputIndex === 0) {4 // In this case, no row has been processed from TSV file so the5 // output should be an empty list:6 await appendFile(outputPath, '[]', 'utf8');7 } else {8 // The last row would not have been written out to the file so9 // we need to do that here. However, we do need to open a new list (with ])10 // or continue an existing list (with a comma) depending on whether the last row11 // is alos the only row:12 const lastItemToWrite = (outputIndex === 1) ?13 `[${JSON.stringify(outputObject)}]` :14 `,${JSON.stringify(outputObject)}]`;15 await appendFile(outputPath, lastItemToWrite, 'utf8');16 }17 resolve();18});
Test code
As usual, I use jest
for testing.
We use the mock-fs
library to mock out the file system so that we don’t have to read from or write to real files during testing.
Once mock-fs
is invoked, the only files that you can read using fs.readFile
are the ones that you register with mock-fs
, like this:
1// https://github.com/huy-nguyen/streaming-conversion-tsv-to-json/blob/58729bd5/src/__tests__/convert.js2test('With many rows of input data', async () => {3// ...4 mockFs = require('mock-fs');5 mockFs({6 [fakeInputDir]: {7 [`${fakeInputFileName}.tsv`]: testInput,8 },9 [fakeOutputDir]: {10 [`${fakeInputFileName}.json`]: '',11 },12 });13});
In the above mocked file system, the directory fakeInputDir
is inside the directory containing the test script (__tests__/convert.js
).
The fake directory contains the fake input file that will be consumed by the converter.
Don’t forget to call mockFs.restore()
after each test to restore the real file system because otherwise jest
will fail.
I include tests for some corner cases, such as empty input and input that consists of one header row and one data row.
If you checkout the repo at this point and run npm run test
, all the tests should pass.