Newer
Older
*
* BlueTooth socket program for passing vrpn data to quad.
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <bluetooth/bluetooth.h>
#include <bluetooth/rfcomm.h>
#include <pthread.h>
#include <assert.h>
#include <errno.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/types.h>
#include <netinet/tcp.h>
#include "packet.h"
#include "param.h"
#include "source.h"
#include "output.h"
#include "bitwise.h"
#define QUAD_BT_ADDR "00:06:66:64:61:D6"
#define QUAD_BT_CHANNEL 0x01
/* Backend-internal command magics */
#define TD_MAGIC "TRACKERDATA"
// function prototypes
void readAndPrint(void);
void sendVrpnPacket(struct ucart_vrpn_TrackerData *);
void getVRPNPacket(struct ucart_vrpn_TrackerData *);
void printVrpnData(struct ucart_vrpn_TrackerData *);
int connectToZybo();
int safe_fd_set(int , fd_set* , int* );
int safe_fd_clr(int , fd_set* , int* );
static void safe_close_fd(int fd, pthread_mutex_t *mutexLock);
static void cb(struct ucart_vrpn_TrackerData *);
/* Return index of client, or -1 */
static ssize_t get_client_index(int fd);
/* Returns pointer to client buffer, or -1 */
static char * get_client_buffer(int fd);
/* Return pointer to client pending responses, or -1*/
static int * get_client_pend_responses(int fd);
/* Return positive integer if successful, -1 otherwise */
static int clientAddPendResponses(int fd, uint16_t packet_id);
/* Return positive integer if successful, -1 otherwise */
static int clientRemovePendResponses(int fd, uint16_t packet_id);
/* Receive data from client */
static void client_recv(int fd);
/* Checks to see if socket has disconnected. Returns 1 on disconnect, else returns 0 */
static int wasDisconnected(int fd);
/* handle controller responses from quad to frontend */
static void handleResponse(struct metadata *m, uint8_t * data);
/* Create new dynamic logfile name */
char * create_log_name(char * buffer, size_t max);
static ssize_t writeQuad(const uint8_t * buf, size_t count);
static ssize_t readQuad(char * buf, size_t count);
/* Functions for recording Latencies */
void findTimeDiff(int respID);
int timeval_subtract (struct timeval *result, struct timeval *x, struct timeval *y);
//time stamp checking
static unsigned int currMessageID = 0;
// global variables
static volatile int keepRunning = 1;
const char *TRACKER_IP = "UAV@192.168.0.120:3883";
#define MAX_CLIENTS 32
#define CLIENT_BUFFER_SIZE 1024
#define CLIENT_MAX_PENDING_RESPONSES 15
static char client_buffers[MAX_CLIENTS][CLIENT_BUFFER_SIZE];
static int client_fds[MAX_CLIENTS];
static int client_pending_responses[MAX_CLIENTS][CLIENT_MAX_PENDING_RESPONSES];
static char user_specified_log_name[256] = "";
int newQuadResponse = 0, newCliInput = 0;
// Callback to be ran whenever the tracker receives data.
// Currently doing much more than it should. It will be slimmed down
// in the future.
static void cb(struct ucart_vrpn_TrackerData * td) {
static int count = 0;
count++;
int activity;
/*
* Create backend listening socket
*/
/* Determine socket path */
char * backend_socket_path = DEFAULT_SOCKET;
if (getenv(SOCKET_ENV)) {
backend_socket_path = getenv(SOCKET_ENV);
}
/* Unlink if it exists */
unlink(backend_socket_path);
/* Create socket */
backendSocket = socket(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (backendSocket < 0) {
err(-1, "socket");
}
printf("backendSocket = %d\n", backendSocket);
/* Create sockaddr and bind */
struct sockaddr_un sa;
sa.sun_family = AF_UNIX;
strncpy(sa.sun_path, backend_socket_path, 107);
sa.sun_path[107] = '\0';
if (bind(backendSocket, (struct sockaddr *) &sa, sizeof(sa))) {
err(-1, "bind");
}
/* Listen */
if (listen(backendSocket, 16)) {
err(-1, "listen");
}
/* Add to socket set */
/* Initialize client buffers */
for (int i = 0; i < MAX_CLIENTS; i++) {
client_fds[i] = -1;
client_buffers[i][0] = '\n';
for(int j = 0; j < CLIENT_MAX_PENDING_RESPONSES; j++) {
client_pending_responses[i][j] = -1;
}
if (pthread_mutex_lock(&quadSocketMutex)) {
err(-2, "pthrtead_mutex_lock (%s:%d):", __FILE__, __LINE__);
}
if ((zyboSocket = connectToZybo()) < 0)
{
printf("zyboSocket = %d\n", zyboSocket);
if (pthread_mutex_unlock(&quadSocketMutex)) {
err(-2, "pthrtead_mutex_unlock (%s:%d):", __FILE__, __LINE__);
}
// watch for input from stdin (fd 0) to see when it has input
if (!getenv(NOQUAD_ENV)) {
// watch for input from the zybo socket
safe_fd_set(zyboSocket, &rfds_master, &max_fd);
strncat(user_specified_log_name, argv[1], strlen(argv[1]));
// create vrpnTracker instance
tracker = ucart_vrpn_tracker_createInstance(TRACKER_IP);
// this function will be called whenever tracker receives data
ucart_vrpn_tracker_addCallback(tracker, cb);
} else {
printf("Ignoring VRPN information...\n");
struct timeval timeout = {
.tv_sec = 1,
.tv_usec = 0
};
activity = select(max_fd+1, &rfds, NULL, NULL, NULL);
if(activity == -1) {
perror("select() ");
} else if (activity) {
if (FD_ISSET(fd, &rfds)) {
/**
* Ignore stdin from the backend
*/
} else if (fd == zyboSocket) {
} else if (fd == backendSocket) {
int new_fd = 0;
new_fd = accept(backendSocket, NULL, NULL);
if (new_fd < 0) {
warn("accept");
} else {
} else if (get_client_index(fd) > -1) {
/* It is a socket to a frontend */
}
}
}
ucart_vrpn_tracker_freeInstance(tracker);
return 0;
}
void sendVrpnPacket(struct ucart_vrpn_TrackerData *info) {
uint8_t packet[64];
struct metadata m;
uint8_t data[128];
struct position_update u;
u.id = currMessageID;
u.x = info->x;
u.y = info->y;
u.z = info->z;
u.pitch = info->pitch;
u.roll = info->roll;
u.yaw = info->yaw;
if (EncodeUpdate(&m, data, 128, &u) < 0) {
warnx("Big problems. sendVrpnPacket . EncodeUpdate");
warnx("Big problems. sendVrpnPacket. EncodePacket");
}
void getVRPNPacket(struct ucart_vrpn_TrackerData *td) {
int status;
if((status = ucart_vrpn_tracker_getData(tracker, td)) < 0)
{
perror("Error receiving VRPN data from tracker...");
keepRunning = 0;
}
}
void printVrpnData(struct ucart_vrpn_TrackerData * td) {
printf("FPS: %lf Pos (xyz): (%lf %lf %lf) Att (pry): (%lf %lf %lf)\n",
td->fps, td->x, td->y, td->z, td->pitch, td->roll, td->yaw);
}
int connectToZybo() {
int sock;
if (getenv(NOQUAD_ENV)) {
return 0;
}
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
/* Use wifi by default */
if (getenv(QUAD_COMM_ENV)) {
/* Determine if we are using bluetooth or local */
if(strcmp(getenv(QUAD_COMM_ENV), "local") == 0) {
printf("Using Local Socket Settings\n");
struct sockaddr_un remote;
char str[100];
if ((sock = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
perror("socket");
exit(1);
}
remote.sun_family = AF_UNIX;
char * sock_env = getenv(QUAD_LOCAL_SOCKET);
strcpy(remote.sun_path, sock_env ? sock_env : QUAD_DEFAULT_LOCAL_SOCKET);
printf("Attempting to connect to local socket at '%s'. please be patiend.\n", remote.sun_path);
status = connect(sock, (struct sockaddr *)&remote, sizeof(remote));
} else if (strcmp(getenv(QUAD_COMM_ENV), "bluetooth") == 0) {
printf("Using BT Settings\n");
struct sockaddr_rc addr;
// allocate a socket
sock = socket(AF_BLUETOOTH, SOCK_STREAM, BTPROTO_RFCOMM);
//set the connection params ie. who to connect to
addr.rc_family = AF_BLUETOOTH;
addr.rc_channel = (uint8_t) QUAD_BT_CHANNEL;
str2ba( QUAD_BT_ADDR, &addr.rc_bdaddr );
printf("Attempting to connect to zybo. Please be patient...\n");
// blocking call to connect to socket sock ie. zybo board
status = connect(sock, (struct sockaddr *)&addr, sizeof(addr));
}
struct sockaddr_in addr;
addr.sin_family = AF_INET;
/* Quick and Dirty */
if (getenv(QUAD_IP_ENV)) {
if (!inet_aton(getenv(QUAD_IP_ENV), &addr.sin_addr)) {
QUAD_IP_ENV, getenv(QUAD_IP_ENV));
return -1;
}
} else {
if (!inet_aton(QUAD_IP_DEFAULT, &addr.sin_addr)) {
if (getenv(QUAD_PORT_ENV)) {
/* Quick 'n dirty, oh yeah! */
addr.sin_port = htons(atoi(getenv(QUAD_PORT_ENV)));
addr.sin_port = htons(QUAD_PORT_DEFAULT);
sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (sock < 0) {
perror("socket");
return -1;
}
printf("Connecting to Quad @ %s:%u\n", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
status = connect(sock, (struct sockaddr *)&addr, sizeof(addr));
int flag = 1;
int result = setsockopt(sock, /* socket affected */
IPPROTO_TCP, /* set option at TCP level */
TCP_NODELAY, /* name of option */
(char *) &flag, /* the cast is historical cruft */
sizeof(int)); /* length of option value */
// connection failed
if(status < 0)
{
close(sock);
else
{
printf("connection successful!...\n");
return sock;
}
}
/* add a fd to fd_set, and update max_fd */
int safe_fd_set(int fd, fd_set* fds, int* max_fd) {
assert(max_fd != NULL);
FD_SET(fd, fds);
if (fd > *max_fd) {
*max_fd = fd;
}
return 0;
}
/* clear fd from fds, update max fd if needed */
int safe_fd_clr(int fd, fd_set* fds, int* max_fd) {
assert(max_fd != NULL);
FD_CLR(fd, fds);
if (fd == *max_fd) {
(*max_fd)--;
}
return 0;
static ssize_t writeQuad(const uint8_t * buf, size_t count) {
if (pthread_mutex_lock(&quadSocketMutex)) {
err(-2, "pthrtead_mutex_lock (%s:%d):", __FILE__, __LINE__);
}
if (pthread_mutex_unlock(&quadSocketMutex)) {
err(-2, "pthrtead_mutex_unlock (%s:%d):", __FILE__, __LINE__);
}
return retval;
}
static ssize_t readQuad(char * buf, size_t count) {
ssize_t retval;
if (getenv(NOQUAD_ENV)) {
return count;
}
if (pthread_mutex_lock(&quadSocketMutex)) {
err(-2, "pthrtead_mutex_lock (%s:%d):", __FILE__, __LINE__);
}
retval = read(zyboSocket, buf, count);
if (pthread_mutex_unlock(&quadSocketMutex)) {
err(-2, "pthrtead_mutex_unlock (%s:%d):", __FILE__, __LINE__);
}
return retval;
}
static int new_client(int fd) {
ssize_t new_slot = -1;
for (ssize_t i = 0; i < MAX_CLIENTS; i++) {
if (client_fds[i] < 0) {
new_slot = i;
break;
}
}
if (new_slot == -1) {
warnx("Ran out of room! Consider increasing MAX_CLIENTS!");
client_fds[new_slot] = fd;
client_buffers[new_slot][0] = '\0';
static ssize_t get_client_index(int fd) {
for (ssize_t i = 0; i < MAX_CLIENTS; i++) {
if (client_fds[i] == fd) {
return i;
}
}
return -1;
}
static char * get_client_buffer(int fd) {
ssize_t slot = get_client_index(fd);
if (slot == -1) {
return NULL;
} else {
return client_buffers[slot];
}
}
static int * get_client_pend_responses(int fd) {
ssize_t slot = get_client_index(fd);
if (slot == -1) {
return NULL;
} else {
return client_pending_responses[slot];
}
}
static int clientAddPendResponses(int fd, uint16_t packet_id) {
int *pendingResponses = get_client_pend_responses(fd);
for(int i = 0; i < CLIENT_MAX_PENDING_RESPONSES; i++) {
if(pendingResponses[i] == -1) {
pendingResponses[i] = packet_id;
return i;
}
}
return -1;
}
static int clientRemovePendResponses(int fd, uint16_t packet_id) {
int *pendingResponses = get_client_pend_responses(fd);
for(int i = 0; i < CLIENT_MAX_PENDING_RESPONSES; i++) {
if(pendingResponses[i] == packet_id) {
pendingResponses[i] = -1;
return i;
}
}
return -1;
}
ssize_t slot = get_client_index(fd);
if(slot == -1)
char *clientBuffer = get_client_buffer(fd);
if(clientBuffer == NULL)
return -1;
int *pendingResponses = get_client_pend_responses(fd);
if(pendingResponses == NULL)
return -1;
for(int i = 0; i < CLIENT_MAX_PENDING_RESPONSES; i++) {
pendingResponses[i] = -1;
}
}
static void safe_close_fd(int fd, pthread_mutex_t *mutexLock) {
if (pthread_mutex_lock(mutexLock)) {
err(-2, "pthrtead_mutex_lock (%s:%d):", __FILE__, __LINE__);
}
close(fd);
if (pthread_mutex_unlock(mutexLock)) {
err(-2, "pthrtead_mutex_unlock (%s:%d):", __FILE__, __LINE__);
}
ssize_t r;
int index = 0;
buffer = get_client_buffer(fd);
len_pre = strlen(buffer);
cursor = buffer + len_pre;
r = read(fd, cursor, CLIENT_BUFFER_SIZE - len_pre - 1);
if (r < 0) {
warn("read (fd: %d)", fd);
}
buffer[len_pre + r] = '\0';
/* Parse buffer and handle commands */
while (1) {
/* not using strtok because reasons */
size_t len = strlen(buffer);
ssize_t newline = -1;
for (size_t i = 0; i < len; i++) {
if (buffer[i] == '\n') {
newline = i;
break;
}
}
/* No newline found. End parsing */
if (newline == -1) {
break;
}
buffer[newline] = '\0';
printf("Client(%d) : '%s'\n",fd, buffer);
char * first_word;
char * tmp = strdup(buffer);
// printf("tmpbuff = '%s'\n", tmp);
first_word = strtok(tmp, " ");
// printf("first word = '%s'\n", first_word);
ssize_t msg_type, i;
for (i = 0; i < MAX_TYPE_ID; ++i) {
if ((msg_type = findCommand(first_word)) != -1)
break;
}
if (msg_type == -1) {
/* buffer was not a quad command, handling internally to
* backend instead of forwarding to quad
*/
if (strncmp(buffer, TD_MAGIC, strlen(TD_MAGIC)) == 0) {
/* Request for tracker data */
struct ucart_vrpn_TrackerData td;
if (tracker == NULL) {
char * dummy = TD_MAGIC " 1.0 2.0 3.0 4.0 5.0 6.0\n";
write(fd, dummy, strlen(dummy));
} else if (ucart_vrpn_tracker_getData(tracker, &td)) {
write(fd, TD_MAGIC " ERROR\n", strlen(TD_MAGIC " ERROR\n"));
} else {
/* more than sufficient buffer */
char buffer[2048];
/* Map VRPN XYZ to Height Lat Long (right now it's
* a guess). Format is Height Lat Long P R Y */
if (snprintf(buffer,
2048,
TD_MAGIC " %lf %lf %lf %lf %lf %lf\n",
td.z,
td.pitch,
td.roll,
td.yaw) >= 2048) {
/* Output longer than buffer */
warnx("Increase format buffer size, output was too long!");
write(fd, TD_MAGIC " ERROR\n", strlen(TD_MAGIC " ERROR\n"));
}
uint8_t *data = malloc(sizeof(*data) * 128);
ssize_t result;
ssize_t psize;
printf(" found a msg_type of %ld\n", msg_type);
switch (msg_type) {
case SETPARAM_ID:
result = EncodeSetParam(&m, data, 128, buffer);
break;
case GETPARAM_ID:
result = EncodeGetParam(&m, data, 128, buffer);
break;
case SETSOURCE_ID:
result = EncodeSetSource(&m, data, 128, buffer);
break;
case GETSOURCE_ID:
result = EncodeGetSource(&m, data, 128, buffer);
break;
case GETOUTPUT_ID:
result = EncodeGetOutput(&m, data, 128, buffer);
break;
case GETNODES_ID:
result = EncodeGetNodes(&m, data, 128, buffer);
break;
case ADDNODE_ID:
result = EncodeAddNode(&m, data, 128, buffer);
break;
default:
result = -1;
break;
}
if (result < 0) {
warnx("Big problems. client_recv. EncodeMetaData");
m.msg_id = currMessageID++;
if ((psize = EncodePacket(packet, 64, &m, data)) < 0) {
warnx("Big problems. client_recv. EncodePacket");
return;
}
/* Only add the client to the pending responses if it was a getparam command */
if (m.msg_type == GETPARAM_ID || m.msg_type == GETOUTPUT_ID ||
m.msg_type == GETSOURCE_ID || m.msg_type == GETNODES_ID || m.msg_type == ADDNODE_ID) {
if (clientAddPendResponses(fd, BytesTo16(packet[ID_L], packet[ID_H])) == -1) {
warnx("Ran out of room! Consider increasing CLIENT_MAX_PENDING_RESPONSES!\n");
}
// printf("packetToQuad = '");
// for(int i = 0; i < (int)psize; ++i) {
// printf(" %.2x ", packet[i]);
// }
// printf("'\n");
size_t restLen = (strlen(rest) == 0) ? 1 : strlen(rest);
/* Delete parsed data and move the rest to the left */
static unsigned char respBuf[CMD_MAX_LENGTH];
static int receiving_logs;
uint8_t data[CMD_MAX_LENGTH];
CMD_MAX_LENGTH - respBufLen);
// printf("packetFromQuad = '");
// for(int i = 0; i < (int)respBufLen; ++i) {
// printf(" %.2x ", respBuf[i]);
// }
// printf("'\n");
datalen = DecodePacket(&m, data, CMD_MAX_LENGTH, respBuf, respBufLen);
warnx("No start Byte");
for (size_t i = 0; i < respBufLen; ++i) {
if (respBuf[i] == BEGIN_CHAR) {
memmove(respBuf, respBuf + i, respBufLen - i);
respBufLen -=i;
return;
}
}
respBufLen = 0;
return;
}
if (datalen == -5) {
warnx("Chechsum mismatch");
for (size_t i = 0; i < respBufLen; ++i) {
if (respBuf[i] == BEGIN_CHAR) {
memmove(respBuf, respBuf + i, respBufLen - i);
respBufLen -=i;
return;
}
}
respBufLen = 0;
return;
}
if (datalen < 0){
/* Not enough data yet. We need to wait for more */
return;
}
packetlen = PacketSize(&m);
memmove(respBuf, respBuf + packetlen, respBufLen - packetlen);
respBufLen -= packetlen;
switch (m.msg_type) {
case DEBUG_ID:
/* in case of debug. Quad send null terminated string in data */
printf(" (Quad) : %s\n", data);
break;
if (!quadlog_file_open) {
char log_file[256];
create_log_name(log_file, 256);
printf("New log file created: '%s'\n", log_file);
quadlog_file = fopen(log_file, "a");
if (!receiving_logs) {
printf("(Quad) : Log found\n");
receiving_logs = 1;
} else {
printf(".");
fflush(0);
}
fwrite((char *) data, sizeof(char), m.data_len, quadlog_file);
break;
case RESPPARAM_ID:
case RESPSOURCE_ID:
case RESPOUTPUT_ID:
case RESPNODES_ID:
case RESPADDNODE_ID:
handleResponse(&m, data);
if (quadlog_file_open) {
fclose(quadlog_file);
quadlog_file_open = 0;
}
printf("\n(Quad) : Log End found\n");
receiving_logs = 0;
printf("(Backend): message type %d ignored from quad\n", m.msg_type);
static void handleResponse(struct metadata *m, uint8_t * data)
ssize_t result = 0;
char *buffer = malloc(sizeof(*buffer) * 128);
if (!buffer) {
warnx("failed immediatly");
return;
}
switch (m->msg_type) {
case RESPPARAM_ID:
result = DecodeResponseParam(buffer, 128, m, data);
break;
case RESPSOURCE_ID:
result = DecodeResponseSource(buffer, 128, m, data);
break;
case RESPOUTPUT_ID:
result = DecodeResponseOutput(buffer, 128, m, data);
break;
case RESPNODES_ID:
result = DecodeResponseGetNodes(&buffer, 128, m, data);
break;
case RESPADDNODE_ID:
result = DecodeResponseAddNode(buffer, 128, m, data);
break;
default:
}
if (result == -2) {
warnx("DecodeResponse error");
return;
} else if (result < 0) {
warnx("DecodeResponse error");
return;
}
// printf("msg to client = '%s'\n", buffer);
for(int fd = 0; fd <= max_fd; ++fd) {
if (get_client_index(fd) > -1) {
clientRemovePendResponses(fd, m->msg_id);
write(fd, buffer, result);
}
}
static int wasDisconnected(int fd) {
char buff;
if(recv(fd, &buff, 1, MSG_PEEK | MSG_DONTWAIT) == 0)
{
remove_client(fd);
safe_fd_clr(fd, &rfds_master, &max_fd);
printf("fd %d has disconnect and was removed\n", fd);
int timeval_subtract (struct timeval *result, struct timeval *x, struct timeval *y) {
/* Perform the carry for the later subtraction by updating y. */
if (x->tv_usec < y->tv_usec) {
int nsec = (y->tv_usec - x->tv_usec) / 1000000 + 1;
y->tv_usec -= 1000000 * nsec;
y->tv_sec += nsec;
}
if (x->tv_usec - y->tv_usec > 1000000) {
int nsec = (x->tv_usec - y->tv_usec) / 1000000;
y->tv_usec += 1000000 * nsec;
y->tv_sec -= nsec;
}
/* Compute the time remaining to wait.
tv_usec is certainly positive. */
result->tv_sec = x->tv_sec - y->tv_sec;
result->tv_usec = x->tv_usec - y->tv_usec;
/* Return 1 if result is negative. */
return x->tv_sec < y->tv_sec;
}
void findTimeDiff(int respID) {
struct timeval result, tend;
gettimeofday(&tend, NULL);
timeval_subtract(&result, &tend, &timeArr[respID%MAX_HASH_SIZE]);
printf("(BackEnd): Elapsed time = %ld ms\n", result.tv_usec/1000);
char * create_log_name(char * buffer, size_t max) {
static const char * prefix = "logs";
static size_t num_logs = 0;
static const char * format_string = "%F_%-l:%M";
time_t curr_time;
char c_time_string[256];
struct tm * tmp;
curr_time = time(NULL);
tmp = localtime(&curr_time);
strftime(c_time_string, 256, format_string, tmp);
if(strcmp(user_specified_log_name, "") == 0) {
sprintf(buffer, "%s/%s_%lu.txt", prefix, c_time_string, num_logs++);
} else {
sprintf(buffer, "%s/%s_%lu.txt", prefix, user_specified_log_name, num_logs++);
}
return buffer;