别再盲找了!GhostTrack 开源项目让设备追踪变得如此简单
一、为什么这个项目值得关注
在当今万物互联的时代,设备追踪已成为企业运维、智能硬件开发、资产管理的核心需求。无论是追踪成千上万的物联网设备,还是监控移动终端的位置信息,一套高效、可靠的追踪系统都是必不可少的。
GhostTrack 正是为解决这一痛点而生的开源项目。它提供了完整的设备追踪解决方案,支持多协议接入、实时数据处理、灵活的可视化配置,让开发者能够快速搭建属于自己的追踪系统。
GhostTrack 的核心价值
┌─────────────────────────────────────────────────────────────┐
│ GhostTrack 架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 设备层 │───▶│ 协议层 │───▶│ 数据层 │───▶│ 展示层 │ │
│ │ │ │ │ │ │ │ │ │
│ │ • GPS │ │ • MQTT │ │ • 存储 │ │ • Dashboard │
│ │ • BLE │ │ • HTTP │ │ • 计算 │ │ • API │ │
│ │ • WiFi │ │ • CoAP │ │ • 分析 │ │ • 告警 │ │
│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
为什么选择 GhostTrack 而不是商业解决方案?
| 特性 | 商业方案 | GhostTrack |
|---|---|---|
| 成本 | 高昂的订阅费用 | 完全免费开源 |
| 定制性 | 受限于厂商 | 源码开放,完全可控 |
| 部署方式 | 仅云端 | 支持本地/云端/边缘部署 |
| 数据主权 | 数据存储在第三方 | 数据完全自主掌控 |
| 社区支持 | 依赖厂商 | 活跃的开源社区 |
二、环境搭建:快速启动 GhostTrack
2.1 系统要求
在开始之前,让我们确认你的开发环境满足以下要求:
最低配置:
- CPU: 2 cores
- RAM: 4GB
- 磁盘: 20GB 可用空间
- 操作系统: Linux/macOS/Windows (WSL2)
推荐配置:
- CPU: 4+ cores
- RAM: 8GB+
- 磁盘: 50GB+ SSD
- 操作系统: Ubuntu 20.04+ / macOS 12+
2.2 安装前的准备工作
首先,确保你的系统已经安装了必要的依赖:
# 检查 Python 版本(需要 Python 3.8+)
python3 --version
# 输出示例:Python 3.10.12
# 检查 Git 是否可用
git --version
# 输出示例:git version 2.34.1
# 检查 Docker(推荐使用 Docker 部署)
docker --version
# 输出示例:Docker version 24.0.7
2.3 克隆项目仓库
# 克隆 GhostTrack 仓库
git clone https://github.com/HunxByts/GhostTrack.git
# 进入项目目录
cd GhostTrack
# 查看项目结构
ls -la
项目结构通常如下:
GhostTrack/
├── docs/ # 项目文档
├── examples/ # 示例代码
├── ghosttrack/ # 核心源码
│ ├── __init__.py
│ ├── api/ # API 接口
│ ├── core/ # 核心模块
│ ├── models/ # 数据模型
│ └── utils/ # 工具函数
├── tests/ # 测试用例
├── docker-compose.yml # Docker 编排文件
├── requirements.txt # Python 依赖
└── README.md # 项目说明
2.4 使用 Docker 快速部署(推荐)
对于大多数用户,我们强烈推荐使用 Docker 进行部署,它能自动处理所有依赖问题:
# 方式一:使用默认配置启动
docker-compose up -d
# 方式二:查看启动日志确认服务状态
docker-compose logs -f
服务启动后,你应该能看到类似以下的输出:
[INFO] GhostTrack API Server started on port 8000
[INFO] WebSocket endpoint available at ws://localhost:8000/ws
[INFO] Dashboard available at http://localhost:8000/dashboard
[INFO] Database connection established
[INFO] All services initialized successfully
2.5 本地开发环境安装
如果你希望进行开发或深入研究项目源码,可以选择本地安装:
# 创建虚拟环境(推荐)
python3 -m venv venv
# 激活虚拟环境
source venv/bin/activate # Linux/macOS
# 或
venv\Scripts\activate # Windows
# 安装依赖
pip install -r requirements.txt
# 安装开发依赖(可选)
pip install -r requirements-dev.txt
# 运行开发服务器
python -m ghosttrack.server
2.6 验证安装
安装完成后,让我们验证 GhostTrack 是否正常运行:
# 创建一个测试脚本 verify_installation.py
import requests
import json
# 测试 API 服务
base_url = "http://localhost:8000"
# 检查服务健康状态
response = requests.get(f"{base_url}/health")
health_data = response.json()
print("=" * 50)
print("GhostTrack 安装验证")
print("=" * 50)
print(f"服务状态: {health_data.get('status', 'unknown')}")
print(f"版本号: {health_data.get('version', 'unknown')}")
print(f"运行时间: {health_data.get('uptime', 'unknown')}")
print("=" * 50)
# 测试设备追踪核心功能
test_device = {
"device_id": "TEST001",
"latitude": 39.9042,
"longitude": 116.4074,
"timestamp": "2024-01-15T10:30:00Z"
}
response = requests.post(
f"{base_url}/api/v1/track",
json=test_device,
headers={"Content-Type": "application/json"}
)
print(f"追踪测试: {response.status_code}")
print(f"响应数据: {json.dumps(response.json(), indent=2, ensure_ascii=False)}")
运行验证脚本:
python verify_installation.py
成功输出示例:
==================================================
GhostTrack 安装验证
==================================================
服务状态: healthy
版本号: 1.2.0
运行时间: 42 seconds
==================================================
追踪测试: 201
响应数据: {
"success": true,
"device_id": "TEST001",
"message": "位置数据已成功接收"
}
三、核心功能详解
GhostTrack 提供了丰富而强大的功能,让我们逐一深入了解。
3.1 多协议设备接入
GhostTrack 支持多种主流物联网协议,让你可以灵活接入各类追踪设备:
HTTP RESTful API
最简单直接的接入方式,适合大多数应用场景:
import requests
import time
class GhostTrackClient:
"""GhostTrack API 客户端封装"""
def __init__(self, base_url, api_key=None):
self.base_url = base_url.rstrip('/')
self.api_key = api_key
self.session = requests.Session()
if api_key:
self.session.headers.update({"X-API-Key": api_key})
def report_location(self, device_id, latitude, longitude, **kwargs):
"""
上报设备位置信息
Args:
device_id: 设备唯一标识
latitude: 纬度
longitude: 经度
**kwargs: 附加参数(altitude, speed, heading, accuracy 等)
Returns:
dict: 包含处理结果的响应
"""
payload = {
"device_id": device_id,
"latitude": latitude,
"longitude": longitude,
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
**kwargs
}
response = self.session.post(
f"{self.base_url}/api/v1/track",
json=payload
)
return response.json()
# 使用示例
client = GhostTrackClient("http://localhost:8000")
# 追踪一辆快递车的位置
result = client.report_location(
device_id="TRUCK-2024-001",
latitude=31.2304,
longitude=121.4737,
altitude=10.5,
speed=45.2,
heading=90.0,
accuracy=5.0
)
print(f"上报结果: {result}")
MQTT 协议接入
对于需要低延迟、高吞吐量的场景,MQTT 是更好的选择:
import paho.mqtt.client as mqtt
import json
import time
class GhostTrackMQTT:
"""基于 MQTT 协议的 GhostTrack 客户端"""
def __init__(self, broker_host, broker_port=1883,
username=None, password=None):
self.client = mqtt.Client()
self.broker_host = broker_host
self.broker_port = broker_port
# 设置认证信息
if username and password:
self.client.username_pw_set(username, password)
# 设置回调函数
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
self.client.on_message = self._on_message
def _on_connect(self, client, userdata, flags, rc):
"""连接成功回调"""
if rc == 0:
print(f"成功连接到 MQTT Broker: {self.broker_host}:{self.broker_port}")
# 订阅设备主题(支持通配符)
client.subscribe("ghosttrack/devices/+/location")
else:
print(f"连接失败,返回码: {rc}")
def _on_disconnect(self, client, userdata, rc):
"""断开连接回调"""
print("与 MQTT Broker 断开连接,正在尝试重连...")
time.sleep(5)
client.reconnect()
def _on_message(self, client, userdata, msg):
"""收到消息回调"""
try:
payload = json.loads(msg.payload.decode())
print(f"收到设备位置数据: {payload}")
except json.JSONDecodeError:
print(f"消息解析失败: {msg.payload}")
def connect(self):
"""建立连接"""
self.client.connect(self.broker_host, self.broker_port, keepalive=60)
self.client.loop_start()
def disconnect(self):
"""断开连接"""
self.client.loop_stop()
self.client.disconnect()
def publish_location(self, device_id, latitude, longitude, **kwargs):
"""
发布设备位置信息
Topic 格式: ghosttrack/devices/{device_id}/location
"""
topic = f"ghosttrack/devices/{device_id}/location"
payload = {
"latitude": latitude,
"longitude": longitude,
"timestamp": int(time.time()),
**kwargs
}
self.client.publish(topic, json.dumps(payload))
print(f"已发布位置数据到 {topic}")
# 使用示例
mqtt_client = GhostTrackMQTT(
broker_host="localhost",
broker_port=1883,
username="your_username",
password="your_password"
)
mqtt_client.connect()
# 模拟持续追踪
try:
while True:
# 模拟设备移动
for lat in [31.2, 31.21, 31.22, 31.23]:
mqtt_client.publish_location(
device_id="BIKE-001",
latitude=lat,
longitude=121.47,
speed=25.0,
heading=45.0
)
time.sleep(2)
except KeyboardInterrupt:
print("\n停止追踪")
mqtt_client.disconnect()
3.2 实时数据处理引擎
GhostTrack 内置了强大的实时数据处理引擎,支持数据过滤、聚合、告警触发等功能:
from ghosttrack.engine import StreamProcessor
from ghosttrack.filters import (
GeoFenceFilter,
SpeedFilter,
AccuracyFilter,
DuplicateFilter
)
from ghosttrack.alerts import AlertManager
# 创建流处理器
processor = StreamProcessor(
buffer_size=1000,
flush_interval=5.0 # 每5秒刷新一次
)
# 配置数据过滤器链
processor.add_filter(DuplicateFilter(tolerance_seconds=1.0))
processor.add_filter(AccuracyFilter(min_accuracy=50.0)) # 过滤精度低于50米的数据
processor.add_filter(SpeedFilter(max_speed=200.0)) # 过滤速度异常数据
# 配置电子围栏
processor.add_filter(GeoFenceFilter(
name="北京市中心区域",
coordinates=[
(39.8, 116.3),
(39.8, 116.5),
(40.0, 116.5),
(40.0, 116.3)
],
action="allow" # allow=允许区域内的数据, deny=禁止区域内的数据
))
# 配置告警管理
alert_manager = AlertManager()
# 添加告警规则
alert_manager.add_rule({
"name": "超速告警",
"condition": lambda data: data.get("speed", 0) > 120,
"severity": "warning",
"message": "设备 {device_id} 检测到超速行驶,速度: {speed} km/h"
})
alert_manager.add_rule({
"name": "设备离线告警",
"condition": lambda data: data.get("offline_duration", 0) > 300,
"severity": "critical",
"message": "设备 {device_id} 已离线超过 {offline_duration} 秒"
})
# 设置告警回调
def on_alert(alert):
print(f"🚨 告警触发 [{alert['severity']}]: {alert['message']}")
# 可以在这里添加邮件通知、短信通知、webhook 等逻辑
alert_manager.on_alert = on_alert
# 启动处理器
processor.start()
# 提交数据进行处理
test_data = {
"device_id": "VEHICLE-001",
"latitude": 39.9042,
"longitude": 116.4074,
"speed": 85.0,
"accuracy": 10.0,
"timestamp": "2024-01-15T10:30:00Z"
}
processor.submit(test_data)
3.3 数据存储与查询
GhostTrack 支持多种数据存储后端,并提供灵活的查询接口:
from ghosttrack.storage import StorageManager
from ghosttrack.models import LocationRecord, DeviceInfo
from datetime import datetime, timedelta
# 初始化存储管理器
storage = StorageManager(
backend="postgresql", # 支持: sqlite, postgresql, mysql, timescaledb
connection_string="postgresql://user:pass@localhost:5432/ghosttrack"
)
# 保存单条位置记录
record = LocationRecord(
device_id="BIKE-001",
latitude=31.2304,
longitude=121.4737,
altitude=10.0,
speed=25.0,
heading=90.0,
accuracy=5.0,
timestamp=datetime.now()
)
storage.save_location(record)
# 批量保存位置记录
records = [
LocationRecord(
device_id="BIKE-001",
latitude=31.2304 + i * 0.001,
longitude=121.4737 + i * 0.001,
timestamp=datetime.now() - timedelta(minutes=i)
)
for i in range(10)
]
storage.save_locations_batch(records)
# 查询设备历史轨迹
trajectory = storage.get_trajectory(
device_id="BIKE-001",
start_time=datetime.now() - timedelta(hours=24),
end_time=datetime.now(),
limit=1000
)
print(f"获取到 {len(trajectory)} 条轨迹记录")
# 计算轨迹总距离
def calculate_distance(points):
"""使用 Haversine 公式计算轨迹总距离"""
from math import radians, sin, cos, sqrt, atan2
def haversine(lat1, lon1, lat2, lon2):
R = 6371 # 地球半径(公里)
lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * atan2(sqrt(a), sqrt(1-a))
return R * c
total_distance = 0
for i in range(1, len(points)):
total_distance += haversine(
points[i-1].latitude, points[i-1].longitude,
points[i].latitude, points[i].longitude
)
return total_distance
total_km = calculate_distance(trajectory)
print(f"总行程: {total_km:.2f} 公里")
# 按设备分组统计
stats = storage.get_device_stats(
start_time=datetime.now() - timedelta(days=30)
)
for device_stat in stats:
print(f"设备 {device_stat.device_id}: "
f"记录数={device_stat.record_count}, "
f"总距离={device_stat.total_distance:.2f}km, "
f"平均速度={device_stat.avg_speed:.1f}km/h")
3.4 WebSocket 实时推送
GhostTrack 提供 WebSocket 接口,支持实时推送设备状态更新:
import asyncio
import websockets
import json
async def websocket_demo():
"""WebSocket 实时追踪演示"""
# 连接到 GhostTrack WebSocket 服务
uri = "ws://localhost:8000/ws/stream"
async with websockets.connect(uri) as websocket:
print("已连接到 GhostTrack 实时流")
# 订阅特定设备
subscribe_msg = {
"action": "subscribe",
"device_ids": ["BIKE-001", "BIKE-002", "TRUCK-001"]
}
await websocket.send(json.dumps(subscribe_msg))
print(f"已订阅设备: {subscribe_msg['device_ids']}")
# 持续接收实时数据
while True:
try:
message = await websocket.recv()
data = json.loads(message)
# 处理不同类型的消息
msg_type = data.get("type")
if msg_type == "location_update":
device_id = data["device_id"]
lat = data["latitude"]
lon = data["longitude"]
speed = data.get("speed", 0)
print(f"📍 {device_id}: ({lat:.6f}, {lon:.6f}) "
f"速度: {speed:.1f} km/h")
elif msg_type == "alert":
print(f"🚨 告警: {data['message']}")
elif msg_type == "device_status":
status = data["status"]
print(f"📶 设备状态: {data['device_id']} -> {status}")
except websockets.exceptions.ConnectionClosed:
print("连接已断开")
break
# 运行演示
asyncio.run(websocket_demo())
四、实战教程:构建完整的车辆追踪系统
接下来,让我们通过一个完整的实战项目,将 GhostTrack 的各项功能串联起来,构建一个功能完备的车辆追踪系统。
4.1 项目概述
我们要构建的系统包含以下组件:
┌─────────────────────────────────────────────────────────────────┐
│ 车辆追踪系统架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 车载 GPS │────▶│ GhostTrack│────▶│ Dashboard│ │
│ │ 设备 │ │ Server │ │ 前端 │ │
│ └──────────┘ └────┬─────┘ └──────────┘ │
│ │ │
│ ▼ │
│ ┌──────────┐ │
│ │ 数据存储 │ │
│ │ PostgreSQL│ │
│ └──────────┘ │
│ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ 告警通知层 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────────────┐ │ │
│ │ │ 邮件 │ │ 短信 │ │ Webhook │ │ │
│ │ └─────────┘ └─────────┘ └─────────────────┘ │ │
│ └──────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
4.2 步骤一:数据模型设计
首先,我们需要定义系统的数据模型:
# models/fleet_models.py
from ghosttrack.models import BaseModel, LocationRecord
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel as PydanticBase, Field
class Vehicle(BaseModel):
"""车辆基础信息模型"""
vehicle_id: str = Field(..., description="车辆唯一标识")
license_plate: str = Field(..., description="车牌号")
vehicle_type: str = Field(default="sedan", description="车辆类型")
driver_name: Optional[str] = Field(None, description="司机姓名")
driver_phone: Optional[str] = Field(None, description="司机电话")
max_speed: float = Field(default=120.0, description="最大允许速度 km/h")
geofence_ids: List[str] = Field(default_factory=list, description="绑定的电子围栏")
class VehicleLocation(LocationRecord):
"""车辆位置扩展模型"""
vehicle_id: str = Field(..., description="车辆ID")
fuel_level: Optional[float] = Field(None, ge=0, le=100, description="油量百分比")
engine_status: str = Field(default="off", description="发动机状态")
door_status: str = Field(default="locked", description="车门状态")
odometer: Optional[float] = Field(None, description="总里程 km")
class VehicleAlert(BaseModel):
"""车辆告警模型"""
alert_id: str = Field(default_factory=lambda: f"ALT-{datetime.now().strftime('%Y%m%d%H%M%S')}")
vehicle_id: str = Field(..., description="关联车辆ID")
alert_type: str = Field(..., description="告警类型")
severity: str = Field(default="info", description="严重程度")
message: str = Field(..., description="告警详情")
location: dict = Field(..., description="告警位置")
timestamp: datetime = Field(default_factory=datetime.now)
acknowledged: bool = Field(default=False, description="是否已确认")
acknowledged_by: Optional[str] = Field(None, description="确认人")
acknowledged_at: Optional[datetime] = Field(None, description="确认时间")
4.3 步骤二:服务端实现
创建主服务端程序:
# server/fleet_tracker.py
import asyncio
import json
from datetime import datetime, timedelta
from typing import Dict, Set
from contextlib import asynccontextmanager
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from ghosttrack import GhostTrack
from ghosttrack.storage import StorageManager
from ghosttrack.engine import StreamProcessor, GeoFenceFilter, SpeedFilter
from models.fleet_models import Vehicle, VehicleLocation, VehicleAlert
# 全局变量
ghosttrack = GhostTrack()
storage = StorageManager(backend="postgresql")
stream_processor = StreamProcessor()
alert_callbacks: Dict[str, Set] = {}
# 创建 FastAPI 应用
app = FastAPI(
title="车队追踪系统",
description="基于 GhostTrack 的企业级车辆追踪解决方案",
version="1.0.0"
)
# 配置 CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# =============================================================================
# 辅助函数
# =============================================================================
def create_speed_alert(vehicle_id: str, speed: float, max_speed: float) -> VehicleAlert:
"""创建超速告警"""
return VehicleAlert(
vehicle_id=vehicle_id,
alert_type="overspeed",
severity="warning",
message=f"车辆超速!当前速度 {speed:.1f} km/h,超过限速 {max_speed} km/h",
location={"overspeed": speed - max_speed}
)
def create_geofence_alert(vehicle_id: str, fence_name: str, action: str) -> VehicleAlert:
"""创建电子围栏告警"""
return VehicleAlert(
vehicle_id=vehicle_id,
alert_type="geofence",
severity="info",
message=f"车辆{action}电子围栏: {fence_name}",
location={"fence_name": fence_name, "action": action}
)
async def broadcast_to_subscribers(vehicle_id: str, data: dict):
"""向订阅该车辆的 WebSocket 客户端广播消息"""
if vehicle_id in alert_callbacks:
for websocket in alert_callbacks[vehicle_id]:
try:
await websocket.send_json(data)
except Exception as e:
print(f"广播消息失败: {e}")
async def save_and_process_location(location: VehicleLocation):
"""保存位置数据并进行实时处理"""
# 保存到数据库
await storage.save_location(location)
# 提交到流处理器进行实时分析
await stream_processor.submit(location.dict())
# 广播实时更新
await broadcast_to_subscribers(location.vehicle_id, {
"type": "location_update",
"data": location.dict()
})
# =============================================================================
# WebSocket 端点
# =============================================================================
@app.websocket("/ws/vehicles/{vehicle_id}")
async def vehicle_websocket(websocket: WebSocket, vehicle_id: str):
"""车辆实时数据 WebSocket 连接"""
await websocket.accept()
# 添加到订阅列表
if vehicle_id not in alert_callbacks:
alert_callbacks[vehicle_id] = set()
alert_callbacks[vehicle_id].add(websocket)
try:
while True:
# 保持连接,可以在这里处理客户端发送的消息
data = await websocket.receive_text()
message = json.loads(data)
if message.get("action") == "ping":
await websocket.send_json({"type": "pong"})
except WebSocketDisconnect:
alert_callbacks[vehicle_id].discard(websocket)
if not alert_callbacks[vehicle_id]:
del alert_callbacks[vehicle_id]
# =============================================================================
# REST API 端点
# =============================================================================
@app.post("/api/v1/vehicles", response_model=Vehicle)
async def register_vehicle(vehicle: Vehicle):
"""注册新车辆"""
existing = await storage.get_vehicle(vehicle.vehicle_id)
if existing:
raise HTTPException(status_code=400, detail="车辆已存在")
await storage.save_vehicle(vehicle)
return vehicle
@app.get("/api/v1/vehicles/{vehicle_id}", response_model=Vehicle)
async def get_vehicle(vehicle_id: str):
"""获取车辆信息"""
vehicle = await storage.get_vehicle(vehicle_id)
if not vehicle:
raise HTTPException(status_code=404, detail="车辆不存在")
return vehicle
@app.post("/api/v1/vehicles/{vehicle_id}/location")
async def report_vehicle_location(vehicle_id: str, location: VehicleLocation):
"""上报车辆位置"""
location.vehicle_id = vehicle_id
location.timestamp = datetime.now()
# 获取车辆配置用于告警判断
vehicle = await storage.get_vehicle(vehicle_id)
if vehicle and vehicle.max_speed and location.speed and location.speed > vehicle.max_speed:
alert = create_speed_alert(vehicle_id, location.speed, vehicle.max_speed)
await storage.save_alert(alert)
await broadcast_to_subscribers(vehicle_id, {
"type": "alert",
"alert": alert.dict()
})
await save_and_process_location(location)
return {"status": "success", "timestamp": location.timestamp}
@app.get("/api/v1/vehicles/{vehicle_id}/trajectory")
async def get_vehicle_trajectory(
vehicle_id: str,
start_time: datetime = None,
end_time: datetime = None,
limit: int = 1000
):
"""获取车辆轨迹"""
if not start_time:
start_time = datetime.now() - timedelta(hours=24)
if not end_time:
end_time = datetime.now()
trajectory = await storage.get_trajectory(
vehicle_id=vehicle_id,
start_time=start_time,
end_time=end_time,
limit=limit
)
return {
"vehicle_id": vehicle_id,
"start_time": start_time,
"end_time": end_time,
"point_count": len(trajectory),
"trajectory": [p.dict() for p in trajectory]
}
@app.get("/api/v1/vehicles/{vehicle_id}/alerts")
async def get_vehicle_alerts(
vehicle_id: str,
start_time: datetime = None,
end_time: datetime = None,
acknowledged: bool = None
):
"""获取车辆告警记录"""
alerts = await storage.get_alerts(
vehicle_id=vehicle_id,
start_time=start_time,
end_time=end_time,
acknowledged=acknowledged
)
return {"alerts": [a.dict() for a in alerts]}
@app.post("/api/v1/alerts/{alert_id}/acknowledge")
async def acknowledge_alert(alert_id: str, acknowledged_by: str):
"""确认告警"""
alert = await storage.get_alert(alert_id)
if not alert:
raise HTTPException(status_code=404, detail="告警不存在")
alert.acknowledged = True
alert.acknowledged_by = acknowledged_by
alert.acknowledged_at = datetime.now()
await storage.update_alert(alert)
return {"status": "success", "alert": alert.dict()}
@app.get("/api/v1/dashboard/stats")
async def get_dashboard_stats():
"""获取仪表盘统计数据"""
stats = await storage.get_fleet_stats()
return stats
@app.get("/health")
async def health_check():
"""健康检查端点"""
return {
"status": "healthy",
"version": "1.0.0",
"timestamp": datetime.now().isoformat(),
"active_connections": sum(len(subs) for subs in alert_callbacks.values()),
"registered_vehicles": await storage.count_vehicles()
}
# =============================================================================
# 启动和关闭事件
# =============================================================================
@app.on_event("startup")
async def startup_event():
"""应用启动时的初始化"""
# 初始化存储
await storage.connect()
# 配置流处理器
stream_processor.add_filter(SpeedFilter(max_speed=200.0))
# 设置告警回调
async def alert_handler(alert_data):
alert = VehicleAlert(**alert_data)
await storage.save_alert(alert)
await broadcast_to_subscribers(alert.vehicle_id, {
"type": "alert",
"alert": alert.dict()
})
stream_processor.on_alert = alert_handler
stream_processor.start()
print("车队追踪系统已启动")
@app.on_event("shutdown")
async def shutdown_event():
"""应用关闭时的清理"""
stream_processor.stop()
await storage.disconnect()
print("车队追踪系统已关闭")
# 启动服务
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
4.4 步骤三:模拟数据生成器
为了测试系统,我们需要一个模拟数据生成器:
# scripts/simulate_fleet.py
import asyncio
import random
import time
from datetime import datetime
from models.fleet_models import Vehicle, VehicleLocation
# 模拟车辆列表
SIMULATED_VEHICLES = [
{"vehicle_id": "TRUCK-001", "license_plate": "京A12345", "max_speed": 100},
{"vehicle_id": "TRUCK-002", "license_plate": "京B67890", "max_speed": 100},
{"vehicle_id": "VAN-001", "license_plate": "沪C11111", "max_speed": 120},
{"vehicle_id": "VAN-002", "license_plate": "沪D22222", "max_speed": 120},
{"vehicle_id": "CAR-001", "license_plate": "粤E33333", "max_speed": 140},
]
class FleetSimulator:
"""车队位置模拟器"""
def __init__(self, api_base_url):
self.api_base_url = api_base_url.rstrip('/')
self.running = False
# 模拟车辆位置(起始点)
self.vehicle_positions = {
v["vehicle_id"]: {
"latitude": 39.9042 + random.uniform(-0.1, 0.1),
"longitude": 116.4074 + random.uniform(-0.1, 0.1),
"heading": random.uniform(0, 360),
"speed": 0
}
for v in SIMULATED_VEHICLES
}
async def register_vehicles(self):
"""注册模拟车辆"""
import requests
for vehicle_data in SIMULATED_VEHICLES:
vehicle = Vehicle(
vehicle_id=vehicle_data["vehicle_id"],
license_plate=vehicle_data["license_plate"],
vehicle_type="truck" if "TRUCK" in vehicle_data["vehicle_id"] else "van",
max_speed=vehicle_data["max_speed"]
)
try:
response = requests.post(
f"{self.api_base_url}/api/v1/vehicles",
json=vehicle.dict(),
timeout=5
)
if response.status_code == 200:
print(f"✓ 车辆 {vehicle.vehicle_id} 注册成功")
elif response.status_code == 400:
print(f"- 车辆 {vehicle.vehicle_id} 已存在")
else:
print(f"✗ 车辆 {vehicle.vehicle_id} 注册失败: {response.text}")
except Exception as e:
print(f"✗ 车辆 {vehicle.vehicle_id} 注册失败: {e}")
def update_position(self, vehicle_id):
"""更新单个车辆位置"""
pos = self.vehicle_positions[vehicle_id]
# 模拟移动
speed_kmh = random.uniform(30, 80) # 随机速度 30-80 km/h
speed_ms = speed_kmh / 3.6 # 转换为米/秒
# 更新方向(模拟道路转弯)
heading_change = random.uniform(-30, 30)
pos["heading"] = (pos["heading"] + heading_change) % 360
# 根据速度和方向计算新位置
distance_m = speed_ms * 2 # 2秒的移动距离
heading_rad = pos["heading"] * 3.14159 / 180
lat_change = (distance_m / 111000) * (-1 if pos["heading"] > 180 else 1) * abs(1 - abs(90 - pos["heading"]) / 90)
lon_change = (distance_m / (111000 * abs(39.9042))) * (-1 if 90 < pos["heading"] < 270 else 1)
pos["latitude"] += lat_change * random.uniform(0.8, 1.2)
pos["longitude"] += lon_change * random.uniform(0.8, 1.2)
pos["speed"] = speed_kmh
# 偶尔模拟异常情况
if random.random() < 0.02: # 2% 概率超速
pos["speed"] = random.uniform(120, 150)
return pos
async def run_simulation(self, interval=2):
"""运行模拟循环"""
import requests
self.running = True
print(f"\n开始模拟 {len(SIMULATED_VEHICLES)} 辆车的位置追踪...")
print("按 Ctrl+C 停止模拟\n")
while self.running:
for vehicle_id in self.vehicle_positions:
pos = self.update_position(vehicle_id)
location_data = {
"vehicle_id": vehicle_id,
"latitude": round(pos["latitude"], 6),
"longitude": round(pos["longitude"], 6),
"altitude": random.uniform(0, 100),
"speed": round(pos["speed"], 1),
"heading": round(pos["heading"], 1),
"accuracy": random.uniform(3, 10),
"fuel_level": random.uniform(20, 100),
"engine_status": "running",
"door_status": "locked"
}
try:
response = requests.post(
f"{self.api_base_url}/api/v1/vehicles/{vehicle_id}/location",
json=location_data,
timeout=5
)
if response.status_code == 200:
print(f"[{datetime.now().strftime('%H:%M:%S')}] "
f"{vehicle_id}: ({pos['latitude']:.6f}, {pos['longitude']:.6f}) "
f"速度: {pos['speed']:.1f} km/h")
except Exception as e:
print(f"位置上报失败 {vehicle_id}: {e}")
await asyncio.sleep(interval)
def stop(self):
"""停止模拟"""
self.running = False
async def main():
simulator = FleetSimulator("http://localhost:8000")
# 注册车辆
await simulator.register_vehicles()
# 运行模拟
try:
await simulator.run_simulation(interval=2)
except KeyboardInterrupt:
print("\n\n停止模拟...")
simulator.stop()
if __name__ == "__main__":
asyncio.run(main())
4.5 步骤四:前端展示页面
创建一个简单但功能完整的前端 Dashboard:
<!-- frontend/dashboard.html -->
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>车队追踪系统 - 实时监控</title>
<link rel="stylesheet" href="https://unpkg.com/leaflet@1.9.4/dist/leaflet.css" />
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
background: #f5f7fa;
color: #333;
}
.header {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 20px 30px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
.header h1 {
font-size: 24px;
margin-bottom: 5px;
}
.header .status {
font-size: 14px;
opacity: 0.9;
}
.status.connected { color: #4ade80; }
.status.disconnected { color: #f87171; }
.main-container {
display: flex;
height: calc(100vh - 80px);
}
.sidebar {
width: 320px;
background: white;
border-right: 1px solid #e5e7eb;
overflow-y: auto;
padding: 20px;
}
.vehicle-card {
background: #f9fafb;
border: 1px solid #e5e7eb;
border-radius: 12px;
padding: 15px;
margin-bottom: 12px;
cursor: pointer;
transition: all 0.2s;
}
.vehicle-card:hover {
border-color: #667eea;
box-shadow: 0 2px 8px rgba(102, 126, 234, 0.2);
}
.vehicle-card.selected {
border-color: #667eea;
background: #f0f1ff;
}
.vehicle-card .vehicle-header {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 10px;
}
.vehicle-card .vehicle-id {
font-weight: 600;
color: #1f2937;
}
.vehicle-card .license-plate {
background: #667eea;
color: white;
padding: 2px 8px;
border-radius: 4px;
font-size: 12px;
}
.vehicle-card .vehicle-info {
font-size: 13px;
color: #6b7280;
}
.vehicle-card .vehicle-info span {
margin-right: 15px;
}
.vehicle-card .alert-badge {
background: #ef4444;
color: white;
padding: 2px 6px;
border-radius: 10px;
font-size: 11px;
margin-left: 5px;
}
.map-container {
flex: 1;
position: relative;
}
#map {
width: 100%;
height: 100%;
}
.stats-panel {
position: absolute;
top: 20px;
right: 20px;
background: white;
border-radius: 12px;
padding: 20px;
box-shadow: 0 4px 20px rgba(0,0,0,0.1);
z-index: 1000;
min-width: 200px;
}
.stats-panel h3 {
font-size: 14px;
color: #6b7280;
margin-bottom: 15px;
}
.stat-item {
margin-bottom: 12px;
}
.stat-item .label {
font-size: 12px;
color: #9ca3af;
}
.stat-item .value {
font-size: 24px;
font-weight: 600;
color: #667eea;
}
.alert-panel {
position: absolute;
bottom: 20px;
right: 20px;
width: 350px;
max-height: 300px;
background: white;
border-radius: 12px;
box-shadow: 0 4px 20px rgba(0,0,0,0.1);
z-index: 1000;
overflow: hidden;
}
.alert-panel .alert-header {
background: #fef2f2;
padding: 12px 15px;
font-weight: 600;
color: #dc2626;
display: flex;
align-items: center;
gap: 8px;
}
.alert-panel .alert-list {
max-height: 250px;
overflow-y: auto;
}
.alert-item {
padding: 12px 15px;
border-bottom: 1px solid #f3f4f6;
font-size: 13px;
}
.alert-item.warning {
border-left: 3px solid #f59e0b;
}
.alert-item.critical {
border-left: 3px solid #ef4444;
}
.alert-item .alert-time {
color: #9ca3af;
font-size: 11px;
margin-top: 4px;
}
.vehicle-popup {
font-size: 13px;
}
.vehicle-popup h4 {
margin-bottom: 8px;
color: #667eea;
}
.vehicle-popup p {
margin: 4px 0;
color: #6b7280;
}
.leaflet-container {
font-family: inherit;
}
</style>
</head>
<body>
<div class="header">
<h1>🚌 车队追踪系统</h1>
<div class="status disconnected" id="connectionStatus">
🔌 正在连接服务器...
</div>
</div>
<div class="main-container">
<div class="sidebar">
<h2 style="font-size: 16px; margin-bottom: 15px;">📋 车辆列表</h2>
<div id="vehicleList">
<!-- 动态生成的车辆卡片 -->
</div>
</div>
<div class="map-container">
<div id="map"></div>
<div class="stats-panel">
<h3>📊 实时统计</h3>
<div class="stat-item">
<div class="label">在线车辆</div>
<div class="value" id="onlineCount">0</div>
</div>
<div class="stat-item">
<div class="label">活跃告警</div>
<div class="value" id="alertCount">0</div>
</div>
<div class="stat-item">
<div class="label">今日行程</div>
<div class="value" id="todayDistance">0 km</div>
</div>
</div>
<div class="alert-panel">
<div class="alert-header">
🚨 最新告警
</div>
<div class="alert-list" id="alertList">
<div class="alert-item" style="color: #9ca3af; text-align: center; padding: 20px;">
暂无告警
</div>
</div>
</div>
</div>
</div>
<script src="https://unpkg.com/leaflet@1.9.4/dist/leaflet.js"></script>
<script>
// =====================================================================
// 全局状态
// =====================================================================
const state = {
vehicles: {},
selectedVehicleId: null,
websocket: null,
markers: {},
vehiclePaths: {}
};
// 初始化地图
const map = L.map('map').setView([39.9042, 116.4074], 12);
L.tileLayer('https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png', {
attribution: '© OpenStreetMap contributors'
}).addTo(map);
// 自定义车辆图标
const vehicleIcon = L.divIcon({
html: '<div style="background: #667eea; width: 24px; height: 24px; border-radius: 50%; border: 3px solid white; box-shadow: 0 2px 6px rgba(0,0,0,0.3);"></div>',
iconSize: [24, 24],
iconAnchor: [12, 12],
className: 'vehicle-marker'
});
const selectedVehicleIcon = L.divIcon({
html: '<div style="background: #ef4444; width: 28px; height: 28px; border-radius: 50%; border: 3px solid white; box-shadow: 0 2px 10px rgba(239,68,68,0.5);"></div>',
iconSize: [28, 28],
iconAnchor: [14, 14],
className: 'vehicle-marker-selected'
});
// =====================================================================
// WebSocket 连接
// =====================================================================
function connectWebSocket() {
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsUrl = `${protocol}//${window.location.host}/ws/vehicles`;
state.websocket = new WebSocket(wsUrl);
state.websocket.onopen = () => {
console.log('WebSocket 连接成功');
document.getElementById('connectionStatus').textContent = '✓ 已连接到服务器';
document.getElementById('connectionStatus').className = 'status connected';
// 订阅所有车辆
state.websocket.send(JSON.stringify({
action: 'subscribe',
vehicle_ids: Object.keys(state.vehicles)
}));
};
state.websocket.onclose = () => {
console.log('WebSocket 连接断开,5秒后重连');
document.getElementById('connectionStatus').textContent = '🔌 连接断开,重新连接中...';
document.getElementById('connectionStatus').className = 'status disconnected';
setTimeout(connectWebSocket, 5000);
};
state.websocket.onerror = (error) => {
console.error('WebSocket 错误:', error);
};
state.websocket.onmessage = (event) => {
const data = JSON.parse(event.data);
handleWebSocketMessage(data);
};
}
function handleWebSocketMessage(data) {
if (data.type === 'location_update') {
updateVehicleLocation(data.data);
} else if (data.type === 'alert') {
handleAlert(data.alert);
}
}
// =====================================================================
// 车辆管理
// =====================================================================
function updateVehicleLocation(locationData) {
const { vehicle_id, latitude, longitude, speed, heading } = locationData;
// 更新状态
if (!state.vehicles[vehicle_id]) {
state.vehicles[vehicle_id] = {};
}
state.vehicles[vehicle_id].location = locationData;
state.vehicles[vehicle_id].lastUpdate = new Date();
// 更新或创建标记
if (state.markers[vehicle_id]) {
state.markers[vehicle_id].setLatLng([latitude, longitude]);
// 更新路径
if (state.vehiclePaths[vehicle_id]) {
const latlngs = state.vehiclePaths[vehicle_id].getLatLngs();
latlngs.push([latitude, longitude]);
if (latlngs.length > 100) latlngs.shift(); // 保持最多100个点
state.vehiclePaths[vehicle_id].setLatLngs(latlngs);
}
} else {
// 创建新标记
const marker = L.marker([latitude, longitude], {
icon: vehicleIcon,
rotationAngle: heading || 0
}).addTo(map);
marker.bindPopup(`
<div class="vehicle-popup">
<h4>${vehicle_id}</h4>
<p>📍 位置: ${latitude.toFixed(6)}, ${longitude.toFixed(6)}</p>
<p>⚡ 速度: ${speed || 0} km/h</p>
<p>🧭 方向: ${heading || 0}°</p>
</div>
`);
state.markers[vehicle_id] = marker;
// 创建轨迹线
state.vehiclePaths[vehicle_id] = L.polyline([], {
color: '#667eea',
weight: 3,
opacity: 0.7
}).addTo(map);
}
// 更新UI
updateVehicleCard(vehicle_id);
updateStats();
}
function selectVehicle(vehicleId) {
// 取消之前的选中
if (state.selectedVehicleId) {
const prevCard = document.querySelector(`[data-vehicle-id="${state.selectedVehicleId}"]`);
if (prevCard) prevCard.classList.remove('selected');
if (state.markers[state.selectedVehicleId]) {
state.markers[state.selectedVehicleId].setIcon(vehicleIcon);
}
}
state.selectedVehicleId = vehicleId;
// 选中新车辆
const card = document.querySelector(`[data-vehicle-id="${vehicleId}"]`);
if (card) card.classList.add('selected');
if (state.markers[vehicleId]) {
state.markers[vehicleId].setIcon(selectedVehicleIcon);
map.setView(state.markers[vehicleId].getLatLng(), 15);
}
}
// =====================================================================
// UI 更新
// =====================================================================
function updateVehicleCard(vehicleId) {
let card = document.querySelector(`[data-vehicle-id="${vehicleId}"]`);
const vehicle = state.vehicles[vehicleId];
if (!card) {
// 创建新卡片
card = document.createElement('div');
card.className = 'vehicle-card';
card.dataset.vehicleId = vehicleId;
card.onclick = () => selectVehicle(vehicleId);
card.innerHTML = `
<div class="vehicle-header">
<span class="vehicle-id">${vehicleId}</span>
<span class="license-plate">--</span>
</div>
<div class="vehicle-info">
<span>⚡ -- km/h</span>
<span>📍 --</span>
</div>
`;
document.getElementById('vehicleList').appendChild(card);
}
// 更新卡片信息
if (vehicle.location) {
const { speed, latitude, longitude } = vehicle.location;
card.querySelector('.vehicle-info').innerHTML = `
<span>⚡ ${(speed || 0).toFixed(0)} km/h</span>
<span>📍 ${latitude.toFixed(4)}, ${longitude.toFixed(4)}</span>
`;
}
}
function updateStats() {
const onlineVehicles = Object.values(state.vehicles)
.filter(v => v.location);
document.getElementById('onlineCount').textContent = onlineVehicles.length;
// 计算总行程(简化版本)
let totalDistance = 0;
Object.values(state.vehiclePaths).forEach(path => {
totalDistance += path.getLatLngs().length * 0.01; // 估算
});
document.getElementById('todayDistance').textContent = `${totalDistance.toFixed(1)} km`;
}
function handleAlert(alert) {
const alertList = document.getElementById('alertList');
const alertCount = document.getElementById('alertCount');
// 更新告警计数
const currentCount = parseInt(alertCount.textContent) || 0;
alertCount.textContent = currentCount + 1;
// 添加告警到列表
const alertItem = document.createElement('div');
alertItem.className = `alert-item ${alert.severity}`;
alertItem.innerHTML = `
<div>${alert.message}</div>
<div class="alert-time">${new Date(alert.timestamp).toLocaleString()}</div>
`;
// 清空"暂无告警"提示
if (alertList.querySelector('div[style*="text-align"]')) {
alertList.innerHTML = '';
}
// 添加到列表开头
alertList.insertBefore(alertItem, alertList.firstChild);
// 保持最多显示10条
while (alertList.children.length > 10) {
alertList.removeChild(alertList.lastChild);
}
}
// =====================================================================
// 初始化
// =====================================================================
async function init() {
try {
// 从API获取车辆列表
const response = await fetch('/api/v1/vehicles');
if (response.ok) {
const vehicles = await response.json();
vehicles.forEach(vehicle => {
state.vehicles[vehicle.vehicle_id] = {
info: vehicle
};
updateVehicleCard(vehicle.vehicle_id);
});
}
} catch (error) {
console.error('获取车辆列表失败:', error);
}
// 连接 WebSocket
connectWebSocket();
// 每30秒更新一次统计数据
setInterval(updateStats, 30000);
}
// 启动应用
init();
</script>
</body>
</html>
4.6 运行完整系统
现在让我们把所有组件组合起来运行:
# 1. 启动后端服务
cd /path/to/GhostTrack
python -m uvicorn server.fleet_tracker:app --host 0.0.0.0 --port 8000 --reload
# 2. 在另一个终端启动模拟器(生成测试数据)
python scripts/simulate_fleet.py
# 3. 在浏览器中打开 Dashboard
# http://localhost:8000/dashboard
五、常见使用场景
GhostTrack 的灵活性使其适用于多种应用场景,以下是几个典型的用例:
5.1 物流车队管理
# 物流追踪集成示例
from ghosttrack import GhostTrack
from datetime import datetime, timedelta
class LogisticsTracker:
"""物流追踪管理器"""
def __init__(self):
self.ghosttrack = GhostTrack()
self.fleet = {}
def register_delivery_vehicle(self, vehicle_id, max_range_km=500):
"""注册配送车辆"""
self.fleet[vehicle_id] = {
"type": "delivery",
"max_range": max_range_km,
"deliveries": [],
"current_charge": 100
}
return self.fleet[vehicle_id]
def assign_delivery(self, vehicle_id, destination, package_id):
"""分配配送任务"""
if vehicle_id not in self.fleet:
raise ValueError(f"车辆 {vehicle_id} 未注册")
delivery = {
"package_id": package_id,
"destination": destination,
"assigned_at": datetime.now(),
"status": "assigned"
}
self.fleet[vehicle_id]["deliveries"].append(delivery)
return delivery
def update_location(self, vehicle_id, latitude, longitude, battery_level=None):
"""更新车辆位置并检查配送状态"""
# 上报位置
self.ghosttrack.report_location(
device_id=vehicle_id,
latitude=latitude,
longitude=longitude
)
# 检查是否到达配送点
vehicle = self.fleet[vehicle_id]
for delivery in vehicle["deliveries"]:
if delivery["status"] == "assigned":
# 计算距离(简化版)
distance = self.calculate_distance(
latitude, longitude,
delivery["destination"]["lat"],
delivery["destination"]["lon"]
)
if distance < 0.1: # 100米内视为到达
delivery["status"] = "delivered"
delivery["delivered_at"] = datetime.now()
print(f"📦 包裹 {delivery['package_id']} 已送达!")
# 更新电量(如果有)
if battery_level is not None:
vehicle["current_charge"] = battery_level
# 低电量告警
if battery_level < 20:
self.ghosttrack.trigger_alert(
device_id=vehicle_id,
alert_type="low_battery",
severity="warning",
message=f"车辆 {vehicle_id} 电量过低: {battery_level}%"
)
@staticmethod
def calculate_distance(lat1, lon1, lat2, lon2):
"""计算两点间距离(公里)"""
from math import radians, sin, cos, sqrt, atan2
R = 6371
lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * atan2(sqrt(a), sqrt(1-a))
return R * c
def get_delivery_summary(self, vehicle_id):
"""获取配送汇总"""
if vehicle_id not in self.fleet:
return None
vehicle = self.fleet[vehicle_id]
deliveries = vehicle["deliveries"]
return {
"vehicle_id": vehicle_id,
"total_deliveries": len(deliveries),
"delivered": sum(1 for d in deliveries if d["status"] == "delivered"),
"pending": sum(1 for d in deliveries if d["status"] == "assigned"),
"current_charge": vehicle["current_charge"]
}
# 使用示例
tracker = LogisticsTracker()
# 注册车辆
tracker.register_delivery_vehicle("VAN-SH-001", max_range_km=300)
# 分配配送任务
tracker.assign_delivery(
vehicle_id="VAN-SH-001",
destination={"lat": 31.2304, "lon": 121.4737},
package_id="PKG-2024-001"
)
# 模拟配送过程
current_lat, current_lon = 31.2, 121.4
for i in range(20):
current_lat += 0.0015
current_lon += 0.0036
tracker.update_location("VAN-SH-001", current_lat, current_lon, battery_level=100-i*2)
# 查看配送汇总
summary = tracker.get_delivery_summary("VAN-SH-001")
print(f"配送汇总: {summary}")
5.2 人员定位与安全管理
# 人员定位系统集成
from ghosttrack import GhostTrack
from datetime import datetime
class PersonnelTracker:
"""人员定位追踪器"""
def __init__(self):
self.ghosttrack = GhostTrack()
self.personnel = {}
self.geofences = {}
def register_person(self, person_id, name, department, role="staff"):
"""注册人员"""
self.personnel[person_id] = {
"name": name,
"department": department,
"role": role,
"last_location": None,
"status": "active"
}
return self.personnel[person_id]
def create_geofence(self, fence_id, name, coordinates, fence_type="restricted"):
"""
创建电子围栏区域
Args:
fence_id: 围栏ID
name: 围栏名称
coordinates: [(lat, lon), ...] 多边形顶点列表
fence_type: restricted(限制区) / safety(安全区)
"""
self.geofences[fence_id] = {
"name": name,
"coordinates": coordinates,
"type": fence_type,
"created_at": datetime.now()
}
return self.geofences[fence_id]
def check_geofence(self, person_id, latitude, longitude):
"""检查人员是否在围栏内"""
for fence_id, fence in self.geofences.items():
is_inside = self.point_in_polygon(latitude, longitude, fence["coordinates"])
if is_inside and fence["type"] == "restricted":
# 进入限制区域
self.ghosttrack.trigger_alert(
device_id=person_id,
alert_type="restricted_area",
severity="critical",
message=f"{self.personnel[person_id]['name']} 进入了限制区域: {fence['name']}"
)
print(f"🚨 警告: {self.personnel[person_id]['name']} 进入了 {fence['name']}")
@staticmethod
def point_in_polygon(x, y, polygon):
"""判断点是否在多边形内(射线法)"""
n = len(polygon)
inside = False
p1x, p1y = polygon[0]
for i in range(1, n + 1):
p2x, p2y = polygon[i % n]
if y > min(p1y, p2y):
if y <= max(p1y, p2y):
if x <= max(p1x, p2x):
if p1y != p2y:
xinters = (y - p1y) * (p2x - p1x) / (p2y - p1y) + p1x
if p1x == p2x or x <= xinters:
inside = not inside
p1x, p1y = p2x, p2y
return inside
def update_person_location(self, person_id, latitude, longitude):
"""更新人员位置"""
if person_id not in self.personnel:
return False
# 上报位置
self.ghosttrack.report_location(
device_id=person_id,
latitude=latitude,
longitude=longitude
)
# 更新本地状态
self.personnel[person_id]["last_location"] = {
"latitude": latitude,
"longitude": longitude,
"timestamp": datetime.now()
}
# 检查围栏
self.check_geofence(person_id, latitude, longitude)
return True
def get_person_status(self, person_id):
"""获取人员状态"""
person = self.personnel.get(person_id)
if not person:
return None
return {
**person,
"last_update": person["last_location"]["timestamp"] if person["last_location"] else None
}
# 使用示例
person_tracker = PersonnelTracker()
# 注册人员
person_tracker.register_person("EMP-001", "张三", "工程部", "engineer")
person_tracker.register_person("EMP-002", "李四", "安保部", "security")
# 创建危险区域围栏
person_tracker.create_geofence(
fence_id="DANGER-001",
name="高压电区域",
coordinates=[
(39.910, 116.400),
(39.910, 116.410),
(39.915, 116.410),
(39.915, 116.400)
],
fence_type="restricted"
)
# 创建安全区域
person_tracker.create_geofence(
fence_id="SAFETY-001",
name="应急避难区",
coordinates=[
(39.905, 116.395),
(39.905, 116.405),
(39.910, 116.405),
(39.910, 116.395)
],
fence_type="safety"
)
# 模拟人员移动
print("开始人员定位监控...")
person_tracker.update_person_location("EMP-001", 39.906, 116.398) # 安全区域
person_tracker.update_person_location("EMP-001", 39.912, 116.405) # 危险区域!
person_tracker.update_person_location("EMP-002", 39.907, 116.400) # 安全区域
5.3 资产追踪与盘点
# 资产追踪管理系统
from ghosttrack import GhostTrack
from datetime import datetime
from typing import List, Dict
class AssetTracker:
"""资产追踪管理器"""
def __init__(self):
self.ghosttrack = GhostTrack()
self.assets = {}
self.asset_types = {
"electronics": ["笔记本", "显示器", "服务器"],
"vehicles": ["叉车", "货车", "工程车"],
"equipment": ["工具箱", "测量仪", "发电机"]
}
def register_asset(self, asset_id, name, asset_type,
initial_location=None, owner=None):
"""
注册资产
Args:
asset_id: 资产编号
name: 资产名称
asset_type: 资产类型
initial_location: 初始位置
owner: 负责人
"""
asset = {
"asset_id": asset_id,
"name": name,
"type": asset_type,
"status": "registered",
"owner": owner,
"location": initial_location,
"history": [],
"last_check": datetime.now()
}
self.assets[asset_id] = asset
return asset
def check_out_asset(self, asset_id, checked_out_by, location=None):
"""资产借出"""
if asset_id not in self.assets:
raise ValueError(f"资产 {asset_id} 不存在")
asset = self.assets[asset_id]
if asset["status"] != "registered" and asset["status"] != "returned":
raise ValueError(f"资产 {asset_id} 当前不可借出")
asset["status"] = "checked_out"
asset["checked_out_by"] = checked_out_by
asset["checked_out_at"] = datetime.now()
if location:
asset["location"] = location
self.record_history(asset_id, "借出", {
"by": checked_out_by,
"location": location
})
return asset
def check_in_asset(self, asset_id, location=None):
"""资产归还"""
if asset_id not in self.assets:
raise ValueError(f"资产 {asset_id} 不存在")
asset = self.assets[asset_id]
asset["status"] = "returned"
asset["returned_at"] = datetime.now()
if location:
asset["location"] = location
self.record_history(asset_id, "归还", {
"location": location
})
return asset
def update_asset_location(self, asset_id, latitude, longitude, altitude=None):
"""更新资产位置"""
if asset_id not in self.assets:
return False
asset = self.assets[asset_id]
asset["location"] = {
"latitude": latitude,
"longitude": longitude,
"altitude": altitude,
"timestamp": datetime.now()
}
# 上报到 GhostTrack
self.ghosttrack.report_location(
device_id=f"ASSET-{asset_id}",
latitude=latitude,
longitude=longitude,
altitude=altitude
)
return True
def record_history(self, asset_id, action, details):
"""记录资产操作历史"""
if asset_id not in self.assets:
return
record = {
"action": action,
"details": details,
"timestamp": datetime.now()
}
self.assets[asset_id]["history"].append(record)
def get_asset_location(self, asset_id):
"""获取资产当前位置"""
asset = self.assets.get(asset_id)
return asset["location"] if asset else None
def find_nearby_assets(self, latitude, longitude, radius_km=0.5):
"""
查找附近资产
Args:
latitude: 中心点纬度
longitude: 中心点经度
radius_km: 搜索半径(公里)
Returns:
附近资产列表
"""
from math import radians, sin, cos, sqrt, atan2
def haversine(lat1, lon1, lat2, lon2):
R = 6371
lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
dlat, dlon = lat2 - lat1, lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
return R * 2 * atan2(sqrt(a), sqrt(1-a))
nearby = []
for asset_id, asset in self.assets.items():
if asset["location"]:
distance = haversine(
latitude, longitude,
asset["location"]["latitude"],
asset["location"]["longitude"]
)
if distance <= radius_km:
nearby.append({
"asset_id": asset_id,
"name": asset["name"],
"distance_km": round(distance, 3),
"location": asset["location"]
})
return sorted(nearby, key=lambda x: x["distance_km"])
def generate_inventory_report(self):
"""生成资产盘点报告"""
report = {
"generated_at": datetime.now(),
"total_assets": len(self.assets),
"by_status": {},
"by_type": {},
"assets": []
}
for asset_id, asset in self.assets.items():
# 统计状态
status = asset["status"]
report["by_status"][status] = report["by_status"].get(status, 0) + 1
# 统计类型
asset_type = asset["type"]
report["by_type"][asset_type] = report["by_type"].get(asset_type, 0) + 1
# 资产详情
report["assets"].append({
"asset_id": asset_id,
"name": asset["name"],
"type": asset["type"],
"status": asset["status"],
"owner": asset.get("owner"),
"last_check": asset["last_check"].isoformat()
})
return report
# 使用示例
asset_tracker = AssetTracker()
# 注册资产
asset_tracker.register_asset(
asset_id="EQ-001",
name="高精度全站仪",
asset_type="equipment",
initial_location={"latitude": 39.9042, "longitude": 116.4074},
owner="测量组"
)
asset_tracker.register_asset(
asset_id="VEH-001",
name="电动叉车 #1",
asset_type="vehicles",
initial_location={"latitude": 39.9050, "longitude": 116.4080},
owner="仓储部"
)
asset_tracker.register_asset(
asset_id="SRV-001",
name="刀片服务器",
asset_type="electronics",
owner="IT部门"
)
# 借出资产
asset_tracker.check_out_asset(
asset_id="EQ-001",
checked_out_by="王工",
location={"latitude": 39.9100, "longitude": 116.4200}
)
# 更新位置
asset_tracker.update_asset_location(
asset_id="VEH-001",
latitude=39.9055,
longitude=116.4085
)
# 查找附近资产
nearby = asset_tracker.find_nearby_assets(
latitude=39.9042,
longitude=116.4074,
radius_km=1.0
)
print("附近资产:")
for asset in nearby:
print(f" - {asset['name']}: {asset['distance_km']} km")
# 生成盘点报告
report = asset_tracker.generate_inventory_report()
print(f"\n资产盘点报告:")
print(f" 总资产数: {report['total_assets']}")
print(f" 按状态: {report['by_status']}")
print(f" 按类型: {report['by_type']}")
六、进阶技巧与最佳实践
6.1 性能优化建议
在实际生产环境中,GhostTrack 的性能优化至关重要:
# 性能优化示例
from ghosttrack import GhostTrack
import asyncio
from functools import lru_cache
import hashlib
class OptimizedGhostTrack:
"""优化后的 GhostTrack 客户端"""
def __init__(self, base_url, batch_size=100, flush_interval=1.0):
self.ghosttrack = GhostTrack(base_url)
self.batch_size = batch_size
self.flush_interval = flush_interval
self.batch_buffer = []
self.lock = asyncio.Lock()
async def report_location_batch(self, locations):
"""
批量上报位置(推荐用于高频场景)
Args:
locations: 位置数据列表
"""
async with self.lock:
self.batch_buffer.extend(locations)
if len(self.batch_buffer) >= self.batch_size:
await self.flush_batch()
async def flush_batch(self):
"""刷新缓冲区"""
if not self.batch_buffer:
return
batch = self.batch_buffer[:self.batch_size]
self.batch_buffer = self.batch_buffer[self.batch_size:]
# 使用批量 API
await self.ghosttrack.report_locations_batch(batch)
print(f"已批量上报 {len(batch)} 条位置数据")
async def auto_flush(self):
"""自动刷新任务"""
while True:
await asyncio.sleep(self.flush_interval)
async with self.lock:
if self.batch_buffer:
await self.flush_batch()
# 数据压缩优化
class CompressedLocationData:
"""压缩位置数据以减少传输带宽"""
@staticmethod
def encode_location(latitude, longitude, precision=6):
"""
将经纬度编码为紧凑字符串
原始: (39.904200, 116.407400)
编码后: "39.9042,116.4074"
"""
lat_str = f"{latitude:.{precision}f}"
lon_str = f"{longitude:.{precision}f}"
return f"{lat_str},{lon_str}"
@staticmethod
def decode_location(encoded):
"""解码位置字符串"""
lat_str, lon_str = encoded.split(",")
return float(lat_str), float(lon_str)
@staticmethod
def pack_location(latitude, longitude):
"""
使用整数打包压缩经纬度
精度: 6位小数 = 约 0.1 米的精度
"""
lat_int = int(latitude * 1e6)
lon_int = int(longitude * 1e6)
# 打包为 8 字节整数
packed = (lat_int << 32) | (lon_int & 0xFFFFFFFF)
return packed
@staticmethod
def unpack_location(packed):
"""解包位置数据"""
lat_int = (packed >> 32) & 0xFFFFFFFF
lon_int = packed & 0xFFFFFFFF
# 处理符号
if lat_int >= 0x80000000:
lat_int -= 0x100000000
if lon_int >= 0x80000000:
lon_int -= 0x100000000
return lat_int / 1e6, lon_int / 1e6
# 缓存优化示例
class CachedDeviceManager:
"""带缓存的设备管理器"""
def __init__(self, cache_ttl_seconds=300):
self.cache = {}
self.cache_ttl = cache_ttl_seconds
def _get_cache_key(self, device_id):
"""生成缓存键"""
return hashlib.md5(f"device:{device_id}".encode()).hexdigest()
def get_device(self, device_id):
"""获取设备信息(带缓存)"""
import time
cache_key = self._get_cache_key(device_id)
if cache_key in self.cache:
cached_data, timestamp = self.cache[cache_key]
if time.time() - timestamp < self.cache_ttl:
return cached_data # 返回缓存
# 从数据库获取
device_data = self._fetch_device_from_db(device_id)
# 更新缓存
self.cache[cache_key] = (device_data, time.time())
return device_data
def _fetch_device_from_db(self, device_id):
"""从数据库获取设备信息"""
# 实际实现中从数据库查询
return {"device_id": device_id, "name": "Cached Device"}
def invalidate_cache(self, device_id):
"""使缓存失效"""
cache_key = self._get_cache_key(device_id)
if cache_key in self.cache:
del self.cache[cache_key]
6.2 安全性最佳实践
# 安全配置示例
from ghosttrack.security import (
APIKeyManager,
RateLimiter,
IPWhitelist,
AuditLogger
)
class SecureGhostTrack:
"""安全增强版 GhostTrack"""
def __init__(self):
self.api_key_manager = APIKeyManager()
self.rate_limiter = RateLimiter(
max_requests_per_minute=100,
max_requests_per_hour=5000
)
self.ip_whitelist = IPWhitelist()
self.audit_logger = AuditLogger()
def validate_request(self, request):
"""
验证请求的安全性
检查项:
1. API Key 有效性
2. 请求频率限制
3. IP 白名单
4. 数据格式校验
"""
# 检查 API Key
api_key = request.headers.get("X-API-Key")
if not self.api_key_manager.validate(api_key):
return False, "Invalid API Key"
# 检查请求频率
if not self.rate_limiter.check(request.client.host):
return False, "Rate limit exceeded"
# 检查 IP 白名单(如果配置了)
if self.ip_whitelist.is_enabled():
if not self.ip_whitelist.is_allowed(request.client.host):
return False, "IP not allowed"
# 记录审计日志
self.audit_logger.log(
action="api_request",
api_key=api_key[:8] + "...", # 只记录 key 前8位
ip=request.client.host,
endpoint=request.url.path,
timestamp=request.datetime
)
return True, "OK"
def generate_api_key(self, user_id, permissions):
"""为用户生成 API Key"""
import secrets
api_key = f"gt_{secrets.token_urlsafe(32)}"
self.api_key_manager.create(
key=api_key,
user_id=user_id,
permissions=permissions,
expires_in_days=365
)
return api_key
def revoke_api_key(self, api_key):
"""吊销 API Key"""
self.api_key_manager.revoke(api_key)
self.audit_logger.log(
action="key_revoked",
api_key=api_key[:8] + "...",
timestamp=datetime.now()
)
6.3 监控与运维
# 监控与运维工具
from ghosttrack.monitoring import (
MetricsCollector,
HealthChecker,
AlertNotifier
)
from prometheus_client import Counter, Histogram, Gauge
# 定义 Prometheus 指标
location_reports_total = Counter(
'ghosttrack_location_reports_total',
'Total number of location reports',
['device_type']
)
report_latency = Histogram(
'ghosttrack_report_latency_seconds',
'Location report latency',
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
)
active_devices = Gauge(
'ghosttrack_active_devices',
'Number of active tracked devices'
)
class OpsDashboard:
"""运维监控面板"""
def __init__(self):
self.metrics = MetricsCollector()
self.health_checker = HealthChecker()
self.notifier = AlertNotifier()
def record_location_report(self, device_type, latency):
"""记录位置上报"""
location_reports_total.labels(device_type=device_type).inc()
report_latency.observe(latency)
def update_active_devices(self, count):
"""更新活跃设备数"""
active_devices.set(count)
async def check_system_health(self):
"""执行系统健康检查"""
health_status = {
"api_server": self.health_checker.check_endpoint("/health"),
"database": self.health_checker.check_database_connection(),
"cache": self.health_checker.check_redis(),
"mqtt": self.health_checker.check_mqtt_broker()
}
# 检查是否有异常
unhealthy = [k for k, v in health_status.items() if not v]
if unhealthy:
await self.notifier.send_alert(
severity="critical",
title="系统健康检查异常",
message=f"以下组件异常: {', '.join(unhealthy)}"
)
return health_status
def get_system_metrics(self):
"""获取系统指标"""
return {
"total_reports": location_reports_total._value.get(),
"reports_by_type": dict(location_reports_total.labels(device_type='')),
"active_devices": active_devices._value.get(),
"avg_latency": report_latency._sum.get() / max(report_latency._count.get(), 1)
}
def generate_report(self):
"""生成运维报告"""
metrics = self.get_system_metrics()
health = asyncio.run(self.check_system_health())
report = f"""
GhostTrack 运维报告
===================
生成时间: {datetime.now().isoformat()}
系统状态:
- API 服务: {'✓ 正常' if health['api_server'] else '✗ 异常'}
- 数据库: {'✓ 正常' if health['database'] else '✗ 异常'}
- 缓存: {'✓ 正常' if health['cache'] else '✗ 异常'}
- MQTT: {'✓ 正常' if health['mqtt'] else '✗ 异常'}
性能指标:
- 总上报次数: {metrics['total_reports']}
- 活跃设备: {metrics['active_devices']}
- 平均延迟: {metrics['avg_latency']:.3f}s
"""
return report
# 使用示例
ops = OpsDashboard()
report = ops.generate_report()
print(report)
七、总结与相关资源
GhostTrack 核心优势回顾
┌────────────────────────────────────────────────────────────────┐
│ GhostTrack 核心优势 │
├────────────────────────────────────────────────────────────────┤
│ │
│ 🎯 开箱即用 完整的追踪解决方案,5分钟快速部署 │
│ │
│ 🔌 协议丰富 支持 HTTP/MQTT/CoAP 等多种物联网协议 │
│ │
│ 📊 实时处理 毫秒级延迟的实时数据处理与告警 │
│ │
│ 🗺️ 灵活存储 支持多种数据库,满足不同规模需求 │
│ │
│ 🔒 安全可靠 完善的认证、授权、审计机制 │
│ │
│ 📈 可扩展 水平扩展设计,支持大规模设备接入 │
│ │
│ 🎨 易集成 RESTful API + WebSocket,集成无忧 │
│ │
└────────────────────────────────────────────────────────────────┘
相关项目推荐
| 项目 | 描述 | 链接 |
|---|---|---|
| GhostTrack Dashboard | 官方可视化面板 | GitHub |
| GhostTrack Mobile SDK | iOS/Android 移动端 SDK | GitHub |
| GhostTrack Python CLI | 命令行工具 | GitHub |
| GhostTrack Examples | 示例代码仓库 | GitHub |
进一步学习资源
学习路径:
│
├── 入门阶段
│ ├── 阅读项目 README.md
│ ├── 运行 Docker 快速体验
│ └── 完成本教程的基础示例
│
├── 进阶阶段
│ ├── 深入阅读源码了解架构设计
│ ├── 学习数据处理引擎配置
│ └── 实践自定义过滤器与告警规则
│
├── 实战阶段
│ ├── 根据业务场景定制开发
│ ├── 性能调优与容量规划
│ └── 生产环境部署与运维
│
└── 贡献阶段
├── 提交 Bug 反馈与功能建议
├── 贡献代码与文档
└── 参与社区讨论
官方资源链接
- GitHub 仓库: https://github.com/HunxByts/GhostTrack
- 官方文档: https://docs.ghosttrack.dev
- 问题反馈: https://github.com/HunxByts/GhostTrack/issues
- 讨论社区: https://github.com/HunxByts/GhostTrack/discussions
- 更新日志: https://github.com/HunxByts/GhostTrack/releases
GhostTrack 让设备追踪变得如此简单,无论是初创公司还是大型企业,都能从中受益。立即开始你的追踪之旅吧! 🚀
评论区