Section Project: ETL Data Pipeline CLI

Congratulations on completing the Standard Library section! This comprehensive project will test your mastery of Go's standard library by building a production-ready ETL Data Pipeline CLI.

Problem Statement

Modern data engineering requires tools that can:

  • Extract data from multiple sources
  • Transform data with filtering, mapping, and aggregation
  • Load results into various destinations
  • Handle large datasets efficiently with concurrent processing
  • Provide robust error handling and progress tracking

Real-world challenges:

  • Different data formats require format-specific parsers
  • Data quality issues need validation and error recovery
  • Performance demands concurrent/streaming processing
  • Production needs logging, metrics, and scheduling

Requirements

Functional Requirements

1. Extract Phase

  • Read CSV files with configurable delimiters
  • Parse JSON files
  • Support stdin for piped input
  • Validate schema and data types
  • Handle malformed records gracefully

2. Transform Phase

  • Filter records with condition expressions
  • Map fields with transformation functions
  • Aggregate data
  • Custom transformation pipeline
  • Data type conversions

3. Load Phase

  • Write to CSV with custom formatting
  • Write to JSON
  • Load into SQLite database
  • Support stdout for piping
  • Atomic writes with rollback

4. CLI Interface

  • Subcommands: extract, transform, load, pipeline
  • Configuration file support
  • Progress bars for long operations
  • Verbose logging mode
  • Dry-run option

5. Advanced Features

  • Concurrent processing with worker pools
  • Streaming mode for large files
  • Scheduled pipelines
  • Pipeline state persistence
  • Error recovery and retry logic

Non-Functional Requirements

  • Performance: Process 100K records in <5 seconds
  • Memory: Stream processing for files >100MB
  • Reliability: Transaction support, error recovery
  • Usability: Clear error messages, help documentation
  • Maintainability: Clean architecture, comprehensive tests

Constraints

  • Use only Go standard library for core ETL logic
  • Cobra allowed for CLI framework
  • SQLite for database functionality
  • No external transformation libraries
  • Must handle files up to 10GB
  • Support concurrent processing with configurable workers

Design Considerations

High-Level Architecture

┌─────────────┐      ┌─────────────┐      ┌─────────────┐
│   EXTRACT   │─────▶│  TRANSFORM  │─────▶│    LOAD     │
└─────────────┘      └─────────────┘      └─────────────┘
      │                     │                     │
   CSV/JSON            Filter/Map            CSV/JSON/DB
   Reader              Aggregate             Writer
      │                     │                     │
   Schema              Validation            Transaction
   Validation          Type Conversion       Management

Key Design Principles:

  1. Modularity: Separate extract, transform, and load phases with clear interfaces
  2. Concurrency: Producer-consumer pattern with buffered channels and worker pools
  3. Streaming: Process records one at a time to minimize memory usage
  4. Error Handling: Graceful degradation with configurable skip-on-error behavior
  5. Extensibility: Interface-based design allows adding new extractors/loaders

Data Flow:

  • Extract: Read source → Validate schema → Emit records to channel
  • Transform: Receive records → Apply filters/mappings → Emit transformed records
  • Load: Receive records → Batch for efficiency → Write to destination with transactions

Concurrency Model:

  • Buffered channels between phases to prevent blocking
  • Worker pool for parallel transformations
  • Context-based cancellation for graceful shutdown
  • Batch writes to optimize database performance

Acceptance Criteria

Your ETL pipeline is complete when it meets these criteria:

Core Functionality:

  • ✓ Successfully extracts data from CSV and JSON files
  • ✓ Validates data against configurable schemas
  • ✓ Applies filters using expression syntax
  • ✓ Performs field mappings and transformations
  • ✓ Loads data to CSV, JSON, and SQLite database
  • ✓ Handles malformed data with skip-on-error option

Performance & Reliability:

  • ✓ Processes 100K records in under 5 seconds
  • ✓ Streams files larger than available memory
  • ✓ Implements transaction support with rollback on errors
  • ✓ Recovers gracefully from network/file interruptions
  • ✓ Provides accurate progress reporting

CLI & Usability:

  • ✓ Implements all required subcommands
  • ✓ Supports configuration files
  • ✓ Provides helpful error messages with line numbers for data issues
  • ✓ Includes comprehensive help documentation
  • ✓ Handles signals for graceful shutdown

Code Quality:

  • ✓ Test coverage >70%
  • ✓ Passes go vet and golint without errors
  • ✓ Follows interface-based design for extensibility
  • ✓ Includes README with usage examples
  • ✓ Proper error wrapping with context

Advanced Features:

  • ✓ Concurrent processing with configurable worker pool
  • ✓ Aggregation functions
  • ✓ Scheduled pipeline execution

