Skip to main content

JSONL by Language

Complete guides for working with JSONL in Python, JavaScript, Go, Java, and Rust

Python

Reading JSONL (Standard Library)

The simplest approach using Python's built-in json module:

import json

# Basic line-by-line reading
with open('data.jsonl', 'r', encoding='utf-8') as f:
    for line in f:
        # Skip empty lines
        line = line.strip()
        if not line:
            continue

        # Parse JSON
        record = json.loads(line)
        print(record['name'])

# With error handling
with open('data.jsonl', 'r', encoding='utf-8') as f:
    for line_num, line in enumerate(f, 1):
        try:
            record = json.loads(line)
            process(record)
        except json.JSONDecodeError as e:
            print(f"Error on line {line_num}: {e}")
            continue

Writing JSONL

import json

# Write list of dicts to JSONL
data = [
    {"id": 1, "name": "Alice", "age": 30},
    {"id": 2, "name": "Bob", "age": 25},
]

with open('output.jsonl', 'w', encoding='utf-8') as f:
    for record in data:
        # Write JSON + newline
        f.write(json.dumps(record) + '\n')

# Append to existing file
with open('output.jsonl', 'a', encoding='utf-8') as f:
    new_record = {"id": 3, "name": "Charlie", "age": 35}
    f.write(json.dumps(new_record) + '\n')

# Pretty-print with ensure_ascii=False for Unicode
with open('output.jsonl', 'w', encoding='utf-8') as f:
    for record in data:
        f.write(json.dumps(record, ensure_ascii=False) + '\n')

High-Performance Parsing (orjson)

orjson is 3-5x faster than standard library and handles more edge cases:

# Install: pip install orjson
import orjson

# Read JSONL (note: read as binary 'rb')
with open('data.jsonl', 'rb') as f:
    for line in f:
        record = orjson.loads(line)
        print(record['name'])

# Write JSONL (note: orjson.dumps returns bytes)
with open('output.jsonl', 'wb') as f:
    for record in data:
        # orjson.dumps() returns bytes, add b'\n'
        f.write(orjson.dumps(record) + b'\n')

# Pretty formatting option
with open('output.jsonl', 'wb') as f:
    for record in data:
        json_bytes = orjson.dumps(
            record,
            option=orjson.OPT_INDENT_2  # Pretty print (not typical for JSONL)
        )
        f.write(json_bytes + b'\n')

Performance tip: orjson is especially fast for large objects and handles datetime serialization automatically.

Working with Pandas

Pandas has native JSONL support via lines=True parameter:

import pandas as pd

# Read JSONL into DataFrame
df = pd.read_json('data.jsonl', lines=True)
print(df.head())

# Write DataFrame to JSONL
df.to_json('output.jsonl', orient='records', lines=True)

# Read compressed JSONL
df = pd.read_json('data.jsonl.gz', lines=True, compression='gzip')

# Chunked reading for large files (memory efficient)
chunk_size = 10000
chunks = []

for chunk in pd.read_json('large.jsonl', lines=True, chunksize=chunk_size):
    # Process each chunk
    filtered = chunk[chunk['age'] > 25]
    chunks.append(filtered)

# Combine all chunks
result = pd.concat(chunks, ignore_index=True)

# Stream processing without loading all into memory
with pd.read_json('large.jsonl', lines=True, chunksize=10000) as reader:
    for chunk in reader:
        # Process and write incrementally
        processed = transform(chunk)
        processed.to_json('output.jsonl', orient='records', lines=True, mode='a')

Streaming Large Files

Process files larger than available RAM:

import json

def stream_jsonl(filepath, batch_size=1000):
    """Generator that yields batches of records"""
    batch = []

    with open(filepath, 'r', encoding='utf-8') as f:
        for line in f:
            try:
                record = json.loads(line)
                batch.append(record)

                if len(batch) >= batch_size:
                    yield batch
                    batch = []
            except json.JSONDecodeError:
                continue

    # Yield remaining records
    if batch:
        yield batch

# Usage: process in batches
for batch in stream_jsonl('huge.jsonl', batch_size=5000):
    # Process batch (5000 records at a time)
    results = bulk_process(batch)
    save_results(results)

# Filter and transform streaming
def filter_transform_jsonl(input_path, output_path, condition):
    with open(input_path, 'r') as fin, open(output_path, 'w') as fout:
        for line in fin:
            record = json.loads(line)

            # Apply filter
            if condition(record):
                # Transform
                transformed = transform(record)
                fout.write(json.dumps(transformed) + '\n')

# Example: Extract active users
filter_transform_jsonl(
    'users.jsonl',
    'active_users.jsonl',
    lambda r: r.get('status') == 'active'
)

Working with Compressed JSONL

import gzip
import json

# Read gzipped JSONL (streaming)
with gzip.open('data.jsonl.gz', 'rt', encoding='utf-8') as f:
    for line in f:
        record = json.loads(line)
        process(record)

# Write gzipped JSONL
with gzip.open('output.jsonl.gz', 'wt', encoding='utf-8') as f:
    for record in data:
        f.write(json.dumps(record) + '\n')

# Transparent compression wrapper
def open_jsonl(filepath, mode='r'):
    """Open JSONL file, handling .gz automatically"""
    if filepath.endswith('.gz'):
        return gzip.open(filepath, mode + 't', encoding='utf-8')
    else:
        return open(filepath, mode, encoding='utf-8')

# Usage works for both compressed and uncompressed
with open_jsonl('data.jsonl.gz', 'r') as f:
    for line in f:
        record = json.loads(line)

# Read bzip2, xz, or zstd
import bz2
import lzma

# bzip2
with bz2.open('data.jsonl.bz2', 'rt') as f:
    for line in f:
        record = json.loads(line)

# xz/lzma
with lzma.open('data.jsonl.xz', 'rt') as f:
    for line in f:
        record = json.loads(line)

Parallel Processing

from multiprocessing import Pool
import json

def process_chunk(lines):
    """Process a chunk of lines"""
    results = []
    for line in lines:
        try:
            record = json.loads(line)
            result = expensive_computation(record)
            results.append(result)
        except:
            continue
    return results

