Spark MCP Optimizer
STDIOApache Spark code optimization server using MCP for performance enhancement
Apache Spark code optimization server using MCP for performance enhancement
This project implements a Model Context Protocol (MCP) server and client for optimizing Apache Spark code. The system provides intelligent code optimization suggestions and performance analysis through a client-server architecture.
graph TB subgraph Input A[Input PySpark Code] --> |spark_code_input.py| B[run_client.py] end subgraph MCP Client B --> |Async HTTP| C[SparkMCPClient] C --> |Protocol Handler| D[Tools Interface] end subgraph MCP Server E[run_server.py] --> F[SparkMCPServer] F --> |Tool Registry| G[optimize_spark_code] F --> |Tool Registry| H[analyze_performance] F --> |Protocol Handler| I[Claude AI Integration] end subgraph Resources I --> |Code Analysis| J[Claude AI Model] J --> |Optimization| K[Optimized Code Generation] K --> |Validation| L[PySpark Runtime] end subgraph Output M[optimized_spark_code.py] N[performance_analysis.md] end D --> |MCP Request| F G --> |Generate| M H --> |Generate| N classDef client fill:#e1f5fe,stroke:#01579b classDef server fill:#f3e5f5,stroke:#4a148c classDef resource fill:#e8f5e9,stroke:#1b5e20 classDef output fill:#fff3e0,stroke:#e65100 class A,B,C,D client class E,F,G,H,I server class J,K,L resource class M,N,O output
Input Layer
spark_code_input.py: Source PySpark code for optimizationrun_client.py: Client startup and configurationMCP Client Layer
MCP Server Layer
run_server.py: Server initializationResource Layer
Output Layer
optimized_spark_code.py: Optimized codeperformance_analysis.md: Detailed analysisThis workflow illustrates:
This project follows the Model Context Protocol architecture for standardized AI model interactions:
┌──────────────────┐      ┌──────────────────┐      ┌──────────────────┐
│                  │      │   MCP Server     │      │    Resources     │
│   MCP Client     │      │  (SparkMCPServer)│      │                  │
│ (SparkMCPClient) │      │                  │      │ ┌──────────────┐ │
│                  │      │    ┌─────────┐   │      │ │  Claude AI   │ │
│   ┌─────────┐    │      │    │ Tools   │   │ <──> │ │   Model      │ │
│   │ Tools   │    │      │    │Registry │   │      │ └──────────────┘ │
│   │Interface│    │ <──> │    └─────────┘   │      │                  │
│   └─────────┘    │      │    ┌─────────┐   │      │ ┌──────────────┐ │
│                  │      │    │Protocol │   │      │ │  PySpark     │ │
│                  │      │    │Handler  │   │      │ │  Runtime     │ │
│                  │      │    └─────────┘   │      │ └──────────────┘ │
└──────────────────┘      └──────────────────┘      └──────────────────┘
        │                         │                          │
        │                         │                          │
        v                         v                          v
┌──────────────┐          ┌──────────────┐           ┌──────────────┐
│  Available   │          │  Registered  │           │   External   │
│    Tools     │          │    Tools     │           │  Resources   │
├──────────────┤          ├──────────────┤           ├──────────────┤
│optimize_code │          │optimize_code │           │ Claude API   │
│analyze_perf  │          │analyze_perf  │           │ Spark Engine │
└──────────────┘          └──────────────┘           └──────────────┘
MCP Client
MCP Server
Resources
sequenceDiagram participant U as User participant C as MCP Client participant S as MCP Server participant AI as Claude AI participant P as PySpark Runtime U->>C: Submit Spark Code C->>S: Send Optimization Request S->>AI: Analyze Code AI-->>S: Optimization Suggestions S->>C: Return Optimized Code C->>P: Run Original Code C->>P: Run Optimized Code P-->>C: Execution Results C->>C: Generate Analysis C-->>U: Final Report
Code Submission
v1/input/spark_code_input.pyOptimization Process
Code Generation
v1/output/optimized_spark_code.pyPerformance Analysis
Results Generation
v1/output/performance_analysis.mdpip install -r requirements.txt
Add your Spark code to optimize in input/spark_code_input.py
Start the MCP server:
python v1/run_server.py
python v1/run_client.py
This will generate two files:
output/optimized_spark_example.py: The optimized Spark code with detailed optimization commentsoutput/performance_analysis.md: Comprehensive performance analysispython v1/run_optimized.py
This will:
ai-mcp/
├── input/
│   └── spark_code_input.py     # Original Spark code to optimize
├── output/
│   ├── optimized_spark_example.py  # Generated optimized code
│   └── performance_analysis.md     # Detailed performance comparison
├── spark_mcp/
│   ├── client.py               # MCP client implementation
│   └── server.py               # MCP server implementation
├── run_client.py              # Client script to optimize code
├── run_server.py              # Server startup script
└── run_optimized.py           # Script to run and compare code versions
The Model Context Protocol (MCP) provides several key advantages for Spark code optimization:
| Aspect | Direct Claude AI Call | MCP Server | 
|---|---|---|
| Integration | • Custom integration per team • Manual response handling • Duplicate implementations  | • Pre-built client libraries • Automated workflows • Unified interfaces  | 
| Infrastructure | • No built-in validation • No result persistence • Manual tracking  | • Automatic validation • Result persistence • Version control  | 
| Context | • Basic code suggestions • No execution context • Limited optimization scope  | • Context-aware optimization • Full execution history • Comprehensive improvements  | 
| Validation | • Manual testing required • No performance metrics • Uncertain outcomes  | • Automated testing • Performance metrics • Validated results  | 
| Workflow | • Ad-hoc process • No standardization • Manual intervention needed  | • Structured process • Standard protocols • Automated pipeline  | 
| Approach | Code Example | Benefits | 
|---|---|---|
| Traditional | client = anthropic.Client(api_key)response = client.messages.create(...) | • Complex setup • Custom error handling • Tight coupling  | 
| MCP | client = SparkMCPClient()result = await client.optimize_spark_code(code) | • Simple interface • Built-in validation • Loose coupling  | 
| Approach | Code Example | Benefits | 
|---|---|---|
| Traditional | class SparkOptimizer:def register_tool(self, name, func):self.tools[name] = func | • Manual registration • No validation • Complex maintenance  | 
| MCP | @register_tool("optimize_spark_code")async def optimize_spark_code(code: str): | • Auto-discovery • Type checking • Easy extension  | 
| Approach | Code Example | Benefits | 
|---|---|---|
| Traditional | def __init__(self):self.claude = init_claude()self.spark = init_spark() | • Manual orchestration • Manual cleanup • Error-prone  | 
| MCP | @requires_resources(["claude_ai", "spark"])async def optimize_spark_code(code: str): | • Auto-coordination • Lifecycle management • Error handling  | 
| Approach | Code Example | Benefits | 
|---|---|---|
| Traditional | {"type": "request","payload": {"code": code}} | • Custom format • Manual validation • Custom debugging  | 
| MCP | {"method": "tools/call","params": {"name": "optimize_code"}} | • Standard format • Auto-validation • Easy debugging  | 
You can also use the client programmatically:
from spark_mcp.client import SparkMCPClient async def main(): # Connect to the MCP server client = SparkMCPClient() await client.connect() # Your Spark code to optimize spark_code = ''' # Your PySpark code here ''' # Get optimized code with performance analysis optimized_code = await client.optimize_spark_code( code=spark_code, optimization_level="advanced", save_to_file=True # Save to output/optimized_spark_example.py ) # Analyze performance differences analysis = await client.analyze_performance( original_code=spark_code, optimized_code=optimized_code, save_to_file=True # Save to output/performance_analysis.md ) # Run both versions and compare # You can use the run_optimized.py script or implement your own comparison await client.close() # Analyze performance performance = await client.analyze_performance(spark_code, optimized_code) await client.close()
The repository includes an example workflow:
input/spark_code_input.py):# Create DataFrames and join emp_df = spark.createDataFrame(employees, ["id", "name", "age", "dept", "salary"]) dept_df = spark.createDataFrame(departments, ["dept", "location", "budget"]) # Join and analyze result = emp_df.join(dept_df, "dept") \ .groupBy("dept", "location") \ .agg({"salary": "avg", "age": "avg", "id": "count"}) \ .orderBy("dept")
output/optimized_spark_example.py):# Performance-optimized version with caching and improved configurations spark = SparkSession.builder \ .appName("EmployeeAnalysis") \ .config("spark.sql.shuffle.partitions", 200) \ .getOrCreate() # Create and cache DataFrames emp_df = spark.createDataFrame(employees, ["id", "name", "age", "dept", "salary"]).cache() dept_df = spark.createDataFrame(departments, ["dept", "location", "budget"]).cache() # Optimized join and analysis result = emp_df.join(dept_df, "dept") \ .groupBy("dept", "location") \ .agg( avg("salary").alias("avg_salary"), avg("age").alias("avg_age"), count("id").alias("employee_count") ) \ .orderBy("dept")
output/performance_analysis.md):## Execution Results Comparison ### Timing Comparison - Original Code: 5.18 seconds - Optimized Code: 0.65 seconds - Performance Improvement: 87.4% ### Optimization Details - Caching frequently used DataFrames - Optimized shuffle partitions - Improved column expressions - Better memory management
ai-mcp/
├── spark_mcp/
│   ├── __init__.py
│   ├── client.py      # MCP client implementation
│   └── server.py      # MCP server implementation
├── examples/
│   ├── optimize_code.py           # Example usage
│   └── optimized_spark_example.py # Generated optimized code
├── requirements.txt
└── run_server.py      # Server startup script
optimize_spark_code
analyze_performance
ANTHROPIC_API_KEY: Your Anthropic API key for Claude AIThe system implements various PySpark optimizations including:
Feel free to submit issues and enhancement requests!
MIT License