Usage Examples

Basic Pipeline: CSV to JSON

1# Extract from CSV, transform, load to JSON
2etl pipeline \
3  --extract-type csv \
4  --extract-file data/sales.csv \
5  --filter "amount > 100" \
6  --transform "revenue = amount * quantity" \
7  --load-type json \
8  --load-file output/sales.json

Using Configuration File

1# Run pipeline from config
2etl pipeline --config configs/pipeline.json

Configuration File:

 1{
 2  "pipeline": {
 3    "name": "sales-etl",
 4    "description": "Process daily sales data"
 5  },
 6  "extract": {
 7    "type": "csv",
 8    "source": "data/sales.csv",
 9    "options": {
10      "delimiter": ",",
11      "has_header": true,
12      "skip_rows": 1
13    },
14    "schema": {
15      "date": "string",
16      "product": "string",
17      "quantity": "int",
18      "price": "float"
19    }
20  },
21  "transform": {
22    "filters": [
23      "quantity > 0",
24      "price > 0"
25    ],
26    "mappings": {
27      "revenue": "quantity * price",
28      "product_upper": "upper(product)"
29    },
30    "aggregations": [
31      {
32        "operation": "sum",
33        "field": "revenue",
34        "group_by": ["product"]
35      }
36    ]
37  },
38  "load": {
39    "type": "database",
40    "destination": "data/sales.db",
41    "options": {
42      "table": "sales_summary",
43      "create_if_not_exists": true,
44      "batch_size": 1000
45    }
46  },
47  "execution": {
48    "concurrent": true,
49    "workers": 4,
50    "buffer_size": 1000,
51    "progress": true
52  }
53}

Individual Commands

Extract Only:

1etl extract --type csv --file sales.csv --output records.json

Transform Only:

1etl transform \
2  --input records.json \
3  --filter "status == 'active'" \
4  --map "full_name = first_name + ' ' + last_name" \
5  --output transformed.json

Load Only:

1etl load --type database --file transformed.json --db sales.db --table customers

Scheduled Pipeline

1# Run pipeline every day at 2 AM
2etl schedule \
3  --config configs/pipeline.json \
4  --cron "0 2 * * *" \
5  --daemon

Streaming Large Files

1# Process 10GB CSV with streaming
2etl pipeline \
3  --config pipeline.json \
4  --streaming \
5  --chunk-size 10000

Key Takeaways

This project demonstrates Standard Library mastery:

1. File I/O & Encoding

  • encoding/csv for CSV parsing and writing
  • encoding/json for JSON processing
  • io package for streaming and buffering
  • File handling with proper error recovery

2. Database Operations

  • database/sql with SQLite driver
  • Prepared statements for performance
  • Transaction management
  • Batch operations

3. CLI Development

  • Command-line argument parsing
  • Subcommands and flags
  • Configuration file support
  • User-friendly output

4. Concurrency Patterns

  • Producer-consumer with channels
  • Worker pools for parallel processing
  • Context for cancellation
  • Buffered channels for performance

5. Error Handling

  • Proper error wrapping with fmt.Errorf
  • Error recovery and retry logic
  • Graceful degradation
  • User-friendly error messages

6. Testing & Quality

  • Unit tests for each component
  • Integration tests for pipelines
  • Table-driven tests
  • Mocking for external dependencies

Production-Ready Patterns:

  • Clean architecture with clear separation
  • Interface-based design for flexibility
  • Configuration-driven behavior
  • Logging and observability
  • Signal handling for graceful shutdown

Next Steps

Enhancements:

  1. Add more data sources
  2. Implement advanced transformations
  3. Add data quality checks and validation rules
  4. Build web UI for pipeline management
  5. Add monitoring and alerting
  6. Implement distributed processing

Learning Path:

  • Section 3: Advanced Topics
  • Section 4: Production Engineering
  • Section 5: Practice with intermediate/advanced exercises
  • Section 6: Build applied projects
  • Section 7: Tackle capstone projects

Download Complete Solution

Full Project Solution

Download the complete, production-ready ETL pipeline implementation with tests, documentation, and sample data.

Download Solution ZIP

Package Contents:

  • Complete source code
  • Comprehensive test suite
  • Sample data files
  • Configuration examples
  • Makefile for building and testing
  • Docker support
  • README with detailed implementation guide, architecture breakdown, and setup instructions

Note: The README in the solution package contains the complete implementation guide, detailed architecture documentation, project structure breakdown, and step-by-step development instructions.


Congratulations! You've completed the Standard Library section by building a production-ready ETL pipeline. This project showcases your ability to leverage Go's standard library to solve real-world data engineering challenges.

Continue your journey with advanced topics and production patterns in the next sections!