对先前工作的优化、思考。
前言
先前为了交差研究生导师的横向,设计了个简单的、部署在单机上的实验室隐患检测系统,见上文,并且训练了一些简单的识别模型挂载,也稳健运行了半年,最近对其进行了面向设备集群的拓展,大大增强了系统的稳健性。到这里也就大致收手了,我也被踢出了横向群,在此做个记录。
开始接触这个项目应该是25年的6月,那时候对老师布置的任务还算是有热情,一开始拿到需求就积极的跑数据、设计训练模型、上机测试。很讽刺的是,因为我的用功,老师成了甩手掌柜,对技术上的事情置若罔闻,对和甲方交接乐此不疲,并且热衷于讨论展示界面这种远离核心的工作。讨论也就罢了,偏偏他很有自己的想法,又偏偏他自己的想法和甲方很多时候不能达成一致,以至于一段时间负责数据看板的同门夹在两个人中间,做的最多的事情就是画设计案。
项目合同签署是30万,分三期交付,距离现在也是半年前的事情了,最终我们一届干活的三个人每人拿到了五千块报酬。说实话,已经超乎我的心里预期了,毕竟做好了一毛不拔的准备,只是暗自替甲方感到不值,这样基本由一个外行学生胡乱搭起来的项目,是否值三十万还有待商榷。并且,对这个项目的未来我也感到悲观,不过以后的事情也与我无关了。
系统设计需求
该系统设计需要满足以下需求:
- 需要监测学校数十座大楼,上百个实验室,上千个(乃至更多)摄像头拍摄下出现的违规行为(如人员衣着与数量规范性,操作规范性,物品摆放规范性等)。
- 需要可快速插拔视觉检测模型。
- 需要保证系统上线的稳定性。
架构总览
设计将系统分为五大平面:
| 平面 | 职责 | 核心组件 |
|---|---|---|
| 控制平面 | 设备管理、分片调度、热加载 | Redis 注册中心、ShardManager、HotReloadManager |
| 采集平面 | 多协议视频帧采集 | CaptureWorker 集群、RTSP/ONVIF/GB28181 插件 |
| 消息平面 | 采集-推理解耦 | Kafka(分布式)/ LIFO Queue(单机) |
| 推理平面 | 多模型并行推理 | InferenceWorker、Fan-out/Fan-in 引擎、DetectorRegistry |
| 可观测性平面 | 指标、健康、日志 | Prometheus、HealthAggregator、JSON Logger |
控制平面
控制平面的任务是使得多个 CaptureWorker 节点能够自动注册、自动分配摄像头、自动检测故障并完成摄像头迁移。
第一个问题就是,假设有 1000 个摄像头,3 个 Worker 节点,谁负责采集哪些摄像头?某个 Worker 挂了怎么办?
模块文件结构:
1 | app/infra/cluster/ |
设备注册中心(Redis)
为什么选择 Redis 作为注册中心?
在分布式系统中,注册中心是所有节点共享状态的地方。常见选型对比:
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| Redis ✅ | 极低延迟(亚毫秒)、原生 TTL 过期、原生分布式锁、部署简单 | 非强一致性(AP 模型) | 中小规模集群(<100 节点) |
| ZooKeeper | 强一致性(CP 模型)、原生 Watcher | 运维复杂、Java 生态 | 大规模集群、强一致性需求 |
| etcd | 强一致性、K8s 原生 | 写入性能不如 Redis | K8s 生态 |
| Consul | 内置健康检查、服务发现 | 额外引入组件 | 微服务架构 |
选择 Redis 的理由:
- TTL 自动过期天然适合心跳检测(Key 过期 = Worker 下线)
SETNX/SET NX EX原生支持分布式锁- 亚毫秒级读写,满足高频心跳上报需求
- 对于实验室场景(几十个摄像头、几个 Worker),Redis 的 AP 模型完全够用
数据模型设计思路
在动手写代码之前,先想清楚注册中心需要管理哪些实体、每个实体需要携带什么信息。
Worker 节点信息(WorkerInfo)
每个 CaptureWorker 在注册时需要告诉注册中心自己的身份和能力。我们定义了一个 WorkerInfo 数据类,包含以下字段:
worker_id:全局唯一标识,例如"cw-1"。在容器化部署中通常用 hostname 自动生成。host/port:网络地址,用于健康检查和管理接口。status:生命周期状态,有四种——STARTING(初始化中)、ALIVE(正常工作)、DRAINING(正在优雅关停,不再接受新摄像头)、DEAD(已停止)。capacity:该 Worker 最多能处理多少路摄像头。这取决于 CPU/GPU 资源,默认设为 64。current_load:当前实际处理的摄像头数量。每次心跳时上报,用于后续做负载感知的分片。gpu_info:GPU 型号(如果有的话),用于调度时优先把需要 GPU 推理的任务分给有 GPU 的节点。started_at/last_heartbeat:时间戳,用于监控和排查问题。
这个数据类需要能序列化为 JSON 字符串存入 Redis,也能从 JSON 反序列化回来。
Worker 的生命周期状态机:
1 | STARTING ──(注册成功)──→ ALIVE ──(收到关停信号)──→ DRAINING ──(关停完成)──→ DEAD |
DRAINING 状态很重要:当我们要下线一个 Worker 时,不是直接杀掉进程,而是先把它标记为 DRAINING,让分片器不再给它分配新摄像头,等它处理完当前帧后再真正退出。这样可以避免正在处理中的帧丢失。
摄像头信息(CameraInfo)
每个摄像头的注册信息包含:
camera_id/lab_id:来自 PostgreSQL 数据库的主键和所属实验室。rtsp_url:RTSP 流地址,Worker 用这个地址去拉取视频流。protocol:采集协议(rtsp / onvif / gb28181),不同协议的采集方式不同。enabled:是否启用。管理员可以在后台禁用某个摄像头,禁用后分片器不会把它分配给任何 Worker。assigned_worker:当前被分配给哪个 Worker(冗余字段,主要用于查询方便)。
DeviceRegistry 的设计逻辑
DeviceRegistry 是整个控制平面的核心类,它封装了所有与 Redis 交互的逻辑。理解它的设计,关键是理解它要解决的五个问题:
问题一:Worker 怎么注册?怎么知道它还活着?
我们利用 Redis 的 SETEX 命令(设置值的同时设置过期时间)来实现。每个 Worker 启动时,往 Redis 写入一个 Key worker:{id},值是自己的 JSON 信息,同时设置 TTL 为 15 秒。之后每隔 5 秒,Worker 通过心跳刷新这个 Key 的 TTL。如果 Worker 崩溃了,它不再刷新 TTL,15 秒后 Redis 自动删除这个 Key——这就等于宣告了 Worker 的死亡。
这个设计的精妙之处在于,我们不需要主动去删除一个死掉的 Worker,Redis 的 TTL 机制帮我们自动完成了这件事。 这比传统的主动探测方式简单得多。
问题二:怎么知道当前有哪些 Worker 还活着?
查询存活 Worker 的逻辑非常直观:扫描 Redis 中所有以 worker: 开头的 Key,能扫到的就是活着的(因为死掉的 Key 已经被 TTL 自动删除了)。
这里有一个重要的实现细节,我们用 SCAN 命令而不是 KEYS 命令来扫描。KEYS worker:* 虽然简单,但它会一次性遍历 Redis 中的所有 Key,在 Key 数量很大时会阻塞 Redis 主线程,导致其他请求超时。SCAN 是游标式的,每次只返回一小批结果,不会阻塞。
扫描到 Key 列表后,用 MGET 批量获取所有 Key 的值(比逐个 GET 快得多),然后反序列化为 WorkerInfo 对象返回。
问题三:摄像头信息怎么管理?
摄像头的数据源是 PostgreSQL 数据库,但分片器在计算分配时不能每次都去查数据库(太慢了)。所以我们把摄像头信息定期同步到 Redis 中,作为一个缓存副本。
摄像头信息存在两个 Redis 数据结构中:
cameras:all(Hash 类型):存储每个摄像头的完整信息,Key 是 camera_id,Value 是 JSON。用于需要查看摄像头详情的场景。cameras:enabled(Set 类型):只存储启用的摄像头 ID 集合。
同步逻辑是,每隔 5 分钟,从 PostgreSQL 读取全量摄像头列表,然后用 Redis Pipeline(事务性批量操作)一次性更新到 Redis。Pipeline 的好处是把几十条 Redis 命令打包成一次网络往返,而且是原子性的——要么全部成功,要么全部失败。
同步时还会处理数据库中已删除但 Redis 中还存在的摄像头,把它们从 Redis 中清理掉,并且从分配表中移除。
问题四:分片结果怎么存储?Worker 怎么知道自己该采集哪些摄像头?
分片结果存在 camera_assignment 这个 Hash 中,Key 是 camera_id,Value 是 worker_id。例如:
1 | camera_assignment: {"1": "cw-1", "2": "cw-1", "3": "cw-2", "5": "cw-3"} |
每次重分片时,我们用 Pipeline 原子性地完成三步操作:删除旧分配 → 写入新分配 → 递增版本号。这三步在一个 Redis 事务中完成。
每个 Worker 通过轮询 shard_version 来感知分片变化。Worker 本地记住上次看到的版本号,每隔 5 秒去 Redis 读一次 shard_version,如果发现版本号变了,就重新拉取 camera_assignment 中属于自己的那部分。这种轮询版本号的方式比 Pub/Sub 更可靠。
问题五:多个组件同时检测到 Worker 变化怎么办?
假设 Worker-2 宕机了,WorkerWatcher 检测到了,同时另一个监控组件也检测到了,它们都想触发重分片。如果两个重分片操作并发执行,就会出现竞态条件:A 计算出的分配结果被 B 覆盖,或者反过来。
解决方案是分布式互斥锁。在执行重分片之前,必须先获取 lock:reshard 这把锁。锁的实现利用了 Redis 的 SET key value NX EX timeout 命令——只有当 Key 不存在时才能设置成功(NX = Not eXists),同时设置一个超时时间防止死锁(万一持有锁的进程崩溃了,锁会在 30 秒后自动释放)。
获取锁失败意味着有其他组件正在重分片,当前操作直接跳过即可——反正那个正在执行的重分片会处理好一切。
除了上述五个核心问题,注册中心还提供了一个 Pub/Sub 通知机制。每当 Worker 注册或注销时,会往 event:worker_change 频道发布一条消息。这不是必须的(WorkerWatcher 的轮询机制已经能检测到变化),但它能让响应更快——不用等到下一次轮询周期,而是立即收到通知。
| Key 模式 | Redis 类型 | 用途 | TTL | 读写频率 |
|---|---|---|---|---|
worker:{id} |
String (JSON) | Worker 注册信息 | 15s | 写: 每5s/Worker,读: 重分片时 |
cameras:all |
Hash | 全量摄像头详情 | 永久 | 写: 每5min同步,读: 重分片时 |
cameras:enabled |
Set | 启用的摄像头 ID 集合 | 永久 | 写: 摄像头变更时,读: 重分片时 |
camera_assignment |
Hash | 摄像头→Worker 映射 | 永久 | 写: 重分片时,读: Worker 启动/版本变化 |
shard_version |
String | 分片版本号 | 永久 | 写: 重分片时 INCR,读: Worker 每5s轮询 |
lock:reshard |
String | 分布式互斥锁 | 30s | 写: 重分片时,读: 重分片时 |
event:worker_change |
Pub/Sub | Worker 拓扑变更通知 | — | 写: Worker 上下线时 |
一致性哈希分片器
问题背景:为什么需要一致性哈希?
假设有 100 个摄像头和 3 个 Worker,最简单的分配方式是取模:
1 | camera_id % 3 = 0 → Worker-0 |
问题:当 Worker-2 宕机,变成 2 个 Worker 时:
1 | # 之前: camera_id % 3 |
几乎所有摄像头都被重新分配了! 这意味着所有 Worker 都要断开当前连接、重新建立 RTSP 连接,造成大面积服务中断。
一致性哈希原理
一致性哈希的核心思想是把 Worker 和摄像头都映射到同一个哈希环(0 ~ 2³²-1)上:
当 Worker-2 宕机时:
- 只有原本分配给 Worker-2 的摄像头需要重新分配(顺时针找到下一个 Worker)
- Worker-1 和 Worker-3 的摄像头完全不受影响
- 迁移量 ≈ K/N(K=总摄像头数,N=Worker数),而非 ~100%
虚拟节点:解决数据倾斜
如果只有 3 个物理节点,它们在哈希环上的位置可能很不均匀,导致某个 Worker 分到 70% 的摄像头。
解决方案是,每个物理节点映射 150 个虚拟节点到哈希环上:
2.2.4 一致性哈希环的实现逻辑
一致性哈希环的实现是一个纯算法模块,不依赖 Redis 或任何外部服务,可以独立进行单元测试。它的内部维护三个数据结构:
_ring字典:哈希值 → 物理节点 ID 的映射。例如{28374: "worker-1", 91025: "worker-2", ...}。_sorted_keys有序列表:所有虚拟节点的哈希值,按从小到大排列。这是为了支持二分查找。_nodes集合:当前环上所有物理节点的 ID,用于去重和遍历。
添加节点的过程:
当调用 add_node("worker-1") 时,算法会为这个物理节点生成 150 个虚拟节点。每个虚拟节点的 Key 是 "worker-1#vn0"、"worker-1#vn1"、…、"worker-1#vn149"。对每个虚拟节点 Key 做 MD5 哈希,取前 8 个十六进制字符转为整数,得到一个[0, 2³²)范围内的位置。然后把这个位置插入到 _sorted_keys 有序列表中(用 bisect.insort 保持有序),同时在 _ring 字典中记录"这个位置属于 worker-1"。
为什么用 MD5 而不是 Python 内置的 hash() 函数?因为 hash() 在不同 Python 进程中对同一个字符串可能返回不同的值(Python 3.3+ 默认开启了哈希随机化),这会导致不同 Worker 节点计算出不同的分配结果。MD5 是确定性的,同一个输入永远得到同一个输出,而且分布非常均匀。
查找节点的过程:
当调用 get_node("camera_42") 时,算法的步骤是:
- 对
"camera_42"做 MD5 哈希,得到一个位置值(比如 57832) - 在
_sorted_keys有序列表中做二分查找,找到第一个 ≥ 57832 的虚拟节点位置 - 如果找到了(比如位置 60001),就查
_ring[60001]得到物理节点 ID(比如"worker-2") - 如果 57832 比所有虚拟节点的位置都大(超过了列表末尾),就回绕到列表第一个元素——这就是"环"的含义
二分查找的时间复杂度是 O(log(N×V)),其中 N 是物理节点数,V 是虚拟节点数。对于 3 个 Worker × 150 个虚拟节点 = 450 个元素,二分查找只需要约 9 次比较,非常快。
删除节点的过程:
当调用 remove_node("worker-2") 时,算法遍历 worker-2 的 150 个虚拟节点,逐个从 _ring 字典和 _sorted_keys 列表中移除。移除后,原本分配给 worker-2 的摄像头会自动"顺时针滑动"到下一个虚拟节点——这就是一致性哈希的核心特性:只有被删除节点的数据需要迁移,其他节点的数据完全不受影响。
核心查找逻辑的伪代码:
1 | def get_node(self, key: str) -> str: |
ShardManager 的设计逻辑
ShardManager 是一致性哈希环的上层封装,它把纯算法和Redis 存储连接起来。如果说 ConsistentHashRing 是一个计算器,那 ShardManager 就是操作这个计算器的人——它知道从哪里读取输入(Redis 中的存活 Worker 和启用摄像头),计算完后把结果写回哪里(Redis 中的分配表)。
reshard() 的完整执行流程
reshard() 是整个控制平面最核心的操作,它在 Worker 上下线时被触发。整个过程分为 8 步:
第 1 步:获取分布式锁。 调用 Redis 的 SET lock:reshard <token> NX EX 30 命令尝试加锁。如果锁已被其他进程持有(返回失败),说明有人正在重分片,当前操作直接跳过。锁的超时时间设为 30 秒,防止持有者崩溃导致死锁。
第 2 步:读取存活 Worker 列表。 通过 DeviceRegistry.get_alive_workers() 从 Redis 扫描所有 worker:* Key,得到当前存活的 Worker 列表。如果没有任何存活 Worker,清空分配表并返回——没有 Worker 就没法分配。
第 3 步:读取启用的摄像头列表。 通过 DeviceRegistry.get_enabled_camera_ids() 从 Redis 的 cameras:enabled Set 中获取所有启用的摄像头 ID。
第 4 步:构建一致性哈希环。 创建一个新的 ConsistentHashRing 实例,把所有存活 Worker 添加到环上。每个 Worker 会生成 150 个虚拟节点。
第 5 步:计算新的分配。 遍历每个启用的摄像头 ID,调用 ring.get_node(str(cam_id)) 找到它应该归属的 Worker。最终得到一个字典:{camera_id: worker_id}。
第 6 步:计算迁移量。 把新分配和旧分配(从 Redis 读取)做对比,统计有多少摄像头换了 Worker。这个指标很重要——每次迁移意味着一个 RTSP 连接的断开和重建,会有短暂的监控空白。一致性哈希的优势就体现在这里:迁移量约为 K/N(K=总摄像头数,N=Worker 数),而不是接近 100%。
第 7 步:原子写入新分配。 通过 Redis Pipeline 在一个事务中完成三个操作:删除旧的 camera_assignment → 写入新的分配 → 递增 shard_version。事务保证了原子性,不会出现"删了旧的但还没写入新的"这种中间状态。
第 8 步:释放锁。 在 finally 块中释放分布式锁,确保即使中间出错也不会死锁。
Worker 如何获取自己的摄像头列表?
每个 Worker 不需要自己去算哈希环,只需要调用 get_cameras_for_worker(worker_id) 从 Redis 的 camera_assignment Hash 中过滤出属于自己的摄像头 ID 列表即可。这是一个纯读操作,多个 Worker 可以并发调用,不需要加锁。
迁移量统计的意义
ShardManager 会在每次重分片后记录迁移量。这个数字可以用来验证一致性哈希是否正常工作:
- 如果一个 Worker 宕机(3 个变 2 个),迁移量应该约为 K/3 ≈ 33 个(100 个摄像头的情况下)
- 如果迁移量接近 100,说明哈希环的实现有问题
- 如果迁移量为 0,说明拓扑没有真正变化(可能是误触发)
容量预览功能
ShardManager 还提供了一个 preview_reshard() 方法,可以在不实际修改分配的情况下,模拟"如果 Worker 列表是这样的,分配会怎样"。这对容量规划很有用——比如在扩容前先预览一下新增一个 Worker 后的负载分布,确认是否均匀。
Worker 心跳与故障转移
心跳机制设计
心跳是分布式系统中检测节点存活的标准方式。核心思想是:
每个 Worker 定期向 Redis 续期自己的注册 Key(刷新 TTL)。如果 Worker 崩溃,无法续期,Key 自动过期 → 被视为死亡。
1 | 时间线: |
WorkerHeartbeat 的设计逻辑
WorkerHeartbeat 是一个后台守护线程,运行在每个 CaptureWorker 进程内部,负责定期向 Redis 续期自己的注册 Key。它的设计有几个关键的工程决策:
为什么用后台线程而不是协程或独立进程?
心跳是一个非常轻量的操作(一次 Redis SETEX 命令,亚毫秒级),但它必须绝对可靠——如果心跳线程挂了,Worker 会被误判为死亡。用后台守护线程(daemon thread)的好处是:
- 它和主进程共享同一个进程空间,不需要进程间通信
- 设置为 daemon 线程后,主进程退出时它会自动被杀掉,不会变成孤儿线程
- 线程比协程更适合这种"定时执行、不需要高并发"的场景——心跳不需要处理成百上千的并发请求,一个线程足矣
为什么用 threading.Event.wait() 而不是 time.sleep()?
这是一个很重要的细节。time.sleep(5) 会让线程硬睡 5 秒,在这 5 秒内即使你想停止线程也得等它睡完。而 threading.Event.wait(timeout=5) 也是等 5 秒,但如果在等待期间有人调用了 stop_event.set(),它会立即醒来。这意味着当我们要优雅关停 Worker 时,心跳线程可以在毫秒级内响应停止信号,而不是最多等 5 秒。
心跳循环的三个阶段
心跳线程启动后经历三个阶段:
- 初始注册:线程启动时,先调用
register_worker()把自己注册到 Redis。如果这一步失败了(比如 Redis 暂时不可用),不会直接退出,而是进入循环继续尝试。 - 周期续期:每隔 5 秒执行一次续期操作。续期时会做两件事——刷新 TTL和上报当前负载(当前正在处理多少路摄像头)。如果续期发现 Key 已经过期了(返回 False),说明 Worker 曾经被认为死亡过,需要重新做一次完整注册。
- 优雅退出:收到停止信号后,主动调用
deregister_worker()从 Redis 删除自己的 Key。
异常处理策略
心跳循环中的所有操作都被 try-catch 包裹,任何异常都不会导致心跳线程崩溃。这是因为心跳线程一旦崩溃,Worker 就会被误判为死亡,后果很严重。即使 Redis 暂时不可用,心跳线程也会继续运行,等 Redis 恢复后自动恢复心跳。
不过,如果连续失败超过 5 次,会触发一个 CRITICAL 级别的日志告警——这通常意味着 Redis 真的挂了,需要运维介入。
WorkerWatcher 故障检测器的设计逻辑
WorkerWatcher 是控制平面的哨兵,它的唯一职责是,监控 Worker 集合的变化,一旦发现有 Worker 加入或离开,就触发重分片。
检测原理:集合差异比较
WorkerWatcher 的检测逻辑非常简单直观:
- 它在内存中维护一个已知 Worker 集合(
known_workers) - 每隔 5 秒,从 Redis 扫描一次当前存活的 Worker 集合(
current_workers) - 比较两个集合:
- 如果
current_workers == known_workers:无变化,什么都不做 - 如果
current_workers - known_workers不为空:有新 Worker 加入 - 如果
known_workers - current_workers不为空:有 Worker 离开/崩溃
- 如果
- 只要集合发生了变化,就触发
ShardManager.reshard()
这种"轮询 + 集合比较"的方式虽然不如事件驱动那么实时,但胜在简单可靠。
启动阶段的特殊处理
WorkerWatcher 启动时有一个特殊的等待阶段:它会阻塞等待,直到至少有一个 Worker 注册到 Redis。这是因为如果没有任何 Worker,做分片是没有意义的。等到第一个 Worker 出现后,立即执行一次初始分片,把所有摄像头分配给这个 Worker。
之后进入正常的监控循环。当第二个、第三个 Worker 陆续加入时,WorkerWatcher 会检测到集合变化,触发重分片,把摄像头重新均匀分配。
为什么先更新 known_workers 再触发重分片?
在代码中,我们先把 known_workers 更新为 current_workers,然后再调用 reshard()。这个顺序很重要:如果先 reshard 再更新 known_workers,那么在 reshard 执行期间(虽然很快,但不是零时间),下一次轮询可能又检测到变化,导致重复触发。先更新集合可以避免这种不必要的重复。
谁来运行 WorkerWatcher?
在我们的架构中,WorkerWatcher 运行在每个 Worker 节点上(作为后台线程)。你可能会问:如果每个 Worker 都在运行 Watcher,岂不是会同时触发多次重分片?答案是不会,因为 reshard() 内部有分布式锁保护,只有一个 Watcher 能成功获取锁并执行重分片,其他的会直接跳过。
故障转移完整时序
Worker 端如何感知分片变化
重分片发生在控制平面(WorkerWatcher + ShardManager),但最终需要每个 Worker 自己去感知"分配变了"并更新自己的摄像头列表。Worker 端的感知机制是版本号轮询。
每个 Worker 在内存中记住上次看到的 shard_version。在主循环的每一轮开始时,先去 Redis 读一次 shard_version(一次 GET 命令,亚毫秒级)。如果版本号没变,说明分配没变,直接继续采集。如果版本号变了,说明发生了重分片,需要重新拉取自己的摄像头列表。
重新拉取的过程是:从 Redis 的 camera_assignment Hash 中读取所有条目,过滤出 worker_id == 自己 的摄像头 ID 列表。然后和旧列表做集合差异比较:
- 新增的摄像头(在新列表中但不在旧列表中):建立 RTSP 连接,开始采集
- 移除的摄像头(在旧列表中但不在新列表中):断开 RTSP 连接,释放资源
- 不变的摄像头:什么都不做,继续采集
这种"增量更新"的方式避免了每次重分片都断开所有连接再重连——只有真正发生迁移的摄像头才需要重连。
为什么用轮询而不是 Pub/Sub?
Pub/Sub 更实时,但有一个致命缺陷:如果 Worker 在消息发出的瞬间恰好断线了(比如网络抖动),这条消息就永远丢失了,Worker 永远不知道分配变了。而版本号是持久化在 Redis 中的,不会丢失——即使 Worker 断线了 10 分钟,重连后一读版本号就知道需要更新。
完整启动与关停流程
启动流程(8 步)
从零启动一个 CaptureWorker 节点的控制平面,需要按以下顺序执行:
第 1 步:连接 Redis。 创建 Redis 客户端连接,必须设置 decode_responses=True(让 Redis 返回字符串而不是字节),以及合理的超时参数(socket_connect_timeout=5, socket_timeout=5)。连接后先 ping() 验证连通性。
第 2 步:初始化 DeviceRegistry。 创建注册中心实例,它会自动初始化 shard_version Key(如果不存在的话设为 0)。
第 3 步:同步摄像头数据。 从 PostgreSQL 读取全量摄像头列表,通过 sync_cameras_from_db() 批量写入 Redis。这一步确保 Redis 中的摄像头信息是最新的。
第 4 步:初始化 ShardManager。 创建分片管理器,指定每个物理节点 150 个虚拟节点。
第 5 步:构造 WorkerInfo。 用当前机器的 hostname 生成唯一的 worker_id(在容器化部署中,每个容器的 hostname 天然不同)。
第 6 步:启动心跳线程。 创建 WorkerHeartbeat 并启动。心跳线程会立即向 Redis 注册当前 Worker,然后每 5 秒续期一次。这里还传入了一个 load_provider 回调函数,让心跳线程能动态上报当前负载。
第 7 步:启动 WorkerWatcher。 创建故障检测器并启动。它会等待第一个 Worker 出现,然后执行初始分片,之后进入持续监控循环。
第 8 步:启动 CaptureWorker。 开始实际的视频采集工作。Worker 会立即检查 shard_version,拉取自己的摄像头列表,建立 RTSP 连接并开始采集。
关停流程(3 阶段)
优雅关停的顺序和启动相反,分三个阶段:
- 停止采集:先停止 CaptureWorker,让它处理完当前帧后退出。设置 30 秒超时,防止卡死。
- 停止心跳:调用
heartbeat.stop(),心跳线程会主动从 Redis 注销当前 Worker。注销后,其他节点的 WorkerWatcher 会检测到拓扑变化,触发重分片,把当前 Worker 的摄像头迁移给其他 Worker。 - 停止监控:最后停止 WorkerWatcher。
这个顺序确保了:先停止产生新数据(采集),再通知集群自己要下线(心跳注销触发重分片),最后停止监控。如果顺序反了,比如先停心跳再停采集,就会出现"Worker 已经被认为死亡,但它还在往 Kafka 发数据"的混乱状态。
如何验证一致性哈希的正确性
一致性哈希环是纯算法模块,非常适合用单元测试来验证。以下是需要验证的几个关键属性:
1. 空环安全性:当环上没有任何节点时,查询任何 Key 应该返回 None 而不是抛异常。这是边界条件,必须处理。
2. 单节点全覆盖:当环上只有一个节点时,所有 Key 都应该分配给这个节点。这验证了环的基本查找逻辑是否正确。
3. 负载均衡性:用 150 个虚拟节点、3 个物理节点、10000 个 Key 做测试。理想情况下每个节点分到 ~3333 个 Key。验证每个节点的实际分配量与理想值的偏差不超过 10%。如果偏差过大,说明哈希函数分布不均匀或虚拟节点数量不够。
4. 最小迁移量:这是一致性哈希最核心的特性。测试方法是:先用 3 个节点分配 1000 个 Key,记录每个 Key 的归属;然后移除一个节点,重新查询所有 Key 的归属;统计有多少 Key 换了节点。预期结果是:
- 迁移量约为 K/N ≈ 333(只有被移除节点的 Key 迁移)
- 关键验证:原本属于未被移除节点(w1、w3)的 Key,在移除 w2 后应该完全不变。如果有任何一个 w1 或 w3 的 Key 发生了迁移,说明一致性哈希的实现有 bug。
5. 幂等性:重复添加同一个节点不应该产生重复的虚拟节点。验证方法是添加两次同一个节点后,环的大小应该和添加一次相同。
6. 安全删除:删除一个不存在的节点不应该抛异常。这是防御性编程的基本要求。
采集平面 CaptureWorker 集群
在先前版本中,CaptureThread 是单线程生产者,管理所有摄像头的采集:
1 | class CaptureThread(threading.Thread): |
这种设计的瓶颈在于,所有摄像头由一个线程管理,受限于单机 CPU 和网络带宽。
3.2 集群化改造:CaptureWorker
在新的架构设计中,每个 CaptureWorker 是一个独立进程/容器,只负责分配给自己的摄像头子集:
1 | class CaptureWorker: |
其中,PhotoMessage.photo_path 从本地磁盘路径改为 MinIO 对象引用(s3://bucket/key),使得采集节点和推理节点可以在不同机器上。
消息平面 Kafka 分布式消息总线
抽象契约层:MessageBroker
消息平面的核心设计是面向抽象编程。PhotoConsumer 依赖的是 MessageBroker ABC 接口,而非任何具体的队列实现:
1 | # infra/contracts.py |
双适配器:InMemoryBroker ↔ KafkaBroker
系统同时提供两个适配器,通过 Composition Root 一行代码切换:
1 | # ===== 适配器 1:单机内存实现 ===== |
零代码切换:
1 | # main.py — Composition Root |
LIFO 队列与背压控制(单机模式)
在单机模式下,系统使用 LIFO(后进先出)队列而非传统 FIFO,因为在实时视频监控场景中,最新的帧具有最高的信息价值:
1 | class GlobalMessageQueue: |
背压策略:生产者永不阻塞(队列满时淘汰最旧帧),消费者始终处理最新帧,丢弃可追踪(QueueStats.dropped)。
Kafka Topic 设计与消费者组
| Topic | 分区数 | 用途 | 消费者组 |
|---|---|---|---|
frame-captured |
与 CaptureWorker 数量对齐 | 采集帧消息 | inference-workers |
detection-result |
3 | 检测结果 | result-persisters |
notification |
1 | 告警通知 | notifiers |
推理平面 — 并行推理引擎集群
扇出/扇入并行推理引擎
推理引擎是系统的核心,采用 Fan-out/Fan-in 模式将同一帧同时分发至 N 个检测模型并行推理:
1 | class PhotoConsumer(threading.Thread): |
检测器注册表与自动发现
模型注册系统将 注册表模式 + 装饰器 + pkgutil 自动发现 三者结合,实现零侵入扩展:
1 | # === 注册表 === |
添加新检测器只需三步(无需修改任何已有代码):
- 创建检测器类 +
@register_detector装饰器(functionmodel/xxx_detector.py) - 创建 JSON 管线配置(
config/xxxdetecter.json) - 在
config.yaml中注册(model: { xxxdetecter: "xxxdetecter.json" })
声明式 YAML 模板引擎
引入声明式检测器模板:
1 | # config/templates/fire_detector.yaml |
模板引擎架构:
1 | 新增模块: app/core/modelloader/template_engine/ |
热插拔与文件系统监听
1 | class HotReloadManager: |
弹性伸缩策略
| 策略 | 触发条件 | 动作 |
|---|---|---|
| Scale-out | Kafka consumer lag > 1000 持续 5 分钟 | 启动新 InferenceWorker 容器 |
| Scale-in | consumer lag < 100 持续 10 分钟 | 缩减 Worker 副本数 |
| GPU 亲和性 | 模型需要 GPU | 调度到带 nvidia.com/gpu 标签的节点 |
1 | class AutoScaler: |
存储平面
抽象契约层:ResultPersister
1 | class ResultPersister(ABC): |
当前适配器:DBResultPersister(延迟初始化,避免启动时建立数据库连接)
1 | class DBResultPersister(ResultPersister, HealthCheckable): |
冷热分离存储策略
| 表 | 用途 | 存储特性 | 生命周期 |
|---|---|---|---|
t_cameras |
摄像头设备注册表 | 低频更新 | 永久 |
t_results_temp |
当日检测结果 | 热存储,高频写入 | 每日归档 |
t_results |
历史归档 | 冷存储,低频查询 | 永久 |
归档流程:每天 03:00–05:00 归档窗口,ResultsMigrationService 将 t_results_temp → t_results 迁移后清空临时表,避免单表数据量无限增长。
MinIO 对象存储
| 用途 | Bucket | 命名规则 |
|---|---|---|
| 标注图像 | detection-results |
{date}/{camera_id}/{photo_id}.jpg |
| 模型权重 | model-weights |
{model_name}/{version}/best.pt |
在分布式模式下,PhotoMessage.photo_path 从本地路径改为 MinIO 对象引用(s3://bucket/key),使得采集节点和推理节点可以在不同机器上。
可观测性平面
抽象契约层:MetricsCollector
1 | class MetricsCollector(ABC): |
适配器:PrometheusMetrics(直接暴露 /metrics 端点)
采集的指标:
| 指标 | 类型 | 说明 |
|---|---|---|
inference_total |
Counter | 每个模型的推理调用次数 |
inference_errors_total |
Counter | 每个模型的推理错误次数 |
inference_duration_seconds |
Histogram | 每个模型的推理延迟分布 |
frame_events_total |
Counter | 帧事件统计(processed / missing / dropped) |
queue_utilization_pct |
Gauge | 队列利用率百分比 |
健康检查聚合器
采用 最差组件决定整体状态(worst-component-wins) 策略:
1 | class HealthAggregator: |
降级阈值:
| 组件 | 条件 | 状态 |
|---|---|---|
| Broker | 队列利用率 > 90% | degraded |
| Persister | 错误计数 ≥ 10 | unhealthy |
| Metrics | 推理错误率 > 10% | degraded |
| Metrics | 推理错误率 > 50% | unhealthy |
输出示例:
1 | { |
Prometheus + Grafana 监控
Grafana Dashboard 面板:
| 面板 | 数据源 | 展示内容 |
|---|---|---|
| 推理延迟 P50/P95/P99 | Prometheus | 各模型推理延迟分位数 |
| 帧处理吞吐量 | Prometheus | 每秒处理帧数 |
| Kafka Consumer Lag | Prometheus (JMX) | 消费积压量 → 伸缩决策 |
| Worker 存活状态 | Redis Exporter | 各节点心跳状态 |
| 队列利用率 | Prometheus | 内存队列使用百分比 |
结构化 JSON 日志
1 | class JsonFormatter(logging.Formatter): |
在容器环境中自动切换为 JSON 格式(通过环境变量 LABSEC_LOG_FORMAT=json),便于 ELK / Loki 等日志聚合系统解析。
容器化
多阶段 Dockerfile
1 | # Stage 1: Builder — 安装依赖(含编译工具链) |
Builder 阶段的编译工具链(gcc、pip wheel 缓存)不进入最终镜像,有利于缩小体积。
docker-compose
1 | version: "3.8" |
健康探针与自愈
| 端点 | 用途 | 检查内容 | 对应 K8s 探针 |
|---|---|---|---|
GET /healthz |
进程存活 | 进程在运行即返回 200 | Liveness Probe |
GET /readyz |
服务就绪 | 所有组件 healthy 才返回 200 | Readiness Probe |
GET /api/status |
详细状态 | 完整 HealthAggregator 输出 | — |
GET /metrics |
Prometheus 指标 | 所有推理/队列/系统指标 | — |
三阶段优雅关停
1 | def shutdown(main_threads, signum=None): |
数据流全链路
总结
本系统的设计,实现了以下的设计原则:
- 开闭原则 (OCP):新检测器通过文件 + 装饰器 / YAML 模板添加,不修改任何已有代码
- 单一职责 (SRP) : CaptureWorker 只采集,PhotoConsumer 只推理,Persister 只持久化
- 依赖倒置 (DIP): Consumer 依赖
MessageBrokerABC,而非GlobalMessageQueue或KafkaConsumer - 接口隔离 (ISP) :四个独立契约:Broker / Persister / Metrics / Health
- 依赖注入 (DI):适配器在 Composition Root 创建,通过构造函数注入
这里的依赖注入还是非常值得一讲的。我们的系统基于依赖倒置原则设计了 MessageBroker / ResultPersister / MetricsCollector 三大抽象契约层,实现了 InMemoryBroker 与 KafkaBroker 双适配器,通过构造函数注入实现从单机内存队列到 Kafka 分布式消息总线的零代码切换。
当时面临的设计问题是,如何让推理引擎在不同的部署环境(单机 / 分布式)下复用同一套代码?
如果 PhotoConsumer 直接依赖 GlobalMessageQueue(具体类),那么切换到 Kafka 时就必须修改消费者代码。这违反了 依赖倒置原则(DIP):高层模块不应该依赖低层模块,两者都应该依赖抽象。
系统在 infra/contracts.py 中定义了四个 ABC 接口,每个接口严格遵循 接口隔离原则(ISP):
1 | # infra/contracts.py |
而main.py 是整个系统中 唯一知道具体实现类的地方:
1 | # main.py — Composition Root |