def parallel_process_jsonl(filepath, num_workers=4, chunk_size=10000):
    """Process JSONL in parallel"""
    # Read file into chunks
    chunks = []
    current_chunk = []

    with open(filepath, 'r') as f:
        for line in f:
            current_chunk.append(line)

            if len(current_chunk) >= chunk_size:
                chunks.append(current_chunk)
                current_chunk = []

    if current_chunk:
        chunks.append(current_chunk)

    # Process chunks in parallel
    with Pool(processes=num_workers) as pool:
        all_results = pool.map(process_chunk, chunks)

    # Flatten results
    final_results = [item for sublist in all_results for item in sublist]
    return final_results

# Usage
results = parallel_process_jsonl('data.jsonl', num_workers=8)

# Alternative: concurrent.futures for more control
from concurrent.futures import ProcessPoolExecutor, as_completed

def process_file_parallel(filepath):
    # Split file into parts
    chunks = split_file_into_chunks(filepath, num_chunks=10)

    with ProcessPoolExecutor(max_workers=10) as executor:
        # Submit all chunks
        futures = {executor.submit(process_chunk_file, chunk): chunk
                   for chunk in chunks}

        # Collect results as they complete
        for future in as_completed(futures):
            chunk = futures[future]
            try:
                result = future.result()
                print(f"Chunk {chunk} completed")
            except Exception as e:
                print(f"Chunk {chunk} failed: {e}")

Streaming with ijson

ijson provides iterative JSON parsing for ultra-large files:

# Install: pip install ijson
import ijson

# Stream large JSONL files with minimal memory
with open('large.jsonl', 'rb') as f:
    # Parse each JSON object as it's encountered
    objects = ijson.items(f, '', multiple_values=True)

    for obj in objects:
        print(obj['name'])
        process(obj)

# Memory-efficient: only loads one object at a time
# Perfect for files larger than available RAM

# Advanced: extract specific fields only
with open('data.jsonl', 'rb') as f:
    # Only parse the 'name' field from each object
    names = ijson.items(f, 'item.name', multiple_values=True)
    for name in names:
        print(name)

# Combine with generators for pipelines
def extract_active_users(filepath):
    with open(filepath, 'rb') as f:
        objects = ijson.items(f, '', multiple_values=True)
        for obj in objects:
            if obj.get('status') == 'active':
                yield obj

# Use in pipeline
for user in extract_active_users('users.jsonl'):
    process(user)

Best for: Files too large to fit in memory, where you need true streaming parsing.

Using jsonlines Library

The jsonlines library provides a clean API for JSONL operations:

# Install: pip install jsonlines
import jsonlines

# Read JSONL
with jsonlines.open('data.jsonl') as reader:
    for record in reader:
        print(record['name'])

# Write JSONL
with jsonlines.open('output.jsonl', mode='w') as writer:
    writer.write({'name': 'Alice', 'age': 30})
    writer.write({'name': 'Bob', 'age': 25})
    # Or write multiple at once
    writer.write_all([
        {'name': 'Charlie', 'age': 35},
        {'name': 'Diana', 'age': 28}
    ])

# Read with compression
with jsonlines.open('data.jsonl.gz') as reader:
    for record in reader:
        process(record)

# Skip invalid lines
with jsonlines.open('data.jsonl', mode='r') as reader:
    for record in reader.iter(skip_invalid=True):
        process(record)

# Type checking with dataclasses
from dataclasses import dataclass

@dataclass
class User:
    id: int
    name: str
    age: int

with jsonlines.open('users.jsonl') as reader:
    for obj in reader:
        user = User(**obj)  # Convert to dataclass
        print(user.name)

Complete Python Example: ETL Pipeline

#!/usr/bin/env python3
"""
ETL Pipeline: Read JSONL, transform, filter, write output
Handles large files, errors, and compression
"""
import json
import gzip
import logging
from typing import Iterator, Dict, Any
from pathlib import Path

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def read_jsonl(filepath: str) -> Iterator[Dict[str, Any]]:
    """Read JSONL file (handles .gz compression)"""
    open_func = gzip.open if filepath.endswith('.gz') else open

    with open_func(filepath, 'rt', encoding='utf-8') as f:
        for line_num, line in enumerate(f, 1):
            line = line.strip()
            if not line:
                continue

            try:
                yield json.loads(line)
            except json.JSONDecodeError as e:
                logger.error(f"Parse error line {line_num}: {e}")


def write_jsonl(records: Iterator[Dict[str, Any]], filepath: str):
    """Write records to JSONL file (handles .gz compression)"""
    open_func = gzip.open if filepath.endswith('.gz') else open

    with open_func(filepath, 'wt', encoding='utf-8') as f:
        for record in records:
            f.write(json.dumps(record) + '\n')


def transform(record: Dict[str, Any]) -> Dict[str, Any]:
    """Transform record (example: add computed field)"""
    record['full_name'] = f"{record['first_name']} {record['last_name']}"
    record['is_adult'] = record['age'] >= 18
    return record


def filter_record(record: Dict[str, Any]) -> bool:
    """Filter logic (example: active users only)"""
    return record.get('status') == 'active'


def etl_pipeline(input_file: str, output_file: str):
    """Complete ETL pipeline"""
    logger.info(f"Starting ETL: {input_file} -> {output_file}")

    processed = 0
    filtered = 0

    records = read_jsonl(input_file)
    transformed = (transform(r) for r in records)
    filtered_records = (r for r in transformed if filter_record(r))

    write_jsonl(filtered_records, output_file)

    logger.info(f"ETL complete. Processed: {processed}, Output: {filtered}")


if __name__ == '__main__':
    etl_pipeline('input.jsonl.gz', 'output.jsonl.gz')

JavaScript / Node.js

Reading JSONL (Node.js)

const fs = require('fs');
const readline = require('readline');

