#!/usr/local/bin/python3.6 import sys import time import serial def create_msg(main_type, subtype, msg_id, data): msg = bytes() msg += b'\xBE' msg += main_type.to_bytes(1, 'little') msg += subtype.to_bytes(1, 'little') msg += msg_id.to_bytes(2, 'little') msg += len(data).to_bytes(2, 'little') msg += data checksum = 0 for b in msg: checksum ^= b msg += checksum.to_bytes(1, 'little') return msg def create_test_packet(size=8): data = bytes((i % 256 for i in range(size))) return create_msg(0, 2, 0, data) def read_packet(ser): header = ser.read(7) length = int.from_bytes(header[5:7], byteorder='little') data = ser.read(length) checksum = ser.read() return data def query_received(ser): # Send request query_msg = create_msg(0, 3, 0, b'') ser.write(query_msg) ser.flush() resp = read_packet(ser) received_str = resp[:-1].decode() return tuple(map(int, received_str.split(','))) if __name__ == '__main__': with serial.Serial('/dev/ttyUSB0', 921600, timeout=5) as ser: ser.reset_input_buffer() ser.write(create_test_packet(8)) ser.flush() print(query_received(ser))