Newer
Older
*
* BlueTooth socket program for passing vrpn data to quad.
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <bluetooth/bluetooth.h>
#include <bluetooth/rfcomm.h>
#include <pthread.h>
#include <assert.h>
#include <errno.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/types.h>
#include <netinet/tcp.h>
#include "packet.h"
#include "param.h"
#include "source.h"
#include "output.h"
#include "bitwise.h"
#define QUAD_BT_ADDR "00:06:66:64:61:D6"
#define QUAD_BT_CHANNEL 0x01
/* Backend-internal command magics */
#define TD_MAGIC "TRACKERDATA"
// function prototypes
void readAndPrint(void);
void sendVrpnPacket(struct ucart_vrpn_TrackerData *);
void sendStartPacket(void);
void getVRPNPacket(struct ucart_vrpn_TrackerData *);
void printVrpnData(struct ucart_vrpn_TrackerData *);
int connectToZybo();
int safe_fd_set(int , fd_set* , int* );
int safe_fd_clr(int , fd_set* , int* );
static void safe_close_fd(int fd, pthread_mutex_t *mutexLock);
static void cb(struct ucart_vrpn_TrackerData *);
/* Return index of client, or -1 */
static ssize_t get_client_index(int fd);
/* Returns pointer to client buffer, or -1 */
static char * get_client_buffer(int fd);
/* Return pointer to client pending responses, or -1*/
static int * get_client_pend_responses(int fd);
/* Return positive integer if successful, -1 otherwise */
static int clientAddPendResponses(int fd, uint16_t packet_id);
/* Return positive integer if successful, -1 otherwise */
static int clientRemovePendResponses(int fd, uint16_t packet_id);
/* Receive data from client */
static void client_recv(int fd);
/* Checks to see if socket has disconnected. Returns 1 on disconnect, else returns 0 */
static int wasDisconnected(int fd);
/* handle controller responses from quad to frontend */
static void handleResponse(struct metadata *m, uint8_t * data);
/* Create new dynamic logfile name */
char * create_log_name(char * buffer, size_t max);
static ssize_t writeQuad(const uint8_t * buf, size_t count);
static ssize_t readQuad(char * buf, size_t count);
/* Functions for recording Latencies */
void findTimeDiff(int respID);
int timeval_subtract (struct timeval *result, struct timeval *x, struct timeval *y);
//time stamp checking
static unsigned int currMessageID = 0;
// global variables
static volatile int keepRunning = 1;
const char *TRACKER_IP = "UAV@192.168.0.120:3883";
#define MAX_CLIENTS 32
#define CLIENT_BUFFER_SIZE 1024
#define CLIENT_MAX_PENDING_RESPONSES 15
static char client_buffers[MAX_CLIENTS][CLIENT_BUFFER_SIZE];
static int client_fds[MAX_CLIENTS];
static int client_pending_responses[MAX_CLIENTS][CLIENT_MAX_PENDING_RESPONSES];
static char user_specified_log_name[256] = "";
int newQuadResponse = 0, newCliInput = 0;
// Callback to be ran whenever the tracker receives data.
// Currently doing much more than it should. It will be slimmed down
// in the future.
static void cb(struct ucart_vrpn_TrackerData * td) {
static int count = 0;
count++;
int activity;
/*
* Create backend listening socket
*/
/* Determine socket path */
char * backend_socket_path = DEFAULT_SOCKET;
if (getenv(SOCKET_ENV)) {
backend_socket_path = getenv(SOCKET_ENV);
}
/* Unlink if it exists */
unlink(backend_socket_path);
/* Create socket */
backendSocket = socket(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (backendSocket < 0) {
err(-1, "socket");
}
/* Create sockaddr and bind */
struct sockaddr_un sa;
sa.sun_family = AF_UNIX;
strncpy(sa.sun_path, backend_socket_path, 107);
sa.sun_path[107] = '\0';
if (bind(backendSocket, (struct sockaddr *) &sa, sizeof(sa))) {
err(-1, "bind");
}
/* Listen */
if (listen(backendSocket, 16)) {
err(-1, "listen");
}
/* Add to socket set */
/* Initialize client buffers */
for (int i = 0; i < MAX_CLIENTS; i++) {
client_fds[i] = -1;
client_buffers[i][0] = '\n';
for(int j = 0; j < CLIENT_MAX_PENDING_RESPONSES; j++) {
client_pending_responses[i][j] = -1;
}
if (pthread_mutex_lock(&quadSocketMutex)) {
err(-2, "pthrtead_mutex_lock (%s:%d):", __FILE__, __LINE__);
}
if ((zyboSocket = connectToZybo()) < 0)
{
if (pthread_mutex_unlock(&quadSocketMutex)) {
err(-2, "pthrtead_mutex_unlock (%s:%d):", __FILE__, __LINE__);
}
// watch for input from stdin (fd 0) to see when it has input
if (!getenv(NOQUAD_ENV)) {
// watch for input from the zybo socket
safe_fd_set(zyboSocket, &rfds_master, &max_fd);
strncat(user_specified_log_name, argv[1], strlen(argv[1]));
char log_file[256];
create_log_name(log_file, 256);
printf("Creating log file '%s'...\n",log_file);
quadlog_file = fopen(log_file, "a");
if(!getenv(NOVRPN_ENV)){
// create vrpnTracker instance
tracker = ucart_vrpn_tracker_createInstance(TRACKER_IP);
// this function will be called whenever tracker receives data
ucart_vrpn_tracker_addCallback(tracker, cb);
}
struct timeval timeout = {
.tv_sec = 1,
.tv_usec = 0
};
activity = select(max_fd+1, &rfds, NULL, NULL, NULL);
if(activity == -1) {
perror("select() ");
} else if (activity) {
if (FD_ISSET(fd, &rfds)) {
/**
* Ignore stdin from the backend
*/
} else if (fd == zyboSocket) {
} else if (fd == backendSocket) {
int new_fd = 0;
new_fd = accept(backendSocket, NULL, NULL);
if (new_fd < 0) {
warn("accept");
} else {
} else if (get_client_index(fd) > -1) {
/* It is a socket to a frontend */
}
}
}
ucart_vrpn_tracker_freeInstance(tracker);
uint8_t packet[64];
struct metadata m;
m.msg_type = BEGINUPDATE_ID;
m.data_len = 0;
m.msg_id = currMessageID++;
if ((psize = EncodePacket(packet, 64, &m, NULL)) < 0) {
warnx("Big problems. sendStartPacket");
printf("Start Packet sent...\n");
}
void sendVrpnPacket(struct ucart_vrpn_TrackerData *info) {
uint8_t packet[64];
struct metadata m;
uint8_t data[128];
struct position_update u;
u.id = currMessageID;
u.x = info->x;
u.y = info->y;
u.z = info->z;
u.pitch = info->pitch;
u.roll = info->roll;
u.yaw = info->yaw;
if (EncodeUpdate(&m, data, 128, &u) < 0) {
warnx("Big problems. sendVrpnPacket . EncodeUpdate");
warnx("Big problems. sendVrpnPacket. EncodePacket");
}
void getVRPNPacket(struct ucart_vrpn_TrackerData *td) {
int status;
if((status = ucart_vrpn_tracker_getData(tracker, td)) < 0)
{
perror("Error receiving VRPN data from tracker...");
keepRunning = 0;
}
}
void printVrpnData(struct ucart_vrpn_TrackerData * td) {
printf("FPS: %lf Pos (xyz): (%lf %lf %lf) Att (pry): (%lf %lf %lf)\n",
td->fps, td->x, td->y, td->z, td->pitch, td->roll, td->yaw);
}
int connectToZybo() {
int sock;
if (getenv(NOQUAD_ENV)) {
return 0;
}
if (!getenv(QUAD_WIFI_ENV)) {
printf("Using BT Settings\n");
struct sockaddr_rc addr;
// allocate a socket
sock = socket(AF_BLUETOOTH, SOCK_STREAM, BTPROTO_RFCOMM);
//set the connection params ie. who to connect to
addr.rc_family = AF_BLUETOOTH;
addr.rc_channel = (uint8_t) QUAD_BT_CHANNEL;
str2ba( QUAD_BT_ADDR, &addr.rc_bdaddr );
printf("Attempting to connect to zybo. Please be patient...\n");
// blocking call to connect to socket sock ie. zybo board
status = connect(sock, (struct sockaddr *)&addr, sizeof(addr));
} else {
struct sockaddr_in addr;
addr.sin_family = AF_INET;
/* Quick and Dirty */
if (getenv(QUAD_IP_ENV)) {
if (!inet_aton(getenv(QUAD_IP_ENV), &addr.sin_addr)) {
QUAD_IP_ENV, getenv(QUAD_IP_ENV));
return -1;
}
} else {
if (!inet_aton(QUAD_IP_DEFAULT, &addr.sin_addr)) {
if (getenv(QUAD_PORT_ENV)) {
/* Quick 'n dirty, oh yeah! */
addr.sin_port = htons(atoi(getenv(QUAD_PORT_ENV)));
addr.sin_port = htons(QUAD_PORT_DEFAULT);
sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (sock < 0) {
perror("socket");
return -1;
}
printf("Connecting to Quad @ %s:%u\n", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
status = connect(sock, (struct sockaddr *)&addr, sizeof(addr));
int flag = 1;
int result = setsockopt(sock, /* socket affected */
IPPROTO_TCP, /* set option at TCP level */
TCP_NODELAY, /* name of option */
(char *) &flag, /* the cast is historical cruft */
sizeof(int)); /* length of option value */
// connection failed
if(status < 0)
{
close(sock);
else
{
printf("connection successful!...\n");
return sock;
}
}
/* add a fd to fd_set, and update max_fd */
int safe_fd_set(int fd, fd_set* fds, int* max_fd) {
assert(max_fd != NULL);
FD_SET(fd, fds);
if (fd > *max_fd) {
*max_fd = fd;
}
return 0;
}
/* clear fd from fds, update max fd if needed */
int safe_fd_clr(int fd, fd_set* fds, int* max_fd) {
assert(max_fd != NULL);
FD_CLR(fd, fds);
if (fd == *max_fd) {
(*max_fd)--;
}
return 0;
static ssize_t writeQuad(const uint8_t * buf, size_t count) {
if (pthread_mutex_lock(&quadSocketMutex)) {
err(-2, "pthrtead_mutex_lock (%s:%d):", __FILE__, __LINE__);
}
if (pthread_mutex_unlock(&quadSocketMutex)) {
err(-2, "pthrtead_mutex_unlock (%s:%d):", __FILE__, __LINE__);
}
return retval;
}
static ssize_t readQuad(char * buf, size_t count) {
ssize_t retval;
if (getenv(NOQUAD_ENV)) {
return count;
}
if (pthread_mutex_lock(&quadSocketMutex)) {
err(-2, "pthrtead_mutex_lock (%s:%d):", __FILE__, __LINE__);
}
retval = read(zyboSocket, buf, count);
if (pthread_mutex_unlock(&quadSocketMutex)) {
err(-2, "pthrtead_mutex_unlock (%s:%d):", __FILE__, __LINE__);
}
return retval;
}
static int new_client(int fd) {
ssize_t new_slot = -1;
for (ssize_t i = 0; i < MAX_CLIENTS; i++) {
if (client_fds[i] < 0) {
new_slot = i;
break;
}
}
if (new_slot == -1) {
warnx("Ran out of room! Consider increasing MAX_CLIENTS!");
client_fds[new_slot] = fd;
client_buffers[new_slot][0] = '\0';
static ssize_t get_client_index(int fd) {
for (ssize_t i = 0; i < MAX_CLIENTS; i++) {
if (client_fds[i] == fd) {
return i;
}
}
return -1;
}
static char * get_client_buffer(int fd) {
ssize_t slot = get_client_index(fd);
if (slot == -1) {
return NULL;
} else {
return client_buffers[slot];
}
}
static int * get_client_pend_responses(int fd) {
ssize_t slot = get_client_index(fd);
if (slot == -1) {
return NULL;
} else {
return client_pending_responses[slot];
}
}
static int clientAddPendResponses(int fd, uint16_t packet_id) {
int *pendingResponses = get_client_pend_responses(fd);
for(int i = 0; i < CLIENT_MAX_PENDING_RESPONSES; i++) {
if(pendingResponses[i] == -1) {
pendingResponses[i] = packet_id;
return i;
}
}
return -1;
}
static int clientRemovePendResponses(int fd, uint16_t packet_id) {
int *pendingResponses = get_client_pend_responses(fd);
for(int i = 0; i < CLIENT_MAX_PENDING_RESPONSES; i++) {
if(pendingResponses[i] == packet_id) {
pendingResponses[i] = -1;
return i;
}
}
return -1;
}
ssize_t slot = get_client_index(fd);
if(slot == -1)
char *clientBuffer = get_client_buffer(fd);
if(clientBuffer == NULL)
return -1;
int *pendingResponses = get_client_pend_responses(fd);
if(pendingResponses == NULL)
return -1;
for(int i = 0; i < CLIENT_MAX_PENDING_RESPONSES; i++) {
pendingResponses[i] = -1;
}
}
static void safe_close_fd(int fd, pthread_mutex_t *mutexLock) {
if (pthread_mutex_lock(mutexLock)) {
err(-2, "pthrtead_mutex_lock (%s:%d):", __FILE__, __LINE__);
}
close(fd);
if (pthread_mutex_unlock(mutexLock)) {
err(-2, "pthrtead_mutex_unlock (%s:%d):", __FILE__, __LINE__);
}
ssize_t r;
int index = 0;
buffer = get_client_buffer(fd);
len_pre = strlen(buffer);
cursor = buffer + len_pre;
r = read(fd, cursor, CLIENT_BUFFER_SIZE - len_pre - 1);
if (r < 0) {
warn("read (fd: %d)", fd);
}
buffer[len_pre + r] = '\0';
/* Parse buffer and handle commands */
while (1) {
/* not using strtok because reasons */
size_t len = strlen(buffer);
ssize_t newline = -1;
for (size_t i = 0; i < len; i++) {
if (buffer[i] == '\n') {
newline = i;
break;
}
}
/* No newline found. End parsing */
if (newline == -1) {
break;
}
buffer[newline] = '\0';
printf("Client(%d) : '%s'\n",fd, buffer);
char * first_word;
char * tmp = strdup(buffer);
// printf("tmpbuff = '%s'\n", tmp);
first_word = strtok(tmp, " ");
// printf("first word = '%s'\n", first_word);
ssize_t msg_type, i;
for (i = 0; i < MAX_TYPE_ID; ++i) {
if ((msg_type = findCommand(first_word)) != -1)
break;
}
if (msg_type == -1) {
/* buffer was not a quad command, handling internally to
* backend instead of forwarding to quad
*/
if (strncmp(buffer, TD_MAGIC, strlen(TD_MAGIC)) == 0) {
/* Request for tracker data */
struct ucart_vrpn_TrackerData td;
if (tracker == NULL) {
char * dummy = TD_MAGIC " 1.0 2.0 3.0 4.0 5.0 6.0\n";
write(fd, dummy, strlen(dummy));
} else if (ucart_vrpn_tracker_getData(tracker, &td)) {
write(fd, TD_MAGIC " ERROR\n", strlen(TD_MAGIC " ERROR\n"));
} else {
/* more than sufficient buffer */
char buffer[2048];
/* Map VRPN XYZ to Height Lat Long (right now it's
* a guess). Format is Height Lat Long P R Y */
if (snprintf(buffer,
2048,
TD_MAGIC " %lf %lf %lf %lf %lf %lf\n",
td.z,
td.pitch,
td.roll,
td.yaw) >= 2048) {
/* Output longer than buffer */
warnx("Increase format buffer size, output was too long!");
write(fd, TD_MAGIC " ERROR\n", strlen(TD_MAGIC " ERROR\n"));
}
uint8_t *data = malloc(sizeof(*data) * 128);
ssize_t result;
ssize_t psize;
printf(" found a msg_type of %ld\n", msg_type);
switch (msg_type) {
case SETPARAM_ID:
result = EncodeSetParam(&m, data, 128, buffer);
break;
case GETPARAM_ID:
result = EncodeGetParam(&m, data, 128, buffer);
break;
case SETSOURCE_ID:
result = EncodeSetSource(&m, data, 128, buffer);
break;
case GETSOURCE_ID:
result = EncodeGetSource(&m, data, 128, buffer);
break;
case GETOUTPUT_ID:
result = EncodeGetOutput(&m, data, 128, buffer);
break;
case GETNODES_ID:
result = EncodeGetNodes(&m, data, 128, buffer);
break;
case ADDNODE_ID:
result = EncodeAddNode(&m, data, 128, buffer);
break;
default:
result = -1;
break;
}
if (result < 0) {
warnx("Big problems. client_recv. EncodeMetaData");
m.msg_id = currMessageID++;
if ((psize = EncodePacket(packet, 64, &m, data)) < 0) {
warnx("Big problems. client_recv. EncodePacket");
return;
}
/* Only add the client to the pending responses if it was a getparam command */
if (m.msg_type == GETPARAM_ID || m.msg_type == GETOUTPUT_ID ||
m.msg_type == GETSOURCE_ID || m.msg_type == GETNODES_ID || m.msg_type == ADDNODE_ID) {
if (clientAddPendResponses(fd, BytesTo16(packet[ID_L], packet[ID_H])) == -1) {
warnx("Ran out of room! Consider increasing CLIENT_MAX_PENDING_RESPONSES!\n");
printf("packetToQuad = '");
for(int i = 0; i < (int)psize; ++i) {
printf(" %.2x ", packet[i]);
}
printf("'\n");
size_t restLen = (strlen(rest) == 0) ? 1 : strlen(rest);
/* Delete parsed data and move the rest to the left */
static unsigned char respBuf[CMD_MAX_LENGTH];
uint8_t data[CMD_MAX_LENGTH];
CMD_MAX_LENGTH - respBufLen);
printf("packetFromQuad = '");
for(int i = 0; i < (int)respBufLen; ++i) {
printf(" %.2x ", respBuf[i]);
}
printf("'\n");
datalen = DecodePacket(&m, data, CMD_MAX_LENGTH, respBuf, respBufLen);
warnx("No start Byte");
for (size_t i = 0; i < respBufLen; ++i) {
if (respBuf[i] == BEGIN_CHAR) {
memmove(respBuf, respBuf + i, respBufLen - i);
respBufLen -=i;
return;
}
}
respBufLen = 0;
return;
}
if (datalen == -5) {
warnx("Chechsum mismatch");
for (size_t i = 0; i < respBufLen; ++i) {
if (respBuf[i] == BEGIN_CHAR) {
memmove(respBuf, respBuf + i, respBufLen - i);
respBufLen -=i;
return;
}
}
respBufLen = 0;
return;
}
if (datalen < 0){
/* Not enough data yet. We need to wait for more */
return;
}
packetlen = PacketSize(&m);
memmove(respBuf, respBuf + packetlen, respBufLen - packetlen);
respBufLen -= packetlen;
switch (m.msg_type) {
case LOG_ID:
/* something like this */
printf("(Quad) : Log found\n");
fwrite((char *) data, sizeof(char), m.data_len, quadlog_file);
case RESPSOURCE_ID:
case RESPOUTPUT_ID:
case RESPNODES_ID:
case RESPADDNODE_ID:
handleResponse(&m, data);
case LOG_END_ID:
fclose(quadlog_file);
char log_file[256];
create_log_name(log_file, 256);
printf("New log file created: '%s'\n", log_file);
quadlog_file = fopen(log_file, "a");
break;
printf("(Backend): message type %d ignored from quad\n", m.msg_type);
static void handleResponse(struct metadata *m, uint8_t * data)
ssize_t result;
char *buffer = malloc(sizeof(*buffer) * 128);
switch (m->msg_type) {
case RESPPARAM_ID:
result = DecodeResponseParam(buffer, 128, m, data);
break;
case RESPSOURCE_ID:
result = DecodeResponseSource(buffer, 128, m, data);
break;
case RESPOUTPUT_ID:
result = DecodeResponseOutput(buffer, 128, m, data);
break;
case RESPNODES_ID:
result = DecodeResponseGetNodes(buffer, 128, m, data);
break;
case RESPADDNODE_ID:
result = DecodeResponseAddNode(buffer, 128, m, data);
break;
default:
result = -1;
}
if (result < 0) {
warnx("DecodeResponse error");
return;
}
if (get_client_index(fd) < 0) {
clientRemovePendResponses(fd, m->msg_id);
write(fd, buffer, result);
static int wasDisconnected(int fd) {
char buff;
if(recv(fd, &buff, 1, MSG_PEEK | MSG_DONTWAIT) == 0)
{
remove_client(fd);
safe_fd_clr(fd, &rfds_master, &max_fd);
printf("fd %d has disconnect and was removed\n", fd);
int timeval_subtract (struct timeval *result, struct timeval *x, struct timeval *y) {
/* Perform the carry for the later subtraction by updating y. */
if (x->tv_usec < y->tv_usec) {
int nsec = (y->tv_usec - x->tv_usec) / 1000000 + 1;
y->tv_usec -= 1000000 * nsec;
y->tv_sec += nsec;
}
if (x->tv_usec - y->tv_usec > 1000000) {
int nsec = (x->tv_usec - y->tv_usec) / 1000000;
y->tv_usec += 1000000 * nsec;
y->tv_sec -= nsec;
}
/* Compute the time remaining to wait.
tv_usec is certainly positive. */
result->tv_sec = x->tv_sec - y->tv_sec;
result->tv_usec = x->tv_usec - y->tv_usec;
/* Return 1 if result is negative. */
return x->tv_sec < y->tv_sec;
}
void findTimeDiff(int respID) {
struct timeval result, tend;
gettimeofday(&tend, NULL);
timeval_subtract(&result, &tend, &timeArr[respID%MAX_HASH_SIZE]);
printf("(BackEnd): Elapsed time = %ld ms\n", result.tv_usec/1000);
char * create_log_name(char * buffer, size_t max) {
static const char * prefix = "logs";
static size_t num_logs = 0;
static const char * format_string = "%Y-%m-%e_%-l:%M";
time_t curr_time;
char c_time_string[256];
struct tm * tmp;
curr_time = time(NULL);
tmp = localtime(&curr_time);
strftime(c_time_string, 256, format_string, tmp);
if(strcmp(user_specified_log_name, "") == 0) {
sprintf(buffer, "%s/%s_%lu.txt", prefix, c_time_string, num_logs++);
} else {
sprintf(buffer, "%s/%s_%lu.txt", prefix, user_specified_log_name, num_logs++);
}
return buffer;
}