Skip to main content

JSONL for Log Processing

Why structured logging with JSONL has become the industry standard for modern application monitoring and observability

Why JSONL for Logging?

Structured vs Unstructured

Traditional plain-text logs are hard to parse and query. JSONL provides structure while maintaining the simplicity of line-based logging - perfect for real-time streaming and analysis.

Plain Text:

2025-01-15 10:30:45 ERROR Database connection failed

JSONL:

{"timestamp":"2025-01-15T10:30:45Z","level":"error","msg":"Database connection failed","db":"users","host":"prod-01"}

Industry Adoption

Major logging platforms and tools have standardized on JSONL:

  • Elasticsearch: Native JSONL ingestion
  • Splunk: JSON event format support
  • Datadog: JSON structured logs
  • Fluentd/Logstash: JSONL as default format
  • CloudWatch: Structured logging support

Application Logging

Node.js with Winston

Winston is the most popular logging library for Node.js with built-in JSONL support.

const winston = require('winston');

// Configure Winston for JSONL output
const logger = winston.createLogger({
  level: 'info',
  format: winston.format.json(),
  transports: [
    // Write all logs to application.jsonl
    new winston.transports.File({
      filename: 'application.jsonl',
      format: winston.format.combine(
        winston.format.timestamp(),
        winston.format.json()
      )
    }),
    // Also log to console for development
    new winston.transports.Console({
      format: winston.format.simple()
    })
  ]
});

// Add request context middleware (Express)
const addContext = (req, res, next) => {
  req.logger = logger.child({
    request_id: req.id,
    user_id: req.user?.id,
    ip: req.ip,
    user_agent: req.get('user-agent')
  });
  next();
};

// Log examples
logger.info('Server started', { port: 3000, env: 'production' });

logger.error('Database query failed', {
  query: 'SELECT * FROM users',
  error: err.message,
  stack: err.stack,
  duration_ms: 1250
});

logger.warn('High memory usage', {
  memory_used_mb: 512,
  memory_limit_mb: 1024,
  threshold_percent: 80
});

// With request context
app.get('/api/users', addContext, async (req, res) => {
  req.logger.info('Fetching users', {
    page: req.query.page,
    limit: req.query.limit
  });
  // ... handler code
});

Pro Tip: Use child loggers to automatically include context like request IDs in all logs within a request.

Python with Structlog

Structlog makes structured logging in Python simple and performant.

import structlog
import logging
import sys

# Configure structlog for JSONL output
structlog.configure(
    processors=[
        structlog.stdlib.filter_by_level,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.add_log_level,
        structlog.stdlib.PositionalArgumentsFormatter(),
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.UnicodeDecoder(),
        structlog.processors.JSONRenderer()
    ],
    context_class=dict,
    logger_factory=structlog.stdlib.LoggerFactory(),
    cache_logger_on_first_use=True,
)

# Create logger
logger = structlog.get_logger()

# Basic logging
logger.info("server_started", port=8000, workers=4)

logger.error(
    "database_connection_failed",
    host="db.example.com",
    port=5432,
    retry_count=3,
    exc_info=True
)

# Add context for multiple log entries
log_with_context = logger.bind(
    user_id=12345,
    request_id="req_abc123",
    session_id="sess_xyz789"
)

log_with_context.info("user_action", action="login", ip="192.168.1.100")
log_with_context.info("user_action", action="view_profile")

# Django/Flask middleware example
class LoggingMiddleware:
    def __init__(self, app):
        self.app = app

    def __call__(self, environ, start_response):
        request_id = environ.get('HTTP_X_REQUEST_ID', 'unknown')
        logger = structlog.get_logger().bind(
            request_id=request_id,
            path=environ.get('PATH_INFO'),
            method=environ.get('REQUEST_METHOD')
        )

        logger.info("request_started")
        try:
            return self.app(environ, start_response)
        finally:
            logger.info("request_completed")

Go with Zerolog

Zerolog is a zero-allocation JSON logger for Go with excellent performance.

package main

import (
    "os"
    "github.com/rs/zerolog"
    "github.com/rs/zerolog/log"
)

func main() {
    // Configure logger for JSONL file output
    file, err := os.OpenFile(
        "application.jsonl",
        os.O_APPEND|os.O_CREATE|os.O_WRONLY,
        0644,
    )
    if err != nil {
        panic(err)
    }
    defer file.Close()

    log.Logger = zerolog.New(file).With().Timestamp().Caller().Logger()

    // Basic logging
    log.Info().
        Str("event", "server_start").
        Int("port", 8080).
        Str("env", "production").
        Msg("Server starting")

    // Error logging with context
    err = connectDatabase()
    if err != nil {
        log.Error().
            Err(err).
            Str("host", "db.example.com").
            Int("port", 5432).
            Int("retry", 3).
            Msg("Database connection failed")
    }

    // HTTP middleware example
    func loggingMiddleware(next http.Handler) http.Handler {
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            start := time.Now()

            // Create logger with request context
            logger := log.With().
                Str("request_id", r.Header.Get("X-Request-ID")).
                Str("method", r.Method).
                Str("path", r.URL.Path).
                Str("remote_addr", r.RemoteAddr).
                Logger()

            logger.Info().Msg("request_started")

            // Wrap response writer to capture status
            wrapped := &statusWriter{ResponseWriter: w}
            next.ServeHTTP(wrapped, r)

            logger.Info().
                Int("status", wrapped.status).
                Dur("duration_ms", time.Since(start)).
                Msg("request_completed")
        })
    }
}

Java with Logback

Configure Logback for JSON structured logging in Spring Boot applications.

<!-- logback-spring.xml -->
<configuration>
    <appender name="FILE" class="ch.qos.logback.core.FileAppender">
        <file>application.jsonl</file>
        <encoder class="net.logstash.logback.encoder.LogstashEncoder">
            <includeContext>false</includeContext>
            <timestampPattern>yyyy-MM-dd'T'HH:mm:ss.SSS'Z'</timestampPattern>
        </encoder>
    </appender>

    <root level="INFO">
        <appender-ref ref="FILE" />
    </root>
</configuration>
// Java application code
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class UserService {
    private static final Logger logger = LoggerFactory.getLogger(UserService.class);

    public User getUser(Long userId) {
        // Add context to MDC (Mapped Diagnostic Context)
        MDC.put("user_id", userId.toString());
        MDC.put("operation", "get_user");

        try {
            logger.info("Fetching user from database");
            User user = userRepository.findById(userId);

            if (user == null) {
                logger.warn("User not found");
                return null;
            }

            logger.info("User retrieved successfully");
            return user;
        } catch (Exception e) {
            logger.error("Error fetching user", e);
            throw e;
        } finally {
            MDC.clear();
        }
    }
}

Rust with Tracing

use tracing::{info, error, warn};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

fn main() {
    // Configure JSON formatter
    let file = std::fs::File::create("application.jsonl")
        .expect("Failed to create log file");

    tracing_subscriber::registry()
        .with(
            tracing_subscriber::fmt::layer()
                .json()
                .with_writer(std::sync::Arc::new(file))
        )
        .init();

    info!(port = 8080, env = "production", "Server started");

    // Structured logging with context
    let user_id = 12345;
    let request_id = "req_abc123";

    info!(
        user_id = user_id,
        request_id = request_id,
        action = "login",
        "User logged in"
    );

    // Error logging
    if let Err(e) = connect_database() {
        error!(
            error = %e,
            host = "db.example.com",
            port = 5432,
            "Database connection failed"
        );
    }
}

ELK Stack Integration

Elasticsearch Bulk API

Ingest JSONL logs directly into Elasticsearch using the Bulk API for high-throughput indexing.

# Bulk API expects NDJSON (JSONL) format
# Each document needs an action line followed by the document

curl -X POST "localhost:9200/_bulk" \
  -H "Content-Type: application/x-ndjson" \
  --data-binary @logs.jsonl

# logs.jsonl format:
{"index": {"_index": "logs-2025.01.15"}}
{"timestamp": "2025-01-15T10:30:00Z", "level": "info", "message": "Server started", "service": "api"}
{"index": {"_index": "logs-2025.01.15"}}
{"timestamp": "2025-01-15T10:30:05Z", "level": "error", "message": "Database timeout", "service": "api"}
{"index": {"_index": "logs-2025.01.15"}}
{"timestamp": "2025-01-15T10:30:10Z", "level": "warn", "message": "High memory usage", "service": "worker"}

Logstash Pipeline

Process JSONL logs with Logstash for filtering, enrichment, and routing.

# logstash.conf
input {
  file {
    path => "/var/log/application.jsonl"
    codec => "json_lines"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}

filter {
  # Parse timestamp
  date {
    match => ["timestamp", "ISO8601"]
    target => "@timestamp"
  }

  # Add hostname
  mutate {
    add_field => { "host" => "%{HOSTNAME}" }
  }

  # Extract error details
  if [level] == "error" {
    mutate {
      add_tag => ["error"]
      add_field => { "alert" => "true" }
    }
  }

  # GeoIP lookup for IP addresses
  if [ip] {
    geoip {
      source => "ip"
      target => "geoip"
    }
  }

  # User-Agent parsing
  if [user_agent] {
    useragent {
      source => "user_agent"
      target => "ua"
    }
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "logs-%{+YYYY.MM.dd}"
  }

  # Also output errors to separate index
  if [level] == "error" {
    elasticsearch {
      hosts => ["localhost:9200"]
      index => "errors-%{+YYYY.MM.dd}"
    }
  }
}

Kibana Queries

Once logs are in Elasticsearch, query them with Kibana's powerful search.

# Find all errors in the last hour
level:error AND @timestamp:[now-1h TO now]

# Search for specific user activity
user_id:12345 AND action:(login OR logout)

# Find slow database queries
service:database AND duration_ms:>1000

# Search across multiple fields
message:"connection failed" OR error.message:"connection failed"

# Complex boolean query
(level:error OR level:warn) AND service:api AND NOT status:404

# Aggregation query (in Kibana Dev Tools)
GET /logs-*/_search
{
  "size": 0,
  "aggs": {
    "errors_by_service": {
      "terms": {
        "field": "service.keyword",
        "size": 10
      },
      "aggs": {
        "error_count": {
          "filter": {
            "term": { "level": "error" }
          }
        }
      }
    }
  }
}

Filebeat Configuration

Ship JSONL logs to Elasticsearch with Filebeat for lightweight, efficient log forwarding.

# filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/application.jsonl
  json.keys_under_root: true
  json.add_error_key: true
  json.message_key: message

# Optional: Add fields
  fields:
    environment: production
    datacenter: us-east-1
  fields_under_root: true

# Multiline support for stack traces
  multiline.type: pattern
  multiline.pattern: '^\s'
  multiline.negate: false
  multiline.match: after

# Output to Elasticsearch
output.elasticsearch:
  hosts: ["localhost:9200"]
  index: "logs-%{+yyyy.MM.dd}"

# Or output to Logstash
output.logstash:
  hosts: ["localhost:5044"]

# Enable modules
filebeat.modules:
- module: nginx
  access:
    enabled: true
    var.paths: ["/var/log/nginx/access.jsonl"]

Fluentd Log Aggregation

Fluentd Configuration

# fluent.conf
<source>
  @type tail
  path /var/log/application.jsonl
  pos_file /var/log/td-agent/application.pos
  tag application.logs

  <parse>
    @type json
    time_key timestamp
    time_format %Y-%m-%dT%H:%M:%S%z
  </parse>
</source>

# Filter: Add hostname and enrich data
<filter application.logs>
  @type record_transformer
  <record>
    hostname ${hostname}
    tag ${tag}
    env production
  </record>
</filter>

# Filter: Parse user agent
<filter application.logs>
  @type parser
  key_name user_agent
  reserve_data true
  <parse>
    @type user_agent
  </parse>
</filter>

# Output to Elasticsearch
<match application.logs>
  @type elasticsearch
  host localhost
  port 9200
  index_name logs-%Y.%m.%d
  type_name _doc

  <buffer>
    @type file
    path /var/log/td-agent/buffer/logs
    flush_interval 10s
    chunk_limit_size 5M
  </buffer>
</match>

# Output to S3 for archival
<match application.logs>
  @type s3
  s3_bucket my-logs-bucket
  s3_region us-east-1
  path logs/%Y/%m/%d/

  <buffer time>
    @type file
    path /var/log/td-agent/buffer/s3
    timekey 3600  # 1 hour
    timekey_wait 10m
    chunk_limit_size 256m
  </buffer>

  <format>
    @type json
  </format>
</match>

Application Integration

// Node.js: fluent-logger
const logger = require('fluent-logger');

logger.configure('application', {
  host: 'localhost',
  port: 24224,
  timeout: 3.0
});

// Send structured logs
logger.emit('user.action', {
  user_id: 12345,
  action: 'login',
  ip: '192.168.1.100',
  timestamp: new Date().toISOString()
});

logger.emit('database.query', {
  query: 'SELECT * FROM users',
  duration_ms: 45,
  rows: 100
});

// Python: fluent-logger-python
from fluent import sender
from fluent import event

logger = sender.FluentSender('application', host='localhost', port=24224)

logger.emit('user.action', {
    'user_id': 12345,
    'action': 'login',
    'ip': '192.168.1.100'
})

logger.emit('database.query', {
    'query': 'SELECT * FROM users',
    'duration_ms': 45,
    'rows': 100
})

Log Aggregation Patterns

Centralized Logging Architecture

Collect logs from multiple services into a central location for analysis.

# Architecture:
# App Servers -> Log Shipper -> Message Queue -> Processor -> Storage

# Example flow:
1. Applications write JSONL to local files
2. Filebeat/Fluentd ships logs to Kafka
3. Logstash consumes from Kafka and processes
4. Elasticsearch indexes for searching
5. S3 archives for long-term storage

# Sample log aggregation script
import json
import boto3
from kafka import KafkaConsumer
from elasticsearch import Elasticsearch

# Consume from Kafka
consumer = KafkaConsumer(
    'application-logs',
    bootstrap_servers=['kafka:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

es = Elasticsearch(['http://elasticsearch:9200'])
s3 = boto3.client('s3')

for message in consumer:
    log_entry = message.value

    # Index in Elasticsearch for searching
    es.index(
        index=f"logs-{log_entry['timestamp'][:10]}",
        document=log_entry
    )

    # Archive to S3
    if log_entry['level'] == 'error':
        s3.put_object(
            Bucket='error-logs',
            Key=f"errors/{log_entry['timestamp']}.json",
            Body=json.dumps(log_entry)
        )

Multi-Tenant Log Segregation

# Separate logs by tenant/customer for SaaS applications

{"tenant_id": "acme_corp", "timestamp": "2025-01-15T10:00:00Z", "user": "[email protected]", "action": "login"}
{"tenant_id": "widgets_inc", "timestamp": "2025-01-15T10:00:01Z", "user": "[email protected]", "action": "purchase"}

# Logstash routing by tenant
filter {
  if [tenant_id] {
    mutate {
      add_field => { "[@metadata][target_index]" => "logs-%{tenant_id}-%{+YYYY.MM.dd}" }
    }
  }
}

output {
  elasticsearch {
    index => "%{[@metadata][target_index]}"
  }
}

# Query specific tenant logs
GET /logs-acme_corp-*/_search
{
  "query": {
    "match": {
      "user": "[email protected]"
    }
  }
}

Time-Series Log Retention

# Implement log retention policies with time-based indices

# Elasticsearch ILM (Index Lifecycle Management) policy
PUT _ilm/policy/logs-policy
{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "rollover": {
            "max_size": "50GB",
            "max_age": "1d"
          }
        }
      },
      "warm": {
        "min_age": "7d",
        "actions": {
          "shrink": {
            "number_of_shards": 1
          },
          "forcemerge": {
            "max_num_segments": 1
          }
        }
      },
      "cold": {
        "min_age": "30d",
        "actions": {
          "freeze": {}
        }
      },
      "delete": {
        "min_age": "90d",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}

# Python script to archive old logs to S3
import gzip
import json
from datetime import datetime, timedelta

def archive_old_logs(days_old=30):
    cutoff_date = datetime.now() - timedelta(days=days_old)

    with open('application.jsonl', 'r') as infile:
        current_logs = []
        archive_logs = []

        for line in infile:
            log = json.loads(line)
            log_date = datetime.fromisoformat(log['timestamp'])

            if log_date < cutoff_date:
                archive_logs.append(line)
            else:
                current_logs.append(line)

    # Write current logs back
    with open('application.jsonl', 'w') as outfile:
        outfile.writelines(current_logs)

    # Compress and archive old logs
    archive_name = f"logs-{cutoff_date.strftime('%Y-%m')}.jsonl.gz"
    with gzip.open(archive_name, 'wt') as zipfile:
        zipfile.writelines(archive_logs)

Real-Time Log Processing

Streaming with Python

import json
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class LogFileHandler(FileSystemEventHandler):
    def __init__(self, filepath):
        self.filepath = filepath
        self.file = open(filepath, 'r')
        # Move to end of file
        self.file.seek(0, 2)

    def on_modified(self, event):
        if event.src_path == self.filepath:
            # Read new lines
            for line in self.file:
                try:
                    log_entry = json.loads(line)
                    self.process_log(log_entry)
                except json.JSONDecodeError:
                    pass

    def process_log(self, log_entry):
        # Real-time processing
        if log_entry.get('level') == 'error':
            self.alert_on_error(log_entry)

        if log_entry.get('duration_ms', 0) > 1000:
            self.alert_slow_request(log_entry)

    def alert_on_error(self, log):
        print(f"ERROR ALERT: {log['message']}")
        # Send to Slack, PagerDuty, etc.

    def alert_slow_request(self, log):
        print(f"SLOW REQUEST: {log.get('path')} took {log['duration_ms']}ms")

# Watch log file
handler = LogFileHandler('/var/log/application.jsonl')
observer = Observer()
observer.schedule(handler, path='/var/log', recursive=False)
observer.start()

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    observer.stop()
observer.join()

Real-Time Alerting

import json
from collections import defaultdict
from datetime import datetime, timedelta

class LogAlerter:
    def __init__(self):
        self.error_counts = defaultdict(int)
        self.last_reset = datetime.now()

    def process_log_stream(self, log_file):
        with open(log_file, 'r') as f:
            # Follow file like 'tail -f'
            f.seek(0, 2)
            while True:
                line = f.readline()
                if not line:
                    time.sleep(0.1)
                    continue

                try:
                    log = json.loads(line)
                    self.analyze(log)
                except json.JSONDecodeError:
                    continue

    def analyze(self, log):
        # Reset counters every minute
        if datetime.now() - self.last_reset > timedelta(minutes=1):
            self.check_thresholds()
            self.error_counts.clear()
            self.last_reset = datetime.now()

        # Count errors by service
        if log.get('level') == 'error':
            service = log.get('service', 'unknown')
            self.error_counts[service] += 1

        # Immediate alerts
        if 'out of memory' in log.get('message', '').lower():
            self.send_alert('CRITICAL', 'Out of memory error detected', log)

        if log.get('status_code') == 500:
            self.send_alert('HIGH', '500 error detected', log)

    def check_thresholds(self):
        for service, count in self.error_counts.items():
            if count > 100:  # More than 100 errors per minute
                self.send_alert(
                    'HIGH',
                    f'{service} error rate exceeded: {count}/min',
                    {'service': service, 'count': count}
                )

    def send_alert(self, severity, message, context):
        # Integrate with alerting systems
        print(f"[{severity}] {message}")
        print(f"Context: {json.dumps(context, indent=2)}")
        # Send to Slack, PagerDuty, Email, etc.

Cloud Logging Services

AWS CloudWatch Logs

import json
import boto3
from datetime import datetime

logs = boto3.client('logs')
log_group = '/aws/application/api'
log_stream = f"instance-{datetime.now().strftime('%Y-%m-%d')}"

# Create log stream if doesn't exist
try:
    logs.create_log_stream(logGroupName=log_group, logStreamName=log_stream)
except logs.exceptions.ResourceAlreadyExistsException:
    pass

# Send structured logs
events = []
with open('application.jsonl', 'r') as f:
    for line in f:
        log = json.loads(line)
        events.append({
            'timestamp': int(datetime.fromisoformat(log['timestamp']).timestamp() * 1000),
            'message': json.dumps(log)
        })

# Batch upload (max 10,000 events or 1 MB)
logs.put_log_events(
    logGroupName=log_group,
    logStreamName=log_stream,
    logEvents=events
)

# Query with CloudWatch Insights
query = '''
fields @timestamp, level, message, duration_ms
| filter level = "error"
| sort @timestamp desc
| limit 100
'''

response = logs.start_query(
    logGroupName=log_group,
    startTime=int((datetime.now() - timedelta(hours=1)).timestamp()),
    endTime=int(datetime.now().timestamp()),
    queryString=query
)

Google Cloud Logging

from google.cloud import logging
import json

client = logging.Client()
logger = client.logger('application-logs')

# Read and upload JSONL logs
with open('application.jsonl', 'r') as f:
    for line in f:
        log_entry = json.loads(line)

        # Map severity
        severity_map = {
            'debug': 'DEBUG',
            'info': 'INFO',
            'warn': 'WARNING',
            'error': 'ERROR',
            'fatal': 'CRITICAL'
        }

        logger.log_struct(
            log_entry,
            severity=severity_map.get(log_entry.get('level'), 'DEFAULT')
        )

# Query logs
from google.cloud.logging import DESCENDING

filter_str = '''
resource.type="gce_instance"
severity="ERROR"
timestamp>="2025-01-15T00:00:00Z"
'''

for entry in client.list_entries(filter_=filter_str, order_by=DESCENDING):
    print(f"{entry.timestamp}: {entry.payload}")

Azure Monitor Logs

from azure.monitor.ingestion import LogsIngestionClient
from azure.identity import DefaultAzureCredential
import json

credential = DefaultAzureCredential()
endpoint = "https://<data-collection-endpoint>.ingest.monitor.azure.com"
rule_id = "dcr-xxxxxxxxxxxxx"
stream_name = "Custom-ApplicationLogs"

client = LogsIngestionClient(endpoint=endpoint, credential=credential)

# Read JSONL and upload
logs_data = []
with open('application.jsonl', 'r') as f:
    for line in f:
        logs_data.append(json.loads(line))

# Upload in batches
client.upload(rule_id=rule_id, stream_name=stream_name, logs=logs_data)

# Query with KQL (Kusto Query Language)
from azure.monitor.query import LogsQueryClient

query_client = LogsQueryClient(credential)

query = """
ApplicationLogs
| where TimeGenerated > ago(1h)
| where Level == "error"
| project TimeGenerated, Message, Service, Host
| order by TimeGenerated desc
| limit 100
"""

response = query_client.query_workspace(
    workspace_id="<workspace-id>",
    query=query,
    timespan=timedelta(hours=1)
)

Best Practices

Log Structure

  • Always include timestamp, level, and message fields
  • Add request_id for tracing requests across services
  • Include service name and hostname for distributed systems
  • Use consistent field names across all services
  • Include environment (dev/staging/prod) in logs

Performance

  • Use async/non-blocking logging to avoid slowing down applications
  • Implement log sampling for high-volume endpoints
  • Compress logs (gzip) for storage efficiency
  • Use log levels appropriately (don't log everything as INFO)
  • Rotate log files automatically to prevent disk space issues

Security

  • Never log sensitive data (passwords, tokens, credit cards)
  • Redact PII (personally identifiable information) before logging
  • Encrypt log files at rest and in transit
  • Implement access controls for log viewing
  • Set up audit trails for who accessed logs

Storage & Retention

  • Define retention policies based on compliance requirements
  • Hot storage: Recent logs (7-30 days) for quick access
  • Cold storage: Archive older logs to S3/Glacier
  • Delete logs according to GDPR/compliance after retention period
  • Monitor storage costs and optimize accordingly