// Basic line-by-line reading
async function readJSONL(filepath) {
    const fileStream = fs.createReadStream(filepath);

    const rl = readline.createInterface({
        input: fileStream,
        crlfDelay: Infinity  // Handle both \n and \r\n
    });

    for await (const line of rl) {
        if (!line.trim()) continue;  // Skip empty lines

        try {
            const record = JSON.parse(line);
            console.log(record.name);
        } catch (err) {
            console.error('Parse error:', err.message);
        }
    }
}

// Usage
readJSONL('data.jsonl');

// Callback-based approach
function readJSONLCallback(filepath, onRecord, onComplete) {
    const fileStream = fs.createReadStream(filepath);
    const rl = readline.createInterface({ input: fileStream });

    rl.on('line', (line) => {
        try {
            const record = JSON.parse(line);
            onRecord(record);
        } catch (err) {
            console.error('Parse error:', err);
        }
    });

    rl.on('close', onComplete);
}

Writing JSONL

const fs = require('fs');

// Basic writing
const data = [
    { id: 1, name: 'Alice', age: 30 },
    { id: 2, name: 'Bob', age: 25 }
];

const writeStream = fs.createWriteStream('output.jsonl');

data.forEach(record => {
    writeStream.write(JSON.stringify(record) + '\n');
});

writeStream.end();

// Async/await with promises
async function writeJSONL(records, filepath) {
    const writeStream = fs.createWriteStream(filepath);

    for (const record of records) {
        const json = JSON.stringify(record) + '\n';

        // Wait if buffer is full
        if (!writeStream.write(json)) {
            await new Promise(resolve => writeStream.once('drain', resolve));
        }
    }

    writeStream.end();
    return new Promise(resolve => writeStream.once('finish', resolve));
}

// Usage
await writeJSONL(data, 'output.jsonl');

// Append to existing file
const appendStream = fs.createWriteStream('output.jsonl', { flags: 'a' });
appendStream.write(JSON.stringify({ id: 3, name: 'Charlie' }) + '\n');
appendStream.end();

High-Performance Streaming (ndjson)

The ndjson npm package provides optimized streaming:

// Install: npm install ndjson
const fs = require('fs');
const ndjson = require('ndjson');

// Read JSONL with streaming parser
fs.createReadStream('data.jsonl')
    .pipe(ndjson.parse())
    .on('data', (record) => {
        console.log(record.name);
    })
    .on('end', () => {
        console.log('Done');
    });

// Write JSONL with streaming serializer
const writeStream = fs.createWriteStream('output.jsonl');
const stringify = ndjson.stringify();

stringify.pipe(writeStream);

// Write objects
stringify.write({ id: 1, name: 'Alice' });
stringify.write({ id: 2, name: 'Bob' });
stringify.end();

// Transform pipeline
fs.createReadStream('input.jsonl')
    .pipe(ndjson.parse())
    .pipe(ndjson.stringify())
    .pipe(fs.createWriteStream('output.jsonl'));

Transform Streams

const { Transform } = require('stream');
const fs = require('fs');
const ndjson = require('ndjson');

// Custom transform stream
class FilterTransform extends Transform {
    constructor(filterFn) {
        super({ objectMode: true });
        this.filterFn = filterFn;
    }

    _transform(record, encoding, callback) {
        if (this.filterFn(record)) {
            // Transform record
            record.processed = true;
            record.timestamp = Date.now();
            this.push(record);
        }
        callback();
    }
}

// Usage: filter active users
const filterActive = new FilterTransform(r => r.status === 'active');

fs.createReadStream('users.jsonl')
    .pipe(ndjson.parse())
    .pipe(filterActive)
    .pipe(ndjson.stringify())
    .pipe(fs.createWriteStream('active_users.jsonl'));

// Pipeline with multiple transforms
const { pipeline } = require('stream/promises');

async function processJSONL() {
    await pipeline(
        fs.createReadStream('input.jsonl'),
        ndjson.parse(),
        new FilterTransform(r => r.age > 25),
        ndjson.stringify(),
        fs.createWriteStream('output.jsonl')
    );
    console.log('Pipeline complete');
}

processJSONL();

Compressed JSONL

const fs = require('fs');
const zlib = require('zlib');
const readline = require('readline');

// Read gzipped JSONL
async function readGzippedJSONL(filepath) {
    const fileStream = fs.createReadStream(filepath);
    const gunzip = zlib.createGunzip();
    const rl = readline.createInterface({
        input: fileStream.pipe(gunzip),
        crlfDelay: Infinity
    });

    for await (const line of rl) {
        const record = JSON.parse(line);
        console.log(record);
    }
}

// Write gzipped JSONL
async function writeGzippedJSONL(records, filepath) {
    const writeStream = fs.createWriteStream(filepath);
    const gzip = zlib.createGzip();

    gzip.pipe(writeStream);

    for (const record of records) {
        gzip.write(JSON.stringify(record) + '\n');
    }

    gzip.end();
    return new Promise(resolve => writeStream.on('finish', resolve));
}

// With ndjson
const ndjson = require('ndjson');

fs.createReadStream('data.jsonl.gz')
    .pipe(zlib.createGunzip())
    .pipe(ndjson.parse())
    .on('data', record => {
        console.log(record);
    });

HTTP Streaming API

const express = require('express');
const fs = require('fs');
const ndjson = require('ndjson');

const app = express();

// Streaming JSONL endpoint
app.get('/api/data', (req, res) => {
    res.setHeader('Content-Type', 'application/x-ndjson');
    res.setHeader('Transfer-Encoding', 'chunked');

    fs.createReadStream('data.jsonl')
        .pipe(ndjson.parse())
        .pipe(ndjson.stringify())
        .pipe(res);
});

// Query database and stream results
app.get('/api/users', async (req, res) => {
    res.setHeader('Content-Type', 'application/x-ndjson');

    const cursor = db.collection('users').find().stream();

    cursor.on('data', (doc) => {
        res.write(JSON.stringify(doc) + '\n');
    });

    cursor.on('end', () => {
        res.end();
    });
});

app.listen(3000);

// Client: consume streaming JSONL
const https = require('https');
const readline = require('readline');

