Skip to content
Snippets Groups Projects
backend.c 18 KiB
Newer Older
/* Author: Kris Burney & Jake Drahos
burneykb's avatar
burneykb committed
 *
 * BlueTooth socket program for passing vrpn data to quad.
 */

#define _GNU_SOURCE
#define _BSD_SOURCE
burneykb's avatar
burneykb committed
//system includes
Jake Drahos's avatar
Jake Drahos committed
#include <err.h>
burneykb's avatar
burneykb committed
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
#include <sys/socket.h>
Jake Drahos's avatar
Jake Drahos committed
#include <sys/un.h>
#include <netinet/in.h>
#include <arpa/inet.h>
Jake Drahos's avatar
Jake Drahos committed
#include <sys/stat.h>
burneykb's avatar
burneykb committed
#include <bluetooth/bluetooth.h>
#include <bluetooth/rfcomm.h>
#include <pthread.h>
#include <errno.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/types.h>
#include <netinet/tcp.h>
burneykb's avatar
burneykb committed

//user created includes
#include "communication.h"
burneykb's avatar
burneykb committed
#include "vrpn_tracker.hpp"
#include "type_def.h"
#include "logger.h"
burneykb's avatar
burneykb committed
#include "packet.h"
#include "respcontrol.h"
Jake Drahos's avatar
Jake Drahos committed
#include "config.h"
burneykb's avatar
burneykb committed

#define QUAD_BT_ADDR  "00:06:66:64:61:D6"
#define QUAD_BT_CHANNEL  0x01
#define CMD_MAX_LENGTH 1024
#define MAX_HASH_SIZE 50
burneykb's avatar
burneykb committed

/* Backend-internal command magics */
#define TD_MAGIC "TRACKERDATA"

burneykb's avatar
burneykb committed
// function prototypes
void readAndPrint(void);
void sendVrpnPacket(struct ucart_vrpn_TrackerData *);
void sendStartPacket(void);
void getVRPNPacket(struct ucart_vrpn_TrackerData *);
void printVrpnData(struct ucart_vrpn_TrackerData *);
int connectToZybo();
int safe_fd_set(int , fd_set* , int* );
int safe_fd_clr(int , fd_set* , int* );
burneykb's avatar
burneykb committed
static void safe_close_fd(int fd, pthread_mutex_t *mutexLock);
burneykb's avatar
burneykb committed

static void cb(struct ucart_vrpn_TrackerData *);
static int new_client(int fd);
Jake Drahos's avatar
Jake Drahos committed
/* Return index of client, or -1 */
static ssize_t get_client_index(int fd);
/* Returns pointer to client buffer, or -1 */
static char * get_client_buffer(int fd);
/* Return pointer to client pending responses, or -1*/ 
static int * get_client_pend_responses(int fd);
/* Return positive integer if successful, -1 otherwise */
static int clientAddPendResponses(int fd, unsigned char *packet);
/* Returns -1 on error */
burneykb's avatar
burneykb committed
static int remove_client(int fd);
/* Receive data from client */
static void client_recv(int fd);
/* Receive data from quad */
static void quad_recv();
/* Checks to see if socket has disconnected. Returns 1 on disconnect, else returns 0 */
static int wasDisconnected(int fd);
burneykb's avatar
burneykb committed

/* Thread-safe wrappers */
Jake Drahos's avatar
Jake Drahos committed
pthread_mutex_t quadSocketMutex;
static ssize_t writeQuad(const char * buf, size_t count);
static ssize_t readQuad(char * buf, size_t count);

/* Functions for recording Latencies */
void findTimeDiff(int respID);
int timeval_subtract (struct timeval *result, struct timeval *x, struct timeval *y);
//time stamp checking
static unsigned int currMessageID = 0;
Jake Drahos's avatar
Jake Drahos committed
struct timeval timeArr[MAX_HASH_SIZE];
burneykb's avatar
burneykb committed
// global variables
static volatile int keepRunning = 1;
const char *TRACKER_IP = "UAV@192.168.0.120:3883";
Jake Drahos's avatar
Jake Drahos committed
static int zyboSocket;
Jake Drahos's avatar
Jake Drahos committed
static int backendSocket;
burneykb's avatar
burneykb committed
struct ucart_vrpn_tracker * tracker = NULL;
const char *logHeader = "";//"#\n#\tDefault log header\n#\tEverything after '#'`s will be printed as is in the processed logs.\n#\n\0";

