0

0

使用pySerial进行串口通信:数据接收的常见陷阱与最佳实践

花韻仙語

花韻仙語

发布时间:2025-10-27 12:41:03

|

232人浏览过

|

来源于php中文网

原创

使用pySerial进行串口通信:数据接收的常见陷阱与最佳实践

在使用pyserial与串口设备通信时,开发者常遇到发送命令后无法读取到预期数据的问题。这通常是由于设备未启用回显模式,而非pyserial连接失败。本文将深入探讨这一常见误区,并提供通过发送特定触发命令、利用`readline()`等方法有效接收串口数据的专业教程,确保稳定可靠的数据交互。

pySerial串口通信基础与数据接收挑战

Python的pySerial库是进行串口通信的强大工具,广泛应用于与各种硬件设备(如传感器、仪表、微控制器)进行数据交互。然而,初学者在尝试连接设备并发送命令后,可能会发现即使连接成功,也无法通过ser.in_waiting或ser.read()等方法读取到任何数据,这往往令人困惑。

一个常见的误解是,当通过ser.write()发送数据后,设备会自动将接收到的数据“回显”回来。实际上,大多数串口设备并不会默认开启回显模式。它们通常只在接收到特定的命令后,才会执行操作并发送预定义的数据响应。一些终端工具(如Termite、Minicom)可能内置了“本地回显”功能,这会让人误以为设备在回显,但实际上是终端软件在本地显示了你发送的字符。

因此,如果设备没有被编程为回显,或者没有收到触发其响应的特定命令,ser.in_waiting自然会返回0,因为设备端根本没有发送任何数据回来。

建立pySerial连接

在尝试与设备通信之前,首先需要正确配置并打开串口。以下是一个典型的pySerial串口配置示例:

import serial
import time

def setup_serial_connection(port='COM4', baudrate=9600, timeout=1):
    """
    配置并返回一个pySerial串口对象。
    :param port: 串口名称,例如'COM4'或'/dev/ttyUSB0'
    :param baudrate: 波特率
    :param timeout: 读取超时时间(秒)
    :return: 配置好的串口对象
    """
    ser = serial.Serial()
    ser.port = port
    ser.baudrate = baudrate
    ser.bytesize = serial.EIGHTBITS  # 8位数据位
    ser.stopbits = serial.STOPBITS_ONE # 1位停止位
    ser.parity = serial.PARITY_NONE  # 无奇偶校验
    ser.xonxoff = False              # 禁用软件流控制
    ser.rtscts = False               # 禁用硬件流控制 (RTS/CTS)
    ser.dsrdtr = False               # 禁用硬件流控制 (DSR/DTR)
    ser.timeout = timeout            # 读取超时时间

    try:
        ser.open()
        if ser.is_open:
            print(f"成功打开串口: {ser.port}")
            return ser
    except serial.SerialException as e:
        print(f"无法打开串口 {ser.port}: {e}")
        return None

    return None

# 示例使用
# ser_connection = setup_serial_connection(port='COM4', baudrate=9600, timeout=1)
# if ser_connection:
#     # 进行通信
#     ser_connection.close()

注意事项:

  • port:根据您的操作系统设备管理器确定正确的串口号(Windows通常是COMx,Linux通常是/dev/ttyUSBx或/dev/ttySx)。
  • baudrate、bytesize、stopbits、parity:这些参数必须与您的设备制造商提供的串口通信协议完全匹配。
  • rtscts、dsrdtr、xonxoff:这些是流控制设置。根据设备要求启用或禁用,通常默认禁用。如果设备需要硬件流控制,应将rtscts或dsrdtr设置为True。
  • timeout:这是一个关键参数。它定义了read()或readline()操作等待数据的时间。如果设置为0,则非阻塞,立即返回。如果设置为None,则无限等待。对于大多数应用,设置一个合理的非零超时值是推荐的做法。

有效的数据接收策略

鉴于设备不回显的特性,正确的策略是发送一个明确会触发设备响应的命令,然后等待并读取其响应。

