a025 · Tanka Memory & Output System

Tanka 内存与输出系统

面向新同事的技术架构导览

Tanka 是一个 AI Agent 平台:管理用户记忆与聊天历史,用对话式 Agent 驱动业务产出(路演 PPT、网站、商业计划、原型、PRD 等),维护一套面向研究员的影响力积分账本(Apodex),并把长耗时工作异步化(事件 / 时间触发的后台任务)。整套系统是一个 FastAPI + Celery 单体部署,由 supervisord 管理三个常驻进程加一个独立守护进程,共享同一套配置与数据层。它的核心抽象是一个通用 Agent 运行时(基于 langgraph 的回合制状态机 + 工具调用生命周期 + HITL 人审),以及挂在其上的若干专门子系统。

进程 · web_api / celery_beat / celery_worker / kafka_consumer 子系统 · 10 架构图 · 5 源文档 · docs/ARCHITECTURE.md
阅读提示 super_agent/super_agent_v2/ 是外部框架的软链接,在本机指向 /Users/admin/wujunjie/project/super_agent/...已失效。所有对该框架的描述均由 import 推断(标注为「推断」)。请在部署环境中验证其内部实现。
§ 00

30 秒速览

十个子系统、一句话职责、代码入口与所属运行进程。先把全局装进脑子,再往下读。

子系统一句话职责代码入口运行进程
部署与引导应用工厂、配置、生命周期、进程编排web/main.py / celery_app/celery_server.pyweb_api / celery_beat / celery_worker
HTTP/API 视图层路由聚合、请求解析、SSE/JSON 序列化、鉴权解析web/src/view/__init__.pyweb_api
中间件与横切库Session/trace_id、生命周期 DB 初始化、ELK 日志、Redis 池、事件 outboxweb/src/middleware/ / web/src/lib/全进程
Tanka Agent 运行时(Flow)回合生命周期、langgraph 状态机、Phase A-G 工具调用、HITL 中断、流式输出web/src/controller/tanka/flow/agent.pyweb_api(+ worker 后台任务)
技能与 MCP技能发现/生命周期/市场、tanka_api 统一工具、RouteRegistry、第三方 MCP.../skill_discovery/ + .../tanka_platform/web_api
检索 / 租户5 节点 LangGraph 检索流水线、语义重排、租户识别链.../search_tools/graph.py + .../tenant/web_api / worker
业务产出生成器路演/网站/商业计划/原型/PRD 编排,Plan 持久化.../insight/output_chat + .../flow/pitch_deckweb_api / worker
Apodex 研究员章程系统仅追加 IMP 影响力积分账本、23 个 DDD 聚合、人事运营.../apodex/services/__init__.pyweb_api / worker
Relay + WaitTaskRelay Agent 流式、事件/时间触发后台任务、Kafka 消费、事件 outbox.../relay/wait_task/ + celery_app/wait_task/worker / beat / kafka_consumer
Celery 与运维Celery/Beat/RedBeat 配置、双队列 worker、任务编目、监控面板、pytestcelery_app/ + admin/ + tests/beat / worker / web_api
持久化 / DAOBaseDAO 多租户 orgId 强制、类型化 CRUD、各类 Repository、连接助手.../apodex/daos/base_dao.py + tanka_ai_toolkitweb_api / worker
§ 01

运行时拓扑

三个常驻进程 + 一个独立守护进程,由 supervisord 按启动优先级编排,共享同一套数据基础设施。

图 01 · 进程与基础设施拓扑
graph TD subgraph procs["Supervisord-managed processes"] WEB["web_api<br/>uvicorn FastAPI :80/:3080<br/>priority=100"] BEAT["celery_beat<br/>RedBeat scheduler<br/>priority=200"] WORKER["celery_worker<br/>threads concurrency=16<br/>queues: celery + wait_task<br/>priority=300"] KAFKACON["kafka_consumer<br/>single-instance daemon"] end MONGO[("MongoDB<br/>5 shards: Habitat / AI Memory / Workbench / ThirdParty / TankaLink")] PG[("PostgreSQL<br/>rw + read replica<br/>chat_history + langgraph checkpoints")] REDIS[("Redis (rediss://)<br/>db3 outbox/flow_session, db2 watcher,<br/>db7 wait_task, db4 business, RedBeat lock")] MILVUS[("Milvus<br/>vector embeddings")] ES[("Elasticsearch<br/>full-text + memory_cellular + ELK")] KAFKA[("Kafka<br/>im_message / vote / arena topics")] TankaServerAPI["TankaServerAPI<br/>/internal/assistant/post-data"] WEB --> MONGO WEB --> PG WEB --> REDIS WEB --> MILVUS WEB --> ES WEB --> KAFKA WEB --> TankaServerAPI BEAT --> REDIS BEAT --> MONGO WORKER --> REDIS WORKER --> MONGO WORKER --> PG WORKER --> ES WORKER --> TankaServerAPI KAFKACON --> KAFKA KAFKACON --> REDIS KAFKACON -->|dispatch eval tasks| WORKER WEB -.broker enqueue.-> REDIS REDIS -.broker dequeue.-> WORKER BEAT -.beat schedule.-> REDIS
三进程 + 一守护进程,启动优先级递增;箭头为运行时依赖,虚线为 Celery broker 链路。

