Tanka 内存与输出系统
面向新同事的技术架构导览
Tanka 是一个 AI Agent 平台:管理用户记忆与聊天历史,用对话式 Agent 驱动业务产出(路演 PPT、网站、商业计划、原型、PRD 等),维护一套面向研究员的影响力积分账本(Apodex),并把长耗时工作异步化(事件 / 时间触发的后台任务)。整套系统是一个 FastAPI + Celery 单体部署,由 supervisord 管理三个常驻进程加一个独立守护进程,共享同一套配置与数据层。它的核心抽象是一个通用 Agent 运行时(基于 langgraph 的回合制状态机 + 工具调用生命周期 + HITL 人审),以及挂在其上的若干专门子系统。
super_agent/ 与 super_agent_v2/ 是外部框架的软链接,在本机指向 /Users/admin/wujunjie/project/super_agent/...,已失效。所有对该框架的描述均由 import 推断(标注为「推断」)。请在部署环境中验证其内部实现。
30 秒速览
十个子系统、一句话职责、代码入口与所属运行进程。先把全局装进脑子,再往下读。
| 子系统 | 一句话职责 | 代码入口 | 运行进程 |
|---|---|---|---|
| 部署与引导 | 应用工厂、配置、生命周期、进程编排 | web/main.py / celery_app/celery_server.py | web_api / celery_beat / celery_worker |
| HTTP/API 视图层 | 路由聚合、请求解析、SSE/JSON 序列化、鉴权解析 | web/src/view/__init__.py | web_api |
| 中间件与横切库 | Session/trace_id、生命周期 DB 初始化、ELK 日志、Redis 池、事件 outbox | web/src/middleware/ / web/src/lib/ | 全进程 |
| Tanka Agent 运行时(Flow) | 回合生命周期、langgraph 状态机、Phase A-G 工具调用、HITL 中断、流式输出 | web/src/controller/tanka/flow/agent.py | web_api(+ worker 后台任务) |
| 技能与 MCP | 技能发现/生命周期/市场、tanka_api 统一工具、RouteRegistry、第三方 MCP | .../skill_discovery/ + .../tanka_platform/ | web_api |
| 检索 / 租户 | 5 节点 LangGraph 检索流水线、语义重排、租户识别链 | .../search_tools/graph.py + .../tenant/ | web_api / worker |
| 业务产出生成器 | 路演/网站/商业计划/原型/PRD 编排,Plan 持久化 | .../insight/output_chat + .../flow/pitch_deck | web_api / worker |
| Apodex 研究员章程系统 | 仅追加 IMP 影响力积分账本、23 个 DDD 聚合、人事运营 | .../apodex/services/__init__.py | web_api / worker |
| Relay + WaitTask | Relay Agent 流式、事件/时间触发后台任务、Kafka 消费、事件 outbox | .../relay/wait_task/ + celery_app/wait_task/ | worker / beat / kafka_consumer |
| Celery 与运维 | Celery/Beat/RedBeat 配置、双队列 worker、任务编目、监控面板、pytest | celery_app/ + admin/ + tests/ | beat / worker / web_api |
| 持久化 / DAO | BaseDAO 多租户 orgId 强制、类型化 CRUD、各类 Repository、连接助手 | .../apodex/daos/base_dao.py + tanka_ai_toolkit | web_api / worker |
运行时拓扑
三个常驻进程 + 一个独立守护进程,由 supervisord 按启动优先级编排,共享同一套数据基础设施。
三进程 + 一守护进程(由 supervisord.conf 生产 / supervisord.local.conf 本地定义,启动优先级递增):
web_api(priority=100,先启动)
uvicorn FastAPI,生产端口 80、本地 3080,承载所有同步 HTTP / SSE 接口。引导链:web/main.py导入时os.environ["DEBUG"]="1"→set_env()写入 API key → ApolloEnvProxy加载web/config/{config_env}_config.yaml(默认test)→FastAPI(lifespan=...)启动期初始化数据库 + 预热 Redis 池 → 注册路由与 monkey patch。celery_beat(priority=200)
RedBeat 调度器,把 schedule 存到 Redis,用redbeat:lock分布式锁做多实例故障切换(锁超时调优为 30s/150s)。驱动每小时 flow card 定时任务与 WaitTask 的 TIME_POINT/TIME_CRON。celery_worker(priority=300)
线程池concurrency=16,订阅双队列celery(核心任务)+wait_task(异步任务),prefetch=1防止任务囤积。kafka_consumer(独立守护)
必须单实例——其 Dispatcher 在内存里做防抖(debounce),不可分布式。订阅im_message / vote / arena等 topic,匹配 watch_target 后派发评估任务给 worker。
基础设施:PostgreSQL(读写分离,存聊天历史 + langgraph checkpoint);5 个独立 MongoDB shard(Habitat / AI Memory / Workbench / ThirdParty / TankaLink);Milvus(向量);Elasticsearch(全文 + memory_cellular + ELK 日志);Redis(按逻辑 db 分区 + RedBeat 锁);Kafka(事件源);外部 TankaServerAPI 后端(写操作与唤醒回调目标 /internal/assistant/post-data)。
startup_database 在 async lifespan 中调用同步 PostgreSQL init_session,若阻塞事件循环会拖慢启动。5 个 shard 注册各自 try/except,任一 shard 失败启动仍继续;beat 进程忽略 mongo 错误。
分层架构
自顶向下:View → Middleware → Controller → Agent 运行时,专门子系统与横切库挂在主干两侧。
自顶向下读法:请求从 View 层(web/src/view/,全部前缀 /api,聚合 tanka/relay/chat_analysis/apodex 子路由)进入;中间件层给每个请求挂上携带 trace_id 与用户身份的 Session ContextVar,写 ELK 日志,再交给 Controller。主干路径是 Tanka Agent 运行时(Flow):从 PostgreSQL 按 room_id 加载 langgraph checkpoint,构建 ContextSchema,跑 astream 循环;每个回合经过 Phase A-G 的工具调用生命周期。围绕核心运行时挂着几个专门 Controller:Skills & MCP、检索、业务产出生成器、Apodex(自成一体的 DDD 限界上下文簇)。横切库(web/src/lib)提供事件 outbox、Redis 池、会话事件投递。DAO 层结构性强制 orgId 多租户。Celery 侧的 worker/beat 处理后台与定时工作,Kafka 消费者把外部事件派发给 worker。
一次请求的生命周期
以最典型的 Tanka 流式对话 POST /api/tanka/work/stream_chat 为例,走通 Phase A-G。
ValueError。逐步走读(详见 docs/tool_call_lifecycle/overview.md,最重要的一份文档):
进入与日志
请求经web/src/middleware/register.py挂上Session(trace_id, user_id),按 endpoint 模式写 ELK;polling 路径(如/work/watcher/tasks/)被静默以减噪。状态加载
stream_chat_tanka()(agent.py:2749+)按thread_id = room_id从 PostgreSQL 加载 checkpoint,构建ContextSchema,启动agent.astream(stream_mode=['messages','custom','values'])。Phase A — 模型出招
进 LLM 前abefore_model跑压缩中间件(Claude 走 prompt caching;Gemini 走摘要)。模型在AIMessage中产出一个或多个tool_call。Phase B — 四阶段中间件(在 aafter_model 内执行)
TankaPlatform(B.1):守卫、校验、动作分类,写_pending_confirmations。
ActionResolve(B.2):跑字段解析器(解析 @mention、邮箱、联系人),产出card_data+processed_body+injected_params→_resolve_results。
Precheck(B.3):本地规则(如群最少成员数)+ 可选远程dry_run,不合规直接拒绝、不弹卡。
Permission(B.4):先匹配always_allow跨回合授权(命中则自动放行、出 SUCCESS 卡);否则raise GraphInterrupt(hitl_requests)。
关键陷阱:中间件append()顺序是执行顺序的逆序(langgraph 由外到内)。agent.py:1984-1989注释里把意图顺序 Platform→Resolve→Precheck→Permission 映射成逆序 append。Phase C-D — HITL
GraphInterrupt被外层 stream 循环捕获,PENDING 卡经 SSE 推给前端;用户点 approve/edit/reject/always_allow,前端回resume_values,Command(resume=...)从中断点恢复。一次中断对应一次恢复,决策数量不匹配会抛ValueError。Phase E-F — 执行
决策收敛后,langgraph 工具节点调tanka_api(method, path, body),合并_pending_injected,POST 到 TankaServerAPI,返回 XMLActionResult的ToolMessage。Phase G — 回到模型
ToolMessage追加进消息序列,模型据此继续(更多 tool_call、文本或结束)。收尾
循环结束保存 checkpoint,TurnSummaryMiddleware(栈中最后执行)写outline.md+ 原始 chunk 到/workspace/turn_history/;流以data: [DONE]结束,响应完成再写一次 ELK。
HEARTBEAT,只有心跳无数据时 120 分钟超时。TankaStreamAdapter(stream_adapter.py)把 super_agent_v2 的 StreamData 归一化成 Tanka SSE 协议(CONTENT / TASK_PLAN / OPTION / LINK_GUIDE)。
领域模型
本仓库承载两套可清晰分离的领域语言:通用的 Agent 运行时模型(描述任何对话「怎么跑」)与领域专属的 Apodex IMP 章程模型(描述某一个工具「做什么」)。理解代码库 = 把这两种语言分开,同时看清它们唯一的接缝。
6a · 运行时领域
核心是 Session——以 room_id 为键的对话/工作实例,room_id 同时就是 langgraph checkpointer 的 thread_id。一个 Session 展开为一串 Turn(一条用户消息 + Agent 直到停下的回应,由客户端 request_id 划界,是 turn_summary、压缩阈值、唤醒事件的边界)。回合内 Agent(tanka / relay / 统一 work_agent 变体)跑一个 langgraph StateGraph,行为由有序的 Middleware 栈塑形(再次强调:append 顺序是执行的逆序)。状态——权限授权、action_log、pending confirmation——以 CustomAgentState 形式 checkpoint 到 Postgres;瞬态信号(终止、限流)放 Redis。两个扩展面挂在 Agent 上:Skill(SKILL.md SOP 模板,挂载进 MongoBackend Workspace VFS,有 short_id 与发布后的 store_id)与 Tool(几乎全部经单一 tanka_api 路由汇聚,第三方经 MCP JSON-RPC)。异步性建模为 WaitTask:等事件(chat/email/arena/vote,经单实例 Kafka 消费者摄入)或等时间(TIME_POINT/TIME_CRON,经 RedBeat);触发达 COMPLETE 时把 LifecycleEvent 发到 Redis 事件 outbox(真相源),并尽力唤醒 Session,把 Resume Text(system-reminder XML)注入 prompt。Flow 会话(DERIVED 子房间)让父 Agent 扇出有界并行子任务。
6b · Apodex IMP 章程域
Apodex 是一套教科书式 DDD 设计,完整地活在一个工具背后。8 个限界上下文组织 23 个聚合,围绕两个核心:Impact Ledger(ImpRecord + EmployeeProfile)和 Contribution Attribution(Mission/Version/Idea)。IMP(Impact Point)是唯一的价值单位,每次计分都是一条仅追加(append-only)的 ImpRecord,带幂等键 sourceType:sourceId:action,且必须落在 8 个合法 type×sourceType 对之一(验证器强制正交性)。归因链是模型的数学主干:Version 在 kickoff 锁定 vImpBase,retro 时按层级乘数算出 vImpFinal,扣减 enablement,再按 Idea(α×β×B 权重)分配,最后用角色系数分到贡献者。Version 一旦 retroed,其所有计分字段冻结以供审计。Supporting 上下文(Base Task、Compliance/Incumbency、People Operations、Research Execution)和 Generic 上下文(Identity、Governance)消费 IMP 做闸门——晋升查 3 年滚动阈值,S 级事故冻结晋升,连续两季不达标触发 PIP。
ImpactLedgerService)强制的,不是 MongoDB schema 约束。直接调 DAO 的 update_one/delete_one 会绕过守卫——所有写入必须走服务层。修正通过反向调整记录(reverse adjustment)实现,绝不更新。
6c · 两个模型如何共存
二者占据同一 FastAPI + Celery 部署的不同层,只在一个接缝相遇:Agent 把 Apodex 当工具调用。运行时模型是通用执行底座,对 HR 语义一无所知;它的词汇(Session/Turn/Agent/Tool/Skill/Middleware/WaitTask/Flow/Checkpoint/LifecycleEvent)描述任何对话「怎么跑」。Apodex 模型是「某个工具做什么」。Apodex 以两种方式暴露:(1) apodex_action Agent 工具——ApodexActionMiddleware 把 {object, action, params} 派发给应用服务;(2) REST /api/apodex/*。两条路径中,运行时都从 Session 上下文注入 userId/orgId,派发器自动注入 xxxBy 审计字段。两模型不共享聚合边界:运行时的 Session/Agent 状态在 Postgres checkpoint 与 Redis;Apodex 聚合在自己的 Mongo collection,受服务层不变式约束(append-only、幂等、type×sourceType 正交、orgId 租户隔离)。唯一的概念重叠是 orgId——运行时注入,Apodex 的 BaseDAO 在每次查询/插入时防御性强制。把运行时当作宿主平台,Apodex 当作只能经服务层门面访问的客体子系统。
核心子系统逐一详解
十个子系统,每个含职责、关键模块、入口、依赖、边界(owns / doesNotOwn)与给新人的提醒。点击展开。
模块边界速查表
每个模块拥有什么、不拥有什么、与谁对话——划清职责边界,避免越权耦合。
| 模块 | owns(拥有) | doesNotOwn(不拥有) | talksTo(对话对象) |
|---|---|---|---|
View Layer web/src/view | HTTP 路由、Pydantic 解析、SSE/JSON 序列化、Bearer 解析(Apodex)、/api 聚合 | LLM 推理、业务逻辑、持久化、任务调度 | Controllers、中间件 Session |
Middleware & 横切 middleware lib | lifespan DB 初始化、Session+trace_id、ELK、http/anthropic patch、Redis 池、event_outbox、env/Apollo | endpoint 业务处理器、Agent 推理、DAO 查询 | 全进程;PG/Mongo/Milvus/ES/Redis 初始化;TankaServerAPI 唤醒 |
Tanka Agent Runtime controller/tanka/flow | 回合生命周期、langgraph StateGraph、Phase A-G 栈、HITL/权限中断、flow 会话、turn summary、压缩、StreamAdapter | super_agent_v2 内部、网关路由、原始消息摄入、Celery 任务体 | super_agent_v2、PG checkpointer、Skills、Search、TankaServerAPI、Redis |
Skills & MCP skill_discovery + tanka_platform | 技能发现/生命周期/市场、RouteRegistry + tanka_api、Confirmation/ParamSpec、字段解析、MCP 客户端、技能 SSE | LLM 模型节点、HITL 中断机制、MongoBackend 存储、OAuth 绑定 | MongoBackend、PG skill_metadata、Mongo 技能集合、第三方 MCP、TenantRegistry |
Search/Retrieval/Tenant search_tools + tenant | 5 节点检索图、tanka+web 检索、语义重排、租户识别链、TenantRegistry YAML | memory_cellular 内部、Serper、topic 合成、Celery 调度 | memory_cellular(ES/Milvus)、agentic_client、Serper、Gemini、output_chat |
Business Output insight/output_chat + flow/pitch_deck | 制品编排、类型图、v4 SubAgent 生成器、WriteCraft、Plan/RelevantContent 持久化、StreamWriter | 核心 WorkAgent、super_agent_v2、检索内部、checkpointing | super_agent_v2、MongoDB、S3、Browserless、Gemini/Claude |
Apodex controller/apodex | 23 聚合、append-only IMP 账本、ImpactLedgerService、17 应用服务、ActionMiddleware、orgId 隔离、幂等 | IM.employee/department 源实体(只读 FK)、Celery 编排、auth 校验、通知投递 | tanka_ai_toolkit.db.mongodb(AI Memory)、employee_dao、Agent(apodex_action)、REST |
Relay + WaitTask controller/relay + celery_app/wait_task | Relay 流式、WaitTask 创建/仓储/调度、Kafka 消费派发、触发评估、唤醒、outbox 发布 | WatchExecutor 外部服务、用户上下文解析、langgraph checkpointer、TankaServerAPI API | Kafka、Redis db7/db0、MongoDB、RedBeat、TankaServerAPI 唤醒、super_agent_v2 |
Celery + Operations celery_app + admin + tests | Celery/Beat/RedBeat 配置、双队列 worker、核心任务、admin 面板/CLI、pytest | web_api 路由、LLM eval 内部、业务 Agent、消息存储 | Redis broker、MongoDB、ES(work_request)、Kafka 消费者、Langfuse |
Persistence/DAO apodex/daos + tanka_ai_toolkit | BaseDAO orgId 强制、类型化 CRUD、ImpRecordDAO append-only、各 Repository、连接助手 | 服务层不变式、业务编排、Agent 运行时 | PG、MongoDB×5、Milvus、ES、Redis |
数据与存储
七类存储各司其职:关系型放历史与 checkpoint,文档型分 5 shard,向量、全文、缓存、事件源各就各位。
| 存储 | 承载内容 |
|---|---|
| PostgreSQL(读写分离) | 聊天历史(chat_history,SQLModel ORM);langgraph checkpoint(AsyncPostgresSaver,键 thread_id=room_id,存 messages/grants/action_log/pending interrupts);PG 加密字段用 pg.encryption_key 口令;PostgreSQL skill_metadata 镜像 |
| MongoDB(5 shard) | Habitat:用户/房间/域数据;AI Memory(default):AI 记忆/brain、flow_sessions、apodex 全部集合(imp_records 等 26 个 collection)、Plan/OutputData/preferences、技能集合;Workbench;ThirdParty;TankaLink |
| Milvus | 向量 embedding(语义检索;vote/calendar searcher 的 a_retrieve_docs) |
| Elasticsearch | 全文索引 + memory_cellular(BM25/关键词)后端;ELK 日志(trace_id 关联);work_request_* 监控索引 |
| Redis(多 db) | db3 = outbox/flow_session;db2 = watcher;db7 = wait_task(wait:task:*/wait:session:*/wait:events:*,7d/24h TTL);db4 = business;celery broker + result backend;RedBeat schedule(redbeat:{task})+ redbeat:lock;终止信号(300s TTL) |
| Kafka | im_message / vote / arena 等 topic,事件源 → 单实例消费者 |
| S3 | 制品图片(路演 slide、原型线框) |
| SQLite | chat-analysis 工具上传的 LangChain 对话 JSON(仅该调试工具) |
横切关注点
贯穿全进程的基础设施约定:配置、日志、可观测、事件、幂等、多租户、流式、monkey patch、分布式调度。
通用语言术语表
团队的 ubiquitous language。读懂这些词,就读懂了代码里的命名与边界。
新人上手路线图
从「跑起来」到「会改东西」,分阶段推进。文档约定见 docs/MODULE_LAYOUT.md:每个模块有 README + design + specs 三件套。
- 读
docs/MODULE_LAYOUT.md理解文档约定;读supervisord.local.conf看本地三进程定义。 - 配置:确认
web/config/test_config.yaml存在,config_env默认 test;celery 侧确认CELERY_CONFIG与celery_config_test.py。 - 启动顺序:web_api(3080)→ celery_beat → celery_worker →(如需事件)kafka_consumer。
- 冒烟:
curl localhost:3080/api/health应返回 200;按onboardingNotes的 Debugging Checklist 验 Redis/Mongo 连通性。
super_agent/ super_agent_v2/ 软链接在本机失效(指向 /Users/admin/...)。本地无法读其实现,相关行为请在部署环境验证。web/main.py— 应用工厂与引导链(DEBUG → set_env → Apollo → lifespan → 路由 → patch)。web/src/middleware/__init__.py+register.py— 生命周期 DB 初始化、Session/trace_id、ELK。web/src/view/__init__.py→web/src/view/tanka/flow.py— 路由聚合与主流式入口。docs/tool_call_lifecycle/overview.md(最重要) — Phase A-G 全流程。agent.py:先看stream_chat_tanka()(2749+),再看create_synergy_ai_agent()(1956-2303),重点读1984-1989的中间件逆序注释。docs/permission/README.md(always-allow 三级模型)+docs/turn_summary_middleware/01_overview_and_motivation.md。
技能 / 工具
docs/skill-api.md(短/长 ID 二元性、SSE 契约)→ docs/skill-discovery-integration-guide.md → route_registry.py → skill_system_middleware.py。
检索
search_tools/graph.py(图结构)→ schema.py → nodes/query_rewrite_and_route.py → nodes/evaluate_results.py → tenant/identification.py。
业务产出
insight/output_chat/flow.py(800+ 行,按 TopicTemplateCode 路由)→ 对应 memory_result/{type}/graph.py → nodes/ → prompts/。
Apodex
docs/ddd/strategic-design.md → services/impact_ledger_service.py → models/imp_record.py + models/version.py → middleware/apodex_action/middleware.py。Apodex 的三层心智模型(DB 存事实 / Skill 存规则 / Agent 经服务执行)务必先建立。
异步 / 调度
docs/watcher_api_integration.md → docs/wait_task_e2e_test_plan.md → relay/wait_task/service.py → kafka_consumer/dispatcher.py → wait_task/trigger.py → wait_task/tasks/common.py。
| 现象 | 排查起点 |
|---|---|
| 某条用户消息没正确进 Agent | flow.py 的 organize_messages(附件/memo/room 处理)→ messages_to_send |
| 某个 tool_call 行为异常 | Agent 循环 → aafter_model 四中间件(B.1-4)→ state 变更;检查 append 逆序 |
| HITL 卡不弹 / 恢复失败 | permission_middleware.py → GraphInterrupt → 卡流 → 前端 resume → Phase E/F;注意决策数量必须匹配 |
| 上下文超长 / 漏压缩 | abefore_model 压缩中间件;确认 Claude/Gemini 选对了 Compression 变体 |
| 检索结果差 | search_tools/prompts/(query_rewrite/strategy/evaluate)+ SearchDebugger 步骤 |
| IMP 计分重复/错误 | 一律走 ImpactLedgerService,检查 idempotencyKey 格式与 type×sourceType;勿直调 DAO |
| 后台任务不唤醒 | Redis db7 wait:events:{sid};确认 Kafka 消费者单实例;Agent 是否轮询 /work/watcher/events |
| 内存随请求增长(OOM) | web/src/lib/redis_pool.py 的 contextvar 捕获问题;确认池在启动期干净上下文预热 |
| 跨租户数据串了 | 检查相关 BaseDAO 子类的 orgId 过滤;这是结构性多租户边界 |
| 分布式日志关联 | 用 trace_id 串 web/worker/beat 的 ELK 记录(docs/how_to_read_elk_log.md) |
- 加路由:建 Pydantic 模型 →
@router.post/get→ 调 controller service → 返回JSONResponse/StreamingResponse;流式用stream_chat_with_heartbeat包装。 - 加 tanka_api 路由:在
route_registry.py注册(Path/ParamSpec/ConfirmationSpec/handler)→ 写 handler → 加测试;OPTIONS 自动生成文档。 - 加技能:在
/work_agent/skills/<name>/SKILL.md+ 辅助文档创建,SkillFileScanner自动发现,可见性在skill_discovery的 yaml 控制。 - 加制品类型:① 新
TopicTemplateCode枚举值 ② 新memory_result/{type}/graph.py + schema.py + nodes/ + prompts/③flow.py路由加 elif 分支(无插件发现机制)。 - 加 Apodex 操作:服务层 + DAO 层成对出现,写入绝不绕过服务;ImpRecord 写入必带正确格式幂等键;retro 操作原子化(批量 insert 或全失败)。
- 测试:pytest markers(
@pytest.mark.mechanism慢 broker 测试、@pytest.mark.e2e、@pytest.mark.apodex/policy/action_card);Apodex 测试自动用 mongomock(确认 conftestsetdefault('APODEX_USE_REAL_MONGO','0'))。