1. 发送触发命令并使用 readline() 读取

许多设备在接收到特定命令后会返回以换行符(\n)或回车符(\r)结尾的行数据。在这种情况下,ser.readline()是最高效的读取方法。

星火作家大神
星火作家大神

星火作家大神是一款面向作家的AI写作工具

下载
import serial
import time

# 假设我们已经通过 setup_serial_connection 函数获取了 ser 对象
# 例如:
# ser = setup_serial_connection(port='COM4', baudrate=9600, timeout=1)

# 模拟一个已打开的串口对象
class MockSerial:
    def __init__(self):
        self.is_open = True
        self.buffer = b''
        self.timeout = 1
        self.port = 'COM_MOCK'

    def open(self):
        self.is_open = True
    def close(self):
        self.is_open = False
    def write(self, data):
        print(f"MockSerial: 发送数据: {data.decode('utf-8').strip()}")
        # 模拟设备响应:如果收到'K',则返回'0309\n'
        if data == b'K':
            self.buffer += b'0309\n'
        # 模拟其他命令响应
        elif data == b'GET_TEMP':
            self.buffer += b'25.5C\n'
        # 模拟设备处理时间
        time.sleep(0.1)

    def readline(self):
        start_time = time.time()
        while b'\n' not in self.buffer:
            if self.timeout is not None and (time.time() - start_time > self.timeout):
                return b'' # 超时返回空字节串
            time.sleep(0.01) # 等待数据

        line_end_index = self.buffer.find(b'\n')
        if line_end_index != -1:
            line = self.buffer[:line_end_index + 1]
            self.buffer = self.buffer[line_end_index + 1:]
            return line
        return b'' # 不应该发生

    @property
    def in_waiting(self):
        return len(self.buffer)

# 替换为实际的串口对象
ser = MockSerial() # 在实际应用中,这里会是 setup_serial_connection() 的返回值

if ser and ser.is_open:
    try:
        # 发送触发命令。根据设备协议,这可能是'K'、'READ'、'GET_DATA'等
        command = b'K' # 假设'K'命令会触发设备返回型号
        ser.write(command)
        time.sleep(0.1) # 给予设备一点时间处理命令并发送响应

        # 检查是否有数据等待,但这不是主要读取方式
        buffer_size = ser.in_waiting
        print(f"串口等待中的字节数 (在readline之前): {buffer_size}")

        # 使用 readline() 读取设备响应
        print("开始读取设备响应...")
        response_lines = []
        while True:
            line = ser.readline()
            if not line: # 如果readline返回空字节串,表示超时或没有更多数据
                break
            try:
                # 尝试以UTF-8解码,并去除首尾空白符(包括换行符)
                decoded_line = line.decode('utf-8').strip()
                response_lines.append(decoded_line)
                print(f"接收到: {decoded_line}")
            except UnicodeDecodeError:
                # 如果解码失败,打印原始十六进制数据以供调试
                print(f"解码失败,原始数据(hex): {line.hex()}")
            except Exception as e:
                print(f"处理数据时发生错误: {e}")
                break

        if response_lines:
            print("\n所有接收到的响应:")
            for resp in response_lines:
                print(resp)
        else:
            print("未从设备接收到任何响应。请检查命令和设备状态。")

    except serial.SerialException as e:
        print(f"串口通信错误: {e}")
    finally:
        if ser.is_open:
            ser.close()
            print("串口已关闭。")
else:
    print("串口未成功打开,无法进行通信。")

代码解析:

  • ser.write(command): 发送编码后的字节串命令。
  • time.sleep(0.1): 在发送命令后短暂暂停,给设备留出处理命令和准备响应的时间。这个延迟时间需要根据设备的响应速度进行调整。
  • ser.readline(): 尝试从串口读取一行数据,直到遇到换行符\n或超时。
  • if not line: break: 如果readline()在超时时间内没有读到任何数据,它会返回一个空字节串b'',此时应退出循环。
  • line.decode('utf-8').strip(): 将接收到的字节串解码为字符串,并移除前后的空白符(包括\n)。如果设备使用不同的编码,需要修改'utf-8'。
  • UnicodeDecodeError处理:如果解码失败,说明接收到的数据可能不是预期的文本格式。此时打印其十六进制表示有助于调试。

