rtps-fpga/sw/standalone/udp_server.cpp

219 lines
6.0 KiB
C++

#include <sys/socket.h> //socket, inet_aton
#include <netinet/in.h> //socket, inet_aton
#include <netinet/udp.h> //socket
#include <arpa/inet.h> //htons, ntohs, htonl, ntohl, inet_aton
#include <signal.h> //signal
#include <thread> //std::thread
#include <unordered_set> //std::unordered_set
#include <shared_mutex> //std::shared_mutex
#include <unistd.h> //close
/*PROTOTYPES*/
void send_thread();
void receive_thread();
void bail(int code);
void sig_handler(int signum);
void usage(char* name);
/*DEFINES*/
//Magic Number used to distinguish our UDP Packets (First 8 Bytes of Payload)
#define MAGIC 0xDEADBEEFCAFEFEED
//Size Limit of set (This implicitly is the limit of in transit messages expected to return) [0 means no limit]
#define MSG_LIMIT 100
//Payload size of UDP messages in Bytes [Has to be >= 16 Bytes]
#define PAYLOAD_SIZE 100
//Print extra debug messages
#define DEBUG true
/*GLOBAL VARIABLES*/
long port = 0;
int sock_fd = -1;
volatile sig_atomic_t done = false;
std::unordered_set<uint64_t> set;
std::shared_mutex mutex1;
std::shared_mutex mutex2;
struct sockaddr_in src_addr, dest_addr;
long pkt_cnt = 0;
const struct timespec ms = {0, 10^6}; // 1ms
const struct timespec us = {0, 10^3}; // 1us
const struct timespec sec = {1, 0}; // 1sec
int main(int argc, char **argv){
/*Parse Arguments*/
if (argc < 4) {
fprintf(stderr, "Insufficient arguments\n");
usage(argv[0]);
}
//Source Address
in_addr_t src_ip_addr = inet_addr(argv[1]);
if (src_ip_addr == INADDR_NONE) {
fprintf(stderr, "Invalid SRC IP Address\n");
usage(argv[0]);
}
//Destination Address
in_addr_t dest_ip_addr = inet_addr(argv[2]);
if (dest_ip_addr == INADDR_NONE) {
fprintf(stderr, "Invalid DEST IP Address\n");
usage(argv[0]);
}
//Port Number
errno = 0;
port = strtol(argv[3], NULL, 10);
if (errno != 0) {
fprintf(stderr, "Invalid Port Number\n");
usage(argv[0]);
}
/*MISC MAIN THREAD*/
//Register Signal Handling
signal(SIGHUP, sig_handler);
signal(SIGINT, sig_handler);
signal(SIGQUIT, sig_handler);
signal(SIGABRT, sig_handler);
/*SOCKET OPERATIONS*/
src_addr = {AF_INET, htons(port), {src_ip_addr}};
dest_addr = {AF_INET, htons(port), {dest_ip_addr}};
//Open Socket
sock_fd = socket(AF_INET, SOCK_DGRAM, 0);
//Bind Socket
if (bind(sock_fd, (const struct sockaddr*) &src_addr, sizeof(src_addr)) < 0) {
perror("Bind failed");
bail(EXIT_FAILURE);
}
//Set Receive Timeout
const struct timeval tv = {.tv_usec = 1000}; //1 ms
setsockopt(sock_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
/*MAIN STUFF*/
//Start Threads
std::thread t1(send_thread);
std::thread t2(receive_thread);
printf("Entering Main loop\n");
//MAIN LOOP
while(!done){
//Sleep
nanosleep(&sec, NULL); //Wait 1 sec
//Read and reset Packet Counter
mutex2.lock();
long cnt = pkt_cnt;
pkt_cnt = 0;
mutex2.unlock();
//Print Results
printf("Throughput: %ld\n", cnt);
//Debug Messages
if(DEBUG){
mutex1.lock();
cnt = set.size();
mutex1.unlock();
printf("Current set size: %ld\n", cnt);
}
}
printf("Main loop exited. Waiting for threads...\n");
//Wait for Threads
t1.join();
t2.join();
bail(EXIT_SUCCESS);
}
void send_thread(){
uint64_t id = 0;
char buffer[PAYLOAD_SIZE];
uint64_t* p = (uint64_t*) buffer;
long size;
//Magic Number
p[0] = MAGIC;
while(!done){
mutex1.lock();
set.insert(id);
size = set.size();
mutex1.unlock();
p[1] = id;
if (sendto(sock_fd, buffer, PAYLOAD_SIZE, 0, (const struct sockaddr*) &dest_addr, sizeof(dest_addr)) < 0) {
perror("sendto failed");
mutex1.lock();
set.erase(id);
mutex1.unlock();
continue;
}
id++;
//Reached Message Limit
while (!done && MSG_LIMIT != 0 && size >= MSG_LIMIT){
//Wait
nanosleep(&us, NULL); //Wait 1 us
mutex1.lock();
size = set.size();
mutex1.unlock();
}
}
}
void receive_thread(){
char buffer[PAYLOAD_SIZE];
uint64_t* p = (uint64_t*) buffer;
socklen_t addrlen;
struct sockaddr_in src;
while(!done){
//Receive
if (recvfrom(sock_fd, buffer, PAYLOAD_SIZE, MSG_TRUNC, (struct sockaddr*) &src, &addrlen) < 0){
// Error
if (errno != EAGAIN && errno != EWOULDBLOCK) {
perror("rcvfrom() error");
}
continue;
}
//Packet not relevant
if (addrlen != sizeof(dest_addr) && src.sin_family != dest_addr.sin_family && src.sin_addr.s_addr != dest_addr.sin_addr.s_addr && src.sin_port != dest_addr.sin_port && p[0] != MAGIC){
continue;
}
mutex1.lock();
auto it = set.find(p[1]);
if (it != set.end()){
set.erase(it);
mutex1.unlock();
mutex2.lock();
pkt_cnt++;
mutex2.unlock();
} else {
mutex1.unlock();
fprintf(stderr, "ID not in set.\n");
}
}
}
void bail(int code){
//Close Sockets
if (sock_fd != -1) {
close(sock_fd);
}
//DONE
exit(code);
}
void sig_handler(int signum){
fprintf(stderr, "Received Signal %d.\n Exiting...\n", signum);
done = true;
}
void usage(char* name){
printf("USAGE: %s SRC_ADDRESS DEST_ADDRESS PORT\n", name);
printf("SRC_ADDRESS The local source IPv4 Address from which to send and listen to (In x.x.x.x format)\n");
printf("DEST_ADDRESS The remote destination IPv4 Address to which to send to (In x.x.x.x format)\n");
printf("PORT The port number used for both Addresses\n");
bail(EXIT_SUCCESS);
}