
Spark MCP Optimizer
STDIOApache Spark code optimization server using MCP for performance enhancement
Apache Spark code optimization server using MCP for performance enhancement
This project implements a Model Context Protocol (MCP) server and client for optimizing Apache Spark code. The system provides intelligent code optimization suggestions and performance analysis through a client-server architecture.
graph TB subgraph Input A[Input PySpark Code] --> |spark_code_input.py| B[run_client.py] end subgraph MCP Client B --> |Async HTTP| C[SparkMCPClient] C --> |Protocol Handler| D[Tools Interface] end subgraph MCP Server E[run_server.py] --> F[SparkMCPServer] F --> |Tool Registry| G[optimize_spark_code] F --> |Tool Registry| H[analyze_performance] F --> |Protocol Handler| I[Claude AI Integration] end subgraph Resources I --> |Code Analysis| J[Claude AI Model] J --> |Optimization| K[Optimized Code Generation] K --> |Validation| L[PySpark Runtime] end subgraph Output M[optimized_spark_code.py] N[performance_analysis.md] end D --> |MCP Request| F G --> |Generate| M H --> |Generate| N classDef client fill:#e1f5fe,stroke:#01579b classDef server fill:#f3e5f5,stroke:#4a148c classDef resource fill:#e8f5e9,stroke:#1b5e20 classDef output fill:#fff3e0,stroke:#e65100 class A,B,C,D client class E,F,G,H,I server class J,K,L resource class M,N,O output
Input Layer
spark_code_input.py
: Source PySpark code for optimizationrun_client.py
: Client startup and configurationMCP Client Layer
MCP Server Layer
run_server.py
: Server initializationResource Layer
Output Layer
optimized_spark_code.py
: Optimized codeperformance_analysis.md
: Detailed analysisThis workflow illustrates:
This project follows the Model Context Protocol architecture for standardized AI model interactions:
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ │ │ MCP Server │ │ Resources │
│ MCP Client │ │ (SparkMCPServer)│ │ │
│ (SparkMCPClient) │ │ │ │ ┌──────────────┐ │
│ │ │ ┌─────────┐ │ │ │ Claude AI │ │
│ ┌─────────┐ │ │ │ Tools │ │ <──> │ │ Model │ │
│ │ Tools │ │ │ │Registry │ │ │ └──────────────┘ │
│ │Interface│ │ <──> │ └─────────┘ │ │ │
│ └─────────┘ │ │ ┌─────────┐ │ │ ┌──────────────┐ │
│ │ │ │Protocol │ │ │ │ PySpark │ │
│ │ │ │Handler │ │ │ │ Runtime │ │
│ │ │ └─────────┘ │ │ └──────────────┘ │
└──────────────────┘ └──────────────────┘ └──────────────────┘
│ │ │
│ │ │
v v v
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Available │ │ Registered │ │ External │
│ Tools │ │ Tools │ │ Resources │
├──────────────┤ ├──────────────┤ ├──────────────┤
│optimize_code │ │optimize_code │ │ Claude API │
│analyze_perf │ │analyze_perf │ │ Spark Engine │
└──────────────┘ └──────────────┘ └──────────────┘
MCP Client
MCP Server
Resources
sequenceDiagram participant U as User participant C as MCP Client participant S as MCP Server participant AI as Claude AI participant P as PySpark Runtime U->>C: Submit Spark Code C->>S: Send Optimization Request S->>AI: Analyze Code AI-->>S: Optimization Suggestions S->>C: Return Optimized Code C->>P: Run Original Code C->>P: Run Optimized Code P-->>C: Execution Results C->>C: Generate Analysis C-->>U: Final Report
Code Submission
v1/input/spark_code_input.py
Optimization Process
Code Generation
v1/output/optimized_spark_code.py
Performance Analysis
Results Generation
v1/output/performance_analysis.md
pip install -r requirements.txt
Add your Spark code to optimize in input/spark_code_input.py
Start the MCP server:
python v1/run_server.py
python v1/run_client.py
This will generate two files:
output/optimized_spark_example.py
: The optimized Spark code with detailed optimization commentsoutput/performance_analysis.md
: Comprehensive performance analysispython v1/run_optimized.py
This will:
ai-mcp/
├── input/
│ └── spark_code_input.py # Original Spark code to optimize
├── output/
│ ├── optimized_spark_example.py # Generated optimized code
│ └── performance_analysis.md # Detailed performance comparison
├── spark_mcp/
│ ├── client.py # MCP client implementation
│ └── server.py # MCP server implementation
├── run_client.py # Client script to optimize code
├── run_server.py # Server startup script
└── run_optimized.py # Script to run and compare code versions
The Model Context Protocol (MCP) provides several key advantages for Spark code optimization:
Aspect | Direct Claude AI Call | MCP Server |
---|---|---|
Integration | • Custom integration per team • Manual response handling • Duplicate implementations | • Pre-built client libraries • Automated workflows • Unified interfaces |
Infrastructure | • No built-in validation • No result persistence • Manual tracking | • Automatic validation • Result persistence • Version control |
Context | • Basic code suggestions • No execution context • Limited optimization scope | • Context-aware optimization • Full execution history • Comprehensive improvements |
Validation | • Manual testing required • No performance metrics • Uncertain outcomes | • Automated testing • Performance metrics • Validated results |
Workflow | • Ad-hoc process • No standardization • Manual intervention needed | • Structured process • Standard protocols • Automated pipeline |
Approach | Code Example | Benefits |
---|---|---|
Traditional | client = anthropic.Client(api_key) response = client.messages.create(...) | • Complex setup • Custom error handling • Tight coupling |
MCP | client = SparkMCPClient() result = await client.optimize_spark_code(code) | • Simple interface • Built-in validation • Loose coupling |
Approach | Code Example | Benefits |
---|---|---|
Traditional | class SparkOptimizer: def register_tool(self, name, func): self.tools[name] = func | • Manual registration • No validation • Complex maintenance |
MCP | @register_tool("optimize_spark_code") async def optimize_spark_code(code: str): | • Auto-discovery • Type checking • Easy extension |
Approach | Code Example | Benefits |
---|---|---|
Traditional | def __init__(self): self.claude = init_claude() self.spark = init_spark() | • Manual orchestration • Manual cleanup • Error-prone |
MCP | @requires_resources(["claude_ai", "spark"]) async def optimize_spark_code(code: str): | • Auto-coordination • Lifecycle management • Error handling |
Approach | Code Example | Benefits |
---|---|---|
Traditional | {"type": "request", "payload": {"code": code}} | • Custom format • Manual validation • Custom debugging |
MCP | {"method": "tools/call", "params": {"name": "optimize_code"}} | • Standard format • Auto-validation • Easy debugging |
You can also use the client programmatically:
from spark_mcp.client import SparkMCPClient async def main(): # Connect to the MCP server client = SparkMCPClient() await client.connect() # Your Spark code to optimize spark_code = ''' # Your PySpark code here ''' # Get optimized code with performance analysis optimized_code = await client.optimize_spark_code( code=spark_code, optimization_level="advanced", save_to_file=True # Save to output/optimized_spark_example.py ) # Analyze performance differences analysis = await client.analyze_performance( original_code=spark_code, optimized_code=optimized_code, save_to_file=True # Save to output/performance_analysis.md ) # Run both versions and compare # You can use the run_optimized.py script or implement your own comparison await client.close() # Analyze performance performance = await client.analyze_performance(spark_code, optimized_code) await client.close()
The repository includes an example workflow:
input/spark_code_input.py
):# Create DataFrames and join emp_df = spark.createDataFrame(employees, ["id", "name", "age", "dept", "salary"]) dept_df = spark.createDataFrame(departments, ["dept", "location", "budget"]) # Join and analyze result = emp_df.join(dept_df, "dept") \ .groupBy("dept", "location") \ .agg({"salary": "avg", "age": "avg", "id": "count"}) \ .orderBy("dept")
output/optimized_spark_example.py
):# Performance-optimized version with caching and improved configurations spark = SparkSession.builder \ .appName("EmployeeAnalysis") \ .config("spark.sql.shuffle.partitions", 200) \ .getOrCreate() # Create and cache DataFrames emp_df = spark.createDataFrame(employees, ["id", "name", "age", "dept", "salary"]).cache() dept_df = spark.createDataFrame(departments, ["dept", "location", "budget"]).cache() # Optimized join and analysis result = emp_df.join(dept_df, "dept") \ .groupBy("dept", "location") \ .agg( avg("salary").alias("avg_salary"), avg("age").alias("avg_age"), count("id").alias("employee_count") ) \ .orderBy("dept")
output/performance_analysis.md
):## Execution Results Comparison ### Timing Comparison - Original Code: 5.18 seconds - Optimized Code: 0.65 seconds - Performance Improvement: 87.4% ### Optimization Details - Caching frequently used DataFrames - Optimized shuffle partitions - Improved column expressions - Better memory management
ai-mcp/
├── spark_mcp/
│ ├── __init__.py
│ ├── client.py # MCP client implementation
│ └── server.py # MCP server implementation
├── examples/
│ ├── optimize_code.py # Example usage
│ └── optimized_spark_example.py # Generated optimized code
├── requirements.txt
└── run_server.py # Server startup script
optimize_spark_code
analyze_performance
ANTHROPIC_API_KEY
: Your Anthropic API key for Claude AIThe system implements various PySpark optimizations including:
Feel free to submit issues and enhancement requests!
MIT License