2. 使用 read() 读取特定长度或所有可用数据

如果设备不以行结尾符响应,或者你需要读取固定长度的数据块,可以使用ser.read(size)。

# ... (串口初始化代码同上) ...

# 假设 ser 已经是一个打开的串口对象
# ser = setup_serial_connection(port='COM4', baudrate=9600, timeout=1)

# 使用 MockSerial 模拟
ser = MockSerial()
ser.timeout = 0.5 # 确保read有超时

if ser and ser.is_open:
    try:
        command = b'GET_DATA_BLOCK' # 假设这个命令会触发设备发送一个固定长度的数据块
        ser.write(command)
        time.sleep(0.1)

        # 假设我们知道设备会返回10个字节的数据
        expected_data_length = 10
        received_data = b''

        # 循环读取直到达到预期长度或超时
        while len(received_data) < expected_data_length:
            chunk = ser.read(expected_data_length - len(received_data))
            if not chunk: # 超时或没有更多数据
                break
            received_data += chunk
            print(f"已接收 {len(received_data)} / {expected_data_length} 字节")

        if received_data:
            print(f"\n接收到的原始数据 (字节): {received_data}")
            try:
                print(f"解码后的数据: {received_data.decode('ascii').strip()}")
            except UnicodeDecodeError:
                print(f"解码失败,原始数据(hex): {received_data.hex()}")
        else:
            print("未从设备接收到任何数据。")

    except serial.SerialException as e:
        print(f"串口通信错误: {e}")
    finally:
        if ser.is_open:
            ser.close()
            print("串口已关闭。")

ser.read(size)的特点:

  • size:指定要读取的最大字节数。
  • 如果size个字节在超时时间内未完全读取到,read()会返回所有已读取到的字节。
  • 如果超时时间内没有数据可用,read()会返回空字节串b''。

3. 处理连续数据流

如果设备持续发送数据,你可能需要一个循环来不断读取。

# ... (串口初始化代码同上) ...
# ser = setup_serial_connection(port='COM4', baudrate=9600, timeout=0.1) # 较短的超时

# 使用 MockSerial 模拟,模拟持续发送数据
class ContinuousMockSerial(MockSerial):
    def __init__(self):
        super().__init__()
        self.counter = 0
    def write(self, data):
        print(f"MockSerial: 发送数据: {data.decode('utf-8').strip()}")
        # 模拟设备收到'START'后开始持续发送数据
        if data == b'START':
            self.start_time = time.time()
            self.sending = True
        elif data == b'STOP':
            self.sending = False
    def readline(self):
        if hasattr(self, 'sending') and self.sending and (time.time() - self.start_time) > 0.1:
            self.buffer += f"Data_Point_{self.counter}\n".encode('utf-8')
            self.counter += 1
            self.start_time = time.time() # 重置发送时间
        return super().readline()

ser = ContinuousMockSerial()
ser.timeout = 0.5 # 确保readline有超时

if ser and ser.is_open:
    print("开始监听连续数据...")
    try:
        ser.write(b'START') # 告知设备开始发送数据
        for i in range(10): # 循环读取10次
            line = ser.readline()
            if line:
                try:
                    print(f"接收到: {line.decode('utf-8').strip()}")
                except UnicodeDecodeError:
                    print(f"解码失败,原始数据(hex): {line.hex()}")
            else:
                print("未接收到数据,可能设备停止发送或超时。")
            time.sleep(0.2) # 模拟处理间隔

        ser.write(b'STOP') # 告知设备停止发送数据
        print("停止监听。")

    except serial.SerialException as e:
        print(f"串口通信错误: {e}")
    finally:
        if ser.is_open:
            ser.close()
            print("串口已关闭。")

总结与最佳实践

  1. 理解设备协议: 这是最重要的。务必查阅设备制造商提供的通信协议文档,了解其命令格式、响应格式、波特率、数据位、停止位、奇偶校验和流控制设置。
  2. 区分本地回显与设备响应: 终端工具的“本地回显”功能可能会误导你。pySerial不会自动回显你发送的数据。
  3. 发送触发命令: 只有发送了设备能识别并会触发响应的命令,设备才会发送数据。
  4. 合理设置timeout: ser.timeout是控制读取操作阻塞时间的关键。设置为None会导致无限等待,设置为0会立即返回。对于大多数应用,建议设置一个非零的合理值。
  5. 选择合适的读取方法:
    • ser.readline():适用于设备以行(\n或\r结尾)发送数据的情况。
    • ser.read(size):适用于读取固定长度的数据块或任意字节流。
    • ser.read_until(expected=b'\n', size=None):适用于读取直到遇到特定终止符。
  6. 处理编码和解码: 串口通信通常涉及字节串。发送时,字符串需要.encode('utf-8')(或其他编码)转换为字节串;接收时,字节串需要.decode('utf-8')转换为字符串。
  7. 异常处理: 使用try...except serial.SerialException来捕获串口相关的错误,并确保在finally块中关闭串口ser.close()。
  8. 考虑流控制: 如果设备需要硬件或软件流控制,请确保在pySerial中正确配置rtscts、dsrdtr或xonxoff。

遵循这些原则,你将能更有效地使用pySerial与各种串口设备进行稳定可靠的通信。

相关专题

更多
python开发工具
python开发工具

php中文网为大家提供各种python开发工具,好的开发工具,可帮助开发者攻克编程学习中的基础障碍,理解每一行源代码在程序执行时在计算机中的过程。php中文网还为大家带来python相关课程以及相关文章等内容,供大家免费下载使用。

708

2023.06.15

python打包成可执行文件
python打包成可执行文件

本专题为大家带来python打包成可执行文件相关的文章,大家可以免费的下载体验。

625

2023.07.20

python能做什么
python能做什么

python能做的有:可用于开发基于控制台的应用程序、多媒体部分开发、用于开发基于Web的应用程序、使用python处理数据、系统编程等等。本专题为大家提供python相关的各种文章、以及下载和课程。

736

2023.07.25

format在python中的用法
format在python中的用法

Python中的format是一种字符串格式化方法,用于将变量或值插入到字符串中的占位符位置。通过format方法,我们可以动态地构建字符串,使其包含不同值。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

616

2023.07.31

python教程
python教程

Python已成为一门网红语言,即使是在非编程开发者当中,也掀起了一股学习的热潮。本专题为大家带来python教程的相关文章,大家可以免费体验学习。

1234

2023.08.03

python环境变量的配置
python环境变量的配置

Python是一种流行的编程语言,被广泛用于软件开发、数据分析和科学计算等领域。在安装Python之后,我们需要配置环境变量,以便在任何位置都能够访问Python的可执行文件。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

547

2023.08.04

python eval
python eval

eval函数是Python中一个非常强大的函数,它可以将字符串作为Python代码进行执行,实现动态编程的效果。然而,由于其潜在的安全风险和性能问题,需要谨慎使用。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

573

2023.08.04

scratch和python区别
scratch和python区别

scratch和python的区别:1、scratch是一种专为初学者设计的图形化编程语言,python是一种文本编程语言;2、scratch使用的是基于积木的编程语法,python采用更加传统的文本编程语法等等。本专题为大家提供scratch和python相关的文章、下载、课程内容,供大家免费下载体验。

695

2023.08.11

ip地址修改教程大全
ip地址修改教程大全

本专题整合了ip地址修改教程大全,阅读下面的文章自行寻找合适的解决教程。

27

2025.12.26

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
PostgreSQL 教程
PostgreSQL 教程

共48课时 | 6.1万人学习

Git 教程
Git 教程

共21课时 | 2.2万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号