Skip to content

python-can

python-can 支持的接口类型

接口类型平台说明
socketcanLinuxLinux 内核原生支持,推荐使用
pcanWindows/LinuxPEAK-System PCAN 硬件
vectorWindowsVector 硬件(如 VN1610)
kvaserWindows/LinuxKvaser CAN 硬件
virtual全平台虚拟接口,用于测试模拟
serial / slcan全平台串口转 CAN 适配器

更多支持的接口类型请参考 python-can 官方文档

环境准备

Windows 驱动

  1. 下载并安装驱动
  2. 安装后找到驱动提供的 PCANBasic.dll,这是 python-can 调用硬件的桥梁

SocketCAN

连接硬件后,系统会将其识别为网络接口(如 can0)。

bash
# 配置波特率并启用接口
sudo ip link set can0 type can bitrate 500000
sudo ip link set can0 up

安装 Python 库

bash
pip install python-can

快速上手

以下是一个完整的 使用 pcan 最小可运行示例,展示基本的发送和接收:

python
import can
import time

# 创建总线(Windows)
bus = can.interface.Bus(bustype='pcan', channel='PCAN_USBBUS1', bitrate=500000)

# 发送一条消息
msg = can.Message(arbitration_id=0x123, data=[1, 2, 3, 4, 5, 6, 7, 8], is_extended_id=False)
bus.send(msg)
print(f"已发送: ID=0x{msg.arbitration_id:X}, Data={list(msg.data)}")

# 阻塞接收一条消息(超时 2 秒)
received = bus.recv(timeout=2.0)
if received:
    print(f"已接收: ID=0x{received.arbitration_id:X}, Data={list(received.data)}")
else:
    print("超时,未收到消息")

# 清理
bus.shutdown()

基础:实例化 Bus 对象

这是唯一需要区分操作系统的地方。建议使用统一的初始化函数:

python
import can
import platform

def init_can_bus(channel='PCAN_USBBUS1', bitrate=500000):
    """
    统一初始化入口
    Windows 默认使用 PCAN 接口
    Linux 默认使用 SocketCAN 接口 (PCAN 硬件会被映射为 can0)
    """
    if platform.system() == "Windows":
        return can.interface.Bus(
            bustype='pcan', 
            channel=channel, 
            bitrate=bitrate
        )
    else:
        # Linux 下需先用 ip link 配置好波特率
        return can.interface.Bus(
            bustype='socketcan', 
            channel='can0' 
        )

# 使用示例
bus = init_can_bus()

发送数据

Message 对象

can.Message 是报文的封装类,发送前需先创建消息对象。

属性类型说明
arbitration_idint报文 ID(0x000 ~ 0x7FF 标准帧,0x00000000 ~ 0x1FFFFFFF 扩展帧)
databytearray数据负载(CAN 2.0 最多 8 字节,CAN FD 最多 64 字节)
is_extended_idbool是否为扩展帧(29-bit ID)
is_remote_framebool是否为远程帧
is_fdbool是否为 CAN FD 帧
bitrate_switchboolCAN FD 加速段(BRS)
error_state_indicatorboolCAN FD 错误状态指示(ESI)
dlcint数据长度码
timestampfloat接收时间戳(秒)
channelstr接收通道

创建消息示例

python
# 标准 CAN 2.0 帧
msg1 = can.Message(
    arbitration_id=0x123,
    data=[0x11, 0x22, 0x33, 0x44],
    is_extended_id=False
)

# 扩展帧(29-bit ID)
msg2 = can.Message(
    arbitration_id=0x18FFABCD,
    data=[0xA0, 0xB0, 0xC0],
    is_extended_id=True
)

# 远程帧
msg3 = can.Message(
    arbitration_id=0x456,
    is_remote_frame=True,
    dlc=8
)

发送标准 CAN 报文

python
def send_standard_msg(bus, arb_id=0x123, data=None):
    """发送标准 CAN 2.0 报文"""
    if data is None:
        data = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88]
    
    msg = can.Message(
        arbitration_id=arb_id,
        data=data,
        is_extended_id=False
    )
    try:
        bus.send(msg)
        print(f"发送成功: ID=0x{arb_id:X}, Data={list(data)}")
    except can.CanError as e:
        print(f"发送失败: {e}")

发送 CAN FD 报文

CAN FD 支持更长数据(最多 64 字节)和更高传输速率。

python
def send_fd_msg(bus, arb_id=0xABC, data=None, bitrate_switch=True):
    """发送 CAN FD 报文"""
    if data is None:
        data = list(range(64))  # 64 字节数据
    
    msg = can.Message(
        arbitration_id=arb_id,
        data=data,
        is_fd=True,
        bitrate_switch=bitrate_switch  # 开启加速段
    )
    try:
        bus.send(msg)
        print(f"FD 发送成功: ID=0x{arb_id:X}, 长度={len(data)}字节")
    except can.CanError as e:
        print(f"FD 发送失败: {e}")

