"""
WeChat (微信) channel implementation using iLink Bot HTTP API.
Uses the official WeChat iLink Bot API for message receiving and sending.
- Receiving: HTTP Long-polling (getupdates, 35s timeout per request)
- Sending: HTTP POST (sendmessage with context_token)
- Auth: QR code scan → ilink_bot_token
Reference: @tencent-weixin/openclaw-weixin (npm, MIT license, Tencent official)
Requires:
- WEIXIN_ILINK_TOKEN: iLink Bot token (obtained via QR code login flow)
- Or: login via QR code on first start (interactive)
"""
import asyncio
import json
import logging
import time
import uuid as uuid_lib
from collections import OrderedDict
from pathlib import Path
from typing import Any, Optional, TYPE_CHECKING
import httpx
from backend.channels.base import BaseChannel
from backend.config import settings
from backend.database.session_store import session_store
from backend.database.channel_store import channel_store
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from backend.agents.agent_run import AgentRunner
# iLink Bot API base URL
ILINK_API_BASE = "https://ilinkai.weixin.qq.com"
# Long-poll timeout (seconds) — server holds request up to this duration
POLL_TIMEOUT = 35
# Token persistence file (relative to workspace base)
TOKEN_FILE_NAME = ".weixin_bot_token"
class WeixinChannel(BaseChannel):
"""
WeChat channel using iLink Bot HTTP API.
Message flow:
1. Login via QR code → obtain ilink_bot_token
2. Long-poll getupdates in a loop → receive messages
3. Process messages → start agent runs
4. Send replies via sendmessage API (using context_token from incoming message)
Architecture notes:
- Unlike DingTalk (Stream SDK, separate thread) and Feishu (WebSocket, separate thread),
WeChat uses async HTTP long-polling in the main event loop — no separate thread needed.
- Each incoming message carries a `context_token` that must be echoed back when replying.
This is stored per-conversation for the reply path.
"""
name = "weixin"
def __init__(self, agent_runner: "AgentRunner"):
"""Initialize WeChat channel with environment config."""
class WeixinConfig:
def __init__(self):
self.ilink_token = settings.weixin_ilink_token
self.allow_from = settings.weixin_allow_from
self.default_agent = settings.weixin_default_agent
config = WeixinConfig()
super().__init__(config, agent_runner)
# iLink Bot token (Bearer auth)
self._token: Optional[str] = self.config.ilink_token or None
# Long-poll cursor for message continuity (persisted to disk)
self._poll_cursor: Optional[str] = None
# context_token cache: conversation_id → latest context_token (for replies)
self._context_tokens: dict[str, str] = {}
# Message dedup cache (ordered dict, cap at 1000)
self._processed_message_ids: OrderedDict[str, None] = OrderedDict()
# httpx client (reused across requests for connection pooling)
self._http_client: Optional[httpx.AsyncClient] = None
# Poll cursor persistence path
self._cursor_file: Optional[Path] = None
# Token persistence path
self._token_file: Optional[Path] = None
# ==================================================================
# Lifecycle
# ==================================================================
async def start(self) -> None:
"""
Start the WeChat bot.
1. Load persisted token and cursor
2. If no token, initiate QR code login flow
3. Enter long-poll message loop
"""
self._running = True
# Persistence paths
base = Path(settings.workspace_base_path)
base.mkdir(parents=True, exist_ok=True)
self._token_file = base / TOKEN_FILE_NAME
self._cursor_file = base / ".weixin_poll_cursor"
# Load persisted token
if not self._token:
self._token = self._load_token()
# Load persisted poll cursor
self._poll_cursor = self._load_cursor()
# Create reusable HTTP client
self._http_client = httpx.AsyncClient(
timeout=httpx.Timeout(connect=10.0, read=POLL_TIMEOUT + 10, write=10.0, pool=10.0),
headers={"Content-Type": "application/json"},
)
# Login if no token
if not self._token:
logger.info("No WeChat iLink token found, starting QR code login flow...")
success = await self._login_via_qrcode()
if not success:
logger.error("WeChat QR code login failed. Channel not started.")
self._running = False
return
logger.info("WeChat bot starting long-poll message loop...")
# Main message loop
retry_delay = 1.0
while self._running:
try:
await self._poll_messages()
retry_delay = 1.0 # Reset on success
except httpx.TimeoutException:
# Normal — server returned after timeout with no messages
continue
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
logger.error("WeChat token expired or invalid. Need re-login.")
self._token = None
self._save_token("")
# Attempt re-login
success = await self._login_via_qrcode()
if not success:
logger.error("Re-login failed, stopping WeChat channel.")
self._running = False
break
else:
logger.error(f"HTTP error in poll loop: {e}")
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, 60.0)
except Exception as e:
logger.error(f"Error in WeChat poll loop: {e}", exc_info=True)
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, 60.0)
async def stop(self) -> None:
"""Stop the WeChat bot and clean up."""
self._running = False
if self._http_client:
await self._http_client.aclose()
self._http_client = None
# Persist cursor on shutdown
self._save_cursor()
logger.info("WeChat bot stopped")
# ==================================================================
# QR Code Login Flow
# ==================================================================
async def _login_via_qrcode(self) -> bool:
"""
Interactive QR code login flow.
1. GET /ilink/bot/get_bot_qrcode?bot_type=3 → qrcode URL + uuid
2. Print QR URL to console (user scans with WeChat)
3. Poll /ilink/bot/get_qrcode_status until scanned + confirmed
4. Receive ilink_bot_token on success
"""
if not self._http_client:
return False
try:
# Step 1: Get QR code
resp = await self._http_client.get(
f"{ILINK_API_BASE}/ilink/bot/get_bot_qrcode",
params={"bot_type": "3"},
timeout=10.0,
)
resp.raise_for_status()
data = resp.json()
qrcode_url = data.get("qrcode_url") or data.get("qrcodeUrl")
qr_uuid = data.get("uuid")
if not qrcode_url or not qr_uuid:
logger.error(f"Failed to get QR code: {data}")
return False
# Print QR code URL for manual scanning
logger.info("=" * 60)
logger.info("WeChat iLink Bot QR Code Login")
logger.info(f"Scan this URL with WeChat: {qrcode_url}")
logger.info("=" * 60)
# Step 2: Poll QR code status
max_wait = 120 # 2 minutes
start_time = time.time()
while time.time() - start_time < max_wait:
await asyncio.sleep(2.0)
status_resp = await self._http_client.post(
f"{ILINK_API_BASE}/ilink/bot/get_qrcode_status",
json={"uuid": qr_uuid},
timeout=10.0,
)
status_resp.raise_for_status()
status_data = status_resp.json()
status = status_data.get("status")
logger.debug(f"QR code status: {status}")
if status == "scanned":
logger.info("QR code scanned, waiting for confirmation...")
elif status == "confirmed" or status == "authorized":
token = status_data.get("token") or status_data.get("ilink_bot_token")
if token:
self._token = token
self._save_token(token)
logger.info("WeChat iLink Bot login successful!")
return True
else:
logger.error(f"Login confirmed but no token in response: {status_data}")
return False
elif status == "expired":
logger.error("QR code expired")
return False
logger.error("QR code login timed out (2 minutes)")
return False
except Exception as e:
logger.error(f"QR code login error: {e}", exc_info=True)
return False
# ==================================================================
# Message Polling (Long-Poll)
# ==================================================================
async def _poll_messages(self) -> None:
"""
Single long-poll request to fetch new messages.
POST /ilink/bot/getupdates
- Sends current cursor (get_updates_buf)
- Server holds connection up to 35 seconds
- Returns array of updates (messages)
"""
if not self._http_client or not self._token:
await asyncio.sleep(1.0)
return
payload: dict[str, Any] = {}
if self._poll_cursor:
payload["get_updates_buf"] = self._poll_cursor
resp = await self._http_client.post(
f"{ILINK_API_BASE}/ilink/bot/getupdates",
json=payload,
headers={"Authorization": f"Bearer {self._token}"},
)
resp.raise_for_status()
data = resp.json()
# Update cursor for next poll
new_cursor = data.get("get_updates_buf") or data.get("next_cursor")
if new_cursor:
self._poll_cursor = new_cursor
self._save_cursor()
# Process updates
updates = data.get("updates") or data.get("messages") or []
for update in updates:
try:
await self._handle_update(update)
except Exception as e:
logger.error(f"Error handling WeChat update: {e}", exc_info=True)
async def _handle_update(self, update: dict[str, Any]) -> None:
"""
Handle a single update from getupdates.
Expected update structure (from iLink Bot API):
{
"message_id": "...",
"from_user": {"id": "...", "name": "..."},
"chat": {"id": "...", "type": "private|group"},
"content": {"type": "text", "text": "...", ...},
"context_token": "...",
"timestamp": 1234567890
}
"""
message_id = update.get("message_id") or update.get("msgId") or ""
if not message_id:
return
# Deduplication
if message_id in self._processed_message_ids:
logger.debug(f"Skipping duplicate WeChat message: {message_id}")
return
self._processed_message_ids[message_id] = None
while len(self._processed_message_ids) > 1000:
self._processed_message_ids.popitem(last=False)
# Extract fields
from_user = update.get("from_user") or update.get("sender") or {}
sender_id = str(from_user.get("id") or from_user.get("wxid") or "unknown")
sender_name = from_user.get("name") or from_user.get("nickname") or ""
chat = update.get("chat") or update.get("conversation") or {}
chat_id = str(chat.get("id") or update.get("conversation_id") or sender_id)
chat_type = chat.get("type") or "private" # "private" or "group"
context_token = update.get("context_token") or ""
content_obj = update.get("content") or update.get("message") or {}
content_type = content_obj.get("type") or "text"
# Parse content by type
if content_type == "text":
content = content_obj.get("text") or ""
elif content_type == "image":
content = "[图片]"
elif content_type == "voice" or content_type == "audio":
content = "[语音]"
elif content_type == "video":
content = "[视频]"
elif content_type == "file":
content = "[文件]"
elif content_type == "location":
content = "[位置]"
else:
content = f"[{content_type}]"
if not content or not content.strip():
return
logger.info(f"📨 WeChat message: sender={sender_name}({sender_id}), "
f"chat={chat_id}, type={chat_type}, content={content[:100]!r}")
# Cache context_token for replies
if context_token:
self._context_tokens[chat_id] = context_token
# Permission check
if not self.is_allowed(sender_id):
logger.warning(f"⛔ Access denied for WeChat sender: {sender_id}")
self._log_access_denied(sender_id)
return
# Send typing indicator
asyncio.create_task(self._send_typing(context_token))
# Handle commands
is_command = await self._handle_command(content.strip(), chat_id)
if is_command:
return
# Handle user message → start agent run
await self._handle_user_message(
content=content,
chat_id=chat_id,
sender_id=sender_id,
sender_name=sender_name,
chat_type=chat_type,
context_token=context_token,
)
# ==================================================================
# User Message → Agent Run
# ==================================================================
async def _handle_user_message(
self,
content: str,
chat_id: str,
sender_id: str,
sender_name: str = "",
chat_type: str = "private",
context_token: str = "",
) -> None:
"""Handle a user message by creating a session and starting an agent run."""
from ag_ui.core import RunAgentInput, UserMessage
# Preprocess content (e.g., /skill command)
content = self.preprocess_content(content)
# Group chat: inject sender identity
is_group = chat_type == "group"
if is_group and sender_id:
nick = sender_name or sender_id
content = f"[群聊场景] 当前用户: {nick}(wxid:{sender_id}) 正在和你对话\n\n{content}"
# Get or create session mapping
mapping = await channel_store.get_session_mapping("weixin", chat_id)
if mapping:
session_id = mapping.session_uuid
if mapping.agent_key:
agent_key = mapping.agent_key
else:
session = await session_store.get_session(session_id)
agent_key = (
(getattr(session, "agent_id", None) if session else None)
or self.config.default_agent
)
else:
# Create new session
session = await session_store.create_session(
title=f"微信对话 - {chat_id[:8]}",
agent_id=self.config.default_agent,
created_by=f"weixin:{sender_id}",
source="channel_weixin",
)
session_id = session.id
agent_key = self.config.default_agent
await channel_store.create_session_mapping(
channel_type="weixin",
channel_chat_id=chat_id,
session_uuid=session_id,
agent_key=agent_key,
sender_staff_id=sender_id,
conversation_type="1" if chat_type == "private" else "2",
)
# Update last message time
await channel_store.update_last_message_time("weixin", chat_id)
from backend.services.session_queue import AgentRunJob, session_queue_manager
workspace_path = settings.get_workspace_path(session_id)
forwarded_props: dict = {}
if sender_id:
forwarded_props["sender_id"] = sender_id
if sender_name:
forwarded_props["sender_name"] = sender_name
run_input = RunAgentInput(
thread_id=session_id,
run_id=str(uuid_lib.uuid4()),
state={},
messages=[
UserMessage(
id=str(uuid_lib.uuid4()),
role="user",
content=content,
)
],
tools=[],
context=[],
forwarded_props=forwarded_props,
)
job = AgentRunJob(
session_id=session_id,
input_data=run_input,
agent_type=agent_key,
workspace_path=str(workspace_path),
job_type="weixin",
)
session_queue_manager.enqueue(job)
logger.info(f"[weixin] Agent run enqueued, session={session_id}")
# ==================================================================
# Message Sending
# ==================================================================
async def send_message(
self,
session_id: str,
content: str,
metadata: dict[str, Any],
) -> None:
"""
Send a message through WeChat via iLink Bot API.
Uses context_token from the latest incoming message in this conversation.
Falls back to conversation_id if context_token is not available.
"""
if not self._http_client or not self._token:
logger.warning("WeChat client not ready, cannot send message")
return
try:
chat_id = metadata.get("chat_id")
if not chat_id:
logger.error("No chat_id in metadata for WeChat send")
return
context_token = self._context_tokens.get(chat_id, "")
payload = {
"context_token": context_token,
"message": {
"type": "text",
"text": content,
},
}
resp = await self._http_client.post(
f"{ILINK_API_BASE}/ilink/bot/sendmessage",
json=payload,
headers={"Authorization": f"Bearer {self._token}"},
timeout=10.0,
)
resp.raise_for_status()
result = resp.json()
errcode = result.get("errcode") or result.get("error_code") or 0
if errcode != 0:
logger.error(
f"WeChat send failed: errcode={errcode}, "
f"errmsg={result.get('errmsg', '')}"
)
else:
logger.debug(f"WeChat message sent to {chat_id}, len={len(content)}")
except Exception as e:
logger.error(f"Error sending WeChat message: {e}", exc_info=True)
async def _send_typing(self, context_token: str) -> None:
"""Send typing indicator (non-blocking, fire-and-forget)."""
if not self._http_client or not self._token or not context_token:
return
try:
await self._http_client.post(
f"{ILINK_API_BASE}/ilink/bot/sendtyping",
json={"context_token": context_token},
headers={"Authorization": f"Bearer {self._token}"},
timeout=5.0,
)
except Exception as e:
logger.debug(f"Typing indicator failed (non-critical): {e}")
# ==================================================================
# Token & Cursor Persistence
# ==================================================================
def _load_token(self) -> Optional[str]:
"""Load iLink Bot token from persistent storage."""
if self._token_file and self._token_file.exists():
try:
token = self._token_file.read_text().strip()
if token:
logger.info("Loaded WeChat iLink token from disk")
return token
except Exception as e:
logger.warning(f"Failed to load token file: {e}")
return None
def _save_token(self, token: str) -> None:
"""Save iLink Bot token to persistent storage."""
if self._token_file:
try:
self._token_file.write_text(token)
self._token_file.chmod(0o600)
except Exception as e:
logger.warning(f"Failed to save token file: {e}")
def _load_cursor(self) -> Optional[str]:
"""Load poll cursor from persistent storage."""
if self._cursor_file and self._cursor_file.exists():
try:
cursor = self._cursor_file.read_text().strip()
if cursor:
logger.debug("Loaded WeChat poll cursor from disk")
return cursor
except Exception as e:
logger.warning(f"Failed to load cursor file: {e}")
return None
def _save_cursor(self) -> None:
"""Save poll cursor to persistent storage."""
if self._cursor_file and self._poll_cursor:
try:
self._cursor_file.write_text(self._poll_cursor)
except Exception as e:
logger.warning(f"Failed to save cursor file: {e}")
# ==================================================================
# Override: session new command label
# ==================================================================
async def _cmd_session_new(self, chat_id: str) -> bool:
"""Handle `/session new` — create a new session with WeChat label."""
from backend.database.session_store import session_store
from backend.database.channel_store import channel_store
mapping = await channel_store.get_session_mapping(self.name, chat_id)
if mapping and mapping.agent_key:
current_agent_key = mapping.agent_key
else:
current_agent_key = self.config.default_agent
new_session = await session_store.create_session(
title=f"微信对话 - {chat_id[:8]}",
agent_id=current_agent_key,
created_by=f"{self.name}:command",
source=f"channel_{self.name}",
)
if mapping:
old_session_uuid = mapping.session_uuid
await channel_store.update_session_uuid(
channel_type=self.name,
channel_chat_id=chat_id,
session_uuid=new_session.id,
agent_key=current_agent_key,
)
from backend.channels.manager import channel_manager
if channel_manager:
channel_manager.invalidate_cache(old_session_uuid)
channel_manager.invalidate_cache(new_session.id)
else:
await channel_store.create_session_mapping(
channel_type=self.name,
channel_chat_id=chat_id,
session_uuid=new_session.id,
agent_key=current_agent_key,
)
agent_info = f"\nAgent: `{current_agent_key}`" if current_agent_key else ""
await self.send_message(
session_id="",
content=f"✅ 已创建新 Session\n\nID: `{new_session.id}`{agent_info}\n\n后续对话将在新 Session 中进行",
metadata={"chat_id": chat_id},
)
return True