当前位置:首页 > 物联网 > Freak嵌入式

摘要:

本文主要介绍了在使用Python面向对象编程时,如何实现组合关系,同时对比了组合关系和继承关系的优缺点,并讲解了如何通过csv模块来保存Python接收/生成的数据。

文档和代码获取:

本文档主要介绍如何使用 Python 进行面向对象编程,需要读者对 Python 语法和单片机开发具有基本了解。相比其他讲解 Python 面向对象编程的博客或书籍而言,本文档更加详细、侧重于嵌入式上位机应用,以上位机和下位机的常见串口数据收发、数据处理、动态图绘制等为应用实例,同时使用 Sourcetrail代码软件对代码进行可视化阅读便于读者理解。

正文:

前面讲了面向类与对象的继承,知道了继承是一种什么“是”什么的关系。然而类与类之间还有另一种关系,这就是组合。组合是将几个对象收集在一起生成一个新对象的行为。当一个对象是另外一个对象的一部分时,组合通常是不错的选择。

例如,汽车是由发动机、传动装置、启动装置、车前灯、挡风玻璃以及其他部件组成的,发动机又是由活塞、曲柄轴和阀门等组合而成的。汽车是发动机等多个元器件的抽象,而发动机是活塞等元器件的抽象,二者处于不同的层次而又有彼此交互的接口,组合是提供不同抽象层的好办法。汽车对象可以提供司机所需要的接口,同时也能够获取内在组成部分,从而为机械师提供适合操作的深层抽象。当然,如果机械师需要更多信息来诊断问题或调整发动机,这些组成部分也可以进一步被细分。

总的来说,组合就是让不同的类混合并且加入其他类中来增加功能和代码重用性,这种适用于由多个小类组成一个大类的情况,并且不需要对小类进行太多修改。在前面示例中,我们实现了主机的串口收发和绘图功能,在实际应用中,我们往往需要将传感器数据存储到文件中,以便后续的查看和处理,很明显前面的传感器数据为一维的时间序列数据,适合存储为表格类型(即列标题为索引和值),我们通常将该类数据保存为csv格式文件,csv是一种字符串文件的格式,它组织数据的语法就是在字符串之间加分隔符(行与行之间是加换行符,同行字符之间是加逗号分隔),可以用任意的文本编辑器打开(如记事本),也可以用Excel打开,还可以通过Excel把文件另存为csv格式。用csv格式存储数据,读写比较方便,易于实现,文件也会比Excel文件小。但csv文件缺少Excel文件本身的很多功能,如不能嵌入图像和图表,不能生成公式等等。

这里,我们首先定义一个FileIOClass类,其中具有初始化方法、写入传感器数据到文件方法和关闭文件方法,示例代码如下:

import csv# 使用typing模块提供的复合注解功能from typing import List class FileIOClass: def __init__(self,path:str="G:\\Python面向对象编程\\Demo\\file.csv"): ''' 初始化csv文件和列标题 :param path: 文件路径和文件名 ''' self.path   = path # path为输出路径和文件名,newline=''是为了不出现空行 self.csvFile = open(path, "w+", newline='') # rowname为列名,index-索引,data-数据 self.rowname = ['index', 'data'] # 返回一个writer对象,将用户的数据在给定的文件型对象上转换为带分隔符的字符串 self.writer = csv.writer(self.csvFile) # 写入csv文件的列标题 self.writer.writerow( self.rowname)  def WriteFile(self,index:List[int],data:List[int])->None: ''' :param index: 传感器索引列表 :param data:  传感器数据列表 :return: ''' writedatalist = [] for i in range(len(data)): writedatalist.append([index[i],data[i]]) # 将列表中的每个元素将被写入CSV文件的一列中 self.writer.writerow(writedatalist[i])  def CloseFile(self)->None: ''' 关闭文件 :return: None ''' self.csvFile.close()

这里,在初始化方法中,我们需要传入文件保存路径。之后创建一个writer对象,将用户的数据在给定的文件型对象上转换为带分隔符的字符串,同时写入csv文件的列标题。在WriteFile方法中传入数据的索引列表用于表示数据的先后顺序,之后是数据列表(这里的类型注解需要使用typing模块提供的复合注解功能),并循环将每个元素将被写入CSV文件的一列中,最后定义了文件的关闭方法。

在主函数中,我们创建FileIOClass对象,写入模拟传感器数据后关闭文件,以下为示例代码和运行效果:

if __name__ == '__main__': path  = "G:\\Python面向对象编程\\Demo\\file.csv" data = [11,42,307,46,55,61,78,80,19,11] index = [count for count in range(len(data))]  file = FileIOClass(path) file.WriteFile(index,data) file.CloseFile()

这里,我们可以直接在MasterClass类的初始化中创建FileIOClass类的实例化对象来实现组合。代码如下:

 # 文件保存路径 self.savepath = "G:\\Python面向对象编程\\Demo\\file.csv" # 创建FileIOClass类的实例化对象 self.fileio = FileIOClass(self.savepath)

通过sourcetrail,我们可以清晰看到类之间的组合与继承关系:

在主程序中,我们在主机接收10次数据后,将数据保存到file.csv中:

if __name__ == "__main__": # 创建数据列表 datalist = [] m = MasterClass(state = MasterClass.IDLE_STATE, port = "COM17", wintitle = "Basic plotting examples", plottitle = "Updating plot", width = 1000, height = 600) m.StartMaster() m.SendSensorCMD(MasterClass.SENDID_CMD) m.RecvSensorID()  # 循环10次接收数据 for i in range(10): m.SendSensorCMD(MasterClass.SENDVALUE_CMD) value = m.RecvSensorValue() datalist.append(value) indexlist = [count for count in range(len(datalist))]  # 写入数据 m.fileio.WriteFile(indexlist,datalist) m.fileio.CloseFile()

如下为运行效果:

目前,整个文件的完整代码如下,可以看到单单是这么一个简单程序就有了三百多行,对于代码查找修改来讲,非常不便。同时我们注意到,几个不同类之间似乎功能并不相同,不应该放到一个文件中。下一节我们将会说如何利用Python中的模块和包来组织我们的代码。

完整代码如下:

# 串口相关库import serialimport serial.tools.list_ports# 队列相关import queueimport random# 日志输出相关库import logging# 曲线作图相关库import pyqtgraph as pgimport numpy as npfrom pyqtgraph.Qt import QtCore# 文件读写相关库import csv# 使用typing模块提供的复合注解功能from typing import Listimport time  # # 设置日志输出级别# logging.basicConfig(level=logging.DEBUG)# 在配置下日志输出目标文件和日志格式LOG_FORMAT="%(asctime)s-%(levelname)s-%(message)s"logging.basicConfig(filename='my.log',level=logging.DEBUG,format=LOG_FORMAT) class SerialClass: # 限定SerialClass对象只能绑定以下属性 __slots__ = ('dev','_SerialClass__devstate') # 初始化 # 使用默认参数 def __init__(self, devport:str     = "COM17", devbaudrate:int = 115200, devbytesize:int = serial.EIGHTBITS, devparity :str = serial.PARITY_NONE, devstopbits:int = serial.STOPBITS_ONE): # 直接传入serial.Serial()类 self.dev             = serial.Serial() self.dev.port        = devport self.dev.baudrate    = devbaudrate self.dev.bytesize    = devbytesize self.dev.parity      = devparity self.dev.stopbits    = devstopbits # 表示串口设备的状态-打开或者关闭 # 初始化时为关闭 self.__devstate      = False  print("SerialClass init") logging.info("SerialClass init")  # 取值方法 @property def devstate(self): return self.__devstate  # 打开串口 def OpenSerial(self): print("SerialClass-OpenSerial") logging.info("SerialClass-OpenSerial") self.dev.open() self.__devstate = True  # 关闭串口 def CloseSerial(self): print("SerialClass-CloseSerial") logging.info("SerialClass-CloseSerial") self.dev.close() self.__devstate = False  # 串口读取 def ReadSerial(self): print("SerialClass-ReadSerial") logging.info("SerialClass-ReadSerial") if self.__devstate: # 阻塞方式读取 # 按行读取 data = self.dev.readline() # 收到为二进制数据 # 用utf-8编码将二进制数据解码为unicode字符串 # 字符串转为int类型 data = int(data.decode('utf-8', 'replace')) return data  # 串口写入 def WriteSerial(self,write_data): print("SerialClass-WriteSerial") logging.info("SerialClass-WriteSerial") if self.__devstate: # 非阻塞方式写入 self.dev.write(write_data.encode()) # 输出换行符 # write的输入参数必须是bytes 格式 # 字符串数据需要encode()函数将其编码为二进制数据,然后才可以顺利发送 # \r\n表示换行回车 self.dev.write('\r\n'.encode())  def RetSerialState(self): if self.dev.isOpen(): self.__devstate = True return True else: self.__devstate = False return False class PlotClass: # 绘图类初始化 def __init__(self,wintitle:str="Basic plotting examples",plottitle:str="Updating plot",width:int=1000,height:int=600): ''' 用于初始化Plot类 :param wintitle:  窗口标题 :param plottitle: 图层标题 :param width:     窗口宽度 :param height:    窗口高度 ''' # Qt应用实例对象 self.app        = None # 窗口对象 self.win        = None # 设置窗口标题 self.title      = wintitle # 设置窗口尺寸 self.width      = width self.height     = height # 传感器数据 self.value      = 0 # 计数变量 self.__count    = 0 # 传感器数据缓存列表 self.valuelist  = [] # 绘图曲线 self.curve      = None # 图层对象 self.plotob     = None # 图层标题 self.plottitle  = plottitle # 定时器对象 self.timer = QtCore.QTimer() # 定时时间 self.time  = 0 # Qt应用和窗口初始化 self.appinit()  print("PLOT INIT SUCCESS") logging.info("PLOT INIT SUCCESS")  # 应用程序初始化 def appinit(self): ''' 用于qt应用程序初始化,添加窗口、曲线和图层 :return: None ''' # 创建一个Qt应用,并返回该应用的实例对象 self.app = pg.mkQApp("Plotting Example") # 生成多面板图形 # show:(bool) 如果为 True,则在创建小部件后立即显示小部件。 # title:(str 或 None)如果指定,则为此小部件设置窗口标题。 self.win = pg.GraphicsLayoutWidget(show=True, title=self.title) # 设置窗口尺寸 self.win.resize(self.width, self.height) # 进行窗口全局设置,setConfigOptions一次性配置多项参数 # antialias启用抗锯齿,useNumba对图像进行加速 pg.setConfigOptions(antialias=True, useNumba=True)  # 添加图层 self.plotob = self.win.addPlot(title=self.plottitle) # 添加曲线 self.curve = self.plotob.plot(pen='y')  # 接收数据 def GetValue(self,value): ''' 用于接收传感器数据,加入缓存列表 :param value: 传感器数据 :return: None ''' self.value = value # 加入数据缓存列表 self.valuelist.append(value) print("PLOT RECV DATA : "+str(self.value)) logging.info("PLOT RECV DATA : "+str(self.value))  # 更新曲线数据 def DataUpdate(self): ''' 用于定时进行曲线更新,这里模拟绘制正弦曲线 :return: None ''' # 模拟绘制正弦曲线 # 计数变量更新 self.__count = self.__count + 0.1 self.value = np.sin(self.__count) self.GetValue(self.value) # 将数据转化为图形 self.curve.setData(self.valuelist)  # 设置定时更新 def SetUpdate(self,time:int = 100): ''' 设置定时更新任务 :param time: 定时的时间 :return: None ''' # 定时器结束,触发DataUpdate方法 self.timer.timeout.connect(self.DataUpdate) # 启动定时器 self.timer.start(time) # 定时时间 self.time = time print("PLOT SET UPDATA") logging.info("PLOT SET UPDATA") # 进入主事件循环并等待 pg.exec() class FileIOClass: def __init__(self,path:str="G:\\Python面向对象编程\\Demo\\file.csv"): ''' 初始化csv文件和列标题 :param path: 文件路径和文件名 ''' self.path   = path # path为输出路径和文件名,newline=''是为了不出现空行 self.csvFile = open(path, "w+", newline='') # rowname为列名,index-索引,data-数据 self.rowname = ['index', 'data'] # 返回一个writer对象,将用户的数据在给定的文件型对象上转换为带分隔符的字符串 self.writer = csv.writer(self.csvFile) # 写入csv文件的列标题 self.writer.writerow(self.rowname)  def WriteFile(self,index:List[int],data:List[int])->None: ''' :param index: 传感器索引列表 :param data:  传感器数据列表 :return: ''' writedatalist = [] for i in range(len(data)): writedatalist.append([index[i],data[i]]) # 将列表中的每个元素将被写入CSV文件的一列中 self.writer.writerow(writedatalist[i])  def CloseFile(self)->None: ''' 关闭文件 :return: None ''' self.csvFile.close() class SensorClass(SerialClass): # 类变量: #   RESPOND_MODE -响应模式-0 #   LOOP_MODE    -循环模式-1 RESPOND_MODE,LOOP_MODE = (0,1) # 类变量: #   START_CMD       - 开启命令      -0 #   STOP_CMD        - 关闭命令      -1 #   SENDID_CMD      - 发送ID命令    -2 #   SENDVALUE_CMD   - 发送数据命令   -3 START_CMD,STOP_CMD,SENDID_CMD,SENDVALUE_CMD = (0,1,2,3)  # 类的初始化 def __init__(self,port:str = "COM11",id:int = 0,state:int = RESPOND_MODE): # 调用父类的初始化方法,super() 函数将父类和子类连接 super().__init__(port) self.sensorvalue = 0 self.sensorid    = id self.sensorstate = state print("Sensor Init") logging.info("Sensor Init")  @staticmethod # 判断传感器ID号是否正确:这里判断ID号是否在0到99之间 def IsTrueID(id:int = 0): if id >= 0 and id <= 99: print("Sensor ID True") return True else: print("Sensor ID False") return False  # 传感器上电初始化 def InitSensor(self): # 传感器上电初始化工作 # 同时输出ID号以及状态 print("Sensor %d Init complete : %d"%(self.sensorid,self.sensorstate)) logging.info("Sensor %d Init complete : %d"%(self.sensorid,self.sensorstate))  # 开启传感器 def StartSensor(self): super().OpenSerial() print("Sensor %d start serial %s "%(self.sensorid,self.dev.port)) logging.info("Sensor %d start serial %s "%(self.sensorid,self.dev.port))  # 停止传感器 def StopSensor(self): super().CloseSerial() print("Sensor %d close serial %s " % (self.sensorid, self.dev.port)) logging.info("Sensor %d close serial %s " % (self.sensorid, self.dev.port))  # 发送传感器ID号 def SendSensorID(self): super().WriteSerial(str(self.sensorid)) print("Sensor %d send id "%self.sensorid) logging.info("Sensor %d send id "%self.sensorid)  # 发送传感器数据 def SendSensorValue(self): # 生成[1, 10]内的随机整数 data = random.randint(1, 10) super().WriteSerial(str(data)) print("Sensor %d send data  %d" % (self.sensorid,data)) logging.info("Sensor %d send data  %d" % (self.sensorid,data))  # 接收主机指令 def RecvMasterCMD(self): cmd = super().ReadSerial() print("Sensor %d recv cmd %d " % (self.sensorid,cmd)) logging.info("Sensor %d recv cmd %d " % (self.sensorid,cmd)) return cmd class MasterClass(SerialClass,PlotClass): # 类变量: #   BUSY_STATE  -忙碌状态-0 #   IDLE_STATE  -空闲状态-1 BUSY_STATE, IDLE_STATE = (0, 1) # 类变量: #   START_CMD       - 开启命令      -0 #   STOP_CMD        - 关闭命令      -1 #   SENDID_CMD      - 发送ID命令    -2 #   SENDVALUE_CMD   - 发送数据命令   -3 START_CMD, STOP_CMD, SENDID_CMD, SENDVALUE_CMD = (0, 1, 2, 3)  # 类的初始化 def __init__(self,state:int = IDLE_STATE,port:str = "COM17",wintitle:str="Basic plotting examples",plottitle:str="Updating plot",width:int=1000,height:int=600): # 分别调用不同父类的__init__方法 SerialClass.__init__(self,port) PlotClass.__init__(self,wintitle,plottitle,width,height) self.valuequeue   = queue.Queue(10) self.__masterstatue = state # 初始化完成的标志量 self.INIT_FLAG = False # 文件保存路径 self.savepath = "G:\\Python面向对象编程\\Demo\\file.csv" # 创建FileIOClass类的实例化对象 self.fileio = FileIOClass(self.savepath) print("MASTER INIT SUCCESSS") logging.info("MASTER INIT SUCCESSS")  @classmethod def MasterInfo(cls): print("Info : "+str(cls))  # 开启主机 def StartMaster(self): super().OpenSerial() print("START MASTER :"+self.dev.port) logging.info("START MASTER :"+self.dev.port)  # 停止主机 def StopMaster(self): super().CloseSerial() print("CLOSE MASTER :" + self.dev.port) logging.info("CLOSE MASTER :" + self.dev.port)  # 接收传感器ID号 def RecvSensorID(self): sensorid = super().ReadSerial() print("MASTER RECIEVE ID : " + str(sensorid)) logging.info("MASTER RECIEVE ID : " + str(sensorid)) return sensorid  # 接收传感器数据 def RecvSensorValue(self): data = super().ReadSerial() print("MASTER RECIEVE DATA : " + str(data)) logging.info("MASTER RECIEVE DATA : " + str(data)) self.valuequeue.put(data) return data  # 主机发送命令 def SendSensorCMD(self,cmd): super().WriteSerial(str(cmd)) print("MASTER SEND CMD : " + str(cmd)) logging.info("MASTER SEND CMD : " + str(cmd))  # 主机返回工作状态- def RetMasterStatue(self): return self.__masterstatue  # 重写父类的DataUpdate方法 def DataUpdate(self): self.SendSensorCMD(self.SENDVALUE_CMD) self.value = self.RecvSensorValue() self.WriteSerial("Recv:"+str(self.value)) self.GetValue(self.value) self.curve.setData(self.valuelist) print("PLOT UPDATA : " + str(self.value)) logging.info("PLOT UPDATA : " + str(self.value)) class DevClass(SerialClass): def __init__(self,port:str = "COM1"): super().__init__(port)  # 开启设备 def StartDev(self): super().OpenSerial() print("START Dev :" + self.dev.port)  def ReadSerial(self,byte_size): if super().RetSerialState(): data = self.dev.read(byte_size) data = int(data.decode('utf-8', 'replace')) return data # 判断串口类对象的串口是否开启def IsSerialConnected(serialclass): return serialclass.RetSerialState() if __name__ == "__main__": # 创建数据列表 datalist = [] m = MasterClass(state = MasterClass.IDLE_STATE, port = "COM17", wintitle = "Basic plotting examples", plottitle = "Updating plot", width = 1000, height = 600) m.StartMaster() m.SendSensorCMD(MasterClass.SENDID_CMD) m.RecvSensorID()  # 循环10次接收数据 for i in range(10): m.SendSensorCMD(MasterClass.SENDVALUE_CMD) value = m.RecvSensorValue() datalist.append(value) indexlist = [count for count in range(len(datalist))]  # 写入数据 m.fileio.WriteFile(indexlist,datalist) m.fileio.CloseFile()


本站声明: 本文章由作者或相关机构授权发布,目的在于传递更多信息,并不代表本站赞同其观点,本站亦不保证或承诺内容真实性等。需要转载请联系该专栏作者,如若文章内容侵犯您的权益,请及时联系本站删除( 邮箱:macysun@21ic.com )。
关闭