# 使用前需初始化 FD 总线
# bus_fd = can.interface.Bus(bustype='pcan', channel='PCAN_USBBUS1', fd=True, bitrate=500000)
# send_fd_msg(bus_fd)

周期性发送

使用 CyclicSendTask 自动周期性发送报文:

python
def start_cyclic_send(bus, arb_id=0x123, data=None, period=0.1):
    """
    启动周期性发送任务
    :param period: 发送周期(秒)
    """
    if data is None:
        data = [0x01, 0x02, 0x03, 0x04]
    
    msg = can.Message(
        arbitration_id=arb_id,
        data=data,
        is_extended_id=False
    )
    
    task = bus.send_periodic(msg, period)
    print(f"已启动周期发送: ID=0x{arb_id:X}, 周期={period}s")
    return task

# 使用示例
# task = start_cyclic_send(bus, period=0.1)  # 每 100ms 发送一次
# time.sleep(5)  # 持续 5 秒
# task.stop()    # 停止周期发送

接收数据

阻塞式接收

最简单的接收方式,程序会阻塞直到收到消息或超时:

python
def receive_blocking(bus, timeout=1.0):
    """阻塞式接收单条消息"""
    msg = bus.recv(timeout=timeout)
    if msg:
        print(f"收到: ID=0x{msg.arbitration_id:X}, Data={list(msg.data)}, Time={msg.timestamp:.6f}")
        return msg
    else:
        print("超时,未收到消息")
        return None

# 持续接收
# while True:
#     receive_blocking(bus)

Notifier 非阻塞接收

推荐用于实际应用,不会阻塞主线程:

python
class CANReceiver(can.Listener):
    def on_message_received(self, msg):
        print(f"收到报文: ID=0x{msg.arbitration_id:X} Data={msg.data.hex().upper()} Time={msg.timestamp:.6f}")

def start_listening(bus):
    listener = CANReceiver()
    notifier = can.Notifier(bus, [listener])
    return notifier

# 使用示例
# notifier = start_listening(bus)
# try:
#     while True:
#         time.sleep(1)
# except KeyboardInterrupt:
#     notifier.stop()
#     bus.shutdown()

消息过滤器

只接收特定 ID 的报文,减少无关消息干扰:

python
def setup_filter(bus, can_ids):
    """
    设置接收过滤器
    :param can_ids: 要接收的 ID 列表,如 [0x123, 0x456]
    """
    # 方式 1:单个 ID 过滤
    filters = [{"can_id": can_id, "can_mask": 0x7FF, "extended": False} for can_id in can_ids]
    
    # 方式 2:ID 范围过滤(接收 0x100~0x1FF)
    # filters = [{"can_id": 0x100, "can_mask": 0x700, "extended": False}]
    
    bus.set_filters(filters)
    print(f"已设置过滤器: {can_ids}")

# 使用示例
# setup_filter(bus, [0x123, 0x456])  # 只接收 0x123 和 0x456

过滤器掩码原理

  • can_id:基准 ID
  • can_mask:掩码,1 表示该位需要匹配,0 表示忽略
  • 报文 ID 与 can_id 在掩码为 1 的位上相等时通过
示例:can_id=0x123, can_mask=0x7FF(全匹配)
      只接收 ID 恰好为 0x123 的报文

示例:can_id=0x100, can_mask=0x700(高 3 位匹配)
      接收 ID 范围 0x100~0x1FF 的所有报文

常见问题与调试建议

场景常见报错/问题解决方法
WindowsThe PCANBasic.dll could not be found.确保驱动已安装,或手动将 dll 放入 Python 脚本目录。
WindowsPCAN_ERROR_ILLHW (硬件未找到)检查 USB 连接;确保 PCAN-View 软件没有独占打开该通道。
Linux收到数据为空检查是否执行了 sudo ip link set can0 up type can bitrate 500000
Linuxsocket: Network is down先执行 sudo ip link set can0 up 启用接口。
通用PCAN_ERROR_BUSOFF检查终端电阻(120Ω)是否连接,或波特率是否与总线一致。
通用发送成功但收不到检查 CAN H/L 接线是否正确,确认总线有其他节点应答。
通用收到 Error Frame使用 can.Logger 记录原始帧分析,或用示波器/逻辑分析仪检查信号。

调试技巧

python
import logging
# 开启调试日志
logging.basicConfig(level=logging.DEBUG)

# 记录所有收发消息到文件
logger = can.Logger("can_log.asc")  # .asc 格式可用 CANoe/CANalyzer 打开
notifier = can.Notifier(bus, [logger])

# 打印总线信息
print(bus.channel_info)

参考资源

驱动智能连接,赋能科技未来