最近参与了一个横向,项目架构与基础代码大概稳定下来了,浅浅记录一下。
 写在前面
实际上这个项目在技术上并不难,甚至比许多工作简单的多,但让人头疼的主要是三点:
- 典型小作坊做派,前端、数据库、识别系统、模型全要做,没有明确的职责分工
- 内网开发,测试也很困难,每天要骑个电瓶车去update
- 需求含混不清,一会说用这个一会说用那个,为此接入数据的逻辑改了n遍
 系统需求与架构
领导需要一个接入监控系统,实施监测各个实验室实验人员操作规范性,并归纳总结的系统,现在已在接入了一百余个监控设备,后续将推广至一千余个设备,其大致架构如下图所示:

按照该架构,重点的内容为监控轮询与帧识别两部分,程序将以设定的轮询间隔依次通过厂商提供的SDK访问摄像头并截图,将截图与包括摄像头基础信息、时间等的元数据交予消息队列并分发给多个识别设备进行图像识别。在试运行版本中,整个体系都在一台一张gpu的设备上运行。图像识别完成后,将识别结果汇总后存入数据库中。
 任务模型训练
甲方提出了识别实验室实验人员不穿白大褂、夜间单人做实验、消防通道占用、废液桶是否盖盖子、通风橱开启是否规范等等。这些需求有的可以直接调用YOLO模型,有的则需要自行训练模型。
以实验人员穿着为例,首先基于YOLOv11模型提取多张监控截图中的人物边框并裁减,使用自己设计的标注工具进行是否穿着实验服装的正负样本标注,约两千余张。使用该数据,基于MobileNetV2模型训练我们需求的分类模型。
MobileNetV2 是 Google 在 2018 年提出的一种轻量级卷积神经网络架构,我们将其迁移至我们的任务上。具体任务流程与要点如下:
- 按照8:2划分数据集
- 加载 ImageNet 上预训练好的权重
- 将MobileNetV2 的最后一层 nn.Sequential替换为输出 2 类
- 任务采用交叉熵损失函数, Adam优化器,训练 epochs=10, batchsize=32, lr=1e-4
 最终,分类模型的Accuracy为0.9850,达到了预期需求。
整体模型首先使用yolo11s识别人,置信阈值设置为0.4,随后使用自己训练的二分类模型进行是否穿白大褂的判别,在后续的测试中,发现其基本可以满足需求,但仍对穿着白色T恤的实验人员有漏检情况,需要后续对模型进行改进。
 Producer模块设计
读取数据库中当日需要访问的摄像头的id,基于摄像头厂商提供的监控rtsp视频流Python SDK,轮询摄像头获取rtsp url。一开始使用opencv链接流并截取帧,但是截取效率和内存占用都很不理想,后来就直接改用了ffmpeg截取,在处理速度和内存占用上都优化了许多。
由于仍存在一定的等待时间,采用线程池提交任务,大概逻辑如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 
 | with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
 futures = [executor.submit(ffmpeg_cut, *task) for task in tasks]
 
 for future in concurrent.futures.as_completed(futures):
 try:
 result = future.result()
 except Exception as e:
 print(f"Task raised an exception: {e}")
 
 | 
在截取完成图像后,保存至本地并将元数据加入消息队列。
 消息队列设计
在单机任务中,任务队列由Python维护,设计一个PhotoMessage类如下所示:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 
 | @dataclassclass PhotoMessage:
 """
 Attributes:
 photo_id: 帧的唯一标识符(默认使用 UUID)
 camera_id: 摄像头的标识符
 room_id: 物理位置的标识符
 photo_path: 帧在文件系统中的路径或内存引用
 timestamp: 帧的捕获时间(如果未提供则自动生成)
 metadata: 附加的处理上下文信息(如模型输出等)
 """
 camera_id: str
 room_id: str
 photo_path: Path
 photo_id: str = field(default_factory=lambda: str(uuid.uuid4()))
 timestamp: datetime = field(default_factory=datetime.now)
 metadata: Dict[str, Any] = field(default_factory=dict)
 
 def to_json(self) -> str:
 
 
 | 
其中,有如 photo_id 是运行时生成一个唯一值(不能提前固定成某个默认字符串)。如果你写 photo_id: str = str(uuid.uuid4()),那么所有对象都会默认用同一个 UUID(因为默认值只会在类定义时求一次值)。而default_factory 会在每次创建对象时调用 lambda 生成新的值。在 dataclass 中,凡是带副作用的默认值(当前时间、UUID、列表、字典等)都应使用 default_factory,否则会引发不可预测的行为或共享状态。
为了表示队列,设计一个GlobalMessageQueue类如下所示:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 
 | class GlobalMessageQueue():_instance = None
 
 def __new__(cls):
 if cls._instance is None:
 cls._instance = super().__new__(cls)
 cls._instance._queue = queue.LifoQueue(maxsize=10000)
 cls._instance._persist_file = None
 return cls._instance
 
 def put_message(self, msg: PhotoMessage) -> bool:
 """
 存入消息到队列
 :param msg: PhotoMessage实例
 :return: 是否成功入队
 """
 try:
 self._queue.put_nowait(msg)
 return True
 except queue.Full:
 print("WARNING: 消息队列已满,丢弃最早的消息")
 try:
 self._queue.get_nowait()
 self._queue.put_nowait(msg)
 return True
 except queue.Empty:
 return False
 
 def get_message(self) -> Optional[PhotoMessage]:
 """
 非阻塞获取消息
 :return: PhotoMessage实例或None
 """
 try:
 return self._queue.get_nowait()
 except queue.Empty:
 return None
 
 def persist_messages(self, file_path: Path) -> int:
 """
 持久化队列到磁盘
 :param file_path: 存储文件路径
 :return: 持久化的消息数量
 """
 
 
 def load_messages(self, file_path: Path) -> int:
 """
 从磁盘加载消息到队列
 :param file_path: 存储文件路径
 :return: 加载的消息数量
 """
 
 return count
 
 def set_persist_file(self, file_path: Path):
 """设置自动持久化文件路径"""
 
 
 def __del__(self):
 """析构时自动持久化"""
 
 
 def size(self) -> int:
 """
 获取当前队列的大小
 :return: 队列中消息的数量
 """
 
 
 | 
该类 GlobalMessageQueue 实现了线程安全的全局单例消息队列,使用 LifoQueue 存储 PhotoMessage 实例,支持消息的入队、出队、磁盘持久化和加载,并在对象销毁时自动保存队列内容,防止数据丢失。
当然,在多设备并行处理中,可使用Redis作为多设备共享的消息队列。
 Consumer模块设计
首先,我们定义一个父类BaseModel,规范了模型对于图像输入的预处理方式。配置从指定config中读取,规范模型的名称、权重文件位置、预处理方法
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 
 | class BaseModel():def __init__(self, config: Dict[str, Any]):
 self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
 self.name = config["name"]
 self.preprocess = config["preprocess"]
 self.pipeline = config["pipeline"]
 
 self._build_preprocess_method()
 
 
 def _build_preprocess_method(self):
 preprocess = self.preprocess
 
 def preprocess_image(self, message: PhotoMessage):
 
 
 return image
 
 self.preprocess_image = MethodType(preprocess_image, self)
 
 def _crop(self, message: PhotoMessage, image: Image.Image):
 
 return image
 
 def _resize(self, message: PhotoMessage, image: Image.Image):
 return image.resize(self.preprocess["size"])
 
 | 
随后继承该类,定义一个IdentificationModel子类,新增一个_loadmodel功能,需要返回一个predict方法。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 
 | class IdentificationModel(BaseModel):def __init__(self, config: Dict[str, Any]):
 super().__init__(config)
 self.predict = self._loadmodel()
 
 def _loadmodel(self):
 predict = ModelLoader.loader(self.name, self.pipeline, self.device)
 return predict
 
 def __str__(self):
 return self.name
 
 | 
由于当前模型都暂时只需要一个predict方法,因此,暂时只使用该子类。可以注意到,IdentificationModel类使用了一个ModelLoader类,该类负责每个具体的识别模型的predict逻辑,返回每个模型中我们需要的predict函数,由于过于冗长,不做展示。
基于上述类,consumer线程可以很轻松的从配置文件中加载模型:
| 12
 3
 
 | self.modellist = []for config in config_list:
 self.modellist.append(IdentificationModel(config))
 
 | 
并逐个调用模型对图片进行识别:
| 12
 3
 
 | for model in self.modellist:                photo = model.preprocess_image(message)
 output = model.predict(message, photo)
 
 | 
当然,在consumer内部也采用了多线程的设计,此处不再赘述。
 存储模块设计
存储模块负责将识别结果整理后存入Postgresql数据库,识别结果图片存入Minio桶容器,之间通过唯一的uuid相关联,此处不多赘述。
 容器化部署
由于本项目涉及到深度学习环境以及redis、minio、psql等工具,因此,使用docker部署是最快速最安全的方式。但是这里有个大坑!我原本基于dockerfile构建了带cuda驱动、nvcc、python环境以及ffmpeg等的容器,但是发现,由于windows上的docker是机遇wsl实现的,和本机之间存在网络的隔离,在容器内运行的python程序读不到局域网下的rtsp流。本人当即表示烦。
无奈,只能暂时线把主要程序放在宿主机上执行,redis、minio、psql使用docker部署。该项目的docker-compose如下:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 
 | version: '3.8'
 services:
 
 redis:
 image: redis:7
 container_name: labsecurity_redis
 ports:
 - "6379:6379"
 restart: always
 
 minio:
 image: minio/minio
 container_name: labsecurity_minio
 ports:
 - "9000:9000"
 - "9001:9001"
 environment:
 MINIO_ROOT_USER: minioadmin
 MINIO_ROOT_PASSWORD: minioadmin
 command: server /data --console-address ":9001"
 volumes:
 - minio_data:/data
 restart: always
 
 minio-init:
 image: minio/mc
 depends_on:
 - minio
 entrypoint: >
 /bin/sh -c "
 sleep 5;
 mc alias set local http://minio:9000 minioadmin minioadmin;
 mc mb -p local/detection-results;
 mc policy set public local/detection-results;
 exit 0;
 "
 
 postgres:
 image: postgres:17
 container_name: labsecurity_postgres
 environment:
 POSTGRES_DB: labcamera
 POSTGRES_USER: postgres
 POSTGRES_PASSWORD: postgres
 ports:
 - "5434:5432"
 volumes:
 - ./initdb:/docker-entrypoint-initdb.d
 restart: always
 
 volumes:
 pgdata:
 minio_data:
 
 | 
其中,在labsecurity_postgres容器中,我指定了数据库初始化的文件位置,即/initdb,只要在该文件夹下放置.sql,容器初始化时即可根据该sql文件建立数据库,不要太方便。而容器间、宿主机和容器间也可以通过端口相互访问,例如,在其他容器内通过minio:9000,在本机中可以通过http://localhost:9001/访问。
通过 docker compose up -d 可以一键部署容器,通过 docker ps` -a 查看部署情况。此外,按照原计划,深度学习环境也会部署在容器内,这里做个记录,首先,构建 .Dockerfile:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 
 | FROM nvidia/cuda:12.8.0-cudnn-runtime-ubuntu22.04
 RUN apt-get update && apt-get install -y python3 python3-pip
 
 WORKDIR /app
 
 COPY requirements.txt .
 
 RUN pip install torch==2.6.0 torchvision==0.21.0 torchaudio==2.6.0 --index-url https://download.pytorch.org/whl/cu126
 RUN pip install --no-cache-dir -r requirements.txt
 
 COPY ./app /app
 
 CMD ["python3", "main.py"]
 
 | 
可以看到,该文件指定了安装cuda驱动和python,并根据requirements安装好需要的python包。在docker- compose中加上:
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 
 | services:app:
 build: .
 runtime: nvidia
 container_name: labsecurity_app
 depends_on:
 - redis
 - postgres
 - minio
 volumes:
 - ./app:/app
 working_dir: /app
 ports:
 - "8888:8888"
 command: ["tail", "-f", "/dev/null"]
 
 | 
容器能看到本机的 ./app 目录,点击vscode左下方的><,选择Attach to Running Container,进入python容器,就可以愉快的开发了。
 总结
这算是我参与的算是比较完整的横向了,但是也没什么好总结的,懒得总结。