https.get('https://api.example.com/data', (res) => {
    const rl = readline.createInterface({ input: res });

    rl.on('line', (line) => {
        const record = JSON.parse(line);
        console.log(record);
    });
});

Browser Usage (Fetch API)

// Streaming JSONL in the browser
async function fetchStreamingJSONL(url) {
    const response = await fetch(url);
    const reader = response.body.getReader();
    const decoder = new TextDecoder();

    let buffer = '';

    while (true) {
        const { done, value } = await reader.read();

        if (done) break;

        // Decode chunk
        buffer += decoder.decode(value, { stream: true });

        // Process complete lines
        const lines = buffer.split('\n');
        buffer = lines.pop(); // Keep incomplete line in buffer

        for (const line of lines) {
            if (line.trim()) {
                const record = JSON.parse(line);
                console.log(record);
                // Update UI with record
            }
        }
    }
}

// Usage
fetchStreamingJSONL('https://api.example.com/data.jsonl');

// With async iterator (modern browsers)
async function* parseJSONLStream(response) {
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    let buffer = '';

    while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, { stream: true });
        const lines = buffer.split('\n');
        buffer = lines.pop();

        for (const line of lines) {
            if (line.trim()) {
                yield JSON.parse(line);
            }
        }
    }
}

// Usage with for-await
const response = await fetch('https://api.example.com/data.jsonl');
for await (const record of parseJSONLStream(response)) {
    console.log(record);
}

Complete Node.js Example: API + Streaming

// Complete example: JSONL API with filtering
const express = require('express');
const fs = require('fs');
const ndjson = require('ndjson');
const { Transform } = require('stream');

const app = express();

// Custom filter transform
class FilterStream extends Transform {
    constructor(query) {
        super({ objectMode: true });
        this.query = query;
    }

    _transform(record, encoding, callback) {
        // Apply filters
        let match = true;

        if (this.query.minAge && record.age < this.query.minAge) {
            match = false;
        }

        if (this.query.status && record.status !== this.query.status) {
            match = false;
        }

        if (match) {
            this.push(record);
        }

        callback();
    }
}

// Streaming JSONL endpoint with query params
app.get('/api/users', (req, res) => {
    const query = {
        minAge: parseInt(req.query.minAge) || 0,
        status: req.query.status
    };

    res.setHeader('Content-Type', 'application/x-ndjson');
    res.setHeader('Transfer-Encoding', 'chunked');

    const filterStream = new FilterStream(query);

    fs.createReadStream('users.jsonl')
        .pipe(ndjson.parse())
        .pipe(filterStream)
        .pipe(ndjson.stringify())
        .pipe(res)
        .on('error', (err) => {
            console.error('Stream error:', err);
            res.end();
        });
});

app.listen(3000, () => {
    console.log('API listening on port 3000');
});

Go

Reading JSONL (bufio.Scanner)

package main

import (
    "bufio"
    "encoding/json"
    "fmt"
    "log"
    "os"
)

type User struct {
    ID    int    `json:"id"`
    Name  string `json:"name"`
    Email string `json:"email"`
}

func main() {
    file, err := os.Open("data.jsonl")
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    scanner := bufio.NewScanner(file)

    // Increase buffer size for large lines (default is 64KB)
    const maxCapacity = 1024 * 1024 // 1MB
    buf := make([]byte, maxCapacity)
    scanner.Buffer(buf, maxCapacity)

    lineNum := 0
    for scanner.Scan() {
        lineNum++
        line := scanner.Bytes()

        var user User
        if err := json.Unmarshal(line, &user); err != nil {
            log.Printf("Error on line %d: %v", lineNum, err)
            continue
        }

        fmt.Printf("%s\n", user.Name)
    }

    if err := scanner.Err(); err != nil {
        log.Fatal(err)
    }
}

Writing JSONL

package main

import (
    "bufio"
    "encoding/json"
    "log"
    "os"
)

type User struct {
    ID    int    `json:"id"`
    Name  string `json:"name"`
    Email string `json:"email"`
}

func main() {
    users := []User{
        {ID: 1, Name: "Alice", Email: "[email protected]"},
        {ID: 2, Name: "Bob", Email: "[email protected]"},
    }

    file, err := os.Create("output.jsonl")
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    // Use buffered writer for better performance
    writer := bufio.NewWriter(file)
    defer writer.Flush()

    encoder := json.NewEncoder(writer)

    for _, user := range users {
        if err := encoder.Encode(&user); err != nil {
            log.Printf("Error encoding user %d: %v", user.ID, err)
            continue
        }
    }
}

// Append to existing file
func appendToJSONL(user User, filepath string) error {
    file, err := os.OpenFile(filepath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
    if err != nil {
        return err
    }
    defer file.Close()

    encoder := json.NewEncoder(file)
    return encoder.Encode(&user)
}

Streaming with json.Decoder

package main

import (
    "bufio"
    "encoding/json"
    "io"
    "log"
    "os"
)

type Record struct {
    ID   int    `json:"id"`
    Data string `json:"data"`
}

// Stream JSONL with json.Decoder (more efficient than Scanner for large files)
func streamJSONL(filepath string, processFn func(Record) error) error {
    file, err := os.Open(filepath)
    if err != nil {
        return err
    }
    defer file.Close()

    reader := bufio.NewReader(file)

    for {
        line, err := reader.ReadBytes('\n')
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }

        var record Record
        if err := json.Unmarshal(line, &record); err != nil {
            log.Printf("Parse error: %v", err)
            continue
        }

        if err := processFn(record); err != nil {
            return err
        }
    }

    return nil
}

func main() {
    err := streamJSONL("data.jsonl", func(r Record) error {
        // Process each record
        log.Printf("Processing ID %d", r.ID)
        return nil
    })

    if err != nil {
        log.Fatal(err)
    }
}

Concurrent Processing with Goroutines

package main

import (
    "bufio"
    "encoding/json"
    "log"
    "os"
    "sync"
)

type Record struct {
    ID   int    `json:"id"`
    Data string `json:"data"`
}

