网站排名alexa,百度经验手机版官网,聊城正规网站建设公司电话,韩国网站 后缀#x1f525;作者简介#xff1a; 一个平凡而乐于分享的小比特#xff0c;中南民族大学通信工程专业研究生#xff0c;研究方向无线联邦学习 #x1f3ac;擅长领域#xff1a;驱动开发#xff0c;嵌入式软件开发#xff0c;BSP开发 ❄️作者主页#xff1a;一个平凡而…作者简介 一个平凡而乐于分享的小比特中南民族大学通信工程专业研究生研究方向无线联邦学习擅长领域驱动开发嵌入式软件开发BSP开发❄️作者主页一个平凡而乐于分享的小比特的个人主页✨收录专栏Python本专栏为记录项目中用到常用python库欢迎大家点赞 收藏 ⭐ 加关注哦PySerial 串口通信教程PySerial 是一个 Python 串口通信库可以用于与各种串口设备如 Arduino、传感器、嵌入式设备等进行通信。1. PySerial 简介PySerial 提供了跨平台的串口通信功能支持Windows、Linux、macOS多种串口设置波特率、数据位、停止位、校验位等同步和异步通信二进制和文本数据2. 安装和基础使用2.1 安装pipinstallpyserial2.2 基础示例importserialimporttime# 基础串口通信示例try:# 打开串口serserial.Serial(COM3,9600,timeout1)print(f串口已打开:{ser.name})# 等待设备初始化time.sleep(2)# 发送数据ser.write(bHello Arduino!\n)print(数据已发送)# 读取数据responseser.readline().decode(utf-8).strip()print(f收到响应:{response})# 关闭串口ser.close()print(串口已关闭)exceptserial.SerialExceptionase:print(f串口错误:{e})exceptExceptionase:print(f其他错误:{e})3. 串口配置和打开3.1 查找可用串口importserialimportserial.tools.list_portsdeflist_serial_ports():列出所有可用的串口portsserial.tools.list_ports.comports()ifnotports:print(没有找到可用的串口)return[]print(可用串口列表:)fori,portinenumerate(ports,1):print(f{i}.{port.device}-{port.description})print(f 硬件ID:{port.hwid})print(f 制造商:{port.manufacturer})print(f 产品:{port.product})print()return[port.deviceforportinports]# 使用示例available_portslist_serial_ports()3.2 串口配置参数importserial# 完整的串口配置serial_config{port:COM3,# 串口号 (Windows: COM3, Linux: /dev/ttyUSB0, macOS: /dev/tty.usbserial)baudrate:9600,# 波特率 (常用: 9600, 115200, 57600, 38400)bytesize:serial.EIGHTBITS,# 数据位 (可选: FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS)parity:serial.PARITY_NONE,# 校验位 (可选: PARITY_NONE, PARITY_EVEN, PARITY_ODD, PARITY_MARK, PARITY_SPACE)stopbits:serial.STOPBITS_ONE,# 停止位 (可选: STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO)timeout:1,# 读取超时时间(秒)None表示阻塞读取0表示非阻塞xonxoff:False,# 软件流控制rtscts:False,# 硬件(RTS/CTS)流控制dsrdtr:False,# 硬件(DSR/DTR)流控制write_timeout:2,# 写入超时时间}# 打开串口try:serserial.Serial(**serial_config)print(f串口配置:{ser})# 获取当前配置print(f当前波特率:{ser.baudrate})print(f当前数据位:{ser.bytesize})print(f当前校验位:{ser.parity})print(f当前停止位:{ser.stopbits})ser.close()exceptserial.SerialExceptionase:print(f无法打开串口:{e})3.3 自动检测和连接串口importserialimportserial.tools.list_portsimporttimedefauto_connect_serial(baudrate9600,timeout1): 自动检测并连接串口 返回: serial.Serial 对象 或 None portsserial.tools.list_ports.comports()ifnotports:print(未找到可用串口)returnNoneforportinports:try:print(f尝试连接:{port.device}-{port.description})# 尝试连接serserial.Serial(portport.device,baudratebaudrate,timeouttimeout)# 测试连接time.sleep(2)# 等待设备初始化# 清空缓冲区ser.reset_input_buffer()ser.reset_output_buffer()# 发送测试命令ser.write(bAT\r\n)time.sleep(0.1)# 检查响应ifser.in_waiting0:responseser.read(ser.in_waiting)print(f设备响应:{response})print(f成功连接到:{port.device})returnserexceptserial.SerialException:print(f连接{port.device}失败尝试下一个...)continueprint(所有串口连接尝试失败)returnNone# 使用示例serauto_connect_serial()ifser:# 进行通信操作ser.write(bHello!\n)ser.close()4. 数据读写操作4.1 基本读写操作importserialimporttimedefbasic_read_write_example():基本读写操作示例try:# 打开串口serserial.Serial(COM3,9600,timeout1)print(串口已打开)# 等待设备就绪time.sleep(2)# 清空缓冲区ser.reset_input_buffer()ser.reset_output_buffer()# 方法1: 写入字符串 (需要编码为字节)commandATVER?\r\nser.write(command.encode(utf-8))print(f发送命令:{command})# 方法2: 直接写入字节ser.write(bATID?\r\n)# 读取数据time.sleep(0.5)# 等待响应# 方法1: 读取指定字节数ifser.in_waiting0:dataser.read(ser.in_waiting)print(f原始数据:{data})print(f解码后:{data.decode(utf-8,errorsignore)})# 方法2: 读取一行ser.write(bATNAME?\r\n)lineser.readline().decode(utf-8,errorsignore).strip()print(f读取一行:{line})# 方法3: 读取所有可用数据ser.write(bATHELP\r\n)time.sleep(1)whileser.in_waiting0:dataser.read(ser.in_waiting)print(f收到数据:{data.decode(utf-8,errorsignore)})ser.close()exceptExceptionase:print(f错误:{e})# 运行示例basic_read_write_example()4.2 文本模式通信importserialdeftext_communication_example():文本模式通信示例try:serserial.Serial(COM3,9600,timeout1)# 发送文本命令commands[LED ON\n,GET TEMP\n,SET SPEED 100\n,STATUS\n]forcmdincommands:print(f发送:{cmd.strip()})ser.write(cmd.encode(utf-8))# 读取响应responseser.readline().decode(utf-8,errorsignore).strip()print(f响应:{response})# 处理特定响应ifTEMPinresponse:temp_valueresponse.split(:)[1].strip()print(f温度值:{temp_value}°C)ser.close()exceptExceptionase:print(f错误:{e})# 运行示例text_communication_example()4.3 二进制数据通信importserialimportstructimporttimedefbinary_communication_example():二进制数据通信示例try:serserial.Serial(COM3,115200,timeout1)# 示例1: 发送二进制命令# 假设协议: 起始字节(0xAA) 命令字节 数据长度 数据 校验和# 构建数据包start_byte0xAAcommand0x01# 读取数据命令data[0x10,0x20,0x30]data_lengthlen(data)# 计算校验和checksum(start_bytecommanddata_lengthsum(data))0xFF# 打包数据packetstruct.pack(BBB,start_byte,command,data_length)packetbytes(data)packetstruct.pack(B,checksum)print(f发送数据包:{packet.hex()})ser.write(packet)# 等待响应time.sleep(0.1)# 读取响应ifser.in_waiting10:# 假设响应包长度至少10字节responseser.read(ser.in_waiting)print(f收到响应:{response.hex()})# 解析响应iflen(response)3:# 解析前3个字节start,cmd,lengthstruct.unpack(BBB,response[:3])print(f响应: 起始{hex(start)}, 命令{hex(cmd)}, 长度{length})# 示例2: 发送浮点数float_value3.14159float_bytesstruct.pack(f,float_value)ser.write(bFLOAT)ser.write(float_bytes)print(f发送浮点数:{float_value}-{float_bytes.hex()})ser.close()exceptExceptionase:print(f错误:{e})# 运行示例binary_communication_example()4.4 持续数据读取importserialimportthreadingimporttimeclassSerialMonitor:串口监视器持续读取数据def__init__(self,port,baudrate9600):self.serserial.Serial(port,baudrate,timeout1)self.runningFalseself.threadNonedefstart_monitoring(self):开始监视串口数据ifnotself.running:self.runningTrueself.threadthreading.Thread(targetself._monitor_loop)self.thread.start()print(串口监视器已启动)def_monitor_loop(self):监视循环whileself.running:try:# 检查是否有数据可读ifself.ser.in_waiting0:# 读取一行数据lineself.ser.readline()try:decodedline.decode(utf-8).strip()print(f[串口数据]{decoded})# 这里可以添加数据处理逻辑self._process_data(decoded)exceptUnicodeDecodeError:# 如果是二进制数据以十六进制显示print(f[二进制数据]{line.hex()})# 短暂休眠避免占用过多CPUtime.sleep(0.01)exceptExceptionase:print(f读取数据时出错:{e})time.sleep(1)def_process_data(self,data):处理接收到的数据# 示例处理特定格式的数据ifdata.startswith(TEMP:):tempdata.split(:)[1]print(f检测到温度数据:{temp}°C)elifdata.startswith(HUMI:):humidata.split(:)[1]print(f检测到湿度数据:{humi}%)defsend_command(self,command):发送命令try:ifisinstance(command,str):commandcommand.encode(utf-8)self.ser.write(command)print(f发送命令:{command})exceptExceptionase:print(f发送命令失败:{e})defstop_monitoring(self):停止监视self.runningFalseifself.thread:self.thread.join()self.ser.close()print(串口监视器已停止)# 使用示例defmonitor_example():monitorSerialMonitor(COM3,9600)monitor.start_monitoring()try:# 主线程可以继续执行其他操作foriinrange(5):monitor.send_command(fGET DATA{i}\n)time.sleep(2)# 等待用户输入退出input(按回车键停止监视...)finally:monitor.stop_monitoring()# 运行示例monitor_example()5. 高级功能5.1 串口扫描和自动重连importserialimportserial.tools.list_portsimporttimeimportthreadingclassSmartSerialConnection:智能串口连接管理器def__init__(self,portNone,baudrate9600):self.portport self.baudratebaudrate self.serNoneself.connectedFalseself.reconnect_threadNoneself.reconnect_flagFalsedefconnect(self,portNone):连接到串口ifport:self.portportifnotself.port:print(未指定串口号尝试自动检测...)self.portself._auto_detect_port()ifnotself.port:print(无法找到可用串口)returnFalsetry:self.serserial.Serial(portself.port,baudrateself.baudrate,timeout1,write_timeout1)# 测试连接time.sleep(2)# 等待设备初始化self._test_connection()self.connectedTrueprint(f成功连接到{self.port})returnTrueexceptserial.SerialExceptionase:print(f连接失败:{e})self.connectedFalsereturnFalsedef_auto_detect_port(self):自动检测串口portsserial.tools.list_ports.comports()forportinports:print(f尝试端口:{port.device})try:# 尝试打开端口test_serserial.Serial(port.device,self.baudrate,timeout0.5)time.sleep(1)# 发送测试命令test_ser.write(bAT\r\n)time.sleep(0.1)# 检查响应iftest_ser.in_waiting0:responsetest_ser.read(test_ser.in_waiting)print(f端口{port.device}响应:{response})test_ser.close()returnport.device test_ser.close()except:continuereturnNonedef_test_connection(self):测试连接是否正常ifnotself.ser:returnFalsetry:# 清空缓冲区self.ser.reset_input_buffer()self.ser.reset_output_buffer()# 发送测试命令self.ser.write(bAT\r\n)time.sleep(0.1)# 尝试读取响应ifself.ser.in_waiting0:returnTrueelse:returnFalseexcept:returnFalsedefstart_auto_reconnect(self,interval5):启动自动重连ifself.reconnect_threadandself.reconnect_thread.is_alive():print(自动重连已在运行)returnself.reconnect_flagTrueself.reconnect_threadthreading.Thread(targetself._reconnect_loop,args(interval,),daemonTrue)self.reconnect_thread.start()print(自动重连已启动)def_reconnect_loop(self,interval):重连循环whileself.reconnect_flag:ifnotself.connected:print(尝试重新连接...)self.connect()# 定期检查连接状态ifself.connected:ifnotself._test_connection():print(连接断开)self.connectedFalseifself.ser:self.ser.close()time.sleep(interval)defstop_auto_reconnect(self):停止自动重连self.reconnect_flagFalseifself.reconnect_thread:self.reconnect_thread.join(timeout2)print(自动重连已停止)defsend(self,data):发送数据ifnotself.connectedornotself.ser:print(未连接无法发送数据)returnFalsetry:ifisinstance(data,str):datadata.encode(utf-8)self.ser.write(data)returnTrueexceptExceptionase:print(f发送失败:{e})self.connectedFalsereturnFalsedefreceive(self,timeout1):接收数据ifnotself.connectedornotself.ser:print(未连接无法接收数据)returnNonetry:# 设置临时超时original_timeoutself.ser.timeout self.ser.timeouttimeout dataself.ser.readline()# 恢复原始超时self.ser.timeoutoriginal_timeoutifdata:try:returndata.decode(utf-8).strip()except:returndata.hex()else:returnNoneexceptExceptionase:print(f接收失败:{e})returnNonedefclose(self):关闭连接self.stop_auto_reconnect()self.connectedFalseifself.ser:self.ser.close()print(连接已关闭)# 使用示例defsmart_connection_example():serial_connSmartSerialConnection(baudrate9600)# 尝试连接ifserial_conn.connect(COM3):print(连接成功)# 启动自动重连serial_conn.start_auto_reconnect(interval10)# 发送和接收数据foriinrange(10):serial_conn.send(fTEST{i}\n)responseserial_conn.receive()ifresponse:print(f收到:{response})time.sleep(1)# 停止并关闭serial_conn.close()else:print(连接失败)# 运行示例smart_connection_example()5.2 数据协议解析器importserialimportstructimportcrcmodimporttimeclassSerialProtocol:串口协议处理器def__init__(self,ser):self.serser self.bufferbytearray()# 初始化CRC计算self.crc16crcmod.mkCrcFun(0x18005,revTrue,initCrc0xFFFF,xorOut0x0000)defsend_packet(self,command,dataNone):发送数据包ifdataisNone:databytearray()# 构建数据包packetbytearray()# 起始字节packet.append(0xAA)packet.append(0x55)# 命令字节packet.append(command)# 数据长度packet.append(len(data))# 数据packet.extend(data)# 计算CRCcrcself.crc16(bytes(packet))packet.extend(struct.pack(H,crc))# 发送数据包self.ser.write(packet)print(f发送数据包:{packet.hex()})returnpacketdefreceive_packet(self,timeout1):接收数据包start_timetime.time()whiletime.time()-start_timetimeout:# 读取可用数据ifself.ser.in_waiting0:self.buffer.extend(self.ser.read(self.ser.in_waiting))# 尝试解析数据包packetself._parse_buffer()ifpacket:returnpacketreturnNonedef_parse_buffer(self):解析缓冲区中的数据包# 查找起始字节start_index-1foriinrange(len(self.buffer)-1):ifself.buffer[i]0xAAandself.buffer[i1]0x55:start_indexibreakifstart_index-1:returnNone# 检查数据包长度iflen(self.buffer[start_index:])6:# 最小包长度returnNone# 获取数据长度data_lengthself.buffer[start_index3]total_length4data_length2# 头部4字节 数据 CRC2字节# 检查是否接收到完整数据包iflen(self.buffer[start_index:])total_length:returnNone# 提取完整数据包packetself.buffer[start_index:start_indextotal_length]# 验证CRCcrc_receivedstruct.unpack(H,packet[-2:])[0]crc_calculatedself.crc16(packet[:-2])ifcrc_received!crc_calculated:print(CRC校验失败)# 移除错误的数据包self.bufferself.buffer[start_index1:]returnNone# 解析数据包parsed{command:packet[2],data_length:packet[3],data:packet[4:4data_length],crc:crc_received,raw:packet}# 从缓冲区中移除已处理的数据self.bufferself.buffer[start_indextotal_length:]returnparseddefsend_command_with_response(self,command,dataNone,timeout1,retries3):发送命令并等待响应forattemptinrange(retries):print(f尝试{attempt1}/{retries})# 发送命令self.send_packet(command,data)# 等待响应responseself.receive_packet(timeout)ifresponse:print(f收到响应: 命令{response[command]}, 数据{response[data].hex()})returnresponseprint(未收到响应重试...)time.sleep(0.5)print(所有重试均失败)returnNone# 使用示例defprotocol_example():try:serserial.Serial(COM3,115200,timeout1)protocolSerialProtocol(ser)# 示例1: 发送读取传感器命令responseprotocol.send_command_with_response(command0x01,# 读取传感器命令databytearray([0x00,0x01]),# 传感器地址timeout2,retries3)ifresponse:# 解析传感器数据iflen(response[data])4:temperature,humiditystruct.unpack(HH,response[data][:4])temperaturetemperature/10.0humidityhumidity/10.0print(f温度:{temperature}°C, 湿度:{humidity}%)# 示例2: 发送控制命令control_databytearray([0x01,0x01])# 打开设备protocol.send_command_with_response(command0x02,# 控制命令datacontrol_data)ser.close()exceptExceptionase:print(f错误:{e})# 运行示例protocol_example()6. 错误处理和调试importserialimporttimeimportlogging# 配置日志logging.basicConfig(levellogging.DEBUG,format%(asctime)s - %(name)s - %(levelname)s - %(message)s)loggerlogging.getLogger(__name__)classRobustSerial:健壮的串口通信类包含错误处理和调试功能def__init__(self,port,baudrate9600,**kwargs):self.portport self.baudratebaudrate self.kwargskwargs self.serNoneself.error_count0self.max_errors10self.last_error_time0self.error_cooldown5# 错误冷却时间(秒)defopen(self):打开串口try:logger.info(f尝试打开串口{self.port})self.serserial.Serial(portself.port,baudrateself.baudrate,**self.kwargs)# 检查串口是否真正打开ifnotself.ser.is_open:raiseserial.SerialException(串口未成功打开)logger.info(f串口{self.port}已成功打开)# 清空缓冲区self._flush_buffers()returnTrueexceptserial.SerialExceptionase:logger.error(f打开串口失败:{e})self._handle_error(open,e)returnFalseexceptExceptionase:logger.error(f未知错误:{e})self._handle_error(unknown,e)returnFalsedef_flush_buffers(self):清空输入输出缓冲区try:ifself.ser:self.ser.reset_input_buffer()self.ser.reset_output_buffer()logger.debug(缓冲区已清空)exceptExceptionase:logger.warning(f清空缓冲区失败:{e})defwrite(self,data,retries3):写入数据带重试机制forattemptinrange(retries):try:ifnotself.serornotself.ser.is_open:logger.warning(串口未打开尝试重新打开)ifnotself.open():continueifisinstance(data,str):datadata.encode(utf-8)bytes_writtenself.ser.write(data)logger.debug(f写入{bytes_written}字节:{data[:50]}...)# 确保数据发送完成self.ser.flush()returnbytes_writtenexceptserial.SerialTimeoutException:logger.warning(f写入超时尝试{attempt1}/{retries})time.sleep(0.1)exceptserial.SerialExceptionase:logger.error(f写入失败:{e})self._handle_error(write,e)# 尝试重新打开串口time.sleep(0.5)self.open()exceptExceptionase:logger.error(f未知写入错误:{e})self._handle_error(write_unknown,e)breaklogger.error(f写入失败已重试{retries}次)return0defread(self,sizeNone,timeoutNone):读取数据try:ifnotself.serornotself.ser.is_open:logger.warning(串口未打开)returnNone# 设置临时超时original_timeoutself.ser.timeoutiftimeoutisnotNone:self.ser.timeouttimeoutifsize:dataself.ser.read(size)else:# 读取所有可用数据dataself.ser.read(self.ser.in_waiting)# 恢复原始超时iftimeoutisnotNone:self.ser.timeoutoriginal_timeoutifdata:logger.debug(f读取{len(data)}字节)returndataelse:returnNoneexceptserial.SerialExceptionase:logger.error(f读取失败:{e})self._handle_error(read,e)returnNoneexceptExceptionase:logger.error(f未知读取错误:{e})self._handle_error(read_unknown,e)returnNonedefreadline(self,timeoutNone):读取一行try:ifnotself.serornotself.ser.is_open:logger.warning(串口未打开)returnNone# 设置临时超时original_timeoutself.ser.timeoutiftimeoutisnotNone:self.ser.timeouttimeout lineself.ser.readline()# 恢复原始超时iftimeoutisnotNone:self.ser.timeoutoriginal_timeoutifline:logger.debug(f读取行:{line[:50]}...)returnlineelse:returnNoneexceptserial.SerialExceptionase:logger.error(f读取行失败:{e})self._handle_error(readline,e)returnNonedef_handle_error(self,error_type,error):处理错误current_timetime.time()# 检查是否在冷却期内ifcurrent_time-self.last_error_timeself.error_cooldown:returnself.error_count1self.last_error_timecurrent_time logger.error(f错误类型:{error_type}, 错误:{error})logger.error(f错误计数:{self.error_count}/{self.max_errors})# 如果错误过多可能需要采取特殊措施ifself.error_countself.max_errors:logger.critical(错误过多建议检查硬件连接)self.error_count0# 重置计数器defclose(self):关闭串口try:ifself.serandself.ser.is_open:self.ser.close()logger.info(串口已关闭)exceptExceptionase:logger.error(f关闭串口失败:{e})defget_info(self):获取串口信息ifnotself.ser:return串口未打开info{port:self.ser.port,baudrate:self.ser.baudrate,bytesize:self.ser.bytesize,parity:self.ser.parity,stopbits:self.ser.stopbits,timeout:self.ser.timeout,is_open:self.ser.is_open,in_waiting:self.ser.in_waitingifself.ser.is_openelse0,error_count:self.error_count}returninfo# 调试工具函数defdebug_serial_port(port,baudrate9600):调试串口print(f 串口调试:{port})# 1. 检查端口是否存在importserial.tools.list_ports ports[p.deviceforpinserial.tools.list_ports.comports()]ifportnotinports:print(f警告: 端口{port}不在可用端口列表中)print(f可用端口:{ports})# 2. 尝试打开端口try:serserial.Serial(port,baudrate,timeout1)print(f✓ 端口可以打开)# 3. 获取端口信息print(f端口信息:)print(f 名称:{ser.name})print(f 波特率:{ser.baudrate})print(f 数据位:{ser.bytesize})print(f 校验位:{ser.parity})print(f 停止位:{ser.stopbits})# 4. 测试读写print(\n测试读写功能:)# 清空缓冲区ser.reset_input_buffer()ser.reset_output_buffer()# 发送测试数据test_databAT\r\nser.write(test_data)print(f 发送:{test_data})# 等待响应time.sleep(0.1)# 读取响应ifser.in_waiting0:responseser.read(ser.in_waiting)print(f 接收:{response})print(f 解码:{response.decode(utf-8,errorsignore)})else:print( 警告: 没有收到响应)# 5. 测试不同波特率print(\n测试不同波特率:)baudrates[9600,19200,38400,57600,115200]forbaudinbaudrates:try:ser.baudratebaud ser.write(bAT\r\n)time.sleep(0.1)ifser.in_waiting0:responseser.read(ser.in_waiting)print(f{baud}bps: 收到响应 ({len(response)}字节))else:print(f{baud}bps: 无响应)exceptExceptionase:print(f{baud}bps: 错误 -{e})ser.close()print(\n✓ 调试完成)exceptserial.SerialExceptionase:print(f✗ 无法打开端口:{e})print(\n可能的解决方案:)print(1. 检查端口名称是否正确)print(2. 检查设备是否连接)print(3. 检查是否有其他程序占用该端口)print(4. 检查驱动程序是否安装)print(5. 尝试以管理员权限运行)exceptExceptionase:print(f✗ 未知错误:{e})# 使用示例defdebug_example():# 调试特定端口debug_serial_port(COM3,9600)# 使用健壮的串口类logger.info(开始测试健壮串口类)robust_serRobustSerial(portCOM3,baudrate9600,timeout1,write_timeout1)ifrobust_ser.open():# 发送数据robust_ser.write(Hello Device!\n)# 读取响应responserobust_ser.readline(timeout2)ifresponse:print(f收到响应:{response.decode(utf-8,errorsignore)})# 获取信息inforobust_ser.get_info()print(f串口信息:{info})robust_ser.close()# 运行示例debug_example()7. 实际应用示例7.1 Arduino 通信示例importserialimporttimeimportjsonfromdatetimeimportdatetimeclassArduinoCommunicator:Arduino 串口通信类def__init__(self,port,baudrate9600):self.portport self.baudratebaudrate self.serNoneself.connectedFalsedefconnect(self):连接到 Arduinotry:self.serserial.Serial(self.port,self.baudrate,timeout1)time.sleep(2)# 等待 Arduino 重启self.connectedTrueprint(f已连接到 Arduino ({self.port}))returnTrueexceptExceptionase:print(f连接失败:{e})returnFalsedefsend_command(self,command,valueNone):发送命令到 Arduinoifnotself.connected:print(未连接)returnNonetry:# 构建命令字符串ifvalueisnotNone:cmd_strf{command}:{value}\nelse:cmd_strf{command}\n# 发送命令self.ser.write(cmd_str.encode(utf-8))print(f发送命令:{cmd_str.strip()})# 读取响应responseself.ser.readline().decode(utf-8,errorsignore).strip()ifresponse:print(fArduino 响应:{response})returnresponseexceptExceptionase:print(f通信错误:{e})returnNonedefread_sensors(self):读取所有传感器数据# 发送读取命令self.ser.write(bREAD_ALL\n)# 等待响应time.sleep(0.5)# 读取所有可用数据data_lines[]whileself.ser.in_waiting0:lineself.ser.readline().decode(utf-8,errorsignore).strip()ifline:data_lines.append(line)# 解析传感器数据sensors{}forlineindata_lines:if:inline:key,valueline.split(:,1)try:# 尝试转换为数值sensors[key.strip()]float(value.strip())except:sensors[key.strip()]value.strip()returnsensorsdefcontrol_led(self,led_id,state):控制 LEDstate_strONifstateelseOFFreturnself.send_command(fLED{led_id},state_str)defread_analog(self,pin):读取模拟引脚值returnself.send_command(fANALOG{pin})defset_pwm(self,pin,value):设置 PWM 值 (0-255)returnself.send_command(fPWM{pin},str(value))defclose(self):关闭连接ifself.ser:self.ser.close()self.connectedFalseprint(连接已关闭)# Arduino 数据采集应用defarduino_data_logger():Arduino 数据记录器arduinoArduinoCommunicator(COM3,9600)ifnotarduino.connect():print(无法连接到 Arduino)returntry:# 创建数据文件timestampdatetime.now().strftime(%Y%m%d_%H%M%S)filenamefarduino_data_{timestamp}.csvwithopen(filename,w)asf:# 写入 CSV 头部f.write(timestamp,temperature,humidity,light,analog0,analog1\n)print(f数据将保存到:{filename})print(开始数据采集 (按 CtrlC 停止)...)# 数据采集循环whileTrue:try:# 读取传感器数据sensorsarduino.read_sensors()ifsensors:# 获取时间戳current_timedatetime.now().strftime(%Y-%m-%d %H:%M:%S)# 准备数据行data_row{timestamp:current_time,temperature:sensors.get(TEMP,0),humidity:sensors.get(HUMI,0),light:sensors.get(LIGHT,0),analog0:sensors.get(A0,0),analog1:sensors.get(A1,0)}# 显示数据print(f[{current_time}] f温度:{data_row[temperature]:.1f}°C, f湿度:{data_row[humidity]:.1f}%, f光照:{data_row[light]})# 保存到文件withopen(filename,a)asf:csv_linef{data_row[timestamp]},csv_linef{data_row[temperature]},csv_linef{data_row[humidity]},csv_linef{data_row[light]},csv_linef{data_row[analog0]},csv_linef{data_row[analog1]}\nf.write(csv_line)# 间隔时间time.sleep(2)exceptKeyboardInterrupt:print(\n数据采集已停止)breakexceptExceptionase:print(f数据采集错误:{e})time.sleep(1)finally:arduino.close()print(f数据已保存到:{filename})# 运行示例arduino_data_logger()7.2 GPS 数据解析示例importserialimporttimeimportpynmea2classGPSReader:GPS 数据读取器def__init__(self,port,baudrate9600):self.portport self.baudratebaudrate self.serNoneself.runningFalsedefstart(self):开始读取 GPS 数据try:self.serserial.Serial(self.port,self.baudrate,timeout1)self.runningTrueprint(GPS 接收器已启动)# 读取数据self._read_loop()exceptExceptionase:print(f启动失败:{e})def_read_loop(self):读取循环bufferwhileself.running:try:# 读取数据ifself.ser.in_waiting0:dataself.ser.read(self.ser.in_waiting).decode(utf-8,errorsignore)bufferdata# 处理完整的 NMEA 语句while\ninbuffer:line,bufferbuffer.split(\n,1)lineline.strip()ifline.startswith($):self._parse_nmea(line)time.sleep(0.1)exceptKeyboardInterrupt:print(\n正在停止 GPS 接收器...)self.stop()breakexceptExceptionase:print(f读取错误:{e})time.sleep(1)def_parse_nmea(self,sentence):解析 NMEA 语句try:msgpynmea2.parse(sentence)# 处理不同类型的 NMEA 语句ifisinstance(msg,pynmea2.types.talker.RMC):# 推荐最小定位信息ifmsg.statusA:# 数据有效print(f位置:{msg.latitude:.6f},{msg.longitude:.6f})print(f速度:{msg.spd_over_grnd}节, 航向:{msg.true_course}°)print(f时间:{msg.datetime})elifisinstance(msg,pynmea2.types.talker.GGA):# GPS 定位信息print(f质量:{msg.gps_qual}, 卫星数:{msg.num_sats})print(f海拔:{msg.altitude}{msg.altitude_units})print(f大地水准面高度:{msg.geo_sep}{msg.geo_sep_units})elifisinstance(msg,pynmea2.types.talker.GSA):# 当前卫星信息print(f模式:{msg.mode}, 定位类型:{msg.mode_fix_type})print(fPDOP:{msg.pdop}, HDOP:{msg.hdop}, VDOP:{msg.vdop})exceptpynmea2.ParseErrorase:# 忽略解析错误passexceptExceptionase:print(f解析错误:{e})defstop(self):停止读取self.runningFalseifself.ser:self.ser.close()print(GPS 接收器已停止)# 使用示例defgps_example():GPS 数据读取示例gpsGPSReader(COM4,4800)# 典型 GPS 波特率为 4800try:# 开始读取 GPS 数据gps.start()exceptExceptionase:print(fGPS 示例错误:{e})# 运行示例gps_example()7.3 串口聊天程序importserialimportthreadingimporttimeimportsysclassSerialChat:简单的串口聊天程序def__init__(self,port,baudrate9600):self.portport self.baudratebaudrate self.serNoneself.runningFalseself.receive_threadNonedefstart(self):开始聊天try:# 打开串口self.serserial.Serial(portself.port,baudrateself.baudrate,timeout0.1)self.runningTrue# 启动接收线程self.receive_threadthreading.Thread(targetself._receive_messages)self.receive_thread.daemonTrueself.receive_thread.start()print(f 串口聊天 ({self.port}) )print(输入消息并按回车发送)print(输入 quit 或 exit 退出)print(*40)# 发送循环self._send_loop()exceptExceptionase:print(f启动失败:{e})def_receive_messages(self):接收消息线程bufferwhileself.running:try:ifself.serandself.ser.in_waiting0:dataself.ser.read(self.ser.in_waiting).decode(utf-8,errorsignore)bufferdata# 处理完整行while\ninbuffer:line,bufferbuffer.split(\n,1)lineline.strip()ifline:print(f\n[接收]{line})print([发送] ,end,flushTrue)time.sleep(0.01)exceptExceptionase:ifself.running:# 只有在运行中才报告错误print(f接收错误:{e})time.sleep(0.1)def_send_loop(self):发送消息循环try:whileself.running:# 获取用户输入messageinput([发送] )# 检查退出命令ifmessage.lower()in[quit,exit,q]:print(正在退出...)self.stop()break# 发送消息ifmessageandself.ser:self.ser.write((message\n).encode(utf-8))exceptKeyboardInterrupt:print(\n正在退出...)self.stop()exceptExceptionase:print(f发送错误:{e})self.stop()defstop(self):停止聊天self.runningFalseifself.receive_thread:self.receive_thread.join(timeout1)ifself.ser:self.ser.close()print(聊天已结束)# 使用示例defchat_example():串口聊天示例iflen(sys.argv)1:portsys.argv[1]else:portinput(请输入串口号 (如 COM3): )iflen(sys.argv)2:baudrateint(sys.argv[2])else:baudrateint(input(请输入波特率 (默认 9600): )or9600)chatSerialChat(port,baudrate)chat.start()if__name____main__:chat_example()总结本教程涵盖了 PySerial 的主要功能基础使用安装、打开串口、基本读写串口配置参数设置、自动检测端口数据操作文本和二进制数据通信高级功能协议处理、错误处理、自动重连实际应用Arduino 通信、GPS 解析、聊天程序最佳实践建议错误处理始终使用 try-except 包装串口操作超时设置合理设置 timeout 和 write_timeout缓冲区管理定期清空输入输出缓冲区资源清理确保在程序结束时关闭串口线程安全在多线程环境中使用适当的锁机制常见问题解决无法打开串口检查端口名称、权限、是否被其他程序占用数据乱码检查波特率设置、数据编码通信不稳定检查硬件连接、线缆质量、接地速度慢调整波特率优化读写策略