Kafka Integration
STDIOMessage Context Protocol server integrating with Apache Kafka for LLM and Agentic applications.
Message Context Protocol server integrating with Apache Kafka for LLM and Agentic applications.
A Message Context Protocol (MCP) server that integrates with Apache Kafka to provide publish and consume functionalities for LLM and Agentic applications.
This project implements a server that allows AI models to interact with Kafka topics through a standardized interface. It supports:
Clone the repository:
git clone <repository-url> cd <repository-directory>
Create a virtual environment and activate it:
python -m venv venv source venv/bin/activate # On Windows, use: venv\Scripts\activate
Install the required dependencies:
pip install -r requirements.txt
If no requirements.txt exists, install the following packages:
pip install aiokafka python-dotenv pydantic-settings mcp-server
Create a .env
file in the project root with the following variables:
# Kafka Configuration
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
TOPIC_NAME=your-topic-name
IS_TOPIC_READ_FROM_BEGINNING=False
DEFAULT_GROUP_ID_FOR_CONSUMER=kafka-mcp-group
# Optional: Custom Tool Descriptions
# TOOL_PUBLISH_DESCRIPTION="Custom description for the publish tool"
# TOOL_CONSUME_DESCRIPTION="Custom description for the consume tool"
You can run the server using the provided main.py
script:
python main.py --transport stdio
Available transport options:
stdio
: Standard input/output (default)sse
: Server-Sent EventsTo use this Kafka MCP server with Claude Desktop, add the following configuration to your Claude Desktop configuration file:
{ "mcpServers": { "kafka": { "command": "python", "args": [ "<PATH TO PROJECTS>/main.py" ] } } }
Replace <PATH TO PROJECTS>
with the absolute path to your project directory.
main.py
: Entry point for the applicationkafka.py
: Kafka connector implementationserver.py
: MCP server implementation with tools for Kafka interactionsettings.py
: Configuration management using PydanticPublishes information to the configured Kafka topic.
consume information from the configured Kafka topic.
Creates a new Kafka topic with specified parameters.
--topic
Name of the topic to create--partitions
Number of partitions to allocate--replication-factor
Replication factor across brokers--config
(optional) Topic-level configuration overrides (e.g., retention.ms=604800000
)Deletes an existing Kafka topic.
--topic
Name of the topic to delete--timeout
(optional) Time to wait for deletion to completeLists all topics in the cluster (or filtered by pattern).
--bootstrap-server
Broker address--pattern
(optional) Regular expression to filter topic names--exclude-internal
(optional) Exclude internal topics (default: true)Displays or alters configuration for one or more topics.
--describe
Show current configs for a topic--alter
Modify configs (e.g., --add-config retention.ms=86400000,--delete-config cleanup.policy
)--topic
Name of the topicRetrieves metadata about a topic or the cluster.
--topic
(If provided) Fetch metadata only for this topic--bootstrap-server
Broker address--include-offline
(optional) Include brokers or partitions that are offline