// Concurrent JSONL processing
func processConcurrent(filepath string, numWorkers int) error {
    file, err := os.Open(filepath)
    if err != nil {
        return err
    }
    defer file.Close()

    // Channel for distributing work
    lines := make(chan []byte, numWorkers*2)
    results := make(chan Record, numWorkers*2)

    // Start worker goroutines
    var wg sync.WaitGroup
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for line := range lines {
                var record Record
                if err := json.Unmarshal(line, &record); err != nil {
                    log.Printf("Parse error: %v", err)
                    continue
                }

                // Process record
                processed := processRecord(record)
                results <- processed
            }
        }()
    }

    // Result collector
    var resultsWg sync.WaitGroup
    resultsWg.Add(1)
    go func() {
        defer resultsWg.Done()
        for result := range results {
            // Handle results
            log.Printf("Processed: %+v", result)
        }
    }()

    // Read file and distribute lines
    scanner := bufio.NewScanner(file)
    buf := make([]byte, 1024*1024)
    scanner.Buffer(buf, 1024*1024)

    for scanner.Scan() {
        // Make a copy of line (scanner reuses buffer)
        lineCopy := make([]byte, len(scanner.Bytes()))
        copy(lineCopy, scanner.Bytes())
        lines <- lineCopy
    }

    close(lines)
    wg.Wait()
    close(results)
    resultsWg.Wait()

    return scanner.Err()
}

func processRecord(r Record) Record {
    // Expensive processing here
    return r
}

func main() {
    if err := processConcurrent("data.jsonl", 10); err != nil {
        log.Fatal(err)
    }
}

Compressed JSONL

package main

import (
    "bufio"
    "compress/gzip"
    "encoding/json"
    "log"
    "os"
)

type Record struct {
    ID   int    `json:"id"`
    Data string `json:"data"`
}

// Read gzipped JSONL
func readGzippedJSONL(filepath string) error {
    file, err := os.Open(filepath)
    if err != nil {
        return err
    }
    defer file.Close()

    gzReader, err := gzip.NewReader(file)
    if err != nil {
        return err
    }
    defer gzReader.Close()

    scanner := bufio.NewScanner(gzReader)
    buf := make([]byte, 1024*1024)
    scanner.Buffer(buf, 1024*1024)

    for scanner.Scan() {
        var record Record
        if err := json.Unmarshal(scanner.Bytes(), &record); err != nil {
            log.Printf("Parse error: %v", err)
            continue
        }

        // Process record
        log.Printf("%+v", record)
    }

    return scanner.Err()
}

// Write gzipped JSONL
func writeGzippedJSONL(records []Record, filepath string) error {
    file, err := os.Create(filepath)
    if err != nil {
        return err
    }
    defer file.Close()

    gzWriter := gzip.NewWriter(file)
    defer gzWriter.Close()

    writer := bufio.NewWriter(gzWriter)
    defer writer.Flush()

    encoder := json.NewEncoder(writer)

    for _, record := range records {
        if err := encoder.Encode(&record); err != nil {
            return err
        }
    }

    return nil
}

func main() {
    // Read
    if err := readGzippedJSONL("data.jsonl.gz"); err != nil {
        log.Fatal(err)
    }

    // Write
    records := []Record{{ID: 1, Data: "test"}}
    if err := writeGzippedJSONL(records, "output.jsonl.gz"); err != nil {
        log.Fatal(err)
    }
}

Generic JSONL Processing (Go 1.18+)

package main

import (
    "bufio"
    "encoding/json"
    "io"
    "os"
)

// Generic JSONL reader
func ReadJSONL[T any](filepath string) ([]T, error) {
    file, err := os.Open(filepath)
    if err != nil {
        return nil, err
    }
    defer file.Close()

    var results []T
    scanner := bufio.NewScanner(file)

    for scanner.Scan() {
        var record T
        if err := json.Unmarshal(scanner.Bytes(), &record); err != nil {
            continue
        }
        results = append(results, record)
    }

    return results, scanner.Err()
}

// Generic JSONL writer
func WriteJSONL[T any](records []T, filepath string) error {
    file, err := os.Create(filepath)
    if err != nil {
        return err
    }
    defer file.Close()

    writer := bufio.NewWriter(file)
    defer writer.Flush()

    encoder := json.NewEncoder(writer)

    for _, record := range records {
        if err := encoder.Encode(&record); err != nil {
            return err
        }
    }

    return nil
}

// Streaming generic processor
func StreamJSONL[T any](r io.Reader, processFn func(T) error) error {
    scanner := bufio.NewScanner(r)

    for scanner.Scan() {
        var record T
        if err := json.Unmarshal(scanner.Bytes(), &record); err != nil {
            continue
        }

        if err := processFn(record); err != nil {
            return err
        }
    }

    return scanner.Err()
}

type User struct {
    Name string `json:"name"`
    Age  int    `json:"age"`
}

func main() {
    // Type-safe reading
    users, _ := ReadJSONL[User]("users.jsonl")

    // Type-safe writing
    WriteJSONL(users, "output.jsonl")

    // Streaming
    file, _ := os.Open("users.jsonl")
    defer file.Close()

    StreamJSONL(file, func(u User) error {
        // Process user
        return nil
    })
}

Complete Go Example: Concurrent ETL Pipeline

package main

import (
    "bufio"
    "compress/gzip"
    "encoding/json"
    "log"
    "os"
    "sync"
)

type InputRecord struct {
    ID     int    `json:"id"`
    Status string `json:"status"`
    Value  int    `json:"value"`
}

type OutputRecord struct {
    ID        int    `json:"id"`
    Processed bool   `json:"processed"`
    Result    int    `json:"result"`
}

func main() {
    if err := runETL("input.jsonl.gz", "output.jsonl.gz", 10); err != nil {
        log.Fatal(err)
    }
}

func runETL(inputPath, outputPath string, numWorkers int) error {
    // Open input file
    inFile, err := os.Open(inputPath)
    if err != nil {
        return err
    }
    defer inFile.Close()

    gzReader, _ := gzip.NewReader(inFile)
    defer gzReader.Close()

    // Open output file
    outFile, err := os.Create(outputPath)
    if err != nil {
        return err
    }
    defer outFile.Close()

    gzWriter := gzip.NewWriter(outFile)
    defer gzWriter.Close()

    // Channels
    inputs := make(chan InputRecord, numWorkers*2)
    outputs := make(chan OutputRecord, numWorkers*2)

    // Workers
    var wg sync.WaitGroup
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for input := range inputs {
                // Filter
                if input.Status != "active" {
                    continue
                }

                // Transform
                output := OutputRecord{
                    ID:        input.ID,
                    Processed: true,
                    Result:    input.Value * 2,
                }

                outputs <- output
            }
        }()
    }

    // Writer goroutine
    var writerWg sync.WaitGroup
    writerWg.Add(1)
    go func() {
        defer writerWg.Done()
        encoder := json.NewEncoder(gzWriter)
        for output := range outputs {
            encoder.Encode(&output)
        }
    }()

    // Read and distribute
    scanner := bufio.NewScanner(gzReader)
    buf := make([]byte, 1024*1024)
    scanner.Buffer(buf, 1024*1024)

    for scanner.Scan() {
        var record InputRecord
        if err := json.Unmarshal(scanner.Bytes(), &record); err != nil {
            continue
        }
        inputs <- record
    }

    // Cleanup
    close(inputs)
    wg.Wait()
    close(outputs)
    writerWg.Wait()

    return scanner.Err()
}

Java

Setup and Dependencies

Add Jackson or Gson to your project for JSON processing:

// Maven - Jackson
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.15.2</version>
</dependency>

// Gradle - Jackson
implementation 'com.fasterxml.jackson.core:jackson-databind:2.15.2'

// Maven - Gson
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.10.1</version>
</dependency>

// Gradle - Gson
implementation 'com.google.code.gson:gson:2.10.1'

Reading JSONL with Jackson

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class JacksonJSONLReader {
    public static class User {
        public int id;
        public String name;
        public String email;
    }

    public static void main(String[] args) throws IOException {
        ObjectMapper mapper = new ObjectMapper();
        List<User> users = new ArrayList<>();

        try (BufferedReader reader = new BufferedReader(new FileReader("data.jsonl"))) {
            String line;
            int lineNum = 0;

            while ((line = reader.readLine()) != null) {
                lineNum++;

                // Skip empty lines
                if (line.trim().isEmpty()) {
                    continue;
                }

                try {
                    User user = mapper.readValue(line, User.class);
                    users.add(user);
                    System.out.println(user.name);
                } catch (IOException e) {
                    System.err.println("Error on line " + lineNum + ": " + e.getMessage());
                }
            }
        }

        System.out.println("Loaded " + users.size() + " users");
    }
}

Writing JSONL with Jackson

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;

public class JacksonJSONLWriter {
    public static class User {
        public int id;
        public String name;
        public String email;

        public User(int id, String name, String email) {
            this.id = id;
            this.name = name;
            this.email = email;
        }
    }

    public static void main(String[] args) throws IOException {
        ObjectMapper mapper = new ObjectMapper();

        List<User> users = Arrays.asList(
            new User(1, "Alice", "[email protected]"),
            new User(2, "Bob", "[email protected]"),
            new User(3, "Charlie", "[email protected]")
        );

        try (BufferedWriter writer = new BufferedWriter(new FileWriter("output.jsonl"))) {
            for (User user : users) {
                String json = mapper.writeValueAsString(user);
                writer.write(json);
                writer.newLine();
            }
        }

        System.out.println("Wrote " + users.size() + " users to output.jsonl");
    }
}

Streaming Large Files with Jackson

Jackson's streaming API provides memory-efficient processing:

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.function.Consumer;

public class StreamingJSONLProcessor {
    public static class Record {
        public int id;
        public String data;
    }

    public static void streamJSONL(String filepath, Consumer<Record> processor) throws IOException {
        ObjectMapper mapper = new ObjectMapper();

        try (BufferedReader reader = new BufferedReader(new FileReader(filepath), 8192)) {
            String line;

            while ((line = reader.readLine()) != null) {
                if (line.trim().isEmpty()) {
                    continue;
                }

                try {
                    Record record = mapper.readValue(line, Record.class);
                    processor.accept(record);
                } catch (IOException e) {
                    System.err.println("Parse error: " + e.getMessage());
                }
            }
        }
    }

    public static void main(String[] args) throws IOException {
        // Process each record as it's read (low memory usage)
        streamJSONL("large.jsonl", record -> {
            System.out.println("Processing: " + record.id);
            // Do expensive processing here
        });
    }
}

Reading JSONL with Gson

import com.google.gson.Gson;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class GsonJSONLReader {
    public static class User {
        int id;
        String name;
        String email;
    }

    public static void main(String[] args) throws IOException {
        Gson gson = new Gson();
        List<User> users = new ArrayList<>();

        try (BufferedReader reader = new BufferedReader(new FileReader("data.jsonl"))) {
            String line;

            while ((line = reader.readLine()) != null) {
                if (line.trim().isEmpty()) {
                    continue;
                }

                User user = gson.fromJson(line, User.class);
                users.add(user);
                System.out.println(user.name);
            }
        }

        System.out.println("Total users: " + users.size());
    }
}

Writing JSONL with Gson

import com.google.gson.Gson;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;

public class GsonJSONLWriter {
    public static class User {
        int id;
        String name;
        String email;

        User(int id, String name, String email) {
            this.id = id;
            this.name = name;
            this.email = email;
        }
    }

    public static void main(String[] args) throws IOException {
        Gson gson = new Gson();

        List<User> users = Arrays.asList(
            new User(1, "Alice", "[email protected]"),
            new User(2, "Bob", "[email protected]")
        );

        try (BufferedWriter writer = new BufferedWriter(new FileWriter("output.jsonl"))) {
            for (User user : users) {
                String json = gson.toJson(user);
                writer.write(json);
                writer.newLine();
            }
        }
    }
}

Parallel Processing with Streams

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.stream.Collectors;

public class ParallelJSONLProcessor {
    public static class Record {
        public int id;
        public String data;
        public String status;
    }

