kachaka-mcp
Kachaka MCP Server bridges Kachaka functionalities to AI models via MCP, enabling control and state monitoring. It supports AI models like GPT-4 and Claude, and includes essential components like API client and resource layer.
Kachaka MCP サーバー
[!WARNING] 現在このリポジトリにあるコードは実験的なものであり、正常に動作しない可能性があります。
1. 概要
Kachaka MCPサーバー(kachaka-mcp)は、Kachakaの機能をModel Context Protocol(MCP)を通じて様々なAIモデルに提供するサーバーです。このサーバーにより、Claude、GPT-4、ローカルLLMなどのAIモデルがKachakaの状態を把握し、制御することができるようになります。
2. デプロイメント
2.1 uv syncを使ったインストール
uvをインストールします。(まだインストールしていない場合)
kachaka-mcpを以下の手順でインストールします。
git clone https://github.com/pf-robotics/kachaka-mcp.git
cd kachaka-mcp
# インストール
uv sync
# または開発用依存関係も含めてインストール
uv sync --all
2.2 Claude Desktopからの使用方法
claude_desktop_config.json
に以下の記述を追加してください:
{
"mcpServers": {
"kachaka-mcp": {
"command": "uv",
"args": [
"--directory",
"<path to kachaka-mcp directory>",
"run",
"python",
"-m",
"kachaka_mcp.server"
],
"env": {
"KACHAKA_HOST": "<kachaka host>"
}
}
}
}
ここで、kachaka-mcpのディレクトリやKACHAKA_HOST
の値は、お使いの環境に合わせて変更してください。
記述例:
{
"mcpServers": {
"kachaka-mcp": {
"command": "uv",
"args": [
"--directory",
"C:\\kachaka-mcp",
"run",
"python",
"-m",
"kachaka_mcp.server"
],
"env": {
"KACHAKA_HOST": "192.168.1.100:26400"
}
}
}
}
2.3 MCP Inspectorを使ったデバッグ
MCP Inspectorを使ってデバッグする場合は以下のようにします。
npx @modelcontextprotocol/inspector -e KACHAKA_HOST=192.168.1.100:26400 uv run python -m kachaka_mcp.server
2.4 直接実行(すべてのプラットフォーム)
# インストール
pip install -e .
# 設定
export KACHAKA_HOST="192.168.1.100:26400" # Kachakaのホスト(Linux/macOS)
# または
set KACHAKA_HOST=192.168.1.100:26400 # Kachakaのホスト(Windows CMD)
# または
$env:KACHAKA_HOST = "192.168.1.100:26400" # Kachakaのホスト(Windows PowerShell)
# 実行
python -m kachaka_mcp.server
# または
mcp run kachaka_mcp.server
# 開発モードで実行(MCP Inspector付き)
mcp dev kachaka_mcp.server
# Claude Desktopにインストール
mcp install kachaka_mcp.server --name "Kachaka Robot" -v KACHAKA_HOST=192.168.1.100:26400
3. 設定ファイルの使用
環境変数の代わりに設定ファイルを使用することもできます。設定ファイルは ~/.kachaka-mcp/config.json
(Linux/macOS)または %USERPROFILE%\.kachaka-mcp\config.json
(Windows)に配置します。
{
"kachaka_host": "192.168.1.100:26400",
"server_name": "My Kachaka Robot",
"log_level": "INFO",
"auth_enabled": false,
"api_keys": []
}
設定ファイルの場所は環境変数 KACHAKA_MCP_CONFIG
で変更できます:
# Linux/macOS
export KACHAKA_MCP_CONFIG="/path/to/config.json"
# Windows CMD
set KACHAKA_MCP_CONFIG=C:\path\to\config.json
# Windows PowerShell
$env:KACHAKA_MCP_CONFIG = "C:\path\to\config.json"
4. アーキテクチャ
graph TD
A[AIモデル] <-->|MCP| B[kachaka-mcp サーバー]
B <-->|gRPC| C[Kachaka]
subgraph "AIモデル"
A1[Claude] --- A
A2[GPT-4] --- A
A3[ローカルLLM] --- A
end
subgraph "kachaka-mcp"
D[リソース層] --> F[Kachaka API クライアント]
E[ツール層] --> F
G[プロンプト層] --> F
H[認証・セキュリティ層]
end
5. 主要コンポーネント
5.1 Kachaka API クライアント
- Kachaka APIとの通信を担当
- KachakaApiClientを使用してKachaakaと通信
- 同期・非同期両方のインターフェースをサポート
- ネットワーク接続の管理と再接続機能
5.2 リソース層
Kachakaの状態情報をMCPリソースとして公開します:
5.2.1 Kachaaka情報リソース
robot://status
- Kachaakaの現在の状態(位置、バッテリー、エラーなど)robot://version
- Kachaakaのバージョン情報robot://serial
- シリアル番号robot://command
- 現在実行中のコマンド情報
5.2.2 マップリソース
map://current
- 現在のマップ情報(PNG形式)map://locations/{location_id?}
- 登録された場所の情報map://shelves/{shelf_id?}
- 棚の情報と位置map://list
- 利用可能なマップのリスト
5.2.3 センサーリソース
sensors://camera/front
- 前面カメラ画像sensors://camera/back
- 背面カメラ画像sensors://camera/tof
- ToFカメラ画像sensors://laser
- レーザースキャンデータsensors://imu
- IMUデータsensors://odometry
- オドメトリデータsensors://object_detection
- 物体検出結果
5.3 ツール層
Kachakaの操作機能をMCPツールとして公開します:
5.3.1 移動ツール
move_to_location(location_name: str)
- 指定した場所に移動move_to_pose(x: float, y: float, yaw: float)
- 指定した座標に移動return_home()
- ホームに戻るmove_forward(distance_meter: float, speed: float)
- 指定した距離前進rotate_in_place(angle_radian: float)
- その場で回転set_robot_velocity(linear: float, angular: float)
- Kachaakaの速度を設定
5.3.2 棚操作ツール
move_shelf(shelf_name: str, location_name: str)
- 棚を指定した場所に移動return_shelf(shelf_name: str)
- 棚を元の場所に戻すdock_shelf()
- 棚にドッキングundock_shelf()
- 棚からアンドックdock_any_shelf_with_registration(location_name: str, dock_forward: bool)
- 任意の棚にドッキングして登録
5.3.3 システム操作ツール
speak(text: str)
- テキストを音声で発話cancel_command()
- 実行中のコマンドをキャンセルproceed()
- 次のステップに進むlock(duration_sec: float)
- 指定した時間ロックするset_auto_homing_enabled(enable: bool)
- 自動ホーミングの有効/無効を設定set_manual_control_enabled(enable: bool)
- 手動制御の有効/無効を設定set_speaker_volume(volume: int)
- スピーカーの音量を設定restart_robot()
- Kachaakaを再起動
5.3.4 マップ操作ツール
switch_map(map_id: str)
- マップを切り替えるexport_map(map_id: str, output_file_path: str)
- マップをエクスポートimport_map(target_file_path: str)
- マップをインポートset_robot_pose(pose: dict)
- Kachaakaの位置を設定
5.4 プロンプト層
AIモデルとの対話を効率化するためのプロンプトテンプレートを提供します:
robot_control_prompt()
- Kachaaka制御のための基本プロンプトshelf_operation_prompt()
- 棚操作のための基本プロンプトnavigation_prompt()
- ナビゲーションのための基本プロンプトerror_handling_prompt()
- エラー処理のための基本プロンプト
5.5 認証・セキュリティ層
- APIキーベースの認証
- アクセス制御と権限管理
- 操作ログの記録
6. 実装計画
6.1 プロジェクト構造
kachaka-mcp/
├── README.md # プロジェクト概要
├── pyproject.toml # プロジェクト設定
├── requirements.txt # 依存関係
├── src/
│ └── kachaka_mcp/
│ ├── __init__.py
│ ├── server.py # MCPサーバーのメイン実装
│ ├── resources.py # リソース定義
│ ├── tools.py # ツール定義
│ ├── prompts.py # プロンプト定義
│ ├── client.py # Kachaka APIクライアントラッパー
│ ├── auth.py # 認証・セキュリティ
│ └── utils/
│ ├── __init__.py
│ ├── config.py # 設定管理
│ └── logging.py # ロギング
├── scripts/
│ ├── install.sh # インストールスクリプト
│ └── run.sh # 実行スクリプト
└── tests/
├── __init__.py
├── test_server.py
├── test_resources.py
└── test_tools.py
6.2 依存関係
[project]
name = "kachaka-mcp"
version = "0.1.0"
description = "MCP server for Kachaka robot"
requires-python = ">=3.10"
dependencies = [
"mcp[cli]>=1.7.1",
"kachaka-api@git+https://github.com/pf-robotics/kachaka-api",
"grpcio",
"pydantic",
"pillow",
"numpy",
"loguru",
"protobuf==5.27.2"
]
[project.optional-dependencies]
dev = [
"pytest",
"black",
"isort",
"mypy",
"ruff",
]
6.3 サーバー実装
6.3.1 メインサーバークラス
# server.py
import asyncio
from contextlib import asynccontextmanager
from typing import AsyncIterator, Optional
from kachaka_api.aio import KachakaApiClient
from mcp.server.fastmcp import Context, FastMCP
from .resources import register_resources
from .tools import register_tools
from .prompts import register_prompts
from .auth import KachakaAuthProvider
class KachakaMCPContext:
"""Kachaka MCP サーバーのコンテキスト"""
def __init__(self, kachaka_client: KachakaApiClient):
self.kachaka_client = kachaka_client
@asynccontextmanager
async def kachaka_lifespan(server: FastMCP) -> AsyncIterator[KachakaMCPContext]:
"""Kachaka MCP サーバーのライフスパン管理"""
# 設定の読み込み
from .utils.config import load_config
config = load_config()
# Kachaka APIクライアントの初期化
kachaka_client = KachakaApiClient(target=config.kachaka_host)
try:
# コンテキストの作成と提供
context = KachakaMCPContext(kachaka_client)
yield context
finally:
# クリーンアップ処理
pass
def create_server(server_name: str = "Kachaka Robot") -> FastMCP:
"""Kachaka MCP サーバーを作成"""
# MCPサーバーの作成
mcp = FastMCP(
server_name,
lifespan=kachaka_lifespan,
auth_provider=KachakaAuthProvider(),
)
# リソース、ツール、プロンプトの登録
register_resources(mcp)
register_tools(mcp)
register_prompts(mcp)
return mcp
def main():
"""メイン関数"""
server = create_server()
server.run()
if __name__ == "__main__":
main()
6.3.2 リソース実装例
# resources.py
from mcp.server.fastmcp import FastMCP, Context, Image
from PIL import Image as PILImage
import io
import json
def register_resources(mcp: FastMCP):
"""リソースの登録"""
@mcp.resource("robot://status")
async def get_robot_status(ctx: Context) -> str:
"""Kachaakaの現在の状態を取得"""
kachaka_client = ctx.request_context.lifespan_context.kachaka_client
# 各種情報の取得
pose = await kachaka_client.get_robot_pose()
battery_info = await kachaka_client.get_battery_info()
command_state, command = await kachaka_client.get_command_state()
# JSONとして返す
status = {
"pose": {
"x": pose.x,
"y": pose.y,
"yaw": pose.yaw
},
"battery": {
"percentage": battery_info[0],
"status": str(battery_info[1])
},
"command_state": str(command_state),
"command": {
"type": command.WhichOneof("command"),
"id": command_state.command_id if command_state.command_id else ""
}
}
return json.dumps(status, indent=2)
@mcp.resource("map://current")
async def get_current_map(ctx: Context) -> Image:
"""現在のマップ画像を取得"""
kachaka_client = ctx.request_context.lifespan_context.kachaka_client
# マップの取得
map_data = await kachaka_client.get_png_map()
# 画像として返す
return Image(data=map_data.data, format="png")
# 他のリソースも同様に実装...
6.3.3 ツール実装例
# tools.py
from mcp.server.fastmcp import FastMCP, Context
from typing import Dict, Any
def register_tools(mcp: FastMCP):
"""ツールの登録"""
@mcp.tool()
async def move_to_location(location_name: str, ctx: Context) -> str:
"""指定した場所にKachaakaを移動させる"""
kachaka_client = ctx.request_context.lifespan_context.kachaka_client
# 進捗報告の設定
ctx.info(f"Moving to location: {location_name}")
# 移動コマンドの実行
result = await kachaka_client.move_to_location(
location_name,
wait_for_completion=True
)
# 結果の返却
if result.success:
return f"Successfully moved to {location_name}"
else:
return f"Failed to move to {location_name}: {result.message}"
@mcp.tool()
async def speak(text: str, ctx: Context) -> str:
"""テキストを音声で発話する"""
kachaka_client = ctx.request_context.lifespan_context.kachaka_client
# 発話コマンドの実行
result = await kachaka_client.speak(text, wait_for_completion=True)
# 結果の返却
if result.success:
return f"Successfully spoke: {text}"
else:
return f"Failed to speak: {result.message}"
# 他のツールも同様に実装...
6.3.4 プロンプト実装例
# prompts.py
from mcp.server.fastmcp import FastMCP
from mcp.server.fastmcp.prompts import base
def register_prompts(mcp: FastMCP):
"""プロンプトの登録"""
@mcp.prompt()
def robot_control_prompt() -> list[base.Message]:
"""Kachaaka制御のための基本プロンプト"""
return [
base.SystemMessage(
"あなたはKachakaを制御するアシスタントです。"
"以下のツールを使ってKachaakaを操作できます:"
"- move_to_location: 指定した場所に移動"
"- return_home: ホームに戻る"
"- speak: テキストを音声で発話"
"など"
),
base.UserMessage("Kachakaを操作するのを手伝ってください。"),
]
# 他のプロンプトも同様に実装...
6.4 設定管理
以降に出てくるkachaka_host
やKACHAKA_HOST
の値は「PythonでカチャカAPIを利用する」に記載されているtarget=
に続く値と同じ値が入ります。
なお26400
はKachaka APIサーバー (gRPC)のポート番号です。
# utils/config.py
from pydantic import BaseModel
import os
import json
from pathlib import Path
class KachakaMCPConfig(BaseModel):
"""Kachaka MCP サーバーの設定"""
kachaka_host: str = "192.168.1.100:26400" # デフォルトのKachakaホスト
server_name: str = "Kachaka Robot"
log_level: str = "INFO"
auth_enabled: bool = False
api_keys: list[str] = []
def load_config() -> KachakaMCPConfig:
"""設定を読み込む"""
# 環境変数から設定を読み込む
config_path = os.environ.get("KACHAKA_MCP_CONFIG", "~/.kachaka-mcp/config.json")
config_path = Path(config_path).expanduser()
# デフォルト設定
config = KachakaMCPConfig()
# 設定ファイルが存在する場合は読み込む
if config_path.exists():
with open(config_path, "r") as f:
config_data = json.load(f)
config = KachakaMCPConfig(**config_data)
# 環境変数で上書き
if os.environ.get("KACHAKA_HOST"):
config.kachaka_host = os.environ.get("KACHAKA_HOST")
return config
6.5 認証実装
# auth.py
from mcp.server.auth import OAuthServerProvider, ClientRegistrationOptions, RevocationOptions
from typing import Dict, List, Optional
class KachakaAuthProvider(OAuthServerProvider):
"""Kachaka MCP サーバーの認証プロバイダー"""
def __init__(self):
from .utils.config import load_config
self.config = load_config()
self.api_keys = set(self.config.api_keys)
async def validate_client_credentials(self, client_id: str, client_secret: str) -> bool:
"""クライアント認証情報の検証"""
if not self.config.auth_enabled:
return True
return client_secret in self.api_keys
# 他の認証メソッドも実装...
7. テスト計画
7.1 単体テスト
- リソース機能のテスト
- ツール機能のテスト
- 認証機能のテスト
7.2 統合テスト
- サーバー起動と終了のテスト
- AIモデルとの連携テスト
- エラー処理のテスト
7.3 エンドツーエンドテスト
- 実際のKachakaとの連携テスト
- 様々なAIモデルからのアクセステスト
8. 今後の拡張性
8.1 短期的な拡張計画
- WebUIの追加(サーバー状態の監視、設定変更など)
- 複数Kachaakaの管理機能
- カスタムコマンドのサポート
8.2 中長期的な拡張計画
- 高度なスケジューリング機能
- ユーザー定義のワークフロー
- 他のロボットプラットフォームとの連携
9. セキュリティ考慮事項
- ネットワークセキュリティ(TLS/SSL対応)
- アクセス制御と認証
- 安全な操作の確保(危険な操作の制限)
- ログ記録と監査