#include //socket, inet_aton #include //socket, inet_aton #include //socket #include //htons, ntohs, htonl, ntohl, inet_aton #include //signal #include //std::thread #include //std::unordered_set #include //std::shared_mutex #include //close /*PROTOTYPES*/ void send_thread(); void receive_thread(); void bail(int code); void sig_handler(int signum); void usage(char* name); /*DEFINES*/ //Magic Number used to distinguish our UDP Packets (First 8 Bytes of Payload) #define MAGIC 0xDEADBEEFCAFEFEED //Size Limit of set (This implicitly is the limit of in transit messages expected to return) [0 means no limit] #define MSG_LIMIT 100 //Payload size of UDP messages in Bytes [Has to be >= 16 Bytes] #define PAYLOAD_SIZE 100 //Print extra debug messages #define DEBUG true /*GLOBAL VARIABLES*/ long port = 0; int sock_fd = -1; volatile sig_atomic_t done = false; std::unordered_set set; std::shared_mutex mutex1; std::shared_mutex mutex2; struct sockaddr_in src_addr, dest_addr; long pkt_cnt = 0; const struct timespec ms = {0, 10^6}; // 1ms const struct timespec us = {0, 10^3}; // 1us const struct timespec sec = {1, 0}; // 1sec int main(int argc, char **argv){ /*Parse Arguments*/ if (argc < 4) { fprintf(stderr, "Insufficient arguments\n"); usage(argv[0]); } //Source Address in_addr_t src_ip_addr = inet_addr(argv[1]); if (src_ip_addr == INADDR_NONE) { fprintf(stderr, "Invalid SRC IP Address\n"); usage(argv[0]); } //Destination Address in_addr_t dest_ip_addr = inet_addr(argv[2]); if (dest_ip_addr == INADDR_NONE) { fprintf(stderr, "Invalid DEST IP Address\n"); usage(argv[0]); } //Port Number errno = 0; port = strtol(argv[3], NULL, 10); if (errno != 0) { fprintf(stderr, "Invalid Port Number\n"); usage(argv[0]); } /*MISC MAIN THREAD*/ //Register Signal Handling signal(SIGHUP, sig_handler); signal(SIGINT, sig_handler); signal(SIGQUIT, sig_handler); signal(SIGABRT, sig_handler); /*SOCKET OPERATIONS*/ src_addr = {AF_INET, htons(port), {src_ip_addr}}; dest_addr = {AF_INET, htons(port), {dest_ip_addr}}; //Open Socket sock_fd = socket(AF_INET, SOCK_DGRAM, 0); //Bind Socket if (bind(sock_fd, (const struct sockaddr*) &src_addr, sizeof(src_addr)) < 0) { perror("Bind failed"); bail(EXIT_FAILURE); } //Set Receive Timeout const struct timeval tv = {.tv_usec = 1000}; //1 ms setsockopt(sock_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); /*MAIN STUFF*/ //Start Threads std::thread t1(send_thread); std::thread t2(receive_thread); printf("Entering Main loop\n"); //MAIN LOOP while(!done){ //Sleep nanosleep(&sec, NULL); //Wait 1 sec //Read and reset Packet Counter mutex2.lock(); long cnt = pkt_cnt; pkt_cnt = 0; mutex2.unlock(); //Print Results printf("Throughput: %ld\n", cnt); //Debug Messages if(DEBUG){ mutex1.lock(); cnt = set.size(); mutex1.unlock(); printf("Current set size: %ld\n", cnt); } } printf("Main loop exited. Waiting for threads...\n"); //Wait for Threads t1.join(); t2.join(); bail(EXIT_SUCCESS); } void send_thread(){ uint64_t id = 0; char buffer[PAYLOAD_SIZE]; uint64_t* p = (uint64_t*) buffer; long size; //Magic Number p[0] = MAGIC; while(!done){ mutex1.lock(); set.insert(id); size = set.size(); mutex1.unlock(); p[1] = id; if (sendto(sock_fd, buffer, PAYLOAD_SIZE, 0, (const struct sockaddr*) &dest_addr, sizeof(dest_addr)) < 0) { perror("sendto failed"); mutex1.lock(); set.erase(id); mutex1.unlock(); continue; } id++; //Reached Message Limit while (!done && MSG_LIMIT != 0 && size >= MSG_LIMIT){ //Wait nanosleep(&us, NULL); //Wait 1 us mutex1.lock(); size = set.size(); mutex1.unlock(); } } } void receive_thread(){ char buffer[PAYLOAD_SIZE]; uint64_t* p = (uint64_t*) buffer; socklen_t addrlen; struct sockaddr_in src; while(!done){ //Receive if (recvfrom(sock_fd, buffer, PAYLOAD_SIZE, MSG_TRUNC, (struct sockaddr*) &src, &addrlen) < 0){ // Error if (errno != EAGAIN && errno != EWOULDBLOCK) { perror("rcvfrom() error"); } continue; } //Packet not relevant if (addrlen != sizeof(dest_addr) && src.sin_family != dest_addr.sin_family && src.sin_addr.s_addr != dest_addr.sin_addr.s_addr && src.sin_port != dest_addr.sin_port && p[0] != MAGIC){ continue; } mutex1.lock(); auto it = set.find(p[1]); if (it != set.end()){ set.erase(it); mutex1.unlock(); mutex2.lock(); pkt_cnt++; mutex2.unlock(); } else { mutex1.unlock(); fprintf(stderr, "ID not in set.\n"); } } } void bail(int code){ //Close Sockets if (sock_fd != -1) { close(sock_fd); } //DONE exit(code); } void sig_handler(int signum){ fprintf(stderr, "Received Signal %d.\n Exiting...\n", signum); done = true; } void usage(char* name){ printf("USAGE: %s SRC_ADDRESS DEST_ADDRESS PORT\n", name); printf("SRC_ADDRESS The local source IPv4 Address from which to send and listen to (In x.x.x.x format)\n"); printf("DEST_ADDRESS The remote destination IPv4 Address to which to send to (In x.x.x.x format)\n"); printf("PORT The port number used for both Addresses\n"); bail(EXIT_SUCCESS); }