Skip to content
Snippets Groups Projects
Commit 7cc8277f authored by dawehr's avatar dawehr
Browse files

Created test for TCP.

parent 2aedb74e
No related branches found
No related tags found
1 merge request!8Controller network
import socket
import time
import uart_stress_tests
TCP_IP = "192.168.1.1"
TCP_PORT = 8080
send_delay = 0.001
TEST_ITERS = 100000
REPORT_STEP = int(TEST_ITERS / 10)
payload_size = 28
received = []
def read_n(n_to_read):
global received
while len(received) < n_to_read:
try:
just_received = sock.recv(1024)
if len(just_received) == 0:
print("Socket broken")
quit()
received.extend(just_received)
except:
pass
to_ret = bytes(received[0:n_to_read])
received = received[n_to_read:]
return to_ret
def read_packet(raw=False):
header = read_n(7)
length = int.from_bytes(header[5:7], byteorder='little')
data = read_n(length)
checksum = read_n(1)
if raw:
return header + data + checksum
else:
return data
def get_quad_status():
print("Getting quad status...")
query_msg = uart_stress_tests.create_msg(2, 0, b'')
sock.send(query_msg)
resp = read_packet()
return uart_stress_tests.parse_query_response(resp)
print("Got quad status")
if __name__ == '__main__':
# connect
print("Connecting...")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
try:
sock.connect((TCP_IP, TCP_PORT))
sock.setblocking(True)
except:
print("Failed to connect")
quit()
print("connected")
initial_status = get_quad_status()
message = uart_stress_tests.create_test_packet(payload_size)
overall_start_time = time.perf_counter()
for i in range(TEST_ITERS):
if i % REPORT_STEP == 0:
print("Sent {} messages".format(i))
start_time = time.perf_counter()
sock.send(message)
# Busy waiting
# to_sleep_for = send_delay - (time.time() - start_time)
# if to_sleep_for > 1e-6:
# time.sleep(to_sleep_for)
while (time.perf_counter() - start_time <= send_delay):
pass
# Reporting
avg_time = (time.perf_counter() - overall_start_time) / TEST_ITERS
print("Average send time was {}".format(avg_time))
after_status = get_quad_status()
diff_messages = after_status[0] - initial_status[0]
diff_payload = after_status[1] - initial_status[1]
print("Sent {} messages, total payload of {}".format(TEST_ITERS, TEST_ITERS*payload_size))
print("Recv {} messages, total payload of {}".format(diff_messages, diff_payload))
print("Lost {} messages, {} bytes".format(TEST_ITERS - diff_messages, (TEST_ITERS*payload_size) - diff_payload))
...@@ -14,7 +14,7 @@ def create_msg(msg_type, msg_id, data): ...@@ -14,7 +14,7 @@ def create_msg(msg_type, msg_id, data):
msg += msg_id.to_bytes(2, 'little') msg += msg_id.to_bytes(2, 'little')
msg += len(data).to_bytes(2, 'little') msg += len(data).to_bytes(2, 'little')
msg += data msg += data
checksum = 0 checksum = 0
for b in msg: for b in msg:
checksum ^= b checksum ^= b
...@@ -35,6 +35,13 @@ def read_packet(ser, raw=False): ...@@ -35,6 +35,13 @@ def read_packet(ser, raw=False):
else: else:
return data return data
def parse_query_response(data):
received_str = data[:-1].decode('ascii')
if len(received_str) == 0:
print("Timed out")
return (-1,-1)
return tuple(map(int, received_str.split(',')))
def query_received(ser): def query_received(ser):
# Send request # Send request
query_msg = create_msg(2, 0, b'') query_msg = create_msg(2, 0, b'')
...@@ -42,11 +49,7 @@ def query_received(ser): ...@@ -42,11 +49,7 @@ def query_received(ser):
ser.flush() ser.flush()
sleep(0.1) sleep(0.1)
resp = read_packet(ser) resp = read_packet(ser)
received_str = resp[:-1].decode('ascii') return parse_query_response(resp)
if len(received_str) == 0:
print("Timed out")
return (-1,-1)
return tuple(map(int, received_str.split(',')))
def check_test(ser, n_sent, size_sent, old_status): def check_test(ser, n_sent, size_sent, old_status):
new_n, new_size = query_received(ser) new_n, new_size = query_received(ser)
...@@ -68,7 +71,7 @@ def test_checksum(ser): ...@@ -68,7 +71,7 @@ def test_checksum(ser):
computed_checksum = 0 computed_checksum = 0
for i in range(len(raw_data) - 1): for i in range(len(raw_data) - 1):
computed_checksum ^= raw_data[i] computed_checksum ^= raw_data[i]
valid = computed_checksum == given_checksum valid = computed_checksum == given_checksum
ret_status = query_received(ser) ret_status = query_received(ser)
if not valid: if not valid:
...@@ -124,27 +127,26 @@ def test_get_set(ser): ...@@ -124,27 +127,26 @@ def test_get_set(ser):
resp_cntl = resp[0] resp_cntl = resp[0]
resp_cnst = resp[1] resp_cnst = resp[1]
resp_val = struct.unpack('f', resp[2:6])[0] resp_val = struct.unpack('f', resp[2:6])[0]
if resp_cntl != cntl_id or resp_cnst != const_id or abs(resp_val - to_set) > 1e-5: if resp_cntl != cntl_id or resp_cnst != const_id or abs(resp_val - to_set) > 1e-5:
print("Failed get/set test. Expected controller " + print("Failed get/set test. Expected controller " +
str(cntl_id) + ", constant " + str(const_id) + ", value " + str(to_set)) str(cntl_id) + ", constant " + str(const_id) + ", value " + str(to_set))
print(" Received " + str(resp_cntl) + ", constant " + str(resp_cnst) + ", value " + str(resp_val)) print(" Received " + str(resp_cntl) + ", constant " + str(resp_cnst) + ", value " + str(resp_val))
passed = False passed = False
ret_status = query_received(ser) ret_status = query_received(ser)
return passed, ret_status return passed, ret_status
if __name__ == '__main__': if __name__ == '__main__':
with serial.Serial('/dev/ttyUSB0', 921600, timeout=5) as ser: with serial.Serial('/dev/ttyUSB0', 921600, timeout=5) as ser:
ser.reset_input_buffer() ser.reset_input_buffer()
while (ser.in_waiting != 0): while (ser.in_waiting != 0):
ser.read() ser.read()
status = query_received(ser) status = query_received(ser)
passed, status = test_partial_packet(ser, status) passed, status = test_partial_packet(ser, status)
passed, status = test_blast(ser, status) passed, status = test_blast(ser, status)
passed, status = test_blast(ser, status, 150, 80) passed, status = test_blast(ser, status, 150, 80)
passed, status = test_bad_checksum(ser, status) passed, status = test_bad_checksum(ser, status)
passed, status = test_checksum(ser) passed, status = test_checksum(ser)
passed, status = test_get_set(ser) passed, status = test_get_set(ser)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment