python-can
python-can 支持的接口类型
| 接口类型 | 平台 | 说明 |
|---|---|---|
socketcan | Linux | Linux 内核原生支持,推荐使用 |
pcan | Windows/Linux | PEAK-System PCAN 硬件 |
vector | Windows | Vector 硬件(如 VN1610) |
kvaser | Windows/Linux | Kvaser CAN 硬件 |
virtual | 全平台 | 虚拟接口,用于测试模拟 |
serial / slcan | 全平台 | 串口转 CAN 适配器 |
更多支持的接口类型请参考 python-can 官方文档。
环境准备
Windows 驱动
- 下载并安装驱动
- 安装后找到驱动提供的
PCANBasic.dll,这是 python-can 调用硬件的桥梁
SocketCAN
连接硬件后,系统会将其识别为网络接口(如 can0)。
bash
# 配置波特率并启用接口
sudo ip link set can0 type can bitrate 500000
sudo ip link set can0 up安装 Python 库
bash
pip install python-can快速上手
以下是一个完整的 使用 pcan 最小可运行示例,展示基本的发送和接收:
python
import can
import time
# 创建总线(Windows)
bus = can.interface.Bus(bustype='pcan', channel='PCAN_USBBUS1', bitrate=500000)
# 发送一条消息
msg = can.Message(arbitration_id=0x123, data=[1, 2, 3, 4, 5, 6, 7, 8], is_extended_id=False)
bus.send(msg)
print(f"已发送: ID=0x{msg.arbitration_id:X}, Data={list(msg.data)}")
# 阻塞接收一条消息(超时 2 秒)
received = bus.recv(timeout=2.0)
if received:
print(f"已接收: ID=0x{received.arbitration_id:X}, Data={list(received.data)}")
else:
print("超时,未收到消息")
# 清理
bus.shutdown()基础:实例化 Bus 对象
这是唯一需要区分操作系统的地方。建议使用统一的初始化函数:
python
import can
import platform
def init_can_bus(channel='PCAN_USBBUS1', bitrate=500000):
"""
统一初始化入口
Windows 默认使用 PCAN 接口
Linux 默认使用 SocketCAN 接口 (PCAN 硬件会被映射为 can0)
"""
if platform.system() == "Windows":
return can.interface.Bus(
bustype='pcan',
channel=channel,
bitrate=bitrate
)
else:
# Linux 下需先用 ip link 配置好波特率
return can.interface.Bus(
bustype='socketcan',
channel='can0'
)
# 使用示例
bus = init_can_bus()发送数据
Message 对象
can.Message 是报文的封装类,发送前需先创建消息对象。
| 属性 | 类型 | 说明 |
|---|---|---|
arbitration_id | int | 报文 ID(0x000 ~ 0x7FF 标准帧,0x00000000 ~ 0x1FFFFFFF 扩展帧) |
data | bytearray | 数据负载(CAN 2.0 最多 8 字节,CAN FD 最多 64 字节) |
is_extended_id | bool | 是否为扩展帧(29-bit ID) |
is_remote_frame | bool | 是否为远程帧 |
is_fd | bool | 是否为 CAN FD 帧 |
bitrate_switch | bool | CAN FD 加速段(BRS) |
error_state_indicator | bool | CAN FD 错误状态指示(ESI) |
dlc | int | 数据长度码 |
timestamp | float | 接收时间戳(秒) |
channel | str | 接收通道 |
创建消息示例:
python
# 标准 CAN 2.0 帧
msg1 = can.Message(
arbitration_id=0x123,
data=[0x11, 0x22, 0x33, 0x44],
is_extended_id=False
)
# 扩展帧(29-bit ID)
msg2 = can.Message(
arbitration_id=0x18FFABCD,
data=[0xA0, 0xB0, 0xC0],
is_extended_id=True
)
# 远程帧
msg3 = can.Message(
arbitration_id=0x456,
is_remote_frame=True,
dlc=8
)发送标准 CAN 报文
python
def send_standard_msg(bus, arb_id=0x123, data=None):
"""发送标准 CAN 2.0 报文"""
if data is None:
data = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88]
msg = can.Message(
arbitration_id=arb_id,
data=data,
is_extended_id=False
)
try:
bus.send(msg)
print(f"发送成功: ID=0x{arb_id:X}, Data={list(data)}")
except can.CanError as e:
print(f"发送失败: {e}")发送 CAN FD 报文
CAN FD 支持更长数据(最多 64 字节)和更高传输速率。
python
def send_fd_msg(bus, arb_id=0xABC, data=None, bitrate_switch=True):
"""发送 CAN FD 报文"""
if data is None:
data = list(range(64)) # 64 字节数据
msg = can.Message(
arbitration_id=arb_id,
data=data,
is_fd=True,
bitrate_switch=bitrate_switch # 开启加速段
)
try:
bus.send(msg)
print(f"FD 发送成功: ID=0x{arb_id:X}, 长度={len(data)}字节")
except can.CanError as e:
print(f"FD 发送失败: {e}")
# 使用前需初始化 FD 总线
# bus_fd = can.interface.Bus(bustype='pcan', channel='PCAN_USBBUS1', fd=True, bitrate=500000)
# send_fd_msg(bus_fd)周期性发送
使用 CyclicSendTask 自动周期性发送报文:
python
def start_cyclic_send(bus, arb_id=0x123, data=None, period=0.1):
"""
启动周期性发送任务
:param period: 发送周期(秒)
"""
if data is None:
data = [0x01, 0x02, 0x03, 0x04]
msg = can.Message(
arbitration_id=arb_id,
data=data,
is_extended_id=False
)
task = bus.send_periodic(msg, period)
print(f"已启动周期发送: ID=0x{arb_id:X}, 周期={period}s")
return task
# 使用示例
# task = start_cyclic_send(bus, period=0.1) # 每 100ms 发送一次
# time.sleep(5) # 持续 5 秒
# task.stop() # 停止周期发送接收数据
阻塞式接收
最简单的接收方式,程序会阻塞直到收到消息或超时:
python
def receive_blocking(bus, timeout=1.0):
"""阻塞式接收单条消息"""
msg = bus.recv(timeout=timeout)
if msg:
print(f"收到: ID=0x{msg.arbitration_id:X}, Data={list(msg.data)}, Time={msg.timestamp:.6f}")
return msg
else:
print("超时,未收到消息")
return None
# 持续接收
# while True:
# receive_blocking(bus)Notifier 非阻塞接收
推荐用于实际应用,不会阻塞主线程:
python
class CANReceiver(can.Listener):
def on_message_received(self, msg):
print(f"收到报文: ID=0x{msg.arbitration_id:X} Data={msg.data.hex().upper()} Time={msg.timestamp:.6f}")
def start_listening(bus):
listener = CANReceiver()
notifier = can.Notifier(bus, [listener])
return notifier
# 使用示例
# notifier = start_listening(bus)
# try:
# while True:
# time.sleep(1)
# except KeyboardInterrupt:
# notifier.stop()
# bus.shutdown()消息过滤器
只接收特定 ID 的报文,减少无关消息干扰:
python
def setup_filter(bus, can_ids):
"""
设置接收过滤器
:param can_ids: 要接收的 ID 列表,如 [0x123, 0x456]
"""
# 方式 1:单个 ID 过滤
filters = [{"can_id": can_id, "can_mask": 0x7FF, "extended": False} for can_id in can_ids]
# 方式 2:ID 范围过滤(接收 0x100~0x1FF)
# filters = [{"can_id": 0x100, "can_mask": 0x700, "extended": False}]
bus.set_filters(filters)
print(f"已设置过滤器: {can_ids}")
# 使用示例
# setup_filter(bus, [0x123, 0x456]) # 只接收 0x123 和 0x456过滤器掩码原理:
can_id:基准 IDcan_mask:掩码,1 表示该位需要匹配,0 表示忽略- 报文 ID 与
can_id在掩码为 1 的位上相等时通过
示例:can_id=0x123, can_mask=0x7FF(全匹配)
只接收 ID 恰好为 0x123 的报文
示例:can_id=0x100, can_mask=0x700(高 3 位匹配)
接收 ID 范围 0x100~0x1FF 的所有报文常见问题与调试建议
| 场景 | 常见报错/问题 | 解决方法 |
|---|---|---|
| Windows | The PCANBasic.dll could not be found. | 确保驱动已安装,或手动将 dll 放入 Python 脚本目录。 |
| Windows | PCAN_ERROR_ILLHW (硬件未找到) | 检查 USB 连接;确保 PCAN-View 软件没有独占打开该通道。 |
| Linux | 收到数据为空 | 检查是否执行了 sudo ip link set can0 up type can bitrate 500000。 |
| Linux | socket: Network is down | 先执行 sudo ip link set can0 up 启用接口。 |
| 通用 | PCAN_ERROR_BUSOFF | 检查终端电阻(120Ω)是否连接,或波特率是否与总线一致。 |
| 通用 | 发送成功但收不到 | 检查 CAN H/L 接线是否正确,确认总线有其他节点应答。 |
| 通用 | 收到 Error Frame | 使用 can.Logger 记录原始帧分析,或用示波器/逻辑分析仪检查信号。 |
调试技巧
python
import logging
# 开启调试日志
logging.basicConfig(level=logging.DEBUG)
# 记录所有收发消息到文件
logger = can.Logger("can_log.asc") # .asc 格式可用 CANoe/CANalyzer 打开
notifier = can.Notifier(bus, [logger])
# 打印总线信息
print(bus.channel_info)参考资源
- python-can 官方文档
- 接口文档:
- Windows: PCAN 接口
- Linux: SocketCAN 接口 | SocketCAN FD 接口