    public static void main(String[] args) throws IOException {
        ObjectMapper mapper = new ObjectMapper();

        // Read and process in parallel
        List<Record> results = Files.lines(Paths.get("data.jsonl"))
            .parallel()
            .filter(line -> !line.trim().isEmpty())
            .map(line -> {
                try {
                    return mapper.readValue(line, Record.class);
                } catch (IOException e) {
                    return null;
                }
            })
            .filter(record -> record != null)
            .filter(record -> "active".equals(record.status))
            .map(record -> {
                // Expensive processing
                record.data = processData(record.data);
                return record;
            })
            .collect(Collectors.toList());

        System.out.println("Processed " + results.size() + " records");
    }

    private static String processData(String data) {
        // Simulate expensive processing
        return data.toUpperCase();
    }
}

Complete Java Example: ETL Pipeline

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.*;
import java.util.concurrent.*;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

public class JSONLETLPipeline {
    private static final ObjectMapper mapper = new ObjectMapper();

    public static class InputRecord {
        public int id;
        public String status;
        public int value;
    }

    public static class OutputRecord {
        public int id;
        public boolean processed;
        public int result;

        public OutputRecord(int id, boolean processed, int result) {
            this.id = id;
            this.processed = processed;
            this.result = result;
        }
    }

    public static void main(String[] args) throws Exception {
        runETL("input.jsonl.gz", "output.jsonl.gz", 10);
    }

    public static void runETL(String inputPath, String outputPath, int numThreads)
            throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(numThreads);
        BlockingQueue<OutputRecord> outputQueue = new LinkedBlockingQueue<>(1000);

        // Writer thread
        CompletableFuture<Void> writerFuture = CompletableFuture.runAsync(() -> {
            try (BufferedWriter writer = new BufferedWriter(
                    new OutputStreamWriter(
                        new GZIPOutputStream(new FileOutputStream(outputPath))))) {

                while (true) {
                    OutputRecord record = outputQueue.poll(1, TimeUnit.SECONDS);
                    if (record == null) continue;

                    if (record.id == -1) break; // Sentinel value

                    String json = mapper.writeValueAsString(record);
                    writer.write(json);
                    writer.newLine();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

        // Read and process
        try (BufferedReader reader = new BufferedReader(
                new InputStreamReader(
                    new GZIPInputStream(new FileInputStream(inputPath))))) {

            reader.lines()
                .parallel()
                .filter(line -> !line.trim().isEmpty())
                .forEach(line -> {
                    try {
                        InputRecord input = mapper.readValue(line, InputRecord.class);

                        // Filter
                        if (!"active".equals(input.status)) {
                            return;
                        }

                        // Transform
                        OutputRecord output = new OutputRecord(
                            input.id,
                            true,
                            input.value * 2
                        );

                        outputQueue.put(output);
                    } catch (Exception e) {
                        System.err.println("Error: " + e.getMessage());
                    }
                });
        }

        // Signal completion
        outputQueue.put(new OutputRecord(-1, false, 0));

        // Wait for writer
        writerFuture.get();
        executor.shutdown();

        System.out.println("ETL pipeline complete");
    }
}

Best Practices

  • Use BufferedReader/Writer: Provides significant performance improvement for file I/O
  • Reuse ObjectMapper: Creating new instances is expensive - use a singleton
  • Handle errors gracefully: One bad line shouldn't crash your entire pipeline
  • Use try-with-resources: Ensures proper cleanup of file handles
  • Consider streaming for large files: Process line-by-line to minimize memory usage

Rust

Setup and Dependencies

Add serde and serde_json to your Cargo.toml:

[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

Reading JSONL

use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufRead, BufReader};

#[derive(Debug, Deserialize, Serialize)]
struct User {
    id: u32,
    name: String,
    email: String,
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let file = File::open("data.jsonl")?;
    let reader = BufReader::new(file);

    for (line_num, line) in reader.lines().enumerate() {
        let line = line?;

        // Skip empty lines
        if line.trim().is_empty() {
            continue;
        }

        // Parse JSON
        match serde_json::from_str::(&line) {
            Ok(user) => {
                println!("User: {} ({})", user.name, user.email);
            }
            Err(e) => {
                eprintln!("Error on line {}: {}", line_num + 1, e);
            }
        }
    }

    Ok(())
}

Writing JSONL

use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufWriter, Write};

#[derive(Debug, Deserialize, Serialize)]
struct User {
    id: u32,
    name: String,
    email: String,
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let users = vec![
        User {
            id: 1,
            name: "Alice".to_string(),
            email: "[email protected]".to_string(),
        },
        User {
            id: 2,
            name: "Bob".to_string(),
            email: "[email protected]".to_string(),
        },
    ];

    let file = File::create("output.jsonl")?;
    let mut writer = BufWriter::new(file);

    for user in users {
        // Serialize to JSON string
        let json = serde_json::to_string(&user)?;

        // Write JSON + newline
        writeln!(writer, "{}", json)?;
    }

    // Ensure all data is written
    writer.flush()?;

    println!("Successfully wrote JSONL file");
    Ok(())
}

Memory-Efficient Streaming

Process large files line-by-line with minimal memory:

use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufRead, BufReader};

#[derive(Debug, Deserialize)]
struct Record {
    id: u32,
    data: String,
}

fn stream_jsonl<F>(filepath: &str, mut processor: F) -> Result<(), Box<dyn std::error::Error>>
where
    F: FnMut(Record) -> Result<(), Box<dyn std::error::Error>>,
{
    let file = File::open(filepath)?;
    let reader = BufReader::new(file);

    for line in reader.lines() {
        let line = line?;

        if line.trim().is_empty() {
            continue;
        }

        match serde_json::from_str::(&line) {
            Ok(record) => {
                processor(record)?;
            }
            Err(e) => {
                eprintln!("Parse error: {}", e);
                continue;
            }
        }
    }

    Ok(())
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Process each record as it's read
    stream_jsonl("large.jsonl", |record| {
        println!("Processing ID: {}", record.id);
        // Do expensive work here
        Ok(())
    })?;

    Ok(())
}

Robust Error Handling

use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufRead, BufReader};

#[derive(Debug, Deserialize)]
struct User {
    id: u32,
    name: String,
}

#[derive(Debug)]
struct ParseStats {
    total_lines: usize,
    successful: usize,
    errors: usize,
}

fn read_jsonl_with_stats(filepath: &str) -> Result<(Vec<User>, ParseStats), std::io::Error> {
    let file = File::open(filepath)?;
    let reader = BufReader::new(file);

    let mut users = Vec::new();
    let mut stats = ParseStats {
        total_lines: 0,
        successful: 0,
        errors: 0,
    };

    for (line_num, line) in reader.lines().enumerate() {
        stats.total_lines += 1;
        let line = line?;

        if line.trim().is_empty() {
            continue;
        }

        match serde_json::from_str::(&line) {
            Ok(user) => {
                users.push(user);
                stats.successful += 1;
            }
            Err(e) => {
                eprintln!("Line {}: {}", line_num + 1, e);
                stats.errors += 1;
            }
        }
    }

    Ok((users, stats))
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (users, stats) = read_jsonl_with_stats("data.jsonl")?;

    println!("Loaded {} users", users.len());
    println!("Stats: {:?}", stats);

    Ok(())
}

Compressed JSONL

Add flate2 for gzip support:

// Cargo.toml
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
flate2 = "1.0"

// src/main.rs
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use flate2::Compression;
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufRead, BufReader, BufWriter, Write};

#[derive(Debug, Deserialize, Serialize)]
struct Record {
    id: u32,
    data: String,
}

fn read_gzipped_jsonl(filepath: &str) -> Result<Vec<Record>, Box<dyn std::error::Error>> {
    let file = File::open(filepath)?;
    let decoder = GzDecoder::new(file);
    let reader = BufReader::new(decoder);

    let mut records = Vec::new();

    for line in reader.lines() {
        let line = line?;
        if line.trim().is_empty() {
            continue;
        }

        let record: Record = serde_json::from_str(&line)?;
        records.push(record);
    }

    Ok(records)
}

fn write_gzipped_jsonl(records: &[Record], filepath: &str) -> Result<(), Box<dyn std::error::Error>> {
    let file = File::create(filepath)?;
    let encoder = GzEncoder::new(file, Compression::default());
    let mut writer = BufWriter::new(encoder);

    for record in records {
        let json = serde_json::to_string(record)?;
        writeln!(writer, "{}", json)?;
    }

    writer.flush()?;
    Ok(())
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Read gzipped JSONL
    let records = read_gzipped_jsonl("data.jsonl.gz")?;
    println!("Loaded {} records", records.len());

    // Write gzipped JSONL
    write_gzipped_jsonl(&records, "output.jsonl.gz")?;
    println!("Wrote compressed JSONL");

    Ok(())
}

Parallel Processing with Rayon

Add rayon for easy parallelism:

// Cargo.toml
[dependencies]
rayon = "1.7"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

// src/main.rs
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufRead, BufReader};

#[derive(Debug, Deserialize, Serialize)]
struct Record {
    id: u32,
    status: String,
    value: i32,
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let file = File::open("data.jsonl")?;
    let reader = BufReader::new(file);

    // Read all lines into memory
    let lines: Vec<String> = reader.lines().filter_map(|l| l.ok()).collect();

    // Process in parallel
    let results: Vec<Record> = lines
        .par_iter()
        .filter_map(|line| {
            if line.trim().is_empty() {
                return None;
            }

            serde_json::from_str::(line).ok()
        })
        .filter(|record| record.status == "active")
        .map(|mut record| {
            // Expensive processing
            record.value *= 2;
            record
        })
        .collect();

    println!("Processed {} records", results.len());

    Ok(())
}

Complete Rust Example: ETL Pipeline

use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use flate2::Compression;
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufRead, BufReader, BufWriter, Write};

#[derive(Debug, Deserialize)]
struct InputRecord {
    id: u32,
    status: String,
    value: i32,
}

#[derive(Debug, Serialize)]
struct OutputRecord {
    id: u32,
    processed: bool,
    result: i32,
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    run_etl("input.jsonl.gz", "output.jsonl.gz")?;
    Ok(())
}

fn run_etl(input_path: &str, output_path: &str) -> Result<(), Box<dyn std::error::Error>> {
    // Read gzipped input
    let input_file = File::open(input_path)?;
    let decoder = GzDecoder::new(input_file);
    let reader = BufReader::new(decoder);

    // Read all lines
    let lines: Vec<String> = reader.lines().filter_map(|l| l.ok()).collect();

    println!("Processing {} lines...", lines.len());

    // Parallel ETL pipeline
    let results: Vec<OutputRecord> = lines
        .par_iter()
        .filter_map(|line| {
            if line.trim().is_empty() {
                return None;
            }

            // Parse
            serde_json::from_str::(line).ok()
        })
        .filter(|input| {
            // Filter: only active records
            input.status == "active"
        })
        .map(|input| {
            // Transform
            OutputRecord {
                id: input.id,
                processed: true,
                result: input.value * 2,
            }
        })
        .collect();

    println!("Filtered to {} records", results.len());

    // Write gzipped output
    let output_file = File::create(output_path)?;
    let encoder = GzEncoder::new(output_file, Compression::default());
    let mut writer = BufWriter::new(encoder);

    for record in results {
        let json = serde_json::to_string(&record)?;
        writeln!(writer, "{}", json)?;
    }

    writer.flush()?;
    println!("ETL complete!");

    Ok(())
}

Best Practices

  • Use BufReader/BufWriter: Dramatically improves I/O performance
  • Leverage serde's derive macro: Auto-generate serialization code
  • Handle errors explicitly: Use Result types and proper error propagation
  • Use Rayon for parallelism: Easy parallel processing with minimal code changes
  • Flush writers explicitly: Ensures all data is written to disk
  • Consider memory vs speed tradeoffs: Line-by-line for huge files, bulk for better parallelism