三进程 + 一守护进程(由 supervisord.conf 生产 / supervisord.local.conf 本地定义,启动优先级递增):

  1. web_api(priority=100,先启动)

    uvicorn FastAPI,生产端口 80、本地 3080,承载所有同步 HTTP / SSE 接口。引导链:web/main.py 导入时 os.environ["DEBUG"]="1"set_env() 写入 API key → Apollo EnvProxy 加载 web/config/{config_env}_config.yaml(默认 test)→ FastAPI(lifespan=...) 启动期初始化数据库 + 预热 Redis 池 → 注册路由与 monkey patch。
  2. celery_beat(priority=200)

    RedBeat 调度器,把 schedule 存到 Redis,用 redbeat:lock 分布式锁做多实例故障切换(锁超时调优为 30s/150s)。驱动每小时 flow card 定时任务与 WaitTask 的 TIME_POINT/TIME_CRON。
  3. celery_worker(priority=300)

    线程池 concurrency=16,订阅双队列 celery(核心任务)+ wait_task(异步任务),prefetch=1 防止任务囤积。
  4. kafka_consumer(独立守护)

    必须单实例——其 Dispatcher 在内存里做防抖(debounce),不可分布式。订阅 im_message / vote / arena 等 topic,匹配 watch_target 后派发评估任务给 worker。

基础设施:PostgreSQL(读写分离,存聊天历史 + langgraph checkpoint);5 个独立 MongoDB shard(Habitat / AI Memory / Workbench / ThirdParty / TankaLink);Milvus(向量);Elasticsearch(全文 + memory_cellular + ELK 日志);Redis(按逻辑 db 分区 + RedBeat 锁);Kafka(事件源);外部 TankaServerAPI 后端(写操作与唤醒回调目标 /internal/assistant/post-data)。

Gotcha startup_database 在 async lifespan 中调用同步 PostgreSQL init_session,若阻塞事件循环会拖慢启动。5 个 shard 注册各自 try/except,任一 shard 失败启动仍继续;beat 进程忽略 mongo 错误。
§ 02

分层架构

自顶向下:View → Middleware → Controller → Agent 运行时,专门子系统与横切库挂在主干两侧。

图 02 · 进程内分层与依赖
graph TD subgraph web_api["web_api (uvicorn FastAPI)"] V["View Layer<br/>web/src/view/*"] MW["HTTP/Lifespan Middleware<br/>web/src/middleware/*"] CTRL["Controllers<br/>controller/tanka|relay|apodex"] AGENT["Agent Runtime (Flow)<br/>controller/tanka/flow/agent.py"] SEARCH["Search/Retrieval<br/>controller/tanka/search_tools"] OUTPUT["Business Output Generators<br/>insight/output_chat + flow/pitch_deck"] SKILL["Skills & MCP<br/>flow/middleware/skill_discovery + tanka_platform"] APODEX["Apodex DDD Services<br/>controller/apodex/services"] LIB["Cross-cutting Libs<br/>web/src/lib (event_outbox, redis_pool, session_event_delivery)"] DAO["DAO Layer<br/>apodex/daos + tanka_ai_toolkit DAOs"] end subgraph celery_worker["celery_worker (threads, dual-queue)"] WTASK["WaitTask Tasks<br/>celery_app/wait_task/tasks"] CTASKS["Core Tasks<br/>celery_app/tasks (api, zoom, flow_card)"] KAFKA["Kafka Consumer + Dispatcher<br/>celery_app/wait_task/kafka_consumer"] end subgraph celery_beat["celery_beat (RedBeat)"] BEAT["Beat Scheduler<br/>celery_app/celery_config + wait_task/scheduler"] end subgraph infra["Shared Infrastructure"] PG[("PostgreSQL")] MONGO[("MongoDB x5 shards")] MILVUS[("Milvus")] ES[("Elasticsearch")] REDIS[("Redis (multi-db)")] KQ[("Kafka")] LLM["LLM Providers<br/>Claude / Gemini / OpenRouter"] TankaServerAPI["TankaServerAPI"] end V --> MW MW --> CTRL CTRL --> AGENT CTRL --> OUTPUT CTRL --> APODEX AGENT --> SKILL AGENT --> SEARCH AGENT --> LLM OUTPUT --> SEARCH OUTPUT --> LLM SEARCH --> MILVUS SEARCH --> ES SEARCH --> LLM SKILL --> DAO APODEX --> DAO CTRL --> LIB AGENT --> LIB DAO --> MONGO DAO --> PG LIB --> REDIS AGENT --> PG AGENT --> MONGO AGENT --> REDIS LIB --> TankaServerAPI WTASK --> LLM WTASK --> REDIS WTASK --> MONGO WTASK --> LIB CTASKS --> LLM CTASKS --> ES KAFKA --> KQ KAFKA --> WTASK BEAT --> REDIS BEAT --> WTASK CTRL --> KQ
主干路径 View → Middleware → Controller → Agent;Skills、Search、Output、Apodex 为挂载式子系统。

自顶向下读法:请求从 View 层web/src/view/,全部前缀 /api,聚合 tanka/relay/chat_analysis/apodex 子路由)进入;中间件层给每个请求挂上携带 trace_id 与用户身份的 Session ContextVar,写 ELK 日志,再交给 Controller。主干路径是 Tanka Agent 运行时(Flow):从 PostgreSQL 按 room_id 加载 langgraph checkpoint,构建 ContextSchema,跑 astream 循环;每个回合经过 Phase A-G 的工具调用生命周期。围绕核心运行时挂着几个专门 Controller:Skills & MCP检索业务产出生成器Apodex(自成一体的 DDD 限界上下文簇)。横切库web/src/lib)提供事件 outbox、Redis 池、会话事件投递。DAO 层结构性强制 orgId 多租户。Celery 侧的 worker/beat 处理后台与定时工作,Kafka 消费者把外部事件派发给 worker。

§ 03

一次请求的生命周期

以最典型的 Tanka 流式对话 POST /api/tanka/work/stream_chat 为例,走通 Phase A-G。

图 03 · stream_chat 时序(Phase A-G)
sequenceDiagram participant Client participant View as View Layer (flow.py) participant MW as Register Middleware (Session/trace_id) participant Ctrl as stream_chat_tanka() participant Graph as langgraph astream loop participant B as Phase B Middlewares (Platform/Resolve/Precheck/Permission) participant LLM as LLM (Claude/Gemini) participant Tool as tanka_api tool participant Adapter as TankaStreamAdapter (SSE) Client->>View: POST /api/tanka/work/stream_chat (OutputChatRequest) View->>MW: request enters MW->>MW: attach Session(trace_id, user_id), log to ELK MW->>Ctrl: dispatch handler Ctrl->>Ctrl: load checkpoint (Postgres, thread_id=room_id), build ContextSchema Ctrl->>Graph: agent.astream(stream_mode=[messages,custom,values]) loop Agent turn (Phase A-G) Graph->>LLM: invoke model (abefore_model: compression) LLM-->>Graph: AIMessage with tool_call(s) Graph->>B: aafter_model (reverse-append order) B->>B: Platform classify, ActionResolve card_data, Precheck dry_run alt Permission requires HITL B-->>Graph: raise GraphInterrupt(hitl_requests) Graph->>Adapter: stream PENDING card Adapter-->>Client: data: {PENDING card} SSE Client->>Ctrl: resume_values (approve/edit/reject) Ctrl->>Graph: Command(resume=resume_values) else always_allow grant hit B->>B: auto-approve, write _pending_injected end Graph->>Tool: tanka_api(method, path, body) Tool->>Tool: merge injected params, POST to TankaServerAPI Tool-->>Graph: ToolMessage (XML ActionResult) Graph->>Adapter: StreamData (CONTENT / TASK_PLAN nodes) Adapter-->>Client: data: {event chunk} + HEARTBEAT (30s) SSE end Graph->>Ctrl: final state Ctrl->>Ctrl: save checkpoint, TurnSummary outline.md Adapter-->>Client: data: [DONE] Ctrl->>MW: response complete, log to ELK
一次中断对应一次恢复;决策数量不匹配会抛 ValueError

逐步走读(详见 docs/tool_call_lifecycle/overview.md最重要的一份文档):

  1. 进入与日志

    请求经 web/src/middleware/register.py 挂上 Session(trace_id, user_id),按 endpoint 模式写 ELK;polling 路径(如 /work/watcher/tasks/)被静默以减噪。
  2. 状态加载

    stream_chat_tanka()agent.py:2749+)按 thread_id = room_id 从 PostgreSQL 加载 checkpoint,构建 ContextSchema,启动 agent.astream(stream_mode=['messages','custom','values'])
  3. Phase A — 模型出招

    进 LLM 前 abefore_model 跑压缩中间件(Claude 走 prompt caching;Gemini 走摘要)。模型在 AIMessage 中产出一个或多个 tool_call
  4. Phase B — 四阶段中间件(在 aafter_model 内执行)

    TankaPlatform(B.1):守卫、校验、动作分类,写 _pending_confirmations
    ActionResolve(B.2):跑字段解析器(解析 @mention、邮箱、联系人),产出 card_data + processed_body + injected_params_resolve_results
    Precheck(B.3):本地规则(如群最少成员数)+ 可选远程 dry_run,不合规直接拒绝、不弹卡。
    Permission(B.4):先匹配 always_allow 跨回合授权(命中则自动放行、出 SUCCESS 卡);否则 raise GraphInterrupt(hitl_requests)
    关键陷阱:中间件 append() 顺序是执行顺序的逆序(langgraph 由外到内)。agent.py:1984-1989 注释里把意图顺序 Platform→Resolve→Precheck→Permission 映射成逆序 append。
  5. Phase C-D — HITL

    GraphInterrupt 被外层 stream 循环捕获,PENDING 卡经 SSE 推给前端;用户点 approve/edit/reject/always_allow,前端回 resume_valuesCommand(resume=...) 从中断点恢复。一次中断对应一次恢复,决策数量不匹配会抛 ValueError
  6. Phase E-F — 执行

    决策收敛后,langgraph 工具节点调 tanka_api(method, path, body),合并 _pending_injected,POST 到 TankaServerAPI,返回 XML ActionResultToolMessage
  7. Phase G — 回到模型

    ToolMessage 追加进消息序列,模型据此继续(更多 tool_call、文本或结束)。
  8. 收尾

    循环结束保存 checkpoint,TurnSummaryMiddleware(栈中最后执行)写 outline.md + 原始 chunk 到 /workspace/turn_history/;流以 data: [DONE] 结束,响应完成再写一次 ELK。
心跳与超时 全程每 30s 发一个 HEARTBEAT,只有心跳无数据时 120 分钟超时。TankaStreamAdapterstream_adapter.py)把 super_agent_v2 的 StreamData 归一化成 Tanka SSE 协议(CONTENT / TASK_PLAN / OPTION / LINK_GUIDE)。
§ 04

领域模型

本仓库承载两套可清晰分离的领域语言:通用的 Agent 运行时模型(描述任何对话「怎么跑」)与领域专属的 Apodex IMP 章程模型(描述某一个工具「做什么」)。理解代码库 = 把这两种语言分开,同时看清它们唯一的接缝。

6a · 运行时领域

图 04a · 运行时领域类图
classDiagram direction LR class Session { +str room_id +str session_id +str org_id +str user_id +str trace_id +str parent_room_id +str status } class Turn { +str request_id +HumanMessage user_message +list~AIMessage~ responses } class Agent { +str kind "tanka|relay|work" +str scenario +CustomAgentState state +ContextSchema context } class CustomAgentState { +list grants "always_allow" +list action_log +dict _pending_confirmations +dict _resolve_results +dict _pending_injected } class Checkpoint { +str thread_id "= room_id" +bytes state "Postgres-backed" } class Message { +str role +str content } class ToolCall { +str tc_id +str tool_name +dict args } class Tool { +str name "tanka_api|manage_skills|search" +ParamSpec params +ConfirmationSpec confirmation } class Skill { +str short_id +str store_id +str skill_name "SKILL.md" +str status +int version } class Middleware { +str phase "B.1-B.4 / abefore_model / aafter_model" +int append_order } class HITLRequest { +dict card_data +list always_allow_options +str decision_type } class Flow { +str dispatch_id +str role "ROOT|DERIVED" +str status } class WaitTask { +str task_id +TriggerType trigger +WaitTaskStatus status +list watch_targets +int ttl } class WatchTarget { +str target_id "room|email|arena|vote" +str event_type } class LifecycleEvent { +str event_id +str event_type +str delivery_mode +str wake_policy +str resume_text } class Workspace { +str mount "/workspace" +str backend "MongoBackend(VFS)" } Session "1" o-- "many" Turn : contains Session "1" --> "1" Checkpoint : persisted as Session "1" --> "1" Agent : runs Session "1" o-- "many" WaitTask : schedules Session "1" --> "1" Workspace : owns VFS Session "1" o-- "many" Flow : ROOT dispatches DERIVED Turn "1" o-- "many" Message : produces Turn "1" o-- "many" ToolCall : emits Agent "1" --> "1" CustomAgentState : holds Agent "1" o-- "many" Middleware : assembles stack Agent "1" o-- "many" Tool : registers Agent "1" o-- "many" Skill : discovers/mounts ToolCall "1" --> "1" Tool : invokes ToolCall "1" ..> "0..1" HITLRequest : may interrupt Middleware "1" ..> "many" ToolCall : intercepts (Phase A-G) WaitTask "1" o-- "many" WatchTarget : monitors WaitTask "1" ..> "1" LifecycleEvent : fires on trigger LifecycleEvent "1" --> "1" Session : wakes via resume_text Flow "1" --> "1" Workspace : isolated .flow VFS
核心是 Session(以 room_id 为键 = checkpointer thread_id),展开为一串 Turn。

核心是 Session——以 room_id 为键的对话/工作实例,room_id 同时就是 langgraph checkpointer 的 thread_id。一个 Session 展开为一串 Turn(一条用户消息 + Agent 直到停下的回应,由客户端 request_id 划界,是 turn_summary、压缩阈值、唤醒事件的边界)。回合内 Agent(tanka / relay / 统一 work_agent 变体)跑一个 langgraph StateGraph,行为由有序的 Middleware 栈塑形(再次强调:append 顺序是执行的逆序)。状态——权限授权、action_log、pending confirmation——以 CustomAgentState 形式 checkpoint 到 Postgres;瞬态信号(终止、限流)放 Redis。两个扩展面挂在 Agent 上:SkillSKILL.md SOP 模板,挂载进 MongoBackend Workspace VFS,有 short_id 与发布后的 store_id)与 Tool(几乎全部经单一 tanka_api 路由汇聚,第三方经 MCP JSON-RPC)。异步性建模为 WaitTask:等事件(chat/email/arena/vote,经单实例 Kafka 消费者摄入)或等时间(TIME_POINT/TIME_CRON,经 RedBeat);触发达 COMPLETE 时把 LifecycleEvent 发到 Redis 事件 outbox(真相源),并尽力唤醒 Session,把 Resume Text(system-reminder XML)注入 prompt。Flow 会话(DERIVED 子房间)让父 Agent 扇出有界并行子任务。

6b · Apodex IMP 章程域

图 04b · Apodex 限界上下文与聚合
graph TB subgraph IMP["Impact Ledger Context (Core)"] ImpRecord["ImpRecord (AR)<br/>append-only<br/>idempotencyKey<br/>type x sourceType"] EmployeeProfile["EmployeeProfile (AR)<br/>level L1-L10 / band A-C<br/>isChair / companyRole<br/>homeDeptIds / BG"] end subgraph ATTR["Contribution Attribution Context (Core)"] Mission["Mission (AR)"] Version["Version (AR)<br/>vImpBase / vImpMultiplier<br/>vImpFinal / roleCoefficients<br/>coreOwnerId"] Idea["Idea (AR)<br/>alpha / beta / B<br/>contributors / ideaImp"] end subgraph BASE["Base Task Mgmt Context (Supporting)"] BaseImpTask["BaseImpTask (AR)<br/>assigned->submitted->paid"] end subgraph COMP["Compliance & Incumbency Context (Supporting)"] IncumbencyRecord["IncumbencyRecord (AR)"] QuarterlyCheckIn["QuarterlyCheckIn (AR)"] Mentorship["Mentorship (AR)"] end subgraph PEOPLE["People Operations Context (Supporting)"] Promotion["Promotion (AR)<br/>decisionSnapshot<br/>3yr rolling IMP gate"] BandAdjustment["BandAdjustment (AR)"] Incident["Incident (AR)<br/>S/A/B level"] PipCase["PipCase (AR)<br/>open/recovered/exit"] OwnerRestriction["OwnerRestriction (AR)"] end subgraph RES["Research Execution Context (Supporting)"] DataReview["DataReview (AR)<br/>hard-power gate"] BenchmarkResult["BenchmarkResult (AR)"] BenchmarkReview["BenchmarkReview (AR)<br/>1/org/year"] InfraMilestone["InfraMilestone (AR)<br/>independent Bonus IMP"] end subgraph ID["Identity Context (Generic)"] DepartmentProfile["DepartmentProfile (AR)<br/>leadId / shadowLeadId<br/>applicationTrack / independent"] end subgraph GOV["Governance Context (Generic)"] TaskForce["TaskForce (AR)"] CharterVersion["CharterVersion (AR)<br/>draft/active/inactive<br/>dual-signed"] CharterAddendum["CharterAddendum (AR)<br/>Lead->Admin workflow"] end Reminder["Reminder (AR)<br/>cross-context"] IM_employee[("IM.employee<br/>(external FK)")] IM_department[("IM.department<br/>(external FK)")] BaseImpTask -->|"BaseTaskApproved -> record_base_task_imp"| ImpRecord Version -->|"VersionRetroed -> batch records"| ImpRecord Idea -->|"ideaImp -> bonus,idea record"| ImpRecord Incident -->|"IncidentDeducted -> deduction"| ImpRecord InfraMilestone -->|"bonus,milestone"| ImpRecord Mentorship -->|"base,mentor / bonus,mentor-milestone"| ImpRecord Mission -->|owns| Version Version -->|"V_IMP_for_ideas split"| Idea Idea -->|contributors| EmployeeProfile Promotion -->|"reads 3yr rolling"| ImpRecord Promotion -->|"writes level+1"| EmployeeProfile BandAdjustment -->|updates band| EmployeeProfile Incident -.->|"S unresolved freezes"| Promotion QuarterlyCheckIn -->|"2 fails -> trigger"| PipCase IncumbencyRecord --> QuarterlyCheckIn DataReview -->|"reads independent + leadId"| DepartmentProfile BenchmarkReview --> BenchmarkResult EmployeeProfile -.->|extends, FK| IM_employee DepartmentProfile -.->|extends, FK| IM_department EmployeeProfile -->|homeDeptIds| DepartmentProfile CharterVersion --> CharterAddendum TaskForce -.-> Mission style ImpRecord fill:#ffe0b2 style EmployeeProfile fill:#ffe0b2 style Version fill:#c8e6c9 style Idea fill:#c8e6c9 style Mission fill:#c8e6c9
8 个限界上下文组织 23 个聚合,围绕两个核心:Impact Ledger 与 Contribution Attribution。

Apodex 是一套教科书式 DDD 设计,完整地活在一个工具背后。8 个限界上下文组织 23 个聚合,围绕两个核心:Impact LedgerImpRecord + EmployeeProfile)和 Contribution Attribution(Mission/Version/Idea)。IMP(Impact Point)是唯一的价值单位,每次计分都是一条仅追加(append-only)ImpRecord,带幂等键 sourceType:sourceId:action,且必须落在 8 个合法 type×sourceType 对之一(验证器强制正交性)。归因链是模型的数学主干:Version 在 kickoff 锁定 vImpBase,retro 时按层级乘数算出 vImpFinal,扣减 enablement,再按 Idea(α×β×B 权重)分配,最后用角色系数分到贡献者。Version 一旦 retroed,其所有计分字段冻结以供审计。Supporting 上下文(Base Task、Compliance/Incumbency、People Operations、Research Execution)和 Generic 上下文(Identity、Governance)消费 IMP 做闸门——晋升查 3 年滚动阈值,S 级事故冻结晋升,连续两季不达标触发 PIP。

关键陷阱 append-only 是服务层ImpactLedgerService)强制的,不是 MongoDB schema 约束。直接调 DAO 的 update_one/delete_one 会绕过守卫——所有写入必须走服务层。修正通过反向调整记录(reverse adjustment)实现,绝不更新。

6c · 两个模型如何共存

二者占据同一 FastAPI + Celery 部署的不同层,只在一个接缝相遇:Agent 把 Apodex 当工具调用。运行时模型是通用执行底座,对 HR 语义一无所知;它的词汇(Session/Turn/Agent/Tool/Skill/Middleware/WaitTask/Flow/Checkpoint/LifecycleEvent)描述任何对话「怎么跑」。Apodex 模型是「某个工具做什么」。Apodex 以两种方式暴露:(1) apodex_action Agent 工具——ApodexActionMiddleware{object, action, params} 派发给应用服务;(2) REST /api/apodex/*。两条路径中,运行时都从 Session 上下文注入 userId/orgId,派发器自动注入 xxxBy 审计字段。两模型不共享聚合边界:运行时的 Session/Agent 状态在 Postgres checkpoint 与 Redis;Apodex 聚合在自己的 Mongo collection,受服务层不变式约束(append-only、幂等、type×sourceType 正交、orgId 租户隔离)。唯一的概念重叠是 orgId——运行时注入,Apodex 的 BaseDAO 在每次查询/插入时防御性强制。把运行时当作宿主平台,Apodex 当作只能经服务层门面访问的客体子系统

§ 05

核心子系统逐一详解

十个子系统,每个含职责、关键模块、入口、依赖、边界(owns / doesNotOwn)与给新人的提醒。点击展开。

§ 06

模块边界速查表

每个模块拥有什么、不拥有什么、与谁对话——划清职责边界,避免越权耦合。

模块owns(拥有)doesNotOwn(不拥有)talksTo(对话对象)
View Layer web/src/viewHTTP 路由、Pydantic 解析、SSE/JSON 序列化、Bearer 解析(Apodex)、/api 聚合LLM 推理、业务逻辑、持久化、任务调度Controllers、中间件 Session
Middleware & 横切 middleware liblifespan DB 初始化、Session+trace_id、ELK、http/anthropic patch、Redis 池、event_outbox、env/Apolloendpoint 业务处理器、Agent 推理、DAO 查询全进程;PG/Mongo/Milvus/ES/Redis 初始化;TankaServerAPI 唤醒
Tanka Agent Runtime controller/tanka/flow回合生命周期、langgraph StateGraph、Phase A-G 栈、HITL/权限中断、flow 会话、turn summary、压缩、StreamAdaptersuper_agent_v2 内部、网关路由、原始消息摄入、Celery 任务体super_agent_v2、PG checkpointer、Skills、Search、TankaServerAPI、Redis
Skills & MCP skill_discovery + tanka_platform技能发现/生命周期/市场、RouteRegistry + tanka_api、Confirmation/ParamSpec、字段解析、MCP 客户端、技能 SSELLM 模型节点、HITL 中断机制、MongoBackend 存储、OAuth 绑定MongoBackend、PG skill_metadata、Mongo 技能集合、第三方 MCP、TenantRegistry
Search/Retrieval/Tenant search_tools + tenant5 节点检索图、tanka+web 检索、语义重排、租户识别链、TenantRegistry YAMLmemory_cellular 内部、Serper、topic 合成、Celery 调度memory_cellular(ES/Milvus)、agentic_client、Serper、Gemini、output_chat
Business Output insight/output_chat + flow/pitch_deck制品编排、类型图、v4 SubAgent 生成器、WriteCraft、Plan/RelevantContent 持久化、StreamWriter核心 WorkAgent、super_agent_v2、检索内部、checkpointingsuper_agent_v2、MongoDB、S3、Browserless、Gemini/Claude
Apodex controller/apodex23 聚合、append-only IMP 账本、ImpactLedgerService、17 应用服务、ActionMiddleware、orgId 隔离、幂等IM.employee/department 源实体(只读 FK)、Celery 编排、auth 校验、通知投递tanka_ai_toolkit.db.mongodb(AI Memory)、employee_dao、Agent(apodex_action)、REST
Relay + WaitTask controller/relay + celery_app/wait_taskRelay 流式、WaitTask 创建/仓储/调度、Kafka 消费派发、触发评估、唤醒、outbox 发布WatchExecutor 外部服务、用户上下文解析、langgraph checkpointer、TankaServerAPI APIKafka、Redis db7/db0、MongoDB、RedBeat、TankaServerAPI 唤醒、super_agent_v2
Celery + Operations celery_app + admin + testsCelery/Beat/RedBeat 配置、双队列 worker、核心任务、admin 面板/CLI、pytestweb_api 路由、LLM eval 内部、业务 Agent、消息存储Redis broker、MongoDB、ES(work_request)、Kafka 消费者、Langfuse
Persistence/DAO apodex/daos + tanka_ai_toolkitBaseDAO orgId 强制、类型化 CRUD、ImpRecordDAO append-only、各 Repository、连接助手服务层不变式、业务编排、Agent 运行时PG、MongoDB×5、Milvus、ES、Redis
§ 07

数据与存储

七类存储各司其职:关系型放历史与 checkpoint,文档型分 5 shard,向量、全文、缓存、事件源各就各位。

存储承载内容
PostgreSQL(读写分离)聊天历史(chat_history,SQLModel ORM);langgraph checkpointAsyncPostgresSaver,键 thread_id=room_id,存 messages/grants/action_log/pending interrupts);PG 加密字段用 pg.encryption_key 口令;PostgreSQL skill_metadata 镜像
MongoDB(5 shard)Habitat:用户/房间/域数据;AI Memory(default):AI 记忆/brain、flow_sessions、apodex 全部集合(imp_records 等 26 个 collection)、Plan/OutputData/preferences、技能集合;WorkbenchThirdPartyTankaLink
Milvus向量 embedding(语义检索;vote/calendar searcher 的 a_retrieve_docs
Elasticsearch全文索引 + memory_cellular(BM25/关键词)后端;ELK 日志(trace_id 关联);work_request_* 监控索引
Redis(多 db)db3 = outbox/flow_session;db2 = watcher;db7 = wait_task(wait:task:*/wait:session:*/wait:events:*,7d/24h TTL);db4 = business;celery broker + result backend;RedBeat schedule(redbeat:{task})+ redbeat:lock;终止信号(300s TTL)
Kafkaim_message / vote / arena 等 topic,事件源 → 单实例消费者
S3制品图片(路演 slide、原型线框)
SQLitechat-analysis 工具上传的 LangChain 对话 JSON(仅该调试工具)
§ 08

横切关注点

贯穿全进程的基础设施约定:配置、日志、可观测、事件、幂等、多租户、流式、monkey patch、分布式调度。

配置 / Apollo
EnvProxy 惰性读 YAML(web/config/*_config.yaml),web 与 celery 共享,支持 Apollo 守护进程热更新;safe_read_cfg 带默认值防缺段崩溃。web 用 config_env 选环境,celery 用 CELERY_CONFIG 切 test。
日志 / ELK
register.py 每请求写 ELK 记录(id/trace_id/function/input_params/nlp_name='a025_tanka_memory_output');多进程用 TankaQueueHandler;polling 路径静默。trace_id 是贯穿 web/worker/beat 的唯一关联键,经 before_task_publish 信号传到 Celery 任务 header。
Langfuse
LLM 可观测/追踪(聊天上下文核验,见 docs/verify_chat_context_via_langfuse.md)。
事件 outbox
web/src/lib/event_outbox,事务性事件日志(Redis 主 + Mongo 兜底的 CompositeEventBus),publish/peek/ack 语义,30 天 TTL;是任务完成的真相源,WakeupCaller HTTP 仅 best-effort。
幂等性
Apodex ImpRecord 与 EventOutbox 用幂等键 + append-only 语义;修正为反向调整、绝不更新;ImpRecord 幂等键格式 sourceType:sourceId:action,唯一约束防重放重复计分。
多租户 orgId
BaseDAO 在每次 Mongo 查询自动注入/强制 orgId(结构性防护),insert 时静默覆盖;运行时从 Session 上下文注入;TenantRegistry tag 驱动技能可见性与场景路由。
SSE 流式 + 心跳
stream_chat_with_heartbeat 每 30s 发 HEARTBEAT(120 分钟超时),覆盖 Tanka/Relay/insights;TankaStreamAdapter 归一化 StreamData 到线协议。
import 期 monkey patch
http_monkey_patch(按 hostname 白名单记录出站 HTTP)、langchain_anthropic_patch(多 SystemMessage:第 2+ 个转 HumanMessage);DEBUG 在 FastAPI 实例化前设置。
RedBeat 分布式调度
Redis 后端 + redbeat:lock(30s/150s 调优)做多实例故障切换;单实例 Kafka 消费者必需(内存防抖不可分布)。
§ 09

通用语言术语表

团队的 ubiquitous language。读懂这些词,就读懂了代码里的命名与边界。

§ 10

新人上手路线图

从「跑起来」到「会改东西」,分阶段推进。文档约定见 docs/MODULE_LAYOUT.md:每个模块有 README + design + specs 三件套。

阶段 0跑起来约 1 小时
  1. docs/MODULE_LAYOUT.md 理解文档约定;读 supervisord.local.conf 看本地三进程定义。
  2. 配置:确认 web/config/test_config.yaml 存在,config_env 默认 test;celery 侧确认 CELERY_CONFIGcelery_config_test.py
  3. 启动顺序:web_api(3080)→ celery_beat → celery_worker →(如需事件)kafka_consumer。
  4. 冒烟:curl localhost:3080/api/health 应返回 200;按 onboardingNotes 的 Debugging Checklist 验 Redis/Mongo 连通性。
注意super_agent/ super_agent_v2/ 软链接在本机失效(指向 /Users/admin/...)。本地无法读其实现,相关行为请在部署环境验证。
阶段 1理解主干 · 按顺序读约 2 小时
  1. web/main.py — 应用工厂与引导链(DEBUG → set_env → Apollo → lifespan → 路由 → patch)。
  2. web/src/middleware/__init__.py + register.py — 生命周期 DB 初始化、Session/trace_id、ELK。
  3. web/src/view/__init__.pyweb/src/view/tanka/flow.py — 路由聚合与主流式入口。
  4. docs/tool_call_lifecycle/overview.md(最重要) — Phase A-G 全流程。
  5. agent.py:先看 stream_chat_tanka()(2749+),再看 create_synergy_ai_agent()(1956-2303),重点读 1984-1989 的中间件逆序注释。
  6. docs/permission/README.md(always-allow 三级模型)+ docs/turn_summary_middleware/01_overview_and_motivation.md
阶段 2分头深入 · 按你接到的方向选

技能 / 工具

docs/skill-api.md(短/长 ID 二元性、SSE 契约)→ docs/skill-discovery-integration-guide.mdroute_registry.pyskill_system_middleware.py

检索

search_tools/graph.py(图结构)→ schema.pynodes/query_rewrite_and_route.pynodes/evaluate_results.pytenant/identification.py

业务产出

insight/output_chat/flow.py(800+ 行,按 TopicTemplateCode 路由)→ 对应 memory_result/{type}/graph.pynodes/prompts/

Apodex

docs/ddd/strategic-design.mdservices/impact_ledger_service.pymodels/imp_record.py + models/version.pymiddleware/apodex_action/middleware.py。Apodex 的三层心智模型(DB 存事实 / Skill 存规则 / Agent 经服务执行)务必先建立。

异步 / 调度

docs/watcher_api_integration.mddocs/wait_task_e2e_test_plan.mdrelay/wait_task/service.pykafka_consumer/dispatcher.pywait_task/trigger.pywait_task/tasks/common.py

阶段 3当 X 坏了去哪看
现象排查起点
某条用户消息没正确进 Agentflow.pyorganize_messages(附件/memo/room 处理)→ messages_to_send
某个 tool_call 行为异常Agent 循环 → aafter_model 四中间件(B.1-4)→ state 变更;检查 append 逆序
HITL 卡不弹 / 恢复失败permission_middleware.pyGraphInterrupt → 卡流 → 前端 resume → Phase E/F;注意决策数量必须匹配
上下文超长 / 漏压缩abefore_model 压缩中间件;确认 Claude/Gemini 选对了 Compression 变体
检索结果差search_tools/prompts/(query_rewrite/strategy/evaluate)+ SearchDebugger 步骤
IMP 计分重复/错误一律走 ImpactLedgerService,检查 idempotencyKey 格式与 type×sourceType;勿直调 DAO
后台任务不唤醒Redis db7 wait:events:{sid};确认 Kafka 消费者单实例;Agent 是否轮询 /work/watcher/events
内存随请求增长(OOM)web/src/lib/redis_pool.py 的 contextvar 捕获问题;确认池在启动期干净上下文预热
跨租户数据串了检查相关 BaseDAO 子类的 orgId 过滤;这是结构性多租户边界
分布式日志关联trace_id 串 web/worker/beat 的 ELK 记录(docs/how_to_read_elk_log.md
阶段 4改东西的约定
  • 加路由:建 Pydantic 模型 → @router.post/get → 调 controller service → 返回 JSONResponse/StreamingResponse;流式用 stream_chat_with_heartbeat 包装。
  • 加 tanka_api 路由:在 route_registry.py 注册(Path/ParamSpec/ConfirmationSpec/handler)→ 写 handler → 加测试;OPTIONS 自动生成文档。
  • 加技能:在 /work_agent/skills/<name>/SKILL.md + 辅助文档创建,SkillFileScanner 自动发现,可见性在 skill_discovery 的 yaml 控制。
  • 加制品类型:① 新 TopicTemplateCode 枚举值 ② 新 memory_result/{type}/graph.py + schema.py + nodes/ + prompts/flow.py 路由加 elif 分支(无插件发现机制)。
  • 加 Apodex 操作:服务层 + DAO 层成对出现,写入绝不绕过服务;ImpRecord 写入必带正确格式幂等键;retro 操作原子化(批量 insert 或全失败)。
  • 测试:pytest markers(@pytest.mark.mechanism 慢 broker 测试、@pytest.mark.e2e@pytest.mark.apodex/policy/action_card);Apodex 测试自动用 mongomock(确认 conftest setdefault('APODEX_USE_REAL_MONGO','0'))。