0%

系统设计:一个面向本地部署的多视觉检测任务系统

对先前工作的优化、思考。

前言

先前为了交差研究生导师的横向,设计了个简单的、部署在单机上的实验室隐患检测系统,见上文,并且训练了一些简单的识别模型挂载,也稳健运行了半年,最近对其进行了面向设备集群的拓展,大大增强了系统的稳健性。到这里也就大致收手了,我也被踢出了横向群,在此做个记录。

开始接触这个项目应该是25年的6月,那时候对老师布置的任务还算是有热情,一开始拿到需求就积极的跑数据、设计训练模型、上机测试。很讽刺的是,因为我的用功,老师成了甩手掌柜,对技术上的事情置若罔闻,对和甲方交接乐此不疲,并且热衷于讨论展示界面这种远离核心的工作。讨论也就罢了,偏偏他很有自己的想法,又偏偏他自己的想法和甲方很多时候不能达成一致,以至于一段时间负责数据看板的同门夹在两个人中间,做的最多的事情就是画设计案。

项目合同签署是30万,分三期交付,距离现在也是半年前的事情了,最终我们一届干活的三个人每人拿到了五千块报酬。说实话,已经超乎我的心里预期了,毕竟做好了一毛不拔的准备,只是暗自替甲方感到不值,这样基本由一个外行学生胡乱搭起来的项目,是否值三十万还有待商榷。并且,对这个项目的未来我也感到悲观,不过以后的事情也与我无关了。

系统设计需求

该系统设计需要满足以下需求:

  • 需要监测学校数十座大楼,上百个实验室,上千个(乃至更多)摄像头拍摄下出现的违规行为(如人员衣着与数量规范性,操作规范性,物品摆放规范性等)。
  • 需要可快速插拔视觉检测模型。
  • 需要保证系统上线的稳定性。

架构总览

设计将系统分为五大平面:

平面 职责 核心组件
控制平面 设备管理、分片调度、热加载 Redis 注册中心、ShardManager、HotReloadManager
采集平面 多协议视频帧采集 CaptureWorker 集群、RTSP/ONVIF/GB28181 插件
消息平面 采集-推理解耦 Kafka(分布式)/ LIFO Queue(单机)
推理平面 多模型并行推理 InferenceWorker、Fan-out/Fan-in 引擎、DetectorRegistry
可观测性平面 指标、健康、日志 Prometheus、HealthAggregator、JSON Logger

控制平面

控制平面的任务是使得多个 CaptureWorker 节点能够自动注册、自动分配摄像头、自动检测故障并完成摄像头迁移。

第一个问题就是,假设有 1000 个摄像头,3 个 Worker 节点,谁负责采集哪些摄像头?某个 Worker 挂了怎么办?

模块文件结构

1
2
3
4
5
6
7
8
app/infra/cluster/
├── __init__.py # 导出所有公共类
├── device_registry.py # 设备注册中心 — 管理摄像头和 Worker 的注册
├── consistent_hash.py # 一致性哈希环 — 纯算法实现,无外部依赖
├── shard_manager.py # 分片管理器 — 基于一致性哈希分配摄像头
├── worker_heartbeat.py # 心跳上报线程 — 后台守护线程
├── worker_watcher.py # 故障检测器 — 监控 Worker 存活状态
└── models.py # 数据模型 — WorkerInfo, CameraInfo 等

设备注册中心(Redis)

为什么选择 Redis 作为注册中心?

在分布式系统中,注册中心是所有节点共享状态的地方。常见选型对比:

方案 优点 缺点 适用场景
Redis 极低延迟(亚毫秒)、原生 TTL 过期、原生分布式锁、部署简单 非强一致性(AP 模型) 中小规模集群(<100 节点)
ZooKeeper 强一致性(CP 模型)、原生 Watcher 运维复杂、Java 生态 大规模集群、强一致性需求
etcd 强一致性、K8s 原生 写入性能不如 Redis K8s 生态
Consul 内置健康检查、服务发现 额外引入组件 微服务架构

选择 Redis 的理由

  1. TTL 自动过期天然适合心跳检测(Key 过期 = Worker 下线)
  2. SETNX / SET NX EX 原生支持分布式锁
  3. 亚毫秒级读写,满足高频心跳上报需求
  4. 对于实验室场景(几十个摄像头、几个 Worker),Redis 的 AP 模型完全够用

数据模型设计思路

在动手写代码之前,先想清楚注册中心需要管理哪些实体、每个实体需要携带什么信息。

Worker 节点信息(WorkerInfo)
每个 CaptureWorker 在注册时需要告诉注册中心自己的身份和能力。我们定义了一个 WorkerInfo 数据类,包含以下字段:

  • worker_id:全局唯一标识,例如 "cw-1"。在容器化部署中通常用 hostname 自动生成。
  • host / port:网络地址,用于健康检查和管理接口。
  • status:生命周期状态,有四种——STARTING(初始化中)、ALIVE(正常工作)、DRAINING(正在优雅关停,不再接受新摄像头)、DEAD(已停止)。
  • capacity:该 Worker 最多能处理多少路摄像头。这取决于 CPU/GPU 资源,默认设为 64。
  • current_load:当前实际处理的摄像头数量。每次心跳时上报,用于后续做负载感知的分片。
  • gpu_info:GPU 型号(如果有的话),用于调度时优先把需要 GPU 推理的任务分给有 GPU 的节点。
  • started_at / last_heartbeat:时间戳,用于监控和排查问题。

这个数据类需要能序列化为 JSON 字符串存入 Redis,也能从 JSON 反序列化回来。

Worker 的生命周期状态机

1
STARTING ──(注册成功)──→ ALIVE ──(收到关停信号)──→ DRAINING ──(关停完成)──→ DEAD

DRAINING 状态很重要:当我们要下线一个 Worker 时,不是直接杀掉进程,而是先把它标记为 DRAINING,让分片器不再给它分配新摄像头,等它处理完当前帧后再真正退出。这样可以避免正在处理中的帧丢失。

摄像头信息(CameraInfo)
每个摄像头的注册信息包含:

  • camera_id / lab_id:来自 PostgreSQL 数据库的主键和所属实验室。
  • rtsp_url:RTSP 流地址,Worker 用这个地址去拉取视频流。
  • protocol:采集协议(rtsp / onvif / gb28181),不同协议的采集方式不同。
  • enabled:是否启用。管理员可以在后台禁用某个摄像头,禁用后分片器不会把它分配给任何 Worker。
  • assigned_worker:当前被分配给哪个 Worker(冗余字段,主要用于查询方便)。

DeviceRegistry 的设计逻辑

DeviceRegistry 是整个控制平面的核心类,它封装了所有与 Redis 交互的逻辑。理解它的设计,关键是理解它要解决的五个问题:

问题一:Worker 怎么注册?怎么知道它还活着?
我们利用 Redis 的 SETEX 命令(设置值的同时设置过期时间)来实现。每个 Worker 启动时,往 Redis 写入一个 Key worker:{id},值是自己的 JSON 信息,同时设置 TTL 为 15 秒。之后每隔 5 秒,Worker 通过心跳刷新这个 Key 的 TTL。如果 Worker 崩溃了,它不再刷新 TTL,15 秒后 Redis 自动删除这个 Key——这就等于宣告了 Worker 的死亡。

这个设计的精妙之处在于,我们不需要主动去删除一个死掉的 Worker,Redis 的 TTL 机制帮我们自动完成了这件事。 这比传统的主动探测方式简单得多。

问题二:怎么知道当前有哪些 Worker 还活着?
查询存活 Worker 的逻辑非常直观:扫描 Redis 中所有以 worker: 开头的 Key,能扫到的就是活着的(因为死掉的 Key 已经被 TTL 自动删除了)。

这里有一个重要的实现细节,我们用 SCAN 命令而不是 KEYS 命令来扫描。KEYS worker:* 虽然简单,但它会一次性遍历 Redis 中的所有 Key,在 Key 数量很大时会阻塞 Redis 主线程,导致其他请求超时。SCAN 是游标式的,每次只返回一小批结果,不会阻塞。

扫描到 Key 列表后,用 MGET 批量获取所有 Key 的值(比逐个 GET 快得多),然后反序列化为 WorkerInfo 对象返回。

问题三:摄像头信息怎么管理?
摄像头的数据源是 PostgreSQL 数据库,但分片器在计算分配时不能每次都去查数据库(太慢了)。所以我们把摄像头信息定期同步到 Redis 中,作为一个缓存副本。

摄像头信息存在两个 Redis 数据结构中:

  • cameras:all(Hash 类型):存储每个摄像头的完整信息,Key 是 camera_id,Value 是 JSON。用于需要查看摄像头详情的场景。
  • cameras:enabled(Set 类型):只存储启用的摄像头 ID 集合。

同步逻辑是,每隔 5 分钟,从 PostgreSQL 读取全量摄像头列表,然后用 Redis Pipeline(事务性批量操作)一次性更新到 Redis。Pipeline 的好处是把几十条 Redis 命令打包成一次网络往返,而且是原子性的——要么全部成功,要么全部失败。

同步时还会处理数据库中已删除但 Redis 中还存在的摄像头,把它们从 Redis 中清理掉,并且从分配表中移除。

问题四:分片结果怎么存储?Worker 怎么知道自己该采集哪些摄像头?
分片结果存在 camera_assignment 这个 Hash 中,Key 是 camera_id,Value 是 worker_id。例如:

1
camera_assignment: {"1": "cw-1", "2": "cw-1", "3": "cw-2", "5": "cw-3"}

每次重分片时,我们用 Pipeline 原子性地完成三步操作:删除旧分配 → 写入新分配 → 递增版本号。这三步在一个 Redis 事务中完成。

每个 Worker 通过轮询 shard_version 来感知分片变化。Worker 本地记住上次看到的版本号,每隔 5 秒去 Redis 读一次 shard_version,如果发现版本号变了,就重新拉取 camera_assignment 中属于自己的那部分。这种轮询版本号的方式比 Pub/Sub 更可靠。

问题五:多个组件同时检测到 Worker 变化怎么办?
假设 Worker-2 宕机了,WorkerWatcher 检测到了,同时另一个监控组件也检测到了,它们都想触发重分片。如果两个重分片操作并发执行,就会出现竞态条件:A 计算出的分配结果被 B 覆盖,或者反过来。

解决方案是分布式互斥锁。在执行重分片之前,必须先获取 lock:reshard 这把锁。锁的实现利用了 Redis 的 SET key value NX EX timeout 命令——只有当 Key 不存在时才能设置成功(NX = Not eXists),同时设置一个超时时间防止死锁(万一持有锁的进程崩溃了,锁会在 30 秒后自动释放)。

获取锁失败意味着有其他组件正在重分片,当前操作直接跳过即可——反正那个正在执行的重分片会处理好一切。

除了上述五个核心问题,注册中心还提供了一个 Pub/Sub 通知机制。每当 Worker 注册或注销时,会往 event:worker_change 频道发布一条消息。这不是必须的(WorkerWatcher 的轮询机制已经能检测到变化),但它能让响应更快——不用等到下一次轮询周期,而是立即收到通知。

Key 模式 Redis 类型 用途 TTL 读写频率
worker:{id} String (JSON) Worker 注册信息 15s 写: 每5s/Worker,读: 重分片时
cameras:all Hash 全量摄像头详情 永久 写: 每5min同步,读: 重分片时
cameras:enabled Set 启用的摄像头 ID 集合 永久 写: 摄像头变更时,读: 重分片时
camera_assignment Hash 摄像头→Worker 映射 永久 写: 重分片时,读: Worker 启动/版本变化
shard_version String 分片版本号 永久 写: 重分片时 INCR,读: Worker 每5s轮询
lock:reshard String 分布式互斥锁 30s 写: 重分片时,读: 重分片时
event:worker_change Pub/Sub Worker 拓扑变更通知 写: Worker 上下线时

一致性哈希分片器

问题背景:为什么需要一致性哈希?

假设有 100 个摄像头和 3 个 Worker,最简单的分配方式是取模

1
2
3
camera_id % 3 = 0 → Worker-0
camera_id % 3 = 1 → Worker-1
camera_id % 3 = 2 → Worker-2

问题:当 Worker-2 宕机,变成 2 个 Worker 时:

1
2
3
4
5
6
7
8
9
# 之前: camera_id % 3
camera 1 → Worker-1, camera 2 → Worker-2, camera 3 → Worker-0
camera 4 → Worker-1, camera 5 → Worker-2, camera 6 → Worker-0
...

# 之后: camera_id % 2
camera 1 → Worker-1, camera 2 → Worker-0, camera 3 → Worker-1
camera 4 → Worker-0, camera 5 → Worker-1, camera 6 → Worker-0
...

几乎所有摄像头都被重新分配了! 这意味着所有 Worker 都要断开当前连接、重新建立 RTSP 连接,造成大面积服务中断。

一致性哈希原理

一致性哈希的核心思想是把 Worker 和摄像头都映射到同一个哈希环(0 ~ 2³²-1)上:
当 Worker-2 宕机时

  • 只有原本分配给 Worker-2 的摄像头需要重新分配(顺时针找到下一个 Worker)
  • Worker-1 和 Worker-3 的摄像头完全不受影响
  • 迁移量 ≈ K/N(K=总摄像头数,N=Worker数),而非 ~100%

虚拟节点:解决数据倾斜

如果只有 3 个物理节点,它们在哈希环上的位置可能很不均匀,导致某个 Worker 分到 70% 的摄像头。

解决方案是,每个物理节点映射 150 个虚拟节点到哈希环上:

2.2.4 一致性哈希环的实现逻辑

一致性哈希环的实现是一个纯算法模块,不依赖 Redis 或任何外部服务,可以独立进行单元测试。它的内部维护三个数据结构:

  1. _ring 字典:哈希值 → 物理节点 ID 的映射。例如 {28374: "worker-1", 91025: "worker-2", ...}
  2. _sorted_keys 有序列表:所有虚拟节点的哈希值,按从小到大排列。这是为了支持二分查找。
  3. _nodes 集合:当前环上所有物理节点的 ID,用于去重和遍历。

添加节点的过程
当调用 add_node("worker-1") 时,算法会为这个物理节点生成 150 个虚拟节点。每个虚拟节点的 Key 是 "worker-1#vn0""worker-1#vn1"、…、"worker-1#vn149"。对每个虚拟节点 Key 做 MD5 哈希,取前 8 个十六进制字符转为整数,得到一个[0, 2³²)范围内的位置。然后把这个位置插入到 _sorted_keys 有序列表中(用 bisect.insort 保持有序),同时在 _ring 字典中记录"这个位置属于 worker-1"。

为什么用 MD5 而不是 Python 内置的 hash() 函数?因为 hash() 在不同 Python 进程中对同一个字符串可能返回不同的值(Python 3.3+ 默认开启了哈希随机化),这会导致不同 Worker 节点计算出不同的分配结果。MD5 是确定性的,同一个输入永远得到同一个输出,而且分布非常均匀。

查找节点的过程
当调用 get_node("camera_42") 时,算法的步骤是:

  1. "camera_42" 做 MD5 哈希,得到一个位置值(比如 57832)
  2. _sorted_keys 有序列表中做二分查找,找到第一个 ≥ 57832 的虚拟节点位置
  3. 如果找到了(比如位置 60001),就查 _ring[60001] 得到物理节点 ID(比如 "worker-2"
  4. 如果 57832 比所有虚拟节点的位置都大(超过了列表末尾),就回绕到列表第一个元素——这就是"环"的含义

二分查找的时间复杂度是 O(log(N×V)),其中 N 是物理节点数,V 是虚拟节点数。对于 3 个 Worker × 150 个虚拟节点 = 450 个元素,二分查找只需要约 9 次比较,非常快。

删除节点的过程
当调用 remove_node("worker-2") 时,算法遍历 worker-2 的 150 个虚拟节点,逐个从 _ring 字典和 _sorted_keys 列表中移除。移除后,原本分配给 worker-2 的摄像头会自动"顺时针滑动"到下一个虚拟节点——这就是一致性哈希的核心特性:只有被删除节点的数据需要迁移,其他节点的数据完全不受影响。

核心查找逻辑的伪代码:

1
2
3
4
5
6
def get_node(self, key: str) -> str:
hash_val = md5(key)[:8] # 哈希到环上的位置
idx = bisect_left(self._sorted_keys, hash_val) # 二分查找
if idx >= len(self._sorted_keys):
idx = 0 # 回绕到环的起点
return self._ring[self._sorted_keys[idx]] # 返回物理节点 ID

ShardManager 的设计逻辑

ShardManager 是一致性哈希环的上层封装,它把纯算法和Redis 存储连接起来。如果说 ConsistentHashRing 是一个计算器,那 ShardManager 就是操作这个计算器的人——它知道从哪里读取输入(Redis 中的存活 Worker 和启用摄像头),计算完后把结果写回哪里(Redis 中的分配表)。

reshard() 的完整执行流程
reshard() 是整个控制平面最核心的操作,它在 Worker 上下线时被触发。整个过程分为 8 步:

第 1 步:获取分布式锁。 调用 Redis 的 SET lock:reshard <token> NX EX 30 命令尝试加锁。如果锁已被其他进程持有(返回失败),说明有人正在重分片,当前操作直接跳过。锁的超时时间设为 30 秒,防止持有者崩溃导致死锁。

第 2 步:读取存活 Worker 列表。 通过 DeviceRegistry.get_alive_workers() 从 Redis 扫描所有 worker:* Key,得到当前存活的 Worker 列表。如果没有任何存活 Worker,清空分配表并返回——没有 Worker 就没法分配。

第 3 步:读取启用的摄像头列表。 通过 DeviceRegistry.get_enabled_camera_ids() 从 Redis 的 cameras:enabled Set 中获取所有启用的摄像头 ID。

第 4 步:构建一致性哈希环。 创建一个新的 ConsistentHashRing 实例,把所有存活 Worker 添加到环上。每个 Worker 会生成 150 个虚拟节点。

第 5 步:计算新的分配。 遍历每个启用的摄像头 ID,调用 ring.get_node(str(cam_id)) 找到它应该归属的 Worker。最终得到一个字典:{camera_id: worker_id}

第 6 步:计算迁移量。 把新分配和旧分配(从 Redis 读取)做对比,统计有多少摄像头换了 Worker。这个指标很重要——每次迁移意味着一个 RTSP 连接的断开和重建,会有短暂的监控空白。一致性哈希的优势就体现在这里:迁移量约为 K/N(K=总摄像头数,N=Worker 数),而不是接近 100%。

第 7 步:原子写入新分配。 通过 Redis Pipeline 在一个事务中完成三个操作:删除旧的 camera_assignment → 写入新的分配 → 递增 shard_version。事务保证了原子性,不会出现"删了旧的但还没写入新的"这种中间状态。

第 8 步:释放锁。finally 块中释放分布式锁,确保即使中间出错也不会死锁。

Worker 如何获取自己的摄像头列表?
每个 Worker 不需要自己去算哈希环,只需要调用 get_cameras_for_worker(worker_id) 从 Redis 的 camera_assignment Hash 中过滤出属于自己的摄像头 ID 列表即可。这是一个纯读操作,多个 Worker 可以并发调用,不需要加锁。

迁移量统计的意义
ShardManager 会在每次重分片后记录迁移量。这个数字可以用来验证一致性哈希是否正常工作:

  • 如果一个 Worker 宕机(3 个变 2 个),迁移量应该约为 K/3 ≈ 33 个(100 个摄像头的情况下)
  • 如果迁移量接近 100,说明哈希环的实现有问题
  • 如果迁移量为 0,说明拓扑没有真正变化(可能是误触发)

容量预览功能
ShardManager 还提供了一个 preview_reshard() 方法,可以在不实际修改分配的情况下,模拟"如果 Worker 列表是这样的,分配会怎样"。这对容量规划很有用——比如在扩容前先预览一下新增一个 Worker 后的负载分布,确认是否均匀。

Worker 心跳与故障转移

心跳机制设计

心跳是分布式系统中检测节点存活的标准方式。核心思想是:

每个 Worker 定期向 Redis 续期自己的注册 Key(刷新 TTL)。如果 Worker 崩溃,无法续期,Key 自动过期 → 被视为死亡。

1
2
3
4
5
6
7
8
9
时间线:
t=0s Worker-1 注册: SETEX worker:cw-1 15 {...} TTL=15s
t=5s Worker-1 心跳: SETEX worker:cw-1 15 {...} TTL 重置为 15s
t=10s Worker-1 心跳: SETEX worker:cw-1 15 {...} TTL 重置为 15s
t=12s Worker-1 崩溃! 不再发送心跳
t=15s (TTL 还剩 10s)
t=20s (TTL 还剩 5s)
t=25s TTL 到期! Redis 自动删除 worker:cw-1
t=25s WorkerWatcher 检测到 worker:cw-1 消失 → 触发 reshard()

WorkerHeartbeat 的设计逻辑

WorkerHeartbeat 是一个后台守护线程,运行在每个 CaptureWorker 进程内部,负责定期向 Redis 续期自己的注册 Key。它的设计有几个关键的工程决策:

为什么用后台线程而不是协程或独立进程?
心跳是一个非常轻量的操作(一次 Redis SETEX 命令,亚毫秒级),但它必须绝对可靠——如果心跳线程挂了,Worker 会被误判为死亡。用后台守护线程(daemon thread)的好处是:

  1. 它和主进程共享同一个进程空间,不需要进程间通信
  2. 设置为 daemon 线程后,主进程退出时它会自动被杀掉,不会变成孤儿线程
  3. 线程比协程更适合这种"定时执行、不需要高并发"的场景——心跳不需要处理成百上千的并发请求,一个线程足矣

为什么用 threading.Event.wait() 而不是 time.sleep()
这是一个很重要的细节。time.sleep(5) 会让线程硬睡 5 秒,在这 5 秒内即使你想停止线程也得等它睡完。而 threading.Event.wait(timeout=5) 也是等 5 秒,但如果在等待期间有人调用了 stop_event.set(),它会立即醒来。这意味着当我们要优雅关停 Worker 时,心跳线程可以在毫秒级内响应停止信号,而不是最多等 5 秒。

心跳循环的三个阶段
心跳线程启动后经历三个阶段:

  1. 初始注册:线程启动时,先调用 register_worker() 把自己注册到 Redis。如果这一步失败了(比如 Redis 暂时不可用),不会直接退出,而是进入循环继续尝试。
  2. 周期续期:每隔 5 秒执行一次续期操作。续期时会做两件事——刷新 TTL和上报当前负载(当前正在处理多少路摄像头)。如果续期发现 Key 已经过期了(返回 False),说明 Worker 曾经被认为死亡过,需要重新做一次完整注册。
  3. 优雅退出:收到停止信号后,主动调用 deregister_worker() 从 Redis 删除自己的 Key。

异常处理策略
心跳循环中的所有操作都被 try-catch 包裹,任何异常都不会导致心跳线程崩溃。这是因为心跳线程一旦崩溃,Worker 就会被误判为死亡,后果很严重。即使 Redis 暂时不可用,心跳线程也会继续运行,等 Redis 恢复后自动恢复心跳。

不过,如果连续失败超过 5 次,会触发一个 CRITICAL 级别的日志告警——这通常意味着 Redis 真的挂了,需要运维介入。

WorkerWatcher 故障检测器的设计逻辑

WorkerWatcher 是控制平面的哨兵,它的唯一职责是,监控 Worker 集合的变化,一旦发现有 Worker 加入或离开,就触发重分片。

检测原理:集合差异比较
WorkerWatcher 的检测逻辑非常简单直观:

  1. 它在内存中维护一个已知 Worker 集合(known_workers
  2. 每隔 5 秒,从 Redis 扫描一次当前存活的 Worker 集合(current_workers
  3. 比较两个集合:
    • 如果 current_workers == known_workers:无变化,什么都不做
    • 如果 current_workers - known_workers 不为空:有新 Worker 加入
    • 如果 known_workers - current_workers 不为空:有 Worker 离开/崩溃
  4. 只要集合发生了变化,就触发 ShardManager.reshard()

这种"轮询 + 集合比较"的方式虽然不如事件驱动那么实时,但胜在简单可靠

启动阶段的特殊处理
WorkerWatcher 启动时有一个特殊的等待阶段:它会阻塞等待,直到至少有一个 Worker 注册到 Redis。这是因为如果没有任何 Worker,做分片是没有意义的。等到第一个 Worker 出现后,立即执行一次初始分片,把所有摄像头分配给这个 Worker。

之后进入正常的监控循环。当第二个、第三个 Worker 陆续加入时,WorkerWatcher 会检测到集合变化,触发重分片,把摄像头重新均匀分配。

为什么先更新 known_workers 再触发重分片?
在代码中,我们先把 known_workers 更新为 current_workers,然后再调用 reshard()。这个顺序很重要:如果先 reshard 再更新 known_workers,那么在 reshard 执行期间(虽然很快,但不是零时间),下一次轮询可能又检测到变化,导致重复触发。先更新集合可以避免这种不必要的重复。

谁来运行 WorkerWatcher?
在我们的架构中,WorkerWatcher 运行在每个 Worker 节点上(作为后台线程)。你可能会问:如果每个 Worker 都在运行 Watcher,岂不是会同时触发多次重分片?答案是不会,因为 reshard() 内部有分布式锁保护,只有一个 Watcher 能成功获取锁并执行重分片,其他的会直接跳过。

故障转移完整时序

Worker 端如何感知分片变化

重分片发生在控制平面(WorkerWatcher + ShardManager),但最终需要每个 Worker 自己去感知"分配变了"并更新自己的摄像头列表。Worker 端的感知机制是版本号轮询

每个 Worker 在内存中记住上次看到的 shard_version。在主循环的每一轮开始时,先去 Redis 读一次 shard_version(一次 GET 命令,亚毫秒级)。如果版本号没变,说明分配没变,直接继续采集。如果版本号变了,说明发生了重分片,需要重新拉取自己的摄像头列表。

重新拉取的过程是:从 Redis 的 camera_assignment Hash 中读取所有条目,过滤出 worker_id == 自己 的摄像头 ID 列表。然后和旧列表做集合差异比较:

  • 新增的摄像头(在新列表中但不在旧列表中):建立 RTSP 连接,开始采集
  • 移除的摄像头(在旧列表中但不在新列表中):断开 RTSP 连接,释放资源
  • 不变的摄像头:什么都不做,继续采集

这种"增量更新"的方式避免了每次重分片都断开所有连接再重连——只有真正发生迁移的摄像头才需要重连。

为什么用轮询而不是 Pub/Sub?
Pub/Sub 更实时,但有一个致命缺陷:如果 Worker 在消息发出的瞬间恰好断线了(比如网络抖动),这条消息就永远丢失了,Worker 永远不知道分配变了。而版本号是持久化在 Redis 中的,不会丢失——即使 Worker 断线了 10 分钟,重连后一读版本号就知道需要更新。

完整启动与关停流程

启动流程(8 步)
从零启动一个 CaptureWorker 节点的控制平面,需要按以下顺序执行:

第 1 步:连接 Redis。 创建 Redis 客户端连接,必须设置 decode_responses=True(让 Redis 返回字符串而不是字节),以及合理的超时参数(socket_connect_timeout=5, socket_timeout=5)。连接后先 ping() 验证连通性。

第 2 步:初始化 DeviceRegistry。 创建注册中心实例,它会自动初始化 shard_version Key(如果不存在的话设为 0)。

第 3 步:同步摄像头数据。 从 PostgreSQL 读取全量摄像头列表,通过 sync_cameras_from_db() 批量写入 Redis。这一步确保 Redis 中的摄像头信息是最新的。

第 4 步:初始化 ShardManager。 创建分片管理器,指定每个物理节点 150 个虚拟节点。

第 5 步:构造 WorkerInfo。 用当前机器的 hostname 生成唯一的 worker_id(在容器化部署中,每个容器的 hostname 天然不同)。

第 6 步:启动心跳线程。 创建 WorkerHeartbeat 并启动。心跳线程会立即向 Redis 注册当前 Worker,然后每 5 秒续期一次。这里还传入了一个 load_provider 回调函数,让心跳线程能动态上报当前负载。

第 7 步:启动 WorkerWatcher。 创建故障检测器并启动。它会等待第一个 Worker 出现,然后执行初始分片,之后进入持续监控循环。

第 8 步:启动 CaptureWorker。 开始实际的视频采集工作。Worker 会立即检查 shard_version,拉取自己的摄像头列表,建立 RTSP 连接并开始采集。

关停流程(3 阶段)
优雅关停的顺序和启动相反,分三个阶段:

  1. 停止采集:先停止 CaptureWorker,让它处理完当前帧后退出。设置 30 秒超时,防止卡死。
  2. 停止心跳:调用 heartbeat.stop(),心跳线程会主动从 Redis 注销当前 Worker。注销后,其他节点的 WorkerWatcher 会检测到拓扑变化,触发重分片,把当前 Worker 的摄像头迁移给其他 Worker。
  3. 停止监控:最后停止 WorkerWatcher。

这个顺序确保了:先停止产生新数据(采集),再通知集群自己要下线(心跳注销触发重分片),最后停止监控。如果顺序反了,比如先停心跳再停采集,就会出现"Worker 已经被认为死亡,但它还在往 Kafka 发数据"的混乱状态。

如何验证一致性哈希的正确性

一致性哈希环是纯算法模块,非常适合用单元测试来验证。以下是需要验证的几个关键属性:

1. 空环安全性:当环上没有任何节点时,查询任何 Key 应该返回 None 而不是抛异常。这是边界条件,必须处理。

2. 单节点全覆盖:当环上只有一个节点时,所有 Key 都应该分配给这个节点。这验证了环的基本查找逻辑是否正确。

3. 负载均衡性:用 150 个虚拟节点、3 个物理节点、10000 个 Key 做测试。理想情况下每个节点分到 ~3333 个 Key。验证每个节点的实际分配量与理想值的偏差不超过 10%。如果偏差过大,说明哈希函数分布不均匀或虚拟节点数量不够。

4. 最小迁移量:这是一致性哈希最核心的特性。测试方法是:先用 3 个节点分配 1000 个 Key,记录每个 Key 的归属;然后移除一个节点,重新查询所有 Key 的归属;统计有多少 Key 换了节点。预期结果是:

  • 迁移量约为 K/N ≈ 333(只有被移除节点的 Key 迁移)
  • 关键验证:原本属于未被移除节点(w1、w3)的 Key,在移除 w2 后应该完全不变。如果有任何一个 w1 或 w3 的 Key 发生了迁移,说明一致性哈希的实现有 bug。

5. 幂等性:重复添加同一个节点不应该产生重复的虚拟节点。验证方法是添加两次同一个节点后,环的大小应该和添加一次相同。

6. 安全删除:删除一个不存在的节点不应该抛异常。这是防御性编程的基本要求。

采集平面 CaptureWorker 集群

在先前版本中,CaptureThread 是单线程生产者,管理所有摄像头的采集:

1
2
3
4
5
6
7
8
9
10
11
12
13
class CaptureThread(threading.Thread):
"""Producer thread: captures frames from all cameras"""

def __init__(self, round_rtsp: int = 600):
self._stop_event = threading.Event() # 线程安全的停止信号
self.round_rtsp = round_rtsp # 最小轮次间隔(秒)
self.executor = ThreadPoolExecutor(max_workers=10) # 并发抓帧

def run(self):
while not self._stop_event.is_set():
self._load_camera_list_with_retry() # 从 DB 加载全量摄像头
self._capture_all_cameras() # 并发抓帧 → 入队
self._interruptible_sleep() # 可中断休眠

这种设计的瓶颈在于,所有摄像头由一个线程管理,受限于单机 CPU 和网络带宽。

3.2 集群化改造:CaptureWorker

在新的架构设计中,每个 CaptureWorker 是一个独立进程/容器,只负责分配给自己的摄像头子集:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class CaptureWorker:
"""Distributed capture worker — manages a shard of cameras"""

def __init__(self, worker_id: str, redis_url: str, kafka_brokers: List[str]):
self._worker_id = worker_id
self._registry = DeviceRegistry(redis_url)
self._shard_mgr = ShardManager(redis_url)
self._broker = KafkaBroker(kafka_brokers, topic="frame-captured")
self._heartbeat = WorkerHeartbeat(worker_id, redis_url)

def run(self):
self._heartbeat.start()
self._registry.register(self._worker_id)

while not self._stop_event.is_set():
# 只获取分配给自己的摄像头
my_cameras = self._shard_mgr.assign_cameras(
self._worker_id, self._registry.get_all_cameras()
)
for camera in my_cameras:
frame_path = self._capture_frame(camera)
if frame_path:
# 上传到 MinIO,发布引用到 Kafka
object_key = self._upload_to_minio(frame_path)
message = PhotoMessage(
camera_id=camera.id,
room_id=camera.lab_id,
photo_path=Path(object_key), # s3://bucket/key
photo_id=str(uuid4()),
timestamp=datetime.utcnow(),
)
self._broker.publish(message)

其中,PhotoMessage.photo_path 从本地磁盘路径改为 MinIO 对象引用s3://bucket/key),使得采集节点和推理节点可以在不同机器上。

消息平面 Kafka 分布式消息总线

抽象契约层:MessageBroker

消息平面的核心设计是面向抽象编程PhotoConsumer 依赖的是 MessageBroker ABC 接口,而非任何具体的队列实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# infra/contracts.py
class MessageBroker(ABC):
"""Message broker abstraction — decouples producer from consumer"""

@abstractmethod
def publish(self, message: PhotoMessage) -> bool: ...

@abstractmethod
def consume(self, timeout: float = 2.0) -> Optional[PhotoMessage]: ...

@abstractmethod
def close(self) -> None: ...

@abstractmethod
def stats(self) -> Dict[str, Any]: ...

双适配器:InMemoryBroker ↔ KafkaBroker

系统同时提供两个适配器,通过 Composition Root 一行代码切换:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# ===== 适配器 1:单机内存实现 =====
class InMemoryBroker(MessageBroker, HealthCheckable):
def __init__(self):
self._queue = GlobalMessageQueue() # LIFO 单例

def publish(self, message: PhotoMessage) -> bool:
return self._queue.put_message(message)

def consume(self, timeout: float = 2.0) -> Optional[PhotoMessage]:
return self._queue.get_message_blocking(timeout=timeout)

def health(self) -> Dict[str, Any]:
stats = self._queue.get_stats()
current = stats.get("current_size", 0)
max_size = self._queue._queue.maxsize
return {
"name": "message_broker",
"type": "in_memory_lifo",
"status": "degraded" if current > max_size * 0.9 else "healthy",
"details": {**stats, "utilization_pct": round(current / max_size * 100, 1)},
}


# ===== 适配器 2:Kafka 分布式实现 =====
class KafkaBroker(MessageBroker, HealthCheckable):
def __init__(self, bootstrap_servers: List[str], topic: str = "frame-captured"):
self._producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda m: json.dumps(m.to_dict()).encode("utf-8"),
acks="all", # 所有副本确认
retries=3, # 自动重试
linger_ms=10, # 批量发送优化
)
self._consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
group_id="inference-workers", # 消费者组 → 自动负载均衡
auto_offset_reset="latest", # 新消费者从最新消息开始
enable_auto_commit=True,
value_deserializer=lambda m: PhotoMessage.from_dict(json.loads(m)),
)

def publish(self, message: PhotoMessage) -> bool:
future = self._producer.send(self._topic, value=message)
return future.get(timeout=5) is not None

def consume(self, timeout: float = 2.0) -> Optional[PhotoMessage]:
records = self._consumer.poll(timeout_ms=int(timeout * 1000), max_records=1)
for tp, messages in records.items():
if messages:
return messages[0].value
return None

零代码切换

1
2
3
4
5
6
7
8
9
# main.py — Composition Root
# 单机模式
broker = InMemoryBroker()

# 分布式模式(仅改这一行)
broker = KafkaBroker(bootstrap_servers=["kafka-1:9092", "kafka-2:9092"])

# PhotoConsumer 代码完全不变
consumer = PhotoConsumer(config_list, broker, persister, metrics)

LIFO 队列与背压控制(单机模式)

在单机模式下,系统使用 LIFO(后进先出)队列而非传统 FIFO,因为在实时视频监控场景中,最新的帧具有最高的信息价值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class GlobalMessageQueue:
"""Thread-safe LIFO queue with backpressure and monitoring"""

def __init__(self, maxsize: int = 10000):
self._queue = LifoQueue(maxsize=maxsize)
self._stats = QueueStats()
self._condition = threading.Condition()

def put_message(self, message: PhotoMessage) -> bool:
if self._queue.full():
self._queue.get_nowait() # 淘汰最旧帧
self._stats.increment_dropped()
self._queue.put_nowait(message)
self._stats.increment_enqueued()
with self._condition:
self._condition.notify() # 唤醒阻塞的消费者
return True

def get_message_blocking(self, timeout: float = 2.0) -> Optional[PhotoMessage]:
with self._condition:
if self._queue.empty():
self._condition.wait(timeout=timeout) # 避免 CPU 空转
if not self._queue.empty():
msg = self._queue.get_nowait()
self._stats.increment_dequeued()
return msg
return None

背压策略:生产者永不阻塞(队列满时淘汰最旧帧),消费者始终处理最新帧,丢弃可追踪(QueueStats.dropped)。

Kafka Topic 设计与消费者组

Topic 分区数 用途 消费者组
frame-captured 与 CaptureWorker 数量对齐 采集帧消息 inference-workers
detection-result 3 检测结果 result-persisters
notification 1 告警通知 notifiers

推理平面 — 并行推理引擎集群

扇出/扇入并行推理引擎

推理引擎是系统的核心,采用 Fan-out/Fan-in 模式将同一帧同时分发至 N 个检测模型并行推理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class PhotoConsumer(threading.Thread):
"""Inference engine with fan-out/fan-in parallel execution"""

def __init__(self, config_list, broker: MessageBroker,
persister: ResultPersister, metrics: Optional[MetricsCollector],
max_workers: int = 4):
super().__init__(daemon=True)
self._stop_event = threading.Event()
self.broker = broker # 抽象接口 → InMemory 或 Kafka
self.persister = persister # 抽象接口 → DB 或 Kafka
self.metrics = metrics # 抽象接口 → InMemory 或 Prometheus
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.modellist = [IdentificationModel(config) for config in config_list]

def _process_message(self, message: PhotoMessage):
"""Fan-out → parallel inference → Fan-in → selective persist"""
if not message.photo_path.exists():
self.metrics.record_frame_event("image_missing")
return

# ===== Fan-out: submit all models in parallel =====
futures = {
self.executor.submit(self._run_single_model, model, message): model.name
for model in self.modellist
}

# ===== Fan-in: collect results as they complete =====
for future in as_completed(futures):
model_name = futures[future]
try:
output = future.result()
if output is not None:
self.persister.persist(output) # 只持久化异常结果
except Exception as e:
logger.exception(f"Model {model_name} inference failed: {e}")

self.metrics.record_frame_event("processed")
self._safe_remove_file(message.photo_path)

def _run_single_model(self, model, message):
"""Single model inference with automatic metrics collection"""
start = time.perf_counter()
success = True
try:
photo = model.preprocess_image(message)
output = model.predict(message, photo)
message.mark_processed(model.name)
return output
except Exception:
success = False
raise
finally:
self.metrics.record_inference(model.name, time.perf_counter() - start, success)

检测器注册表与自动发现

模型注册系统将 注册表模式 + 装饰器 + pkgutil 自动发现 三者结合,实现零侵入扩展:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# === 注册表 ===
class DetectorRegistry:
_registry: Dict[str, type] = {}

@classmethod
def register(cls, name: str, detector_cls: type):
if name in cls._registry:
raise ValueError(f"Detector '{name}' is already registered")
cls._registry[name] = detector_cls

@classmethod
def get(cls, name: str) -> Optional[type]:
return cls._registry.get(name)

# === 装饰器 ===
def register_detector(name: str):
def decorator(cls):
DetectorRegistry.register(name, cls)
cls._registered_name = name
return cls
return decorator

# === 自动发现 ===
def _auto_discover_detectors():
package_dir = Path(__file__).resolve().parent
for module_info in pkgutil.iter_modules([str(package_dir)]):
if module_info.name.startswith("_") or module_info.name == "base_detector":
continue
try:
importlib.import_module(f".{module_info.name}", package=__name__)
except ImportError as e:
_logger.warning(f"Could not import detector module '{module_info.name}': {e}")

_auto_discover_detectors() # 包导入时自动执行

添加新检测器只需三步(无需修改任何已有代码):

  • 创建检测器类 + @register_detector 装饰器(functionmodel/xxx_detector.py
  • 创建 JSON 管线配置(config/xxxdetecter.json
  • config.yaml 中注册(model: { xxxdetecter: "xxxdetecter.json" }

声明式 YAML 模板引擎

引入声明式检测器模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# config/templates/fire_detector.yaml
kind: YoloDetector # 模板类型
metadata:
name: "firedetecter"
display_name: "火焰检测器"
alarm_category: "fire"
event_id: 4

model:
type: "pt"
weight_path: "core/modelloader/model/fire_best.pt"
device: "auto" # auto → 有 GPU 用 GPU,否则 CPU

inference:
confidence_threshold: 0.6
iou_threshold: 0.45
target_classes: [0, 1]
class_names: ["fire", "smoke"]

rules:
- condition: "any_class_detected"
params: { classes: ["fire", "smoke"] }
action: "warning"

annotation:
draw_boxes: true
box_color: [255, 0, 0]
label_format: "{class_name} {confidence:.0%}"

模板引擎架构

1
2
3
4
新增模块: app/core/modelloader/template_engine/
├── template_loader.py # 解析 YAML → 生成 predict 函数
├── builtin_templates.py # 预置模板: YoloDetector, ClassificationDetector
└── rule_engine.py # 规则引擎: 条件判断 + 动作触发

热插拔与文件系统监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class HotReloadManager:
"""Watch template directory → auto-register new detectors without restart"""

def __init__(self, registry: DetectorRegistry, consumer: PhotoConsumer):
self._registry = registry
self._consumer = consumer
self._watcher = FileSystemWatcher("core/modelloader/config/templates/")

def on_config_added(self, yaml_path: str):
"""New YAML template detected → parse → register → inject into consumer"""
detector_cls = TemplateLoader.load(yaml_path)
self._registry.register(detector_cls.name, detector_cls)
model = IdentificationModel.from_template(yaml_path)
self._consumer.add_model(model) # Thread-safe: Copy-on-Write
logger.info(f"Hot-loaded detector: {detector_cls.name}")

def on_config_modified(self, yaml_path: str):
"""Existing template modified → reload model weights"""
...

def on_config_removed(self, yaml_path: str):
"""Template removed → unregister detector"""
...

弹性伸缩策略

策略 触发条件 动作
Scale-out Kafka consumer lag > 1000 持续 5 分钟 启动新 InferenceWorker 容器
Scale-in consumer lag < 100 持续 10 分钟 缩减 Worker 副本数
GPU 亲和性 模型需要 GPU 调度到带 nvidia.com/gpu 标签的节点
1
2
3
4
5
6
7
8
9
10
11
12
13
class AutoScaler:
"""Monitor Kafka consumer lag and scale inference workers"""

def check_and_scale(self):
lag = self._get_consumer_lag("inference-workers", "frame-captured")

if lag > self.scale_out_threshold and self._cooldown_expired():
self._scale_out()
logger.info(f"Scaled out: lag={lag}, new_replicas={self._current_replicas}")

elif lag < self.scale_in_threshold and self._sustained_low(minutes=10):
self._scale_in()
logger.info(f"Scaled in: lag={lag}, new_replicas={self._current_replicas}")

存储平面

抽象契约层:ResultPersister

1
2
3
4
5
6
7
8
class ResultPersister(ABC):
"""Result persistence abstraction — decouples inference from storage"""

@abstractmethod
def persist(self, result: DetectionResult) -> Optional[str]: ...

@abstractmethod
def close(self) -> None: ...

当前适配器DBResultPersister(延迟初始化,避免启动时建立数据库连接)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class DBResultPersister(ResultPersister, HealthCheckable):
def __init__(self):
self._service = None # Lazy initialization

def _ensure_service(self):
if self._service is None:
from data.database.service.ResultsTempSaverService import DataSaverService
self._service = DataSaverService()

def persist(self, result: DetectionResult) -> Optional[str]:
self._ensure_service() # 第一次写入时才建立连接
ref_id = self._service.save_detection_result(result)
self._persist_count += 1
return ref_id

冷热分离存储策略

用途 存储特性 生命周期
t_cameras 摄像头设备注册表 低频更新 永久
t_results_temp 当日检测结果 热存储,高频写入 每日归档
t_results 历史归档 冷存储,低频查询 永久

归档流程:每天 03:00–05:00 归档窗口,ResultsMigrationServicet_results_tempt_results 迁移后清空临时表,避免单表数据量无限增长。

MinIO 对象存储

用途 Bucket 命名规则
标注图像 detection-results {date}/{camera_id}/{photo_id}.jpg
模型权重 model-weights {model_name}/{version}/best.pt

在分布式模式下,PhotoMessage.photo_path 从本地路径改为 MinIO 对象引用(s3://bucket/key),使得采集节点和推理节点可以在不同机器上。

可观测性平面

抽象契约层:MetricsCollector

1
2
3
4
5
6
7
8
9
10
11
class MetricsCollector(ABC):
"""Metrics collection abstraction — Prometheus-compatible"""

@abstractmethod
def record_inference(self, model_name: str, duration_seconds: float, success: bool) -> None: ...

@abstractmethod
def record_frame_event(self, event_type: str) -> None: ...

@abstractmethod
def snapshot(self) -> Dict[str, Any]: ...

适配器PrometheusMetrics(直接暴露 /metrics 端点)

采集的指标

指标 类型 说明
inference_total Counter 每个模型的推理调用次数
inference_errors_total Counter 每个模型的推理错误次数
inference_duration_seconds Histogram 每个模型的推理延迟分布
frame_events_total Counter 帧事件统计(processed / missing / dropped)
queue_utilization_pct Gauge 队列利用率百分比

健康检查聚合器

采用 最差组件决定整体状态(worst-component-wins) 策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class HealthAggregator:
_STATUS_PRIORITY = {"healthy": 0, "degraded": 1, "unhealthy": 2}

def check(self) -> Dict[str, Any]:
worst_status = "healthy"
component_reports = []
for component in self._components:
report = component.health()
comp_status = report.get("status", "unknown")
if self._STATUS_PRIORITY.get(comp_status, 99) > \
self._STATUS_PRIORITY.get(worst_status, 0):
worst_status = comp_status
component_reports.append(report)
return {
"status": worst_status,
"version": __version__,
"uptime_seconds": round(time.monotonic() - self._start_time, 1),
"components": component_reports,
}

降级阈值

组件 条件 状态
Broker 队列利用率 > 90% degraded
Persister 错误计数 ≥ 10 unhealthy
Metrics 推理错误率 > 10% degraded
Metrics 推理错误率 > 50% unhealthy

输出示例

1
2
3
4
5
6
7
8
9
10
{
"status": "healthy",
"version": "1.0.0",
"uptime_seconds": 3600.5,
"components": [
{"name": "message_broker", "status": "healthy", "details": {"utilization_pct": 12.3}},
{"name": "result_persister", "status": "healthy", "details": {"persisted_total": 1523}},
{"name": "metrics_collector", "status": "healthy", "details": {"models": {...}}}
]
}

Prometheus + Grafana 监控

Grafana Dashboard 面板

面板 数据源 展示内容
推理延迟 P50/P95/P99 Prometheus 各模型推理延迟分位数
帧处理吞吐量 Prometheus 每秒处理帧数
Kafka Consumer Lag Prometheus (JMX) 消费积压量 → 伸缩决策
Worker 存活状态 Redis Exporter 各节点心跳状态
队列利用率 Prometheus 内存队列使用百分比

结构化 JSON 日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class JsonFormatter(logging.Formatter):
"""Structured JSON log formatter for ELK / Loki ingestion"""

def format(self, record):
return json.dumps({
"timestamp": self.formatTime(record),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"camera_id": getattr(record, "camera_id", None),
"model_name": getattr(record, "model_name", None),
"worker_id": getattr(record, "worker_id", None),
"trace_id": getattr(record, "trace_id", None),
})

在容器环境中自动切换为 JSON 格式(通过环境变量 LABSEC_LOG_FORMAT=json),便于 ELK / Loki 等日志聚合系统解析。

容器化

多阶段 Dockerfile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Stage 1: Builder — 安装依赖(含编译工具链)
FROM python:3.11-slim AS builder
WORKDIR /build
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt

# Stage 2: Runtime — 最小化镜像(仅运行时依赖)
FROM nvidia/cuda:11.8.0-runtime-ubuntu22.04
RUN apt-get update && apt-get install -y python3.11 python3-pip ffmpeg \
&& rm -rf /var/lib/apt/lists/*

WORKDIR /app
COPY --from=builder /install /usr/local
COPY app/ .

HEALTHCHECK --interval=30s --timeout=10s --retries=3 \
CMD python3 -c "import urllib.request; urllib.request.urlopen('http://localhost:9527/api/status')"

ENTRYPOINT ["python3", "main.py"]

Builder 阶段的编译工具链(gcc、pip wheel 缓存)不进入最终镜像,有利于缩小体积。

docker-compose

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
version: "3.8"

services:
# ===== 基础设施 =====
postgres:
image: postgres:17
ports: ["5434:5432"]
volumes: [postgres_data:/var/lib/postgresql/data]
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 10s
timeout: 5s
retries: 5

minio:
image: minio/minio
ports: ["9000:9000", "9001:9001"]
volumes: [minio_data:/data]
command: server /data --console-address ":9001"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]

redis:
image: redis:7-alpine
ports: ["6379:6379"]
healthcheck:
test: ["CMD", "redis-cli", "ping"]

kafka:
image: confluentinc/cp-kafka:7.5.0
ports: ["9092:9092"]
environment:
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_NUM_PARTITIONS: 6

# ===== 应用服务 =====
labsecurity:
build: .
depends_on:
postgres: { condition: service_healthy }
minio: { condition: service_healthy }
redis: { condition: service_healthy }
kafka: { condition: service_started }
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
restart_policy:
condition: on-failure
delay: 10s
max_attempts: 5
volumes:
- model_weights:/app/core/modelloader/model
environment:
- LABSEC_DB__HOST=postgres
- LABSEC_MINIO__ENDPOINT=minio:9000
- LABSEC_KAFKA__BOOTSTRAP_SERVERS=kafka:9092
- LABSEC_REDIS__URL=redis://redis:6379

# ===== 监控 =====
prometheus:
image: prom/prometheus
ports: ["9090:9090"]
volumes: [./prometheus.yml:/etc/prometheus/prometheus.yml]

grafana:
image: grafana/grafana
ports: ["3000:3000"]
depends_on: [prometheus]

volumes:
postgres_data:
minio_data:
model_weights:

健康探针与自愈

端点 用途 检查内容 对应 K8s 探针
GET /healthz 进程存活 进程在运行即返回 200 Liveness Probe
GET /readyz 服务就绪 所有组件 healthy 才返回 200 Readiness Probe
GET /api/status 详细状态 完整 HealthAggregator 输出
GET /metrics Prometheus 指标 所有推理/队列/系统指标

三阶段优雅关停

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def shutdown(main_threads, signum=None):
"""Graceful shutdown: stop capture → drain queue → stop consumer"""
sig_name = signal.Signals(signum).name if signum else "manual"
logger.info(f"Shutdown initiated (signal={sig_name})")

stream_thread, consumer = main_threads

# Phase 1: 停止采集(不再产生新消息)
stream_thread.stop() # _stop_event.set() → 立即唤醒
stream_thread.join(timeout=30)

# Phase 2: 排空推理(完成所有在途任务)
consumer.stop() # executor.shutdown(wait=True, cancel_futures=False)
consumer.join(timeout=30)

# Phase 3: 报告 + 清理
logger.info(f"Final metrics: {_metrics.snapshot()}")
logger.info(f"Queue stats: {_broker.stats()}")
_broker.close()
logger.info(f"Health: {_health.check()}")

数据流全链路

总结

本系统的设计,实现了以下的设计原则:

  • 开闭原则 (OCP):新检测器通过文件 + 装饰器 / YAML 模板添加,不修改任何已有代码
  • 单一职责 (SRP) : CaptureWorker 只采集,PhotoConsumer 只推理,Persister 只持久化
  • 依赖倒置 (DIP): Consumer 依赖 MessageBroker ABC,而非 GlobalMessageQueueKafkaConsumer
  • 接口隔离 (ISP) :四个独立契约:Broker / Persister / Metrics / Health
  • 依赖注入 (DI):适配器在 Composition Root 创建,通过构造函数注入

这里的依赖注入还是非常值得一讲的。我们的系统基于依赖倒置原则设计了 MessageBroker / ResultPersister / MetricsCollector 三大抽象契约层,实现了 InMemoryBroker 与 KafkaBroker 双适配器,通过构造函数注入实现从单机内存队列到 Kafka 分布式消息总线的零代码切换。

当时面临的设计问题是,如何让推理引擎在不同的部署环境(单机 / 分布式)下复用同一套代码?

如果 PhotoConsumer 直接依赖 GlobalMessageQueue(具体类),那么切换到 Kafka 时就必须修改消费者代码。这违反了 依赖倒置原则(DIP):高层模块不应该依赖低层模块,两者都应该依赖抽象。

系统在 infra/contracts.py 中定义了四个 ABC 接口,每个接口严格遵循 接口隔离原则(ISP)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# infra/contracts.py

class MessageBroker(ABC):
"""消息代理:负责帧消息的发布与消费"""
@abstractmethod
def publish(self, message: PhotoMessage) -> bool: ...
@abstractmethod
def consume(self, timeout: float = 2.0) -> Optional[PhotoMessage]: ...
@abstractmethod
def close(self) -> None: ...
@abstractmethod
def stats(self) -> Dict[str, Any]: ...

class ResultPersister(ABC):
"""结果持久化器:负责检测结果的存储"""
@abstractmethod
def persist(self, result: DetectionResult) -> Optional[str]: ...
@abstractmethod
def close(self) -> None: ...

class MetricsCollector(ABC):
"""指标采集器:负责推理性能指标的记录"""
@abstractmethod
def record_inference(self, model_name: str, duration_seconds: float, success: bool) -> None: ...
@abstractmethod
def record_frame_event(self, event_type: str) -> None: ...
@abstractmethod
def snapshot(self) -> Dict[str, Any]: ...

class HealthCheckable(ABC):
"""健康检查接口:所有可检查组件的统一协议"""
@abstractmethod
def health(self) -> Dict[str, Any]: ...

main.py 是整个系统中 唯一知道具体实现类的地方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# main.py — Composition Root
def _init_infrastructure():
global _broker, _persister, _metrics, _health

_broker = InMemoryBroker() # ← 具体类,仅此处出现
_persister = DBResultPersister() # ← 具体类,仅此处出现
_metrics = InMemoryMetrics() # ← 具体类,仅此处出现

_health = HealthAggregator()
_health.register(_broker)
_health.register(_persister)
_health.register(_metrics)

# 消费者接收的全部是抽象类型
consumer = start_photo_consumer_concurrent(
broker=_broker, # MessageBroker 接口
persister=_persister, # ResultPersister 接口
metrics=_metrics, # MetricsCollector 接口
)