基于树莓派与Flask的智能安防摄像头系统:从硬件连接到Web控制
1. 项目概述与核心价值
如果你手头有一块闲置的树莓派,又恰好对家庭安防或者物联网项目感兴趣,那么把这个小玩意儿改造成一个带Web控制界面的智能安防摄像头,绝对是个既实用又有成就感的项目。这不仅仅是让一个摄像头工作起来,而是构建一个完整的系统:硬件上,它通过PIR传感器感知人体移动;软件上,它运行着一个用Flask搭建的Web服务器,让你能在任何有网络的地方,用手机或电脑的浏览器就能调整监控灵敏度、手动拍照录像。整个系统是典型的物联网三层架构——感知层(摄像头、传感器)、网络层(树莓派的网络连接)、应用层(Flask Web应用)的微型实践。
这个项目的核心价值在于它的完整性与可定制性。不同于市面上封装好的摄像头,你自己搭建的系统,从硬件连接到每一行Python代码,都完全透明可控。你可以决定视频存哪里(本地还是云端?),可以自定义触发逻辑(比如只有晚上才启动移动侦测),甚至可以扩展功能(比如识别到人脸后给你发个微信通知)。对于学习者来说,它能让你一次性串联起Linux操作、Python编程、GPIO控制、Web开发、多线程/进程协同等多个知识点。对于实用主义者,它提供了一个低成本、高自由度的安防解决方案基础框架。接下来,我会带你从零开始,手把手复现并深化这个项目,过程中我会补充大量原教程未提及的细节、避坑指南和性能优化思路。
2. 硬件选型、连接与底层原理
2.1 核心硬件详解与选型建议
原教程提到了树莓派3、Pi Camera、PIR传感器等。我们来深入拆解一下每个部分的选择理由和替代方案。
树莓派型号选择:树莓派3是教程发布时的主流型号。但现在树莓派4甚至5已经普及。对于这个项目,树莓派4B 2GB版本是性价比最高的选择。它提供了更快的CPU和千兆以太网,在处理视频流和运行Web服务器时响应更迅速。如果手头只有树莓派3或Zero 2 W,也完全足够,只是Web界面加载和视频处理可能会稍慢一些。务必确保你的树莓派有稳定的电源供应,推荐使用官方电源或能提供5V/3A输出的优质电源,电压不稳是许多诡异问题的根源。
摄像头模块:官方Pi Camera Module(无论是v1还是v2)是最省心的选择,因为它有专用的CSI接口和成熟的picamera库支持。如果你手头有USB摄像头,也可以使用,但需要改用opencv或fswebcam等库来驱动,这会增加软件配置的复杂性,且通常帧率和稳定性不如CSI摄像头。对于安防场景,建议选择Pi Camera Module v2,它的索尼IMX219传感器在低光环境下表现更好。
PIR(被动红外)运动传感器:这是项目的“眼睛”。常见型号如HC-SR501。你需要理解它的工作原理:它检测特定范围内红外辐射的变化(比如人体体温与环境温度的差异),并输出数字信号(高电平)。模块上通常有两个旋钮:一个调节灵敏度(探测距离),一个调节延时(触发后保持高电平的时间)。教程中通过软件设置“灵敏度”,实际上是在代码里设定需要连续检测到多少次高电平才判定为有效移动,这是一种软件防抖逻辑,与硬件灵敏度旋钮是协同工作的。
连接细节与电路保护:
- 树莓派关机断电:这是铁律。连接任何GPIO设备前,务必拔掉电源。
- 摄像头连接:找到树莓派上那个黑色的CSI排线插座,轻轻提起卡扣,将摄像头排线金属面背对以太网口(或USB口)方向插入,然后按下卡扣锁紧。排线很脆弱,切忌用力弯折。
- PIR传感器连接:需要三根母对母杜邦线。
- VCC-> 树莓派GPIO的5V引脚(如物理引脚2或4)。注意,不是3.3V,HC-SR501通常需要5V供电。
- GND-> 树莓派GPIO的任意GND引脚(如物理引脚6、9、14等)。
- OUT-> 树莓派GPIO的GPIO17(对应物理引脚11)。选择这个引脚没有特殊原因,只是一个可用的通用输入引脚。你可以换成其他GPIO,但代码中的引脚编号需要同步修改。
重要提示:为防止感应器误触发或损坏树莓派,可以在PIR的OUT引脚和树莓派GPIO之间串联一个1kΩ的电阻,这是一个简单的限流保护措施。虽然很多教程省略了这一步,但在实际长期运行的工程中,加上它会更稳妥。
2.2 系统初始化与摄像头启用
硬件连接好后,上电启动树莓派。我假设你已安装好Raspberry Pi OS(原Raspbian),并已通过SSH或直接连接显示器键盘的方式登录。
首先,更新系统并启用摄像头接口:
sudo apt update && sudo apt upgrade -y sudo raspi-config在raspi-config界面中,选择Interface Options->Camera->Yes来启用摄像头驱动。完成后选择Finish并重启。
重启后,验证摄像头是否被系统识别:
vcgencmd get_camera如果一切正常,你会看到supported=1 detected=1。如果detected=0,请检查排线是否插紧,或者尝试在raspi-config中先禁用再重新启用摄像头。
接下来,安装项目所需的Python库。原教程使用了pip,但为了系统一致性,我推荐先安装系统包,再用pip安装额外的库:
# 安装Python3和pip,以及GPIO和摄像头库的系统依赖 sudo apt install python3-dev python3-pip python3-picamera2 python3-rpi.gpio -y # 使用pip3安装Flask及其相关组件 pip3 install flask flask-restful flask-wtf这里注意,我们安装了python3-picamera2。树莓派官方已逐步转向新的picamera2库,它比旧的picamera功能更强大,且支持最新的相机系统。但为了与原教程代码最大程度兼容,后续我们会主要使用picamera,但也会介绍picamera2的替代写法。你可以同时安装旧版库:pip3 install picamera。
3. 软件架构深度解析与Flask应用构建
3.1 项目目录结构与代码组织
清晰的目录结构是项目可维护的基础。在树莓派的家目录(/home/pi)下创建项目文件夹:
mkdir -p ~/iot_security_camera/{templates,static,videos,pictures} cd ~/iot_security_cameratemplates/: 存放Flask的HTML模板文件。static/: 可存放CSS、JavaScript文件(用于美化界面,本项目简化处理,暂不需要)。videos/&pictures/: 分别存放录制的视频和拍摄的照片。
3.2 Flask表单与Web界面设计
原教程的camControl.py定义了表单。我们将其优化并增加注释:
camControl.py
from flask_wtf import FlaskForm from wtforms import IntegerField, SelectField, SubmitField, validators from wtforms.validators import DataRequired, NumberRange class CameraControlForm(FlaskForm): """ 摄像头控制表单类。 videoDuration: 录制时长(秒)。注意:实际录制时,树莓派存储和处理器性能可能限制单次录制长度。 sensitivity: 运动灵敏度阈值。这是一个软件防抖参数,表示PIR传感器需要连续检测到多少次高电平信号(每次检查间隔0.1秒)才触发录制。 值越高,需要持续的移动时间越长,触发越“迟钝”。 action: 手动控制选项。 """ videoDuration = IntegerField( '录制时长(秒)', default=10, validators=[DataRequired(message="请输入录制时长"), NumberRange(min=1, max=300, message="时长应在1-300秒之间")] # 增加合理范围验证,避���设置过长导致存储爆满或进程卡死。 ) sensitivity = IntegerField( '移动侦测灵敏度 (2500-10000)', default=5000, validators=[ DataRequired(message="请输入灵敏度值"), NumberRange(min=2500, max=10000, message="灵敏度值必须在2500到10000之间") ] ) # 解释:假设PIR每0.1秒检查一次。sensitivity=5000意味着需要连续5000*0.1=500秒的高电平?不,这里教程逻辑可能有点问题,我们后面在操作类会修正。 # 更合理的逻辑是:sensitivity表示“在连续检测中,高电平计数达到此值即触发”。我们将其重新定义为“触发阈值计数”,比如设为50,表示连续检测到50次高电平(约5秒)才触发。 action = SelectField( '手动控制', choices=[ ('none', '无操作'), ('start_rec', '开始录制'), ('stop_rec', '停止录制'), ('take_pic', '拍摄照片') ], default='none' ) submit = SubmitField('提交设置')接下来是HTML模板。在templates/index.html中,我们创建一个更清晰的界面:
templates/index.html
<!DOCTYPE html> <html> <head> <title>树莓派安防摄像头控制面板</title> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <style> body { font-family: Arial, sans-serif; margin: 20px; background-color: #f4f4f4; } .container { max-width: 600px; margin: auto; background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); } h1 { color: #333; text-align: center; } .form-group { margin-bottom: 15px; } label { display: block; margin-bottom: 5px; font-weight: bold; } input, select { width: 100%; padding: 8px; box-sizing: border-box; border: 1px solid #ccc; border-radius: 4px; } .btn { background-color: #5cb85c; color: white; padding: 10px 15px; border: none; border-radius: 4px; cursor: pointer; width: 100%; font-size: 16px; } .btn:hover { background-color: #4cae4c; } .status { margin-top: 20px; padding: 10px; background-color: #e7f3fe; border-left: 4px solid #2196F3; } .error { color: red; font-size: 0.9em; } </style> </head> <body> <div class="container"> <h1>📷 安防摄像头远程控制</h1> <p>当前摄像头IP: <strong>{{ request.host }}</strong></p> <form method="POST" action="/"> {{ form.hidden_tag() }} <!-- 生成CSRF令牌,Flask-WTF需要 --> <div class="form-group"> {{ form.videoDuration.label }} {{ form.videoDuration() }} {% for error in form.videoDuration.errors %} <span class="error">[{{ error }}]</span> {% endfor %} <small>设置移动触发后自动录制的时长(1-300秒)。</small> </div> <div class="form-group"> {{ form.sensitivity.label }} {{ form.sensitivity() }} {% for error in form.sensitivity.errors %} <span class="error">[{{ error }}]</span> {% endfor %} <small>数值越小越敏感(更容易触发),数值越大越迟钝(需要更持续的运动)。</small> </div> <div class="form-group"> {{ form.action.label }} {{ form.action() }} </div> <div class="form-group"> {{ form.submit(class="btn") }} </div> </form> <div class="status"> <h3>系统状态</h3> <p>最后触发时间: {{ last_trigger_time if last_trigger_time else '暂无' }}</p> <p>当前模式: {% if is_recording %} <span style="color:red;">● 正在录制</span> {% else %} 待机监控 {% endif %}</p> <p><a href="/videos">查看录像文件</a> | <a href="/pictures">查看照片文件</a></p> <!-- 后续可以在这里添加一个简单的视频流预览 --> </div> </div> </body> </html>这个模板比原教程的更友好,包含了表单验证错误显示、状态提示和文件查看链接。
3.3 Flask主应用与API设计
现在创建主应用文件app.py。我们将整合表单处理、API接口,并增加文件列表查看功能。
app.py
#!/usr/bin/env python3 """ 树莓派安防摄像头 - Flask主服务器应用 运行: python3 app.py 访问: http://树莓派IP地址:5000 """ import os from datetime import datetime from flask import Flask, render_template, request, send_from_directory, jsonify from camControl import CameraControlForm import threading import json # 初始化Flask应用 app = Flask(__name__) app.config['SECRET_KEY'] = 'your-secret-key-please-change-this' # 生产环境务必更换! # 用于在Web界面显示的状态变量(简单示例,生产环境应用数据库或更安全的共享内存方式) system_status = { 'is_recording': False, 'last_trigger': None, 'current_duration': 10, 'current_sensitivity': 5000 } # 用于与摄像头控制进程通信的文件路径 SETTINGS_FILE = 'camera_settings.json' # 使用JSON文件比纯文本更结构化,更易于解析和扩展。 def write_settings_to_file(duration, sensitivity, action): """将新的设置写入JSON文件""" settings = { 'duration': duration, 'sensitivity': sensitivity, 'action': action, 'updated_at': datetime.now().isoformat() } with open(SETTINGS_FILE, 'w') as f: json.dump(settings, f) print(f"[Web Server] 设置已更新: {settings}") def read_settings_from_file(): """从JSON文件读取当前设置""" try: with open(SETTINGS_FILE, 'r') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): # 文件不存在或损坏,返回默认值 return {'duration': 10, 'sensitivity': 5000, 'action': 'none'} @app.route('/', methods=['GET', 'POST']) def index(): """主控制页面""" form = CameraControlForm() if form.validate_on_submit(): # 获取表单数据 duration = form.videoDuration.data sensitivity = form.sensitivity.data action = form.action.data # 更新系统状态(示例) if action == 'start_rec': system_status['is_recording'] = True elif action == 'stop_rec': system_status['is_recording'] = False if action in ['start_rec', 'take_pic']: system_status['last_trigger'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 将设置写入文件,供摄像头控制进程读取 write_settings_to_file(duration, sensitivity, action) # 更新内存中的状态(仅用于Web显示) system_status['current_duration'] = duration system_status['current_sensitivity'] = sensitivity # 重定向到当前页面,防止表单重复提交 return render_template('index.html', form=form, is_recording=system_status['is_recording'], last_trigger_time=system_status['last_trigger']) # GET请求或表单验证失败时,显示页面 current_settings = read_settings_from_file() # 用文件中的值预填表单 form.videoDuration.data = current_settings.get('duration', 10) form.sensitivity.data = current_settings.get('sensitivity', 5000) return render_template('index.html', form=form, is_recording=system_status['is_recording'], last_trigger_time=system_status['last_trigger']) @app.route('/videos') def list_videos(): """列出录像文件""" videos_dir = 'videos' try: files = os.listdir(videos_dir) video_files = [f for f in files if f.endswith(('.h264', '.mp4'))] # 按修改时间排序,最新的在前 video_files.sort(key=lambda x: os.path.getmtime(os.path.join(videos_dir, x)), reverse=True) return '<br>'.join([f'<a href="/videos/{f}">{f}</a>' for f in video_files]) or "暂无录像文件。" except FileNotFoundError: return "录像目录不存在。" @app.route('/videos/<filename>') def download_video(filename): """下载录像文件""" return send_from_directory('videos', filename, as_attachment=True) @app.route('/api/status') def get_status(): """提供一个简单的JSON API接口,用于获取系统状态(可用于未来移动端扩展)""" return jsonify(system_status) if __name__ == '__main__': # 确保设置文件存在 if not os.path.exists(SETTINGS_FILE): write_settings_to_file(10, 5000, 'none') # 在局域网内可访问,关闭调试模式以提高性能 app.run(host='0.0.0.0', port=5000, debug=False, threaded=True)这个app.py做了几项重要改进:
- 使用JSON进行进程间通信:比纯文本更可靠,易于解析和扩展字段。
- 增加了状态反馈:Web页面可以显示是否正在录制、最后触发时间。
- 增加了文件管理:可以直接在Web上查看和下载录制的视频。
- 提供了API接口:为未来开发手机App或与其他智能家居系统集成留出了可能。
- 表单预填充:每次加载页面时,从文件读取当前设置并填充表单,用户体验更好。
4. 摄像头控制逻辑与多进程协同
这是项目的核心大脑。我们需要一个独立于Web服务器的后台进程,它持续做两件事:1. 监控PIR传感器并根据灵敏度设置触发录制;2. 定期检查设置文件,响应来自Web界面的手动控制命令。
4.1 重构摄像头操作类
我们创建一个更健壮、易读的摄像头操作类camera_operator.py。
camera_operator.py
#!/usr/bin/env python3 import time import json import logging from datetime import datetime import threading try: import picamera from picamera import PiCamera CAMERA_LIB = 'picamera' except ImportError: try: from picamera2 import Picamera2 CAMERA_LIB = 'picamera2' except ImportError: CAMERA_LIB = None print("错误:未找到picamera或picamera2库。请运行 'pip3 install picamera' 或 'sudo apt install python3-picamera2'") exit(1) # 配置日志,方便调试和查看运行记录 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class SecurityCamera: """ 安防摄像头核心控制类。 处理与PiCamera的交互、视频录制、拍照以及基于PIR传感器的运动检测逻辑。 """ def __init__(self, settings_file='camera_settings.json'): self.settings_file = settings_file self.current_settings = {'duration': 10, 'sensitivity': 50, 'action': 'none'} self.is_recording = False self.motion_detected_counter = 0 self.sensitivity_threshold = 50 # 默认灵敏度阈值(连续检测次数) self.recording_duration = 10 self.camera = None self._init_camera() self.load_settings() def _init_camera(self): """初始化摄像头硬件""" try: if CAMERA_LIB == 'picamera': self.camera = PiCamera() self.camera.resolution = (1296, 972) # 平衡画质与性能,1080p(1920x1080)对树莓派3可能负担较重 self.camera.framerate = 15 self.camera.rotation = 180 # 如果摄像头图像是倒的,可以旋转180度 logger.info("PiCamera (legacy) 初始化成功。") elif CAMERA_LIB == 'picamera2': self.camera = Picamera2() # Picamera2需要更复杂的配置,这里是一个预览配置示例 video_config = self.camera.create_video_configuration(main={"size": (1280, 720)}, controls={"FrameRate": 20}) self.camera.configure(video_config) logger.info("Picamera2 初始化成功。") else: raise RuntimeError("无可用摄像头库。") # 给摄像头一点时间启动 time.sleep(2) except Exception as e: logger.error(f"摄像头初始化失败: {e}") self.camera = None def load_settings(self): """从JSON文件加载设置""" try: with open(self.settings_file, 'r') as f: new_settings = json.load(f) # 只更新我们关心的字段,避免覆盖内部状态 self.recording_duration = new_settings.get('duration', self.recording_duration) self.sensitivity_threshold = new_settings.get('sensitivity', self.sensitivity_threshold) action = new_settings.get('action', 'none') logger.info(f"加载设置: 时长={self.recording_duration}s, 灵敏度阈值={self.sensitivity_threshold}, 动作={action}") return action except (FileNotFoundError, json.JSONDecodeError, KeyError) as e: logger.warning(f"读取设置文件失败,使用默认值: {e}") return 'none' def start_recording(self, custom_name=None): """开始录制视频""" if self.camera is None or self.is_recording: return False try: if custom_name: filename = custom_name else: # 生成基于时间戳的文件名 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"videos/motion_{timestamp}.h264" if CAMERA_LIB == 'picamera': self.camera.start_recording(filename) elif CAMERA_LIB == 'picamera2': # Picamera2 开始录制的方式不同,此处简化处理 logger.warning("Picamera2 录制功能需单独配置,此处暂用占位符。") # 实际应使用: self.camera.start_and_record_video(filename, duration=duration) pass self.is_recording = True logger.info(f"开始录制: {filename}") return True except Exception as e: logger.error(f"开始录制失败: {e}") return False def stop_recording(self): """停止录制视频""" if self.camera is None or not self.is_recording: return False try: if CAMERA_LIB == 'picamera': self.camera.stop_recording() elif CAMERA_LIB == 'picamera2': # 停止录制的代码 pass self.is_recording = False logger.info("停止录制。") return True except Exception as e: logger.error(f"停止录制失败: {e}") return False def capture_image(self, custom_name=None): """拍摄一张照片""" if self.camera is None or self.is_recording: return False try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") if not custom_name: filename = f"pictures/capture_{timestamp}.jpg" else: filename = custom_name if CAMERA_LIB == 'picamera': self.camera.capture(filename, use_video_port=True) # use_video_port更快,适合连拍 elif CAMERA_LIB == 'picamera2': # Picamera2 拍照方式 pass logger.info(f"照片已保存: {filename}") return True except Exception as e: logger.error(f"拍照失败: {e}") return False def check_motion(self, pir_signal): """ 检查PIR信号并应用软件防抖逻辑。 pir_signal: 布尔值,True表示检测到移动(高电平)。 返回: True如果满足条件触发动作,否则False。 """ if pir_signal: self.motion_detected_counter += 1 logger.debug(f"移动检测计数: {self.motion_detected_counter}/{self.sensitivity_threshold}") if self.motion_detected_counter >= self.sensitivity_threshold: self.motion_detected_counter = 0 # 重置计数器 return True else: # 没有检测到信号,重置计数器(防止短暂干扰累积) self.motion_detected_counter = 0 return False def cleanup(self): """清理资源,关闭摄像头""" if self.camera: if self.is_recording: self.stop_recording() self.camera.close() logger.info("摄像头资源已释放。")4.2 主控制循环与进程通信
现在,我们创建主控制脚本main_controller.py。它将SecurityCamera类与GPIO读取、设置文件监控结合起来,形成一个独立的后台服务。
main_controller.py
#!/usr/bin/env python3 """ 安防摄像头主控制进程。 运行: python3 main_controller.py 此进程应作为系统服务或后台进程运行。 """ import RPi.GPIO as GPIO import time import json import logging from datetime import datetime from camera_operator import SecurityCamera # 配置GPIO PIR_PIN = 17 # 对应物理引脚11,BCM编码为GPIO17 GPIO.setmode(GPIO.BCM) # 使用BCM编号,与BOARD编号(11)对应的是GPIO17 GPIO.setup(PIR_PIN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) # 启用内部下拉电阻,确保引脚稳定 # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('camera_service.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) def main(): logger.info("安防摄像头主控制进程启动。") camera = SecurityCamera() # 上一次检查设置文件的时间 last_settings_check = time.time() settings_check_interval = 2 # 每2秒检查一次设置文件 # 上一次处理手动动作的时间(防抖) last_manual_action_time = 0 manual_action_cooldown = 3 # 手动动作冷却时间(秒) try: while True: current_time = time.time() # 1. 检查并响应来自Web界面的手动控制命令 if current_time - last_settings_check > settings_check_interval: action = camera.load_settings() # 这个方法现在会返回'action'字段的值 last_settings_check = current_time if action != 'none' and (current_time - last_manual_action_time) > manual_action_cooldown: logger.info(f"执行手动动作: {action}") if action == 'start_rec' and not camera.is_recording: camera.start_recording(f"videos/manual_{datetime.now().strftime('%Y%m%d_%H%M%S')}.h264") elif action == 'stop_rec' and camera.is_recording: camera.stop_recording() elif action == 'take_pic' and not camera.is_recording: camera.capture_image() # 执行后,清空动作标志(通过写回文件实现) with open(camera.settings_file, 'r+') as f: settings = json.load(f) settings['action'] = 'none' f.seek(0) json.dump(settings, f) f.truncate() last_manual_action_time = current_time # 2. 读取PIR传感器状态并处理移动侦测 try: # 读取GPIO引脚,True表示检测到移动(高电平) motion_detected = GPIO.input(PIR_PIN) == GPIO.HIGH except Exception as e: logger.error(f"读取GPIO失败: {e}") motion_detected = False # 3. 应用移动侦测逻辑 if camera.check_motion(motion_detected): logger.info(f"移动侦测触发!开始录制{ camera.recording_duration }秒。") if not camera.is_recording: camera.start_recording() # 录制指定时长后自动停止(在另一个线程中处理,避免阻塞主循环) def stop_after_duration(): time.sleep(camera.recording_duration) camera.stop_recording() threading.Thread(target=stop_after_duration, daemon=True).start() # 短暂休眠,降低CPU占用 time.sleep(0.1) # 主循环频率约10Hz except KeyboardInterrupt: logger.info("收到中断信号,正在关闭...") except Exception as e: logger.critical(f"主循环发生未预期错误: {e}") finally: camera.cleanup() GPIO.cleanup() logger.info("进程已安全退出。") if __name__ == '__main__': main()这个主控制循环的设计要点:
- 分离关注点:移动侦测逻辑和手动命令处理逻辑分离,互不干扰。
- 非阻塞录制:使用线程来处理“录制固定时长后自动停止”,这样主循环在录制期间依然能响应手动停止命令或新的移动触发。
- 资源安全:使用
try...finally确保无论程序如何退出,摄像头和GPIO资源都会被正确释放。 - 日志记录:详细的日志对于调试后台运行的服务至关重要。
5. 系统集成、部署与优化
5.1 启动与测试流程
现在,我们有了三个核心文件:app.py(Web服务器),camera_operator.py(摄像头类),main_controller.py(主控制进程)。我们需要让它们协同工作。
首先,确保所有文件都在
~/iot_security_camera目录下,并且目录结构如下:/home/pi/iot_security_camera/ ├── app.py ├── camControl.py ├── camera_operator.py ├── main_controller.py ├── camera_settings.json (运行后自动生成) ├── camera_service.log (运行后自动生成) ├── templates/ │ └── index.html ├── static/ (可选) ├── videos/ (空目录) └── pictures/ (空目录)首次运行前,初始化设置文件:
cd ~/iot_security_camera echo '{"duration": 10, "sensitivity": 50, "action": "none"}' > camera_settings.json在两个不同的终端窗口中分别启动服务:
终端1 - Web服务器:
cd ~/iot_security_camera python3 app.py你应该看到输出类似:
* Running on http://0.0.0.0:5000/。记下你的树莓派IP地址(可以用hostname -I命令查看)。终端2 - 摄像头控制进程:
cd ~/iot_security_camera python3 main_controller.py你会看到日志输出,显示进程启动��并开始监听PIR传感器。
测试:
- 在同一局域网的电脑或手机上,打开浏览器,访问
http://<树莓派IP>:5000。 - 你应该能看到控制面板。尝试修改录制时长和灵敏度,点击提交。
- 观察终端2的日志,看是否打印出
[Web Server] 设置已更新。 - 在PIR传感器前挥手,观察终端2日志是否出现
移动侦测触发!,并检查videos/目录下是否生成.h264文件。 - 在Web界面选择“开始录制”、“拍摄照片”等手动操作,检查是否生效。
- 在同一局域网的电脑或手机上,打开浏览器,访问
5.2 配置为系统服务(开机自启)
为了让项目在树莓派启动后自动运行,我们需要创建systemd服务单元。
创建Web服务器服务文件:
sudo nano /etc/systemd/system/security-camera-web.service写入以下内容:
[Unit] Description=Security Camera Flask Web Server After=network.target [Service] Type=simple User=pi WorkingDirectory=/home/pi/iot_security_camera ExecStart=/usr/bin/python3 /home/pi/iot_security_camera/app.py Restart=on-failure RestartSec=10 [Install] WantedBy=multi-user.target创建摄像头控制服务文件:
sudo nano /etc/systemd/system/security-camera-controller.service写入以下内容:
[Unit] Description=Security Camera Main Controller After=network.target [Service] Type=simple User=pi WorkingDirectory=/home/pi/iot_security_camera ExecStart=/usr/bin/python3 /home/pi/iot_security_camera/main_controller.py Restart=always # 控制进程需要持续运行,任何失败都重启 RestartSec=5 [Install] WantedBy=multi-user.target启用并启动服务:
sudo systemctl daemon-reload sudo systemctl enable security-camera-web.service sudo systemctl enable security-camera-controller.service sudo systemctl start security-camera-web.service sudo systemctl start security-camera-controller.service检查服务状态:
sudo systemctl status security-camera-web.service sudo systemctl status security-camera-controller.service看到
active (running)即表示成功。现在,即使重启树莓派,这两个服务也会自动启动。
5.3 高级优化与功能扩展思路
基础系统搭建完成后,可以考虑以下优化和扩展,让项目更实用、更强大:
视频流直播:在Flask应用中集成
MJPG-streamer或使用picamera2的MJPEG输出,实现实时视频流查看,而不仅仅是事后看录像文件。这需要额外的库和前端代码。视频编码与转换:
.h264文件很多设备无法直接播放。可以在录制完成后,使用ffmpeg或MP4Box自动将其封装为更通用的.mp4格式。可以写一个后台脚本监控videos/目录,自动转换新文件。云存储与通知:将录制的视频和照片自动上传到云存储(如Google Drive, Dropbox, 或国内的阿里云OSS)。可以使用
rclone工具。同时,可以集成邮件或即时通讯工具(如Telegram Bot、Server酱)的API,在触发移动侦测或手动操作时发送通知。更智能的检测:PIR传感器只能检测移动,无法区分是人、宠物还是窗帘晃动。可以结合机器学习,使用
TensorFlow Lite或OpenCV进行简单的图像识别,实现“人形检测”,大幅减少误报。功耗与性能优化:
- 降低分辨率/帧率:对于纯安防监控,720p@15fps通常足够,能显著降低CPU和存储负载。
- 定时任务:使用
cron在夜间或特定时段才启动监控服务。 - 使用硬件编码:确保
picamera使用H.264硬件编码(默认就是),这是最省CPU的方式。
安全性加固:
- 更改Flask密钥:生产环境中务必使用强随机字符串替换
app.config['SECRET_KEY']。 - 添加HTTP认证:使用Flask-HTTPAuth等库为Web界面添加简单的用户名密码认证,防止陌生人访问。
- 使用HTTPS:通过Nginx反向代理并配置SSL证书(例如用Let‘s Encrypt的免费证书),实现加密访问。
- 更改Flask密钥:生产环境中务必使用强随机字符串替换
更友好的Web界面:使用Bootstrap等前端框架美化界面,添加实时视频流预览窗口、历史事件时间线、文件管理(删除、播放)等功能。
这个项目就像一个乐高底座,上述每一个扩展点都是一块新的积木。你可以根据自己的需求和兴趣,不断往上添加,最终打造出一个完全符合你个人需求的、功能强大的智能安防系统。从最简单的移动触发录制,到智能识别、云端备份、手机通知,物联网的魅力就在于这种层层递进、无限可能的创造过程。
