0%

实验室安全检测系统

最近参与了一个横向,项目架构与基础代码大概稳定下来了,浅浅记录一下。

写在前面

实际上这个项目在技术上并不难,甚至比许多工作简单的多,但让人头疼的主要是三点:

  • 典型小作坊做派,前端、数据库、识别系统、模型全要做,没有明确的职责分工
  • 内网开发,测试也很困难,每天要骑个电瓶车去update
  • 需求含混不清,一会说用这个一会说用那个,为此接入数据的逻辑改了n遍

系统需求与架构

领导需要一个接入监控系统,实施监测各个实验室实验人员操作规范性,并归纳总结的系统,现在已在接入了一百余个监控设备,后续将推广至一千余个设备,其大致架构如下图所示:

按照该架构,重点的内容为监控轮询与帧识别两部分,程序将以设定的轮询间隔依次通过厂商提供的SDK访问摄像头并截图,将截图与包括摄像头基础信息、时间等的元数据交予消息队列并分发给多个识别设备进行图像识别。在试运行版本中,整个体系都在一台一张gpu的设备上运行。图像识别完成后,将识别结果汇总后存入数据库中。

任务模型训练

甲方提出了识别实验室实验人员不穿白大褂、夜间单人做实验、消防通道占用、废液桶是否盖盖子、通风橱开启是否规范等等。这些需求有的可以直接调用YOLO模型,有的则需要自行训练模型。

实验人员穿着为例,首先基于YOLOv11模型提取多张监控截图中的人物边框并裁减,使用自己设计的标注工具进行是否穿着实验服装的正负样本标注,约两千余张。使用该数据,基于MobileNetV2模型训练我们需求的分类模型。

MobileNetV2 是 Google 在 2018 年提出的一种轻量级卷积神经网络架构,我们将其迁移至我们的任务上。具体任务流程与要点如下:

  1. 按照8:2划分数据集
  2. 加载 ImageNet 上预训练好的权重
  3. 将MobileNetV2 的最后一层 nn.Sequential替换为输出 2 类
  4. 任务采用交叉熵损失函数, Adam优化器,训练 epochs=10, batchsize=32, lr=1e-4
    最终,分类模型的Accuracy为0.9850,达到了预期需求。

整体模型首先使用yolo11s识别人,置信阈值设置为0.4,随后使用自己训练的二分类模型进行是否穿白大褂的判别,在后续的测试中,发现其基本可以满足需求,但仍对穿着白色T恤的实验人员有漏检情况,需要后续对模型进行改进。

Producer模块设计

读取数据库中当日需要访问的摄像头的id,基于摄像头厂商提供的监控rtsp视频流Python SDK,轮询摄像头获取rtsp url。一开始使用opencv链接流并截取帧,但是截取效率和内存占用都很不理想,后来就直接改用了ffmpeg截取,在处理速度和内存占用上都优化了许多。

由于仍存在一定的等待时间,采用线程池提交任务,大概逻辑如下:

1
2
3
4
5
6
7
8
9
# 使用线程池提交任务
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(ffmpeg_cut, *task) for task in tasks]

    for future in concurrent.futures.as_completed(futures):
        try:
            result = future.result() 
        except Exception as e:
            print(f"Task raised an exception: {e}")

在截取完成图像后,保存至本地并将元数据加入消息队列。

消息队列设计

在单机任务中,任务队列由Python维护,设计一个PhotoMessage类如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@dataclass
class PhotoMessage:
"""
Attributes:
photo_id: 帧的唯一标识符(默认使用 UUID)
camera_id: 摄像头的标识符
room_id: 物理位置的标识符
photo_path: 帧在文件系统中的路径或内存引用
timestamp: 帧的捕获时间(如果未提供则自动生成)
metadata: 附加的处理上下文信息(如模型输出等)
"""
camera_id: str
room_id: str
photo_path: Path
photo_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = field(default_factory=datetime.now)
metadata: Dict[str, Any] = field(default_factory=dict)

def to_json(self) -> str:
# ……

其中,有如 photo_id 是运行时生成一个唯一值(不能提前固定成某个默认字符串)。如果你写 photo_id: str = str(uuid.uuid4()),那么所有对象都会默认用同一个 UUID(因为默认值只会在类定义时求一次值)。而default_factory 会在每次创建对象时调用 lambda 生成新的值。在 dataclass 中,凡是带副作用的默认值(当前时间、UUID、列表、字典等)都应使用 default_factory,否则会引发不可预测的行为或共享状态。

为了表示队列,设计一个GlobalMessageQueue类如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class GlobalMessageQueue():
_instance = None

def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._queue = queue.LifoQueue(maxsize=10000)
cls._instance._persist_file = None
return cls._instance

def put_message(self, msg: PhotoMessage) -> bool:
"""
存入消息到队列
:param msg: PhotoMessage实例
:return: 是否成功入队
"""
try:
self._queue.put_nowait(msg)
return True
except queue.Full:
print("WARNING: 消息队列已满,丢弃最早的消息")
try:
self._queue.get_nowait() # 丢弃最旧消息
self._queue.put_nowait(msg)
return True
except queue.Empty:
return False

def get_message(self) -> Optional[PhotoMessage]:
"""
非阻塞获取消息
:return: PhotoMessage实例或None
"""
try:
return self._queue.get_nowait()
except queue.Empty:
return None

def persist_messages(self, file_path: Path) -> int:
"""
持久化队列到磁盘
:param file_path: 存储文件路径
:return: 持久化的消息数量
"""
# ……

def load_messages(self, file_path: Path) -> int:
"""
从磁盘加载消息到队列
:param file_path: 存储文件路径
:return: 加载的消息数量
"""
# ……
return count

def set_persist_file(self, file_path: Path):
"""设置自动持久化文件路径"""
# ……

def __del__(self):
"""析构时自动持久化"""
# ……

def size(self) -> int:
"""
获取当前队列的大小
:return: 队列中消息的数量
"""
# ……

该类 GlobalMessageQueue 实现了线程安全的全局单例消息队列,使用 LifoQueue 存储 PhotoMessage 实例,支持消息的入队、出队、磁盘持久化和加载,并在对象销毁时自动保存队列内容,防止数据丢失。

当然,在多设备并行处理中,可使用Redis作为多设备共享的消息队列。

Consumer模块设计

首先,我们定义一个父类BaseModel,规范了模型对于图像输入的预处理方式。配置从指定config中读取,规范模型的名称、权重文件位置、预处理方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class BaseModel():
def __init__(self, config: Dict[str, Any]):
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.name = config["name"]
self.preprocess = config["preprocess"]
self.pipeline = config["pipeline"]

self._build_preprocess_method()

# 预处理类
def _build_preprocess_method(self):
preprocess = self.preprocess

def preprocess_image(self, message: PhotoMessage):
# 调用_crop和_resize的逻辑

return image

self.preprocess_image = MethodType(preprocess_image, self)

def _crop(self, message: PhotoMessage, image: Image.Image):
# ……
return image

def _resize(self, message: PhotoMessage, image: Image.Image):
return image.resize(self.preprocess["size"])

随后继承该类,定义一个IdentificationModel子类,新增一个_loadmodel功能,需要返回一个predict方法。

1
2
3
4
5
6
7
8
9
10
11
class IdentificationModel(BaseModel):
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.predict = self._loadmodel()

def _loadmodel(self):
predict = ModelLoader.loader(self.name, self.pipeline, self.device)
return predict

def __str__(self):
return self.name

由于当前模型都暂时只需要一个predict方法,因此,暂时只使用该子类。可以注意到,IdentificationModel类使用了一个ModelLoader类,该类负责每个具体的识别模型的predict逻辑,返回每个模型中我们需要的predict函数,由于过于冗长,不做展示。

基于上述类,consumer线程可以很轻松的从配置文件中加载模型:

1
2
3
self.modellist = []
for config in config_list:
self.modellist.append(IdentificationModel(config))

并逐个调用模型对图片进行识别:

1
2
3
for model in self.modellist:                
photo = model.preprocess_image(message)
output = model.predict(message, photo)

当然,在consumer内部也采用了多线程的设计,此处不再赘述。

存储模块设计

存储模块负责将识别结果整理后存入Postgresql数据库,识别结果图片存入Minio桶容器,之间通过唯一的uuid相关联,此处不多赘述。

容器化部署

由于本项目涉及到深度学习环境以及redis、minio、psql等工具,因此,使用docker部署是最快速最安全的方式。但是这里有个大坑!我原本基于dockerfile构建了带cuda驱动、nvcc、python环境以及ffmpeg等的容器,但是发现,由于windows上的docker是机遇wsl实现的,和本机之间存在网络的隔离,在容器内运行的python程序读不到局域网下的rtsp流。本人当即表示烦。
无奈,只能暂时线把主要程序放在宿主机上执行,redis、minio、psql使用docker部署。该项目的docker-compose如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
version: '3.8'

services:

redis:
image: redis:7
container_name: labsecurity_redis
ports:
- "6379:6379"
restart: always

minio:
image: minio/minio
container_name: labsecurity_minio
ports:
- "9000:9000"
- "9001:9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
command: server /data --console-address ":9001"
volumes:
- minio_data:/data
restart: always

minio-init:
image: minio/mc
depends_on:
- minio
entrypoint: >
/bin/sh -c "
sleep 5;
mc alias set local http://minio:9000 minioadmin minioadmin;
mc mb -p local/detection-results;
mc policy set public local/detection-results;
exit 0;
"

postgres:
image: postgres:17
container_name: labsecurity_postgres
environment:
POSTGRES_DB: labcamera
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
ports:
- "5434:5432"
volumes:
- ./initdb:/docker-entrypoint-initdb.d
restart: always

volumes:
pgdata:
minio_data:

其中,在labsecurity_postgres容器中,我指定了数据库初始化的文件位置,即/initdb,只要在该文件夹下放置.sql,容器初始化时即可根据该sql文件建立数据库,不要太方便。而容器间、宿主机和容器间也可以通过端口相互访问,例如,在其他容器内通过minio:9000,在本机中可以通过http://localhost:9001/访问。

通过 docker compose up -d 可以一键部署容器,通过 docker ps` -a 查看部署情况。此外,按照原计划,深度学习环境也会部署在容器内,这里做个记录,首先,构建 .Dockerfile:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
FROM nvidia/cuda:12.8.0-cudnn-runtime-ubuntu22.04

RUN apt-get update && apt-get install -y python3 python3-pip

WORKDIR /app

COPY requirements.txt .

RUN pip install torch==2.6.0 torchvision==0.21.0 torchaudio==2.6.0 --index-url https://download.pytorch.org/whl/cu126
RUN pip install --no-cache-dir -r requirements.txt

COPY ./app /app

CMD ["python3", "main.py"]

可以看到,该文件指定了安装cuda驱动和python,并根据requirements安装好需要的python包。在docker- compose中加上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
services:
app:
build: .
runtime: nvidia
container_name: labsecurity_app
depends_on:
- redis
- postgres
- minio
volumes:
- ./app:/app
working_dir: /app
ports:
- "8888:8888"
command: ["tail", "-f", "/dev/null"]

容器能看到本机的 ./app 目录,点击vscode左下方的><,选择Attach to Running Container,进入python容器,就可以愉快的开发了。

总结

这算是我参与的算是比较完整的横向了,但是也没什么好总结的,懒得总结。