#define MAX_CLIENTS 32
#define CLIENT_BUFFER_SIZE 1024
#define CLIENT_MAX_PENDING_RESPONSES 5
static char client_buffers[MAX_CLIENTS][CLIENT_BUFFER_SIZE];
static int client_fds[MAX_CLIENTS];
static int client_pending_responses[MAX_CLIENTS][CLIENT_MAX_PENDING_RESPONSES];
fd_set rfds_master;
int max_fd = 0;
burneykb's avatar
burneykb committed
pthread_mutex_t quadResponseMutex, cliInputMutex ;
unsigned char *commandBuf;
burneykb's avatar
burneykb committed
int newQuadResponse = 0, newCliInput = 0;

// Structures to be used throughout
modular_structs_t structs;
burneykb's avatar
burneykb committed
// Callback to be ran whenever the tracker receives data.
// Currently doing much more than it should. It will be slimmed down 
// 		in the future.
static void cb(struct ucart_vrpn_TrackerData * td) {
		//updateLogFile(td);
burneykb's avatar
burneykb committed
	}
burneykb's avatar
burneykb committed
}

int main(int argc, char **argv)
{
Jake Drahos's avatar
Jake Drahos committed
	FD_ZERO(&rfds_master);
Jake Drahos's avatar
Jake Drahos committed

	/* 
	 * Create backend listening socket
	 */
	/* Determine socket path */
	char * backend_socket_path = DEFAULT_SOCKET;
	if (getenv(SOCKET_ENV)) {
		backend_socket_path = getenv(SOCKET_ENV);
	}
	/* Unlink if it exists */
	unlink(backend_socket_path);

	/* Create socket */
	mode_t old_umask = umask(0111);
Jake Drahos's avatar
Jake Drahos committed
	backendSocket = socket(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0);
	if (backendSocket < 0) {
		err(-1, "socket");
	}

	/* Create sockaddr and bind */
	struct sockaddr_un sa;
	sa.sun_family = AF_UNIX;
	strncpy(sa.sun_path, backend_socket_path, 107);
	sa.sun_path[107] = '\0';
	if (bind(backendSocket, (struct sockaddr *) &sa, sizeof(sa))) {
		err(-1, "bind");
	}
Jake Drahos's avatar
Jake Drahos committed
	umask(old_umask);
Jake Drahos's avatar
Jake Drahos committed

	/* Listen */
	if (listen(backendSocket, 16)) {
		err(-1, "listen");
	}

	/* Add to socket set */
Jake Drahos's avatar
Jake Drahos committed
	safe_fd_set(backendSocket, &rfds_master, &max_fd);
burneykb's avatar
burneykb committed
	

	/* Initialize client buffers */
	for (int i = 0; i < MAX_CLIENTS; i++) {
		client_fds[i] = -1;
		client_buffers[i][0] = '\n';
		for(int j = 0; j < CLIENT_MAX_PENDING_RESPONSES; j++) {
			client_pending_responses[i][j] = -1;
		}
Jake Drahos's avatar
Jake Drahos committed
	if (pthread_mutex_lock(&quadSocketMutex)) {
		err(-2, "pthrtead_mutex_lock (%s:%d):", __FILE__, __LINE__);
	}

	if ((zyboSocket = connectToZybo()) < 0)
	{
burneykb's avatar
burneykb committed
		perror("Error connecting to Quad...");
		free(commandBuf);
		exit(1);
	}
Jake Drahos's avatar
Jake Drahos committed

	if (pthread_mutex_unlock(&quadSocketMutex)) {
		err(-2, "pthrtead_mutex_unlock (%s:%d):", __FILE__, __LINE__);
	}
burneykb's avatar
burneykb committed
	
	// open the log file
Jake Drahos's avatar
Jake Drahos committed
	if(createLogFile(argc, argv[1]))
	{
		perror("Error creating log file...");
		exit(1);
	}

	// watch for input from stdin (fd 0) to see when it has input
Jake Drahos's avatar
Jake Drahos committed
	safe_fd_set(fileno(stdin), &rfds_master, &max_fd);
burneykb's avatar
burneykb committed
	if (!getenv(NOQUAD_ENV)) {
		// watch for input from the zybo socket
		safe_fd_set(zyboSocket, &rfds_master, &max_fd);

	}
burneykb's avatar
burneykb committed
	// Tell the quad we are ready to send it vrpn data
	sendStartPacket();
burneykb's avatar
burneykb committed

	if(!getenv(NOVRPN_ENV)){
		// create vrpnTracker instance
		tracker = ucart_vrpn_tracker_createInstance(TRACKER_IP);
		// this function will be called whenever tracker receives data
		ucart_vrpn_tracker_addCallback(tracker, cb);	
	}
Jake Drahos's avatar
Jake Drahos committed

	struct timeval timeout = {
		.tv_sec = 1,
		.tv_usec = 0
	};
burneykb's avatar
burneykb committed
	{
Jake Drahos's avatar
Jake Drahos committed
		fd_set rfds;
		rfds = rfds_master;
		activity = select(max_fd+1, &rfds, NULL, NULL, NULL);
		if(activity == -1) {
			perror("select() ");
		} else if (activity) {
Jake Drahos's avatar
Jake Drahos committed
			for(int fd = 0; fd <= max_fd; ++fd) {
burneykb's avatar
burneykb committed
					if(wasDisconnected(fd)){
burneykb's avatar
burneykb committed
					}
					if (fd == fileno(stdin)) {
						/**
						 * Ignore stdin from the backend
						 */
						printf("recieving from quad\n");
						quad_recv();
Jake Drahos's avatar
Jake Drahos committed
					} else if (fd == backendSocket) {
						int new_fd = 0;
						new_fd = accept(backendSocket, NULL, NULL);
						if (new_fd < 0) {
							warn("accept");
						} else {
							printf("Connection\n");
Jake Drahos's avatar
Jake Drahos committed
							if (new_client(new_fd)) {
								printf("Added client\n");
Jake Drahos's avatar
Jake Drahos committed
								safe_fd_set(new_fd, &rfds_master, &max_fd);
Jake Drahos's avatar
Jake Drahos committed
						}
					} else if (get_client_index(fd) > -1) {
						/* It is a socket to a frontend */
						client_recv(fd);
		} else {
Jake Drahos's avatar
Jake Drahos committed
			timeout.tv_sec = 1;
			timeout.tv_usec = 0;
burneykb's avatar
burneykb committed
		}
	}

	ucart_vrpn_tracker_freeInstance(tracker);
burneykb's avatar
burneykb committed
	safe_close_fd(zyboSocket, &quadSocketMutex);

Jake Drahos's avatar
Jake Drahos committed
	closeLogFile();
burneykb's avatar
burneykb committed
	return 0;
}

void sendStartPacket() {
burneykb's avatar
burneykb committed
	uint8_t packet[64];
	struct metadata m;
	m.msg_type = BEGINUPDATE_ID;
	m.data_len = 0;
	m.msg_id = currMessageID++;
burneykb's avatar
burneykb committed

burneykb's avatar
burneykb committed
	size_t psize;

	if ((psize = EncodePacket(packet, 64, &m, NULL)) < 0) {
		warnx("Big problems");
		return;
	}

	writeQuad(packet, psize);
burneykb's avatar
burneykb committed
}

void sendVrpnPacket(struct ucart_vrpn_TrackerData *info) {
burneykb's avatar
burneykb committed
	uint8_t packet[64];
	struct metadata m;
	uint8_t data[128];

	if (EncodeUpdate(&m, data, 128, info)) {
		warnx("Big problems");
		return;
burneykb's avatar
burneykb committed
	}
burneykb's avatar
burneykb committed
	m.msg_id = currMessageID++;
burneykb's avatar
burneykb committed

burneykb's avatar
burneykb committed
	size_t psize;
	if ((psize = EncodePacket(packet, 64, &m, data)) < 0) {
		warnx("Big problems");
		return;
	}

	writeQuad(packet, psize);
burneykb's avatar
burneykb committed
}

void getVRPNPacket(struct ucart_vrpn_TrackerData *td) {
	int status;
	if((status = ucart_vrpn_tracker_getData(tracker, td)) < 0)
	{
		perror("Error receiving VRPN data from tracker...");
		keepRunning = 0;
	}
}

void printVrpnData(struct ucart_vrpn_TrackerData * td) {
	printf("FPS: %lf Pos (xyz): (%lf %lf %lf) Att (pry): (%lf %lf %lf)\n",
		td->fps, td->x, td->y, td->z, td->pitch, td->roll, td->yaw);
}

int connectToZybo() {
	int sock;
	int status = 0;
burneykb's avatar
burneykb committed

burneykb's avatar
burneykb committed
	if (getenv(NOQUAD_ENV)) {
		return 0;
	}

	/* Use bluetooth by default */
burneykb's avatar
burneykb committed
	if (!getenv(QUAD_WIFI_ENV)) {
		printf("Using BT Settings\n");
		struct sockaddr_rc addr;

		// allocate a socket	
		sock = socket(AF_BLUETOOTH, SOCK_STREAM, BTPROTO_RFCOMM);

		//set the connection params ie. who to connect to	
		addr.rc_family = AF_BLUETOOTH;
		addr.rc_channel = (uint8_t) QUAD_BT_CHANNEL;
		str2ba( QUAD_BT_ADDR, &addr.rc_bdaddr );
		
		printf("Attempting to connect to zybo. Please be patient...\n");
		// blocking call to connect to socket sock ie. zybo board
		status = connect(sock, (struct sockaddr *)&addr, sizeof(addr));
	} else {
		printf("Using WIFI settings\n");
		struct sockaddr_in addr;

		addr.sin_family = AF_INET;

		/* Quick and Dirty */
		if (getenv(QUAD_IP_ENV)) {
			 if (!inet_aton(getenv(QUAD_IP_ENV), &addr.sin_addr)) {
				printf("Env var %s invalid IP %s\n",
					     	QUAD_IP_ENV, getenv(QUAD_IP_ENV));
				return -1;
			 }
		} else {
			if (!inet_aton(QUAD_IP_DEFAULT, &addr.sin_addr)) {
				printf("Default IP %s is invalid\n",
					     	QUAD_IP_DEFAULT);
				return -1;
			}
		}
burneykb's avatar
burneykb committed

		if (getenv(QUAD_PORT_ENV)) {
			/* Quick 'n dirty, oh yeah! */
			addr.sin_port = htons(atoi(getenv(QUAD_PORT_ENV)));		
		} else {
			addr.sin_port = htons(QUAD_PORT_DEFAULT);
		sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
		if (sock < 0) {
			perror("socket");
			return -1;
		}
burneykb's avatar
burneykb committed
		printf("Connecting to Quad @ %s:%u\n", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));

		status = connect(sock, (struct sockaddr *)&addr, sizeof(addr));
burneykb's avatar
burneykb committed

	// connection failed
	if(status < 0)
	{
		close(sock);
		perror("connect");
burneykb's avatar
burneykb committed
		return -1;
burneykb's avatar
burneykb committed
	else
	{
		// int result = setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (int[]){1}, sizeof(int));
		// printf("result = %d\n", result);
burneykb's avatar
burneykb committed
		printf("connection successful!...\n");
		return sock;
	}
}
/* add a fd to fd_set, and update max_fd */
int safe_fd_set(int fd, fd_set* fds, int* max_fd) {
    assert(max_fd != NULL);

    FD_SET(fd, fds);
    if (fd > *max_fd) {
        *max_fd = fd;
    }
    return 0;
}

/* clear fd from fds, update max fd if needed */
int safe_fd_clr(int fd, fd_set* fds, int* max_fd) {
    assert(max_fd != NULL);

    FD_CLR(fd, fds);
    if (fd == *max_fd) {
        (*max_fd)--;
    }
    return 0;
Jake Drahos's avatar
Jake Drahos committed

static ssize_t writeQuad(const char * buf, size_t count) {
	ssize_t retval;
burneykb's avatar
burneykb committed
	if (getenv(NOQUAD_ENV)) {
Jake Drahos's avatar
Jake Drahos committed
		return count;
	}
Jake Drahos's avatar
Jake Drahos committed
	if (pthread_mutex_lock(&quadSocketMutex)) {
		err(-2, "pthrtead_mutex_lock (%s:%d):", __FILE__, __LINE__);
	}
	retval = write(zyboSocket, buf, count);
Jake Drahos's avatar
Jake Drahos committed
	if (pthread_mutex_unlock(&quadSocketMutex)) {
		err(-2, "pthrtead_mutex_unlock (%s:%d):", __FILE__, __LINE__);
	}

	return retval;
}
static ssize_t readQuad(char * buf, size_t count) {
	ssize_t retval;
	if (getenv(NOQUAD_ENV)) {
		return count;
	}
	if (pthread_mutex_lock(&quadSocketMutex)) {
		err(-2, "pthrtead_mutex_lock (%s:%d):", __FILE__, __LINE__);
	}
	retval = read(zyboSocket, buf, count);
	if (pthread_mutex_unlock(&quadSocketMutex)) {
		err(-2, "pthrtead_mutex_unlock (%s:%d):", __FILE__, __LINE__);
	}
	return retval;
}

static int new_client(int fd) {
Jake Drahos's avatar
Jake Drahos committed
	ssize_t new_slot = -1;
	for (ssize_t i = 0; i < MAX_CLIENTS; i++) {
		if (client_fds[i] < 0) {
			new_slot = i;
			break;
		}
	}
	if (new_slot == -1) {
		warnx("Ran out of room! Consider increasing MAX_CLIENTS!");
Jake Drahos's avatar
Jake Drahos committed
	client_fds[new_slot] = fd;
	client_buffers[new_slot][0] = '\0';
Jake Drahos's avatar
Jake Drahos committed

static ssize_t get_client_index(int fd) {
Jake Drahos's avatar
Jake Drahos committed
	for (ssize_t i = 0; i < MAX_CLIENTS; i++) {
		if (client_fds[i] == fd) {
			return i;
		}
	}

	return -1;
}

static char * get_client_buffer(int fd) {
Jake Drahos's avatar
Jake Drahos committed
	ssize_t slot = get_client_index(fd);
	if (slot == -1) {
		return NULL;
	} else {
		return client_buffers[slot];
	}
}
static int * get_client_pend_responses(int fd) {
	ssize_t slot = get_client_index(fd);
	if (slot == -1) {
		return NULL;
	} else {
		return client_pending_responses[slot];
	}
}

static int clientAddPendResponses(int fd, unsigned char *packet) {
	int *pendingResponses = get_client_pend_responses(fd);
	int packetID = (packet[4] << 8) | (packet[3]);
	for(int i = 0; i < CLIENT_MAX_PENDING_RESPONSES; i++) {
		if(pendingResponses[i] == -1) {
			pendingResponses[i] = packetID;
			return i;
		}
	}
	return -1;
}

burneykb's avatar
burneykb committed
static int remove_client(int fd) {
	ssize_t slot = get_client_index(fd);
	if(slot == -1)
burneykb's avatar
burneykb committed
		return -1;
	char *clientBuffer = get_client_buffer(fd);
	if(clientBuffer == NULL)
		return -1;
	clientBuffer[0] = '\0';
	int *pendingResponses = get_client_pend_responses(fd);
	if(pendingResponses == NULL)
		return -1;
	for(int i = 0; i < CLIENT_MAX_PENDING_RESPONSES; i++) {
		pendingResponses[i] = -1;
	}
burneykb's avatar
burneykb committed
	client_fds[slot] = -1;
	return 0;
burneykb's avatar
burneykb committed
}

static void safe_close_fd(int fd, pthread_mutex_t *mutexLock) {
	if (pthread_mutex_lock(mutexLock)) {
		err(-2, "pthrtead_mutex_lock (%s:%d):", __FILE__, __LINE__);
	}
	close(fd);
	if (pthread_mutex_unlock(mutexLock)) {
		err(-2, "pthrtead_mutex_unlock (%s:%d):", __FILE__, __LINE__);
	}
static void client_recv(int fd) {
	char * buffer;
	ssize_t len_pre;
     	buffer = get_client_buffer(fd);
	len_pre = strlen(buffer);

	char * cursor;
	cursor = buffer + len_pre;

	ssize_t r;
	r = read(fd, cursor, CLIENT_BUFFER_SIZE - len_pre - 1);
	if (r < 0) {
		warn("read (fd: %d)", fd);
	}
	buffer[len_pre + r] = '\0';

	int index = 0;
	/* Parse buffer and handle commands */
	while (1) {
		/* not using strtok because reasons */
		size_t len = strlen(buffer);
		ssize_t newline = -1;
		for (size_t i = 0; i < len; i++) {
			if (buffer[i] == '\n') {
				newline = i;
				break;
			}
		}

		/* No newline found. End parsing */
		if (newline == -1) {
			break;
		}
		buffer[newline] = '\0';
		printf("Client(%d) : '%s'\n",fd, buffer);
		unsigned char packet[1024];
		ssize_t packetSize;
		if((packetSize = formatCommand(buffer, packet)) == -1) {
			/* buffer was not a quad command, handling internally to
			 * backend instead of forwarding to quad
			 */
			if (strncmp(buffer, TD_MAGIC, strlen(TD_MAGIC)) == 0) {
				/* Request for tracker data */
				struct ucart_vrpn_TrackerData td;
				if (tracker == NULL) {
					char * dummy = TD_MAGIC " 1.0 2.0 3.0 4.0 5.0 6.0\n";
					write(fd, dummy, strlen(dummy));
				} else if (ucart_vrpn_tracker_getData(tracker, &td)) {
					write(fd, TD_MAGIC " ERROR\n", strlen(TD_MAGIC " ERROR\n"));
				} else { 
					/* more than sufficient buffer */
					char buffer[2048];
					/* Map VRPN XYZ to Height Lat Long (right now it's
					 * a guess). Format is Height Lat Long P R Y */
					if (snprintf(buffer,
							2048,
							TD_MAGIC " %lf %lf %lf %lf %lf %lf\n",
							td.z,
GroundStation's avatar
GroundStation committed
							td.y,
							td.x,
							td.pitch,
							td.roll,
							td.yaw) >= 2048) {

						/* Output longer than buffer */
						warnx("Increase format buffer size, output was too long!");
						write(fd, TD_MAGIC " ERROR\n", strlen(TD_MAGIC " ERROR\n"));
					}
GroundStation's avatar
GroundStation committed
					write(fd, buffer, strlen(buffer));
			if (packet[1] == 0x02) {
				if (clientAddPendResponses(fd, packet) == -1) {
					warnx("Ran out of room! Consider increasing CLIENT_MAX_PENDING_RESPONSES!\n");
			/* Don't know about this */
			int datalen = (packet[6] << 8) | (packet[5]);
			printf("sending %lf '", getFloat(packet, 7));
			for(int i = 0; i < datalen + 8; ++i) {
				printf(" 0x%.2x", (signed) packet[i]);
			}
			printf("'\n");
			writeQuad((char *) packet, packetSize);
		
		char * rest = &buffer[newline] + 1;
		size_t restLen = (strlen(rest) == 0) ? 1 : strlen(rest);
		/* Delete parsed data and move the rest to the left */
burneykb's avatar
burneykb committed
		memmove(buffer, rest, restLen +1);
static void quad_recv() {
	/* Check to see which command we are receiving. If it is one that needs to be passed on
		onto  the clients, do so.
	 */
	
	static unsigned char respBuf[2048];
	static size_t respBufLen;

burneykb's avatar
burneykb committed
	struct metadata m;
	uint8_t data[256];
	size_t respLen;

	respLen = readQuad((char *) respBuf + respBufLen, 
			CMD_MAX_LENGTH- respBufLen);
	if (respLen <= 0) {
		perror("ERROR reading from quad...\n");
		return;
	}
	respBufLen += respLen;

burneykb's avatar
burneykb committed
	if (DecodePacket(&m, data, 256, respBuf, respBufLen) < 0) {
		warnx("Packet format error");
		respBufLen = 0;
burneykb's avatar
burneykb committed
	memmove(respBuf, respBuf + datalen + 8, respBufLen - (datalen + 8));
	respBufLen -= PacketSize(&m);
burneykb's avatar
burneykb committed
	switch (m.msg_type) {
		case RESPCONTROL_ID:
			handleRespcontrol;
			break;
		case LOG_ID:
			/* something like this */
			log_write((char *) data, m.data_len);
			break;
burneykb's avatar
burneykb committed
}
burneykb's avatar
burneykb committed
static void handleRespcontrol(struct metadata *m, uint8_t * data)
{
	struct controller_message cm;
	if (DecodeRespcontrol(&cm, m, data) < 0) {
		warnx("Respcontrol error")
burneykb's avatar
burneykb committed
	char buffer[128];
burneykb's avatar
burneykb committed
	char * message = cmToString(RESPCONTROL_ID, cm.id, cm.value_id);
burneykb's avatar
burneykb committed
	size_t len = snprintf(buffer, 128, "%s %f\n", message, cm.value);

	for(int fd = 0; fd <= max_fd; ++fd) {
		if (get_client_index(fd) > -1) {
			write(fd, buffer, len);
static int wasDisconnected(int fd) {
	char buff;
	if(recv(fd, &buff, 1, MSG_PEEK | MSG_DONTWAIT) == 0)
	{
		remove_client(fd);
		safe_fd_clr(fd, &rfds_master, &max_fd);
		printf("fd %d has disconnect and was removed\n", fd);
		return 1;
	}
	return 0;
}

int timeval_subtract (struct timeval *result, struct timeval *x, struct timeval *y) {
  /* Perform the carry for the later subtraction by updating y. */
  if (x->tv_usec < y->tv_usec) {
    int nsec = (y->tv_usec - x->tv_usec) / 1000000 + 1;
    y->tv_usec -= 1000000 * nsec;
    y->tv_sec += nsec;
  }
  if (x->tv_usec - y->tv_usec > 1000000) {
    int nsec = (x->tv_usec - y->tv_usec) / 1000000;
    y->tv_usec += 1000000 * nsec;
    y->tv_sec -= nsec;
  }
  /* Compute the time remaining to wait.
     tv_usec is certainly positive. */
  result->tv_sec = x->tv_sec - y->tv_sec;
  result->tv_usec = x->tv_usec - y->tv_usec;
  /* Return 1 if result is negative. */
  return x->tv_sec < y->tv_sec;
}

void findTimeDiff(int respID) {
	struct timeval result, tend;
	gettimeofday(&tend, NULL);
	timeval_subtract(&result, &tend, &timeArr[respID%MAX_HASH_SIZE]);
burneykb's avatar
burneykb committed
	printf("(BackEnd): Elapsed time = %ld ms\n", result.tv_usec/1000);
	// printf("print to log\n");
	// char tmp[8];
	// snprintf(tmp, 8, "%ld \tms\n", result.tv_usec/1000);
	// writeStringToLog(tmp);