python读取串口数据并保存(pyserial读取串口数据)
python3.5 读取串口中的数据怎么解码
1、安装串口库
2、采用默认通信参数
import serial
t = serial.Serial('com1',9600)
n = t.write('you are my world')print t.portstrprint n
str = t.read(n)print str
3、可以自己设置参数
import timeimport serialser = serial.Serial( #下面这些参数根据情况修改 port='COM1', baudrate=9600, parity=serial.PARITY_ODD, stopbits=serial.STOPBITS_TWO, bytesize=serial.SEVENBITS)data = ''while ser.inWaiting() 0: data += ser.read(1)if data != '': print data
注意:Python与多个串口通信的时候,要确定端口号。如果有时间的话,可以自己写一个查询所有端口的信息,不想这么麻烦的话,下载个串口助手,自己看端口信息,在py文件中修改serial.Serial()里面的端口号。
传感器数据进入python后后怎么直接计算机
传感器数据进入Python后,可以使用Python的数学计算库,如NumPy、SciPy等,对数据进行处理和计算。以下是一些基本的数据处理和计算步骤:
1. 读取数据。首先,需要从传感器采集的数据文件或数据库中读取数据,并将其存储为Python中的数组或矩阵类型。
2. 数据清洗和预处理。在进行计算之前,需要对数据进行清洗和预处理,例如去除异常值、缺失值填充等。
3. 计算数据统计量。可以使用NumPy库中的函数计算数据的平均值、中位数、方差、标准差等统计量。
4. 数据可视化。可以使用Python的可视化库,如Matplotlib、Seaborn等,将数据可视化,以更好地理解和分析数据。
5. 应用机器学习算法。如果需要进行预测或分类等任务,可以使用Python中的机器学习库,如Scikit-Learn、TensorFlow等,对数据进行处理和建模。
总的来说,Python是一种强大而灵活的工具,可以用于处理各种类型的数据,并进行复杂的计算和分析。需要根据具体的数据类型和分析任务,选择合适的Python库和技术,以实现对传感器数据的有效处理和分析。
串口接收数据怎样存储在数组中?
Dim Buffer as Variant 表示声明了一个Buffer变量,但是这个Buffer变量的类型不确定,可以是Long、Integer、Double、Object、String以及数组等等,Buffer变量的类型由编译器自行判断与转换。
Dim Buffer(100) as Variant 表示声明了一个数组Buffer,该数组有100个成员,数组的每一个成员都没有指定具体的数据类型,由编译器自行判断与转换。
Dim receive(100) as Byte 表示声明了一个数组receive,该数组有100个成员,数组的每一个成员都是Byte型。
Dim Buffer() as Byte 表示声明了一个数组Buffer,该数组成员数目未知,数组的每一个成员都是Byte型。在确定数组成员数量之前是不能通过Buffer(xx)的这种方式访问Buffer里的成员的。
Dim Buffer() as Byte
.......
'接受
........
Buffer = Mscomm.Input
这种方法实际上是把Mscomm接收缓存里的所有数据读出来并保存到Buffer数组中,Buffer数组的大小由Mscomm接收缓存实际缓存的数据量决定,因为Mscomm的接收缓存里实际的数据量是不确定的,所以不能使用
Dim Receive(100) as Byte
.......
接受数据
Receive = Mscomm.Input
这样的方式。
Dim Receive(100) as Byte
.......
接受数据
Receive(i) = Mscomm.Input
这样是可以的,这样实际上是一次从Mscomm的接收缓存里读一个字节的数据,你这样做不行估计是因为数据传输速率和RThreshold设定的问题,因为赋值语句运行的时间要远比串口传输速率快很多倍,
所以你必须等Mscomm的接收缓存里有100个字节以上的数据时你才能通过循环用Mscomm.Input依次读出100个字节的数据,否则就肯定会出错,所以如果你把RThreshold设定为100,也就是每收到100个字节的数据触发一次OnComm事件就可以通过Receive(i) = Mscomm.Input把数据读出来。
python - serial communication(串口通信)
由于测试工作的需要,在C端产品上经常使用串口进行通信,而测试脚本大部分时候又采用python编写,于是就不得不了解并熟悉python下的串口通信实现方法了,整理如下以备随时使用:
一、说明
pyserial封装了python环境下对串口的访问,其兼容各种平台,并有统一的操作接口。通过python属性访问串口设置,并可对串口的各种配置参数(如串口名,波特率、停止校验位、流控、超时等等)做修改,再进行串口通信的类与接口封装后,非常方便地被调用和移植。
二、模块安装
pip insatll pyserial
三、初始化与参数说明
import serial
ser = serial.Serial('COM3', 115200, timeout=0.5, ....................)
下面看看 serial.Serial 原生类
四、不同平台下初始化
ser=serial.Serial("/dev/ttyUSB0",9600,timeout=0.5)#使用USB连接串行口ser=serial.Serial("/dev/ttyAMA0",9600,timeout=0.5)#使用树莓派的GPIO口连接串行口ser=serial.Serial(1,9600,timeout=0.5)#winsows系统使用COM1口连接串行口ser=serial.Serial("COM1",9600,timeout=0.5)#winsows系统使用COM1口连接串行口ser=serial.Serial("/dev/ttyS1",9600,timeout=0.5)#Linux系统使用COM1口连接串行口
五、串口属性
ser.name?#串口名称
ser.port?#端口号
ser.baudrate #波特率
ser.bytesize #字节大小
ser.parity #校验位N-无校验,E-偶校验,O-奇校验
ser.stopbits #停止位
ser.timeout #读超时设置
ser.writeTimeout #写超时
ser.xonxoff #软件流控
ser.rtscts #硬件流控
ser.dsrdtr #硬件流控
ser.interCharTimeout #字符间隔超时?
六、串口常用方法
isOpen():查看端口是否被打开。
open() :打开端口‘。
close():关闭端口。
read(size=1):从端口读字节数据。默认1个字节。
read_all():从端口接收全部数据。
write(data):向端口写数据。
readline():读一行数据。
readlines():读多行数据。
in_waiting():返回输入缓存中的字节数。
out_waiting():返回输出缓存中的字节数。
flush():等待所有数据写出。
flushInput():丢弃接收缓存中的所有数据。
flushOutput():终止当前写操作,并丢弃发送缓存中的数据。
sendBreadk(duration=0.25):发送BREAK条件,并于duration时间之后返回IDLE
setBreak(level=True):根据level设置break条件。
setRTS(level=True):设置请求发送(RTS)的控制信号
setDTR(level=True):设置数据终端准备就绪的控制信号
七、类与接口封装
import time
import serial
import serial.tools.list_ports
# 串口操作类
class serialCommunication(object):
????def __init__(self, port, bps, timeout):# 可配置更多参数
? ? ? ? port_list =self.show_usable_com()
????????if len(port_list) 0:
????????????if portnot in port_list:
????????????????self.port = port_list[0]
????????????else:
????????????????self.port = port
????????else:
????????????print("no usable serial, please plugin your serial board")
????????????return
? ? ? ? self.bps = bps
????????self.timeout = timeout
????????try:
????????????# 初始化串口,并得到串口对象,根据需要可拓展更多参数
? ? ? ? ? ? self.ser = serial.Serial(self.port, self.bps, 8, 'N', 1, timeout=self.timeout, write_timeout=self.timeout)
????????except Exception as e:# 抛出异常
? ? ? ? ? ? print("Exception={}".format(e))
????# 显示可用串口列表
? ? @staticmethod
? ? def show_usable_com():
????????serialport_list = []
????????portInfo_list =list(serial.tools.list_ports.comports())
????????if len(portInfo_list) =0:
????????????print("can not find any serial port!")
????????else:
????????????print(portInfo_list)
????????????for i in range(len(portInfo_list)):
????????????????plist =list(portInfo_list[i])
????????????????print(plist)
????????????????serialport_list.append(plist[0])
????????????print(serialport_list)
????????????return serialport_list
# 输出串口基本信息
? ? def serial_infor(self):
????????print(self.ser.name)# 设备名字
? ? ? ? print(self.ser.port)# 读或者写端口
? ? ? ? print(self.ser.baudrate)# 波特率
? ? ? ? print(self.ser.bytesize)# 字节大小
? ? ? ? print(self.ser.parity)# 校验位
? ? ? ? print(self.ser.stopbits)# 停止位
? ? ? ? print(self.ser.timeout)# 读超时设置
? ? ? ? print(self.ser.writeTimeout)# 写超时
? ? ? ? print(self.ser.xonxoff)# 软件流控
? ? ? ? print(self.ser.rtscts)# 软件流控
? ? ? ? print(self.ser.dsrdtr)# 硬件流控
? ? ? ? print(self.ser.interCharTimeout)# 字符间隔超时
? ? # 打开串口
????def serial_open(self):
????????try:
????????????if not self.ser.isOpen():
????????????????self.ser.open()
????????except Exception as e:# 抛出异常
? ? ? ? ????print("serial_open Exception={}".format(e))
????????????self.ser.close()
????# 读取指定大小的数据
? ? # 从串口读size个字节。如果指定超时,则可能在超时后返回较少的字节;如果没有指定超时,则会一直等到收完指定的字节数。
? ? def serial_read_with_size(self, size):
????????try:
????????????self.serial_open()
????????????return self.ser.read(size).decode("utf-8")
????????except Exception as e:
????????????print("serial_read_all Exception={}".format(e))
????????????self.ser.close()
????# 读取当前串口缓存中的所有数据
? ? def serial_read_data(self):
????????try:
????????????self.serial_open()
????????????datalen =self.ser.inWaiting()
? ? ? ? ? ? if datalen ==0:
????????????????return None
? ? ? ? ? ? return self.ser.read(datalen).decode("utf-8")
????????except Exception as e:
????????????print("serial_read_data Exception={}".format(e))
????????????self.ser.close()
????# 读串口全部数据,注意timeout的设置
? ? # 在设定的timeout时间范围内,如果读取的字节数据是有效的(就是非空)那就直接返回,
? ? # 否则一直会等到设定的timeout时间并返回这段时间所读的全部字节数据。
? ? def serial_read_all(self):
????????try:
????????????self.serial_open()
????????????return self.ser.read_all().decode("utf-8")
????????except Exception as e:
????????????print("serial_read_all Exception={}".format(e))
????????????self.ser.close()
????# 读一行数据
? ? # 使用readline()时应该注意:打开串口时应该指定超时,否则如果串口没有收到新行,则会一直等待。
? ? # 如果没有超时,readline会报异常。
? ? def serial_read_line(self):
????????try:
????????????self.serial_open()
????????????return self.ser.readline().decode("utf-8")
????????except Exception as e:
????????????print("serial_read_line Exception={}".format(e))
? ? ? ? ? ? self.ser.close()
????# 读多行数据,返回行列表
? ? def serial_read_lines(self):
????????try:
????????????self.serial_open()
????????????return self.ser.readlines().decode("utf-8")
????????except Exception as e:
????????????print("serial_read_lines Exception={}".format(e))
????????????self.ser.close()
????# 写数据
? ? def serial_write_data(self, data):
????????try:
????????????self.serial_open()
????????????self.ser.flushOutput()
????????????data_len =self.ser.write(data.encode('utf-8'))
????????????return data_len
????????except Exception as e:
????????????print("serial_write_data Exception={}".format(e))
????????????return 0
? ? # 写行数据,注意参数差异
? ? def serial_write_lines(self, lines):
????????self.ser.writelines(lines)
????# 清除串口缓存
????def serial_clean(self):
????????try:
????????????if self.ser.isOpen():
????????????????self.ser.flush()
????????except Exception as e:
????????????print("serial_clean Exception={}".format(e))
????# 关闭串口
? ? def serial_close(self):
????????try:
????????????if self.ser.isOpen():
????????????????self.ser.close()
????????except Exception as e:
????????????print("serial_clean Exception={}".format(e))
if __name__ =='__main__':
????testSerial = serialCommunication("COM10", 1500000, 0.5)
????testSerial.serial_open()
????testSerial.serial_infor()
????testSerial.serial_write_data("ifconfig eth0\n")
????time.sleep(0.1)
????data = testSerial.serial_read_all()
????print(data)
????testSerial.serial_close()
八、其他
1)ser.VERSION表示pyserial版本;?另外,ser.name表示设备名称
2)端口设置可以被读入字典,也可从字典加载设置:
? ? getSettingDict():返回当前串口设置的字典
? ? applySettingDict(d):应用字典到串口设置
3)?Readline()是读一行,以/n结束,要是没有/n就一直读,阻塞。注意:打开串口时应该指定超时,否则如果串口没有收到新行,则会一直等待。
4)serial.read_all 与?serial.read_all()区别
? ??serial.read_all:读取串口所有的参数信息
????serial.read_all():超时时间内从串口读取的所有数据
5) 异常信息
? ? exception serial.SerialException
? ? exception serial.SerialTimeoutException
从数据库里python获取数据存到本地数据库
python项目中从接口获取数据并存入本地数据库
首先用postman测试接口
根据请求方式将数据存入数据库中
首先用postman测试接口
通过url,选择相应的请求方式,头部,数据格式,点击send看能否获取数据
根据请求方式将数据存入数据库中
下面是post请求方式def get() URL = '' HEADERS = {'Content-Type': 'application/json'} JSON = {} response = request.post(URL,headers=HEADERS,json=JSON) #json.loads()用于将str类型的数据转成dict jsondata = json.load(response.txt) for i in jsondata: date1 = i[data] type1 = i[type] ... #拼接sql语句 sql="" conn=MySQLdb.connect(host="localhost",user="root",passwd="sa",db="mytable") cursor=conn.cursor() ursor.execute(sql)