Skip to content
Snippets Groups Projects
Commit 100699bc authored by dawehr's avatar dawehr
Browse files

Created test for TCP.

parent 5f003e8e
No related branches found
No related tags found
No related merge requests found
import socket
import time
import uart_stress_tests
TCP_IP = "192.168.1.1"
TCP_PORT = 8080
send_delay = 0.001
TEST_ITERS = 100000
REPORT_STEP = int(TEST_ITERS / 10)
payload_size = 28
received = []
def read_n(n_to_read):
global received
while len(received) < n_to_read:
try:
just_received = sock.recv(1024)
if len(just_received) == 0:
print("Socket broken")
quit()
received.extend(just_received)
except:
pass
to_ret = bytes(received[0:n_to_read])
received = received[n_to_read:]
return to_ret
def read_packet(raw=False):
header = read_n(7)
length = int.from_bytes(header[5:7], byteorder='little')
data = read_n(length)
checksum = read_n(1)
if raw:
return header + data + checksum
else:
return data
def get_quad_status():
print("Getting quad status...")
query_msg = uart_stress_tests.create_msg(2, 0, b'')
sock.send(query_msg)
resp = read_packet()
return uart_stress_tests.parse_query_response(resp)
print("Got quad status")
if __name__ == '__main__':
# connect
print("Connecting...")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
try:
sock.connect((TCP_IP, TCP_PORT))
sock.setblocking(True)
except:
print("Failed to connect")
quit()
print("connected")
initial_status = get_quad_status()
message = uart_stress_tests.create_test_packet(payload_size)
overall_start_time = time.perf_counter()
for i in range(TEST_ITERS):
if i % REPORT_STEP == 0:
print("Sent {} messages".format(i))
start_time = time.perf_counter()
sock.send(message)
# Busy waiting
# to_sleep_for = send_delay - (time.time() - start_time)
# if to_sleep_for > 1e-6:
# time.sleep(to_sleep_for)
while (time.perf_counter() - start_time <= send_delay):
pass
# Reporting
avg_time = (time.perf_counter() - overall_start_time) / TEST_ITERS
print("Average send time was {}".format(avg_time))
after_status = get_quad_status()
diff_messages = after_status[0] - initial_status[0]
diff_payload = after_status[1] - initial_status[1]
print("Sent {} messages, total payload of {}".format(TEST_ITERS, TEST_ITERS*payload_size))
print("Recv {} messages, total payload of {}".format(diff_messages, diff_payload))
print("Lost {} messages, {} bytes".format(TEST_ITERS - diff_messages, (TEST_ITERS*payload_size) - diff_payload))
......@@ -14,7 +14,7 @@ def create_msg(msg_type, msg_id, data):
msg += msg_id.to_bytes(2, 'little')
msg += len(data).to_bytes(2, 'little')
msg += data
checksum = 0
for b in msg:
checksum ^= b
......@@ -35,6 +35,13 @@ def read_packet(ser, raw=False):
else:
return data
def parse_query_response(data):
received_str = data[:-1].decode('ascii')
if len(received_str) == 0:
print("Timed out")
return (-1,-1)
return tuple(map(int, received_str.split(',')))
def query_received(ser):
# Send request
query_msg = create_msg(2, 0, b'')
......@@ -42,11 +49,7 @@ def query_received(ser):
ser.flush()
sleep(0.1)
resp = read_packet(ser)
received_str = resp[:-1].decode('ascii')
if len(received_str) == 0:
print("Timed out")
return (-1,-1)
return tuple(map(int, received_str.split(',')))
return parse_query_response(resp)
def check_test(ser, n_sent, size_sent, old_status):
new_n, new_size = query_received(ser)
......@@ -68,7 +71,7 @@ def test_checksum(ser):
computed_checksum = 0
for i in range(len(raw_data) - 1):
computed_checksum ^= raw_data[i]
valid = computed_checksum == given_checksum
ret_status = query_received(ser)
if not valid:
......@@ -124,27 +127,26 @@ def test_get_set(ser):
resp_cntl = resp[0]
resp_cnst = resp[1]
resp_val = struct.unpack('f', resp[2:6])[0]
if resp_cntl != cntl_id or resp_cnst != const_id or abs(resp_val - to_set) > 1e-5:
print("Failed get/set test. Expected controller " +
print("Failed get/set test. Expected controller " +
str(cntl_id) + ", constant " + str(const_id) + ", value " + str(to_set))
print(" Received " + str(resp_cntl) + ", constant " + str(resp_cnst) + ", value " + str(resp_val))
passed = False
ret_status = query_received(ser)
return passed, ret_status
if __name__ == '__main__':
with serial.Serial('/dev/ttyUSB0', 921600, timeout=5) as ser:
ser.reset_input_buffer()
while (ser.in_waiting != 0):
ser.read()
status = query_received(ser)
passed, status = test_partial_packet(ser, status)
passed, status = test_blast(ser, status)
passed, status = test_blast(ser, status, 150, 80)
passed, status = test_bad_checksum(ser, status)
passed, status = test_checksum(ser)
passed, status = test_get_set(ser)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment