JSONL for Analytics Pipelines
Power your ETL workflows, data warehouses, and analytics at scale with JSONL - the format trusted by BigQuery, Snowflake, and modern data platforms
Why JSONL for Analytics?
Perfect for Data Warehouses
JSONL is the preferred format for loading data into modern data warehouses. Its line-based structure allows for parallel loading, schema-on-read flexibility, and efficient incremental updates.
- Parallel data loading
- Schema evolution support
- Incremental updates
- Nested data structures
Platform Support
- BigQuery: Native JSONL import
- Snowflake: JSONL bulk loading
- Redshift: COPY from S3 JSONL
- Athena: Query JSONL directly
- Databricks: Delta Lake ingestion
Jump to Topic
ETL Workflows with JSONL
Extract - Pull Data to JSONL
# Python: Extract from PostgreSQL to JSONL
import psycopg2
import json
from datetime import datetime
conn = psycopg2.connect("dbname=mydb user=postgres")
cursor = conn.cursor()
# Query with cursor for memory efficiency
cursor.execute("SELECT * FROM users WHERE created_at > %s", (last_sync,))
with open(f"users_{datetime.now().strftime('%Y%m%d')}.jsonl", 'w') as f:
while True:
rows = cursor.fetchmany(1000) # Batch processing
if not rows:
break
for row in rows:
record = {
'id': row[0],
'email': row[1],
'name': row[2],
'created_at': row[3].isoformat()
}
f.write(json.dumps(record) + '\n')
cursor.close()
conn.close()
# Extract from MongoDB
from pymongo import MongoClient
client = MongoClient('mongodb://localhost:27017/')
db = client['myapp']
with open('orders.jsonl', 'w') as f:
for doc in db.orders.find({'status': 'completed'}):
doc['_id'] = str(doc['_id']) # Convert ObjectId
f.write(json.dumps(doc) + '\n')
# Extract from REST API
import requests
url = "https://api.example.com/events"
params = {'since': '2025-01-01', 'limit': 1000}
with open('api_events.jsonl', 'w') as f:
while True:
response = requests.get(url, params=params)
data = response.json()
for item in data['items']:
f.write(json.dumps(item) + '\n')
if not data.get('next_page'):
break
params['page'] = data['next_page']
Transform - Process JSONL Data
# Python: Transform JSONL with pandas
import pandas as pd
import json
# Read JSONL into DataFrame
records = []
with open('users.jsonl', 'r') as f:
for line in f:
records.append(json.loads(line))
df = pd.DataFrame(records)
# Transformations
df['created_date'] = pd.to_datetime(df['created_at']).dt.date
df['email_domain'] = df['email'].str.split('@').str[1]
df['name_length'] = df['name'].str.len()
# Enrich with external data
df = df.merge(companies_df, left_on='email_domain', right_on='domain')
# Filter and clean
df = df[df['email'].str.contains('@')] # Valid emails
df = df.drop_duplicates(subset=['email'])
# Write transformed data
with open('users_transformed.jsonl', 'w') as f:
for record in df.to_dict('records'):
f.write(json.dumps(record) + '\n')
# Streaming transformation for large files
def transform_record(record):
"""Transform individual record"""
record['email'] = record['email'].lower()
record['created_year'] = record['created_at'][:4]
return record
with open('input.jsonl', 'r') as infile, \
open('output.jsonl', 'w') as outfile:
for line in infile:
record = json.loads(line)
transformed = transform_record(record)
outfile.write(json.dumps(transformed) + '\n')
Load - Import to Data Warehouse
# Load to PostgreSQL
import psycopg2
import json
conn = psycopg2.connect("dbname=warehouse")
cursor = conn.cursor()
with open('users_transformed.jsonl', 'r') as f:
for line in f:
record = json.loads(line)
cursor.execute(
"""
INSERT INTO users (id, email, name, created_at)
VALUES (%s, %s, %s, %s)
ON CONFLICT (id) DO UPDATE
SET email = EXCLUDED.email,
name = EXCLUDED.name,
updated_at = NOW()
""",
(record['id'], record['email'], record['name'], record['created_at'])
)
conn.commit()
cursor.close()
# Bulk load with COPY
with open('users.jsonl', 'r') as f:
cursor.copy_expert(
"""
COPY users (data)
FROM STDIN
""",
f
)
Google BigQuery
Loading JSONL into BigQuery
# Python: Load JSONL to BigQuery
from google.cloud import bigquery
client = bigquery.Client()
table_id = "project.dataset.users"
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
autodetect=True, # Auto-detect schema
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
)
# Load from local file
with open("users.jsonl", "rb") as source_file:
job = client.load_table_from_file(source_file, table_id, job_config=job_config)
job.result() # Wait for completion
print(f"Loaded {job.output_rows} rows")
# Load from Google Cloud Storage
uri = "gs://my-bucket/data/*.jsonl"
job = client.load_table_from_uri(uri, table_id, job_config=job_config)
job.result()
# Streaming inserts (real-time)
rows_to_insert = [
{"id": 1, "name": "Alice", "email": "[email protected]"},
{"id": 2, "name": "Bob", "email": "[email protected]"}
]
errors = client.insert_rows_json(table_id, rows_to_insert)
if errors:
print(f"Errors: {errors}")
# Query nested JSON
query = """
SELECT
user_id,
JSON_VALUE(metadata, '$.country') as country,
JSON_VALUE(metadata, '$.city') as city
FROM `project.dataset.events`
WHERE DATE(timestamp) = CURRENT_DATE()
"""
results = client.query(query)
for row in results:
print(f"{row.user_id}: {row.country}, {row.city}")
Schema Design for JSON Data
-- Create table with schema
CREATE OR REPLACE TABLE `project.dataset.users` (
id INT64,
email STRING,
name STRING,
metadata JSON, -- Native JSON type
tags ARRAY<STRING>,
address STRUCT<
street STRING,
city STRING,
country STRING
>,
created_at TIMESTAMP
)
PARTITION BY DATE(created_at)
CLUSTER BY email;
-- Query JSON fields
SELECT
id,
JSON_EXTRACT_SCALAR(metadata, '$.signup_source') as signup_source,
JSON_EXTRACT_ARRAY(metadata, '$.interests') as interests
FROM `project.dataset.users`
WHERE JSON_EXTRACT_SCALAR(metadata, '$.plan') = 'premium';
-- Flatten nested arrays
SELECT
id,
tag
FROM `project.dataset.users`,
UNNEST(tags) as tag;
Snowflake
Loading JSONL into Snowflake
-- Create file format for JSONL
CREATE OR REPLACE FILE FORMAT jsonl_format
TYPE = 'JSON'
STRIP_OUTER_ARRAY = FALSE
COMPRESSION = 'GZIP';
-- Create stage for S3
CREATE OR REPLACE STAGE s3_stage
URL = 's3://my-bucket/data/'
CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...')
FILE_FORMAT = jsonl_format;
-- Create target table
CREATE OR REPLACE TABLE users (
raw_data VARIANT, -- JSON data type
loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Load data from stage
COPY INTO users (raw_data)
FROM @s3_stage/users.jsonl.gz
FILE_FORMAT = jsonl_format
ON_ERROR = 'CONTINUE';
-- Create view with parsed columns
CREATE OR REPLACE VIEW users_parsed AS
SELECT
raw_data:id::NUMBER as id,
raw_data:email::STRING as email,
raw_data:name::STRING as name,
raw_data:metadata::VARIANT as metadata,
raw_data:created_at::TIMESTAMP as created_at,
loaded_at
FROM users;
-- Query nested JSON
SELECT
id,
email,
metadata:country::STRING as country,
metadata:preferences[0]::STRING as first_preference
FROM users_parsed
WHERE metadata:plan::STRING = 'premium';
Snowflake Python Connector
import snowflake.connector
import json
conn = snowflake.connector.connect(
user='myuser',
password='mypassword',
account='myaccount',
warehouse='COMPUTE_WH',
database='MYDB',
schema='PUBLIC'
)
cursor = conn.cursor()
# Stage local file
cursor.execute("PUT file://users.jsonl @~")
# Load into table
cursor.execute("""
COPY INTO users (raw_data)
FROM @~/users.jsonl.gz
FILE_FORMAT = jsonl_format
""")
# Query data
cursor.execute("""
SELECT raw_data:email::STRING as email
FROM users
WHERE raw_data:created_at::DATE = CURRENT_DATE()
""")
for row in cursor:
print(row[0])
cursor.close()
conn.close()
Apache Airflow Orchestration
ETL DAG with JSONL
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'etl_to_bigquery',
default_args=default_args,
schedule_interval='0 2 * * *', # Daily at 2 AM
start_date=datetime(2025, 1, 1),
catchup=False
)
def extract_data(**context):
"""Extract data to JSONL"""
import json
import psycopg2
conn = psycopg2.connect("postgresql://...")
cursor = conn.cursor()
execution_date = context['execution_date']
filepath = f"/tmp/data_{execution_date.strftime('%Y%m%d')}.jsonl"
cursor.execute("SELECT * FROM users WHERE updated_at::DATE = %s", (execution_date.date(),))
with open(filepath, 'w') as f:
for row in cursor:
record = {'id': row[0], 'email': row[1], 'name': row[2]}
f.write(json.dumps(record) + '\n')
return filepath
def transform_data(**context):
"""Transform JSONL data"""
import json
filepath = context['task_instance'].xcom_pull(task_ids='extract')
output_path = filepath.replace('.jsonl', '_transformed.jsonl')
with open(filepath, 'r') as infile, open(output_path, 'w') as outfile:
for line in infile:
record = json.loads(line)
record['email'] = record['email'].lower()
record['processed_at'] = datetime.utcnow().isoformat()
outfile.write(json.dumps(record) + '\n')
return output_path
extract = PythonOperator(
task_id='extract',
python_callable=extract_data,
dag=dag
)
transform = PythonOperator(
task_id='transform',
python_callable=transform_data,
dag=dag
)
upload_to_gcs = LocalFilesystemToGCSOperator(
task_id='upload_to_gcs',
src="{{ task_instance.xcom_pull(task_ids='transform') }}",
dst="data/{{ ds }}/users.jsonl",
bucket='my-data-bucket',
dag=dag
)
load_to_bq = BigQueryInsertJobOperator(
task_id='load_to_bigquery',
configuration={
"load": {
"sourceUris": ["gs://my-data-bucket/data/{{ ds }}/users.jsonl"],
"destinationTable": {
"projectId": "my-project",
"datasetId": "analytics",
"tableId": "users"
},
"sourceFormat": "NEWLINE_DELIMITED_JSON",
"writeDisposition": "WRITE_APPEND",
"autodetect": True
}
},
dag=dag
)
extract >> transform >> upload_to_gcs >> load_to_bq
dbt (Data Build Tool)
Transforming JSON in dbt
-- models/staging/stg_events.sql
-- Parse JSONL loaded into raw table
{{
config(
materialized='incremental',
unique_key='event_id'
)
}}
SELECT
raw_data:event_id::STRING as event_id,
raw_data:user_id::NUMBER as user_id,
raw_data:event_type::STRING as event_type,
raw_data:timestamp::TIMESTAMP as event_timestamp,
raw_data:properties::VARIANT as properties,
CURRENT_TIMESTAMP() as dbt_loaded_at
FROM {{ source('raw', 'events') }}
{% if is_incremental() %}
WHERE raw_data:timestamp::TIMESTAMP > (SELECT MAX(event_timestamp) FROM {{ this }})
{% endif %}
-- models/marts/fct_user_events.sql
-- Aggregate user events
SELECT
user_id,
DATE(event_timestamp) as event_date,
event_type,
COUNT(*) as event_count,
MIN(event_timestamp) as first_event_at,
MAX(event_timestamp) as last_event_at
FROM {{ ref('stg_events') }}
GROUP BY 1, 2, 3
Apache Spark
Processing JSONL with PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("JSONLProcessor").getOrCreate()
# Read JSONL
df = spark.read.json("s3://bucket/data/*.jsonl")
# Schema inference
df.printSchema()
# Transformations
transformed_df = df \
.withColumn("created_date", to_date(col("created_at"))) \
.withColumn("email_domain", split(col("email"), "@").getItem(1)) \
.filter(col("status") == "active")
# Nested JSON operations
df_exploded = df \
.select(
col("id"),
explode(col("tags")).alias("tag")
)
# Aggregations
summary = df \
.groupBy("email_domain") \
.agg(
count("*").alias("user_count"),
avg("age").alias("avg_age")
) \
.orderBy(desc("user_count"))
# Write to JSONL (partitioned)
transformed_df.write \
.mode("overwrite") \
.partitionBy("created_date") \
.json("s3://bucket/output/")
# Write to Parquet for efficiency
transformed_df.write \
.mode("overwrite") \
.partitionBy("created_date") \
.parquet("s3://bucket/parquet/")
Incremental Loading Patterns
Timestamp-Based Incremental
# Python: Incremental extract based on timestamps
import json
from datetime import datetime, timedelta
def incremental_extract(last_sync_time):
"""Extract only records modified since last sync"""
import psycopg2
conn = psycopg2.connect("postgresql://...")
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM users
WHERE updated_at > %s
ORDER BY updated_at
""", (last_sync_time,))
filename = f"incremental_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jsonl"
with open(filename, 'w') as f:
for row in cursor:
record = {
'id': row[0],
'email': row[1],
'updated_at': row[2].isoformat()
}
f.write(json.dumps(record) + '\n')
cursor.close()
return filename
# Track last sync
with open('last_sync.txt', 'r') as f:
last_sync = datetime.fromisoformat(f.read().strip())
extract_file = incremental_extract(last_sync)
# Update last sync timestamp
with open('last_sync.txt', 'w') as f:
f.write(datetime.utcnow().isoformat())
Upsert Pattern
-- BigQuery MERGE for upserts
MERGE `project.dataset.users` T
USING (
SELECT * FROM EXTERNAL_QUERY(
"projects/project/locations/us/connections/postgres",
"SELECT * FROM users WHERE updated_at > CURRENT_DATE - 1"
)
) S
ON T.id = S.id
WHEN MATCHED THEN
UPDATE SET
email = S.email,
name = S.name,
updated_at = S.updated_at
WHEN NOT MATCHED THEN
INSERT (id, email, name, created_at, updated_at)
VALUES (S.id, S.email, S.name, S.created_at, S.updated_at);
Best Practices
Data Quality
- Validate JSON syntax before loading
- Implement schema validation
- Handle nulls and missing fields
- Deduplicate records
- Add data quality checks in pipelines
Performance
- Partition data by date for faster queries
- Compress JSONL files (gzip, snappy)
- Use columnar formats (Parquet) for analytics
- Implement incremental loads
- Cluster tables on frequently queried columns
Reliability
- Implement idempotent pipelines
- Add retry logic with exponential backoff
- Monitor pipeline failures
- Maintain data lineage
- Version your data and schemas
Cost Optimization
- Use data lifecycle policies
- Archive old data to cold storage
- Optimize query patterns
- Monitor storage costs
- Use appropriate compression