#include //printf #include //open #include //open #include //open #include //close, getopt #include //perror #include //exit #include //mmap #include //signal #include //assert #include //socket, inet_aton #include //socket, inet_aton #include //socket #include //htons, ntohs, htonl, ntohl, inet_aton #include //nanosleep #include //bool #include //Multithreading #include "hps_0.h" /* DEVELOPER NOTE This is an attempt to make udp_forwarder multithreaded, but for some reason I cannot make it faster than the single thread variant. Initially I thought the slow down was due to reading and writing to/from the FPGA happening at the same time, since there is only one AXI master (meaning that only one operation can happen at any time) and mixing of read and write operations somehow interferes with each other. But adding mutexes makes it actually slower! After much experimenting this are the results I got: * Single Thread Variant 16k packets/s * Multi Thread Variant 7.1k packets/s * Multi Thread Variant with no sleep 13.6k packets/s * Multi Thread Variant with mutexes 6.9k packets/s * Multi Thread Variant with mutexes and no sleep 12.1k packets/s */ /*PROTOTYPES*/ void usage(char* name); void bail(int code); void sig_handler(int signum); void* send_thread(void* vargp); /*DEFINES*/ #define LW_FPGA_SLAVES_BASE 0xFF200000 #define LW_FPGA_SLAVES_END 0xFF3FFFFF #define LW_FPGA_SLAVES_SPAN 0x00200000 #define BUF_LENGTH 65536 #define NUM_SOCKETS 5 #define PORT_CONFIG_PB 7400 #define PORT_CONFIG_DG 250 #define PORT_CONFIG_PG 2 #define PORT_CONFIG_D0 0 #define PORT_CONFIG_D1 10 #define PORT_CONFIG_D2 1 #define PORT_CONFIG_D3 11 /*GLOBAL VARIABLES*/ //NOTE: use void* Pointer to avoid pointer arithmetic during offset addition volatile sig_atomic_t done = false; void* lw_bridge = NULL; int mem_fd = -1; int sock_fd[NUM_SOCKETS]; uint8_t endian = -1; //0 Big Endian, 1 Little Endian in_addr_t src_addr; long src_did; bool quiet = false; struct sockaddr_in bind_addr[NUM_SOCKETS]; uint32_t* ADDR_0; uint32_t* ADDR_1; uint32_t* ADDR_2; pthread_mutex_t mutex; /*CONSTANTS*/ const struct timespec ms = {0, 10^6}; // 1ms const struct timespec us = {0, 10^3}; // 1us const struct timespec sec = {1, 0}; // 1sec int main(int argc, char** argv) { int opt; while ((opt = getopt(argc, argv, "q")) != -1) { switch (opt) { case 'q': quiet = true; break; default: /* invalid option */ fprintf(stderr, "Invalid Option\n"); usage(argv[0]); break; } } if (argc-optind < 2) { fprintf(stderr, "Insufficient arguments\n"); usage(argv[0]); } src_addr = inet_addr(argv[optind]); if (src_addr == INADDR_NONE) { fprintf(stderr, "Invalid IP Address\n"); usage(argv[0]); } errno = 0; src_did = strtol(argv[optind+1], NULL, 10); if (errno != 0) { fprintf(stderr, "Invalid Domain ID\n"); usage(argv[0]); } // Initialize Sockets to aid bail function for (int i = 0; i < NUM_SOCKETS; i++) { sock_fd[i] = -1; } //Register Signal Handling signal(SIGHUP, sig_handler); signal(SIGINT, sig_handler); signal(SIGQUIT, sig_handler); signal(SIGABRT, sig_handler); // Check Endianness { int i = 1; endian = (* (char*) &i == 1) ? 1 : 0; printf("ENDIANNESS: %d\n", endian); } if (endian != 1) { fprintf(stderr, "Program is tested on Little Endian Systems\n"); bail(EXIT_FAILURE); } mem_fd = open("/dev/mem", O_RDWR | O_SYNC); if (mem_fd < 0) { perror("Could not open /dev/mem"); bail(EXIT_FAILURE); } lw_bridge = (uint32_t*)mmap(NULL, LW_FPGA_SLAVES_SPAN, PROT_READ | PROT_WRITE, MAP_SHARED, mem_fd, LW_FPGA_SLAVES_BASE); if (lw_bridge == MAP_FAILED) { perror("Could not mmap /dev/mem"); bail(EXIT_FAILURE); } // Polling Address ADDR_0 = (uint32_t*)(lw_bridge + TEST_FPGA_0_BASE); // Read Address ADDR_1 = ADDR_0 + 1; //Pointer Arithmetic // Write Address ADDR_2 = ADDR_0 + 2; //Pointer Arithmetic for (int i = 0; i < NUM_SOCKETS; i++) { sock_fd[i] = socket(AF_INET, SOCK_DGRAM, 0); if (sock_fd[i] < 0) { fprintf(stderr, "Socket %d", i); perror("Could not open socket"); bail(EXIT_FAILURE); } } bind_addr[0] = (struct sockaddr_in){AF_INET, htons(PORT_CONFIG_PB + PORT_CONFIG_D0 + PORT_CONFIG_DG * src_did), {inet_addr("239.255.0.1")}}; bind_addr[1] = (struct sockaddr_in){AF_INET, htons(PORT_CONFIG_PB + PORT_CONFIG_D0 + PORT_CONFIG_DG * src_did), {src_addr}}; bind_addr[2] = (struct sockaddr_in){AF_INET, htons(PORT_CONFIG_PB + PORT_CONFIG_D1 + PORT_CONFIG_DG * src_did), {src_addr}}; bind_addr[3] = (struct sockaddr_in){AF_INET, htons(PORT_CONFIG_PB + PORT_CONFIG_D2 + PORT_CONFIG_DG * src_did), {src_addr}}; bind_addr[4] = (struct sockaddr_in){AF_INET, htons(PORT_CONFIG_PB + PORT_CONFIG_D3 + PORT_CONFIG_DG * src_did), {src_addr}}; printf("bind_addr[0]: %s:%d\n",inet_ntoa(bind_addr[0].sin_addr),ntohs(bind_addr[0].sin_port)); printf("bind_addr[1]: %s:%d\n",inet_ntoa(bind_addr[1].sin_addr),ntohs(bind_addr[1].sin_port)); printf("bind_addr[2]: %s:%d\n",inet_ntoa(bind_addr[2].sin_addr),ntohs(bind_addr[2].sin_port)); printf("bind_addr[3]: %s:%d\n",inet_ntoa(bind_addr[3].sin_addr),ntohs(bind_addr[3].sin_port)); printf("bind_addr[4]: %s:%d\n",inet_ntoa(bind_addr[4].sin_addr),ntohs(bind_addr[4].sin_port)); for (int i = 0; i < NUM_SOCKETS; i++) { if (bind(sock_fd[i], (const struct sockaddr*) &bind_addr[i], sizeof(bind_addr[i])) < 0) { fprintf(stderr, "bind_addr[%d]", i); perror("Bind failed"); bail(EXIT_FAILURE); } } if (pthread_mutex_init(&mutex,NULL) != 0){ perror("pthread_mutex_init failed"); bail(EXIT_FAILURE); } pthread_t thread_id; if (pthread_create(&thread_id, NULL, send_thread, NULL) != 0){ perror("pthread_create failed"); bail(EXIT_FAILURE); } struct sockaddr_in src, dest; char buffer[BUF_LENGTH]; int n, addrlen, j; uint32_t* p; printf("Entering Receive Loop\n"); while (!done) { /*UDP INPUT*/ for (j = 0; j < NUM_SOCKETS; j++) { addrlen = sizeof(src); n = recvfrom(sock_fd[j], buffer, BUF_LENGTH, MSG_DONTWAIT | MSG_TRUNC, (struct sockaddr*) &src, &addrlen); if (n < 0) { // Error if (errno != EAGAIN && errno != EWOULDBLOCK) { perror("rcvfrom() error"); break; } } // Exit Condition else if (n > 0) { break; } } if (n > 0) { //Buffer not 4-Byte aligned if ((n % 4) != 0) { // Add zero byte padding for(int offset = 4 - (n % 4); offset > 0; offset--){ buffer[n+offset-1] = 0; } // Convert n to Word Count n = (n / 4) + 1; } else { // Convert n to Word Count n = n / 4; } //SANITY CHECK if (addrlen != sizeof(src)){ fprintf(stderr, "rcvfrom() returned unexpected addrlen.\n"); continue; } /* while (pthread_mutex_lock(&mutex) != 0){ perror("pthread_mutex_lock failed"); } */ // Write SRC Address *ADDR_2 = src.sin_addr.s_addr; // Write DEST Address *ADDR_2 = bind_addr[j].sin_addr.s_addr; // Write UDP Ports *ADDR_2 = ((bind_addr[j].sin_port << 16) & 0xFFFF0000) | (src.sin_port & 0xFFFF); // Write Packet Length *ADDR_2 = htonl(n); // Write Packet p = (uint32_t*) buffer; for (int i = 0; i < n; i++) { *ADDR_2 = p[i]; } /* while (pthread_mutex_unlock(&mutex) != 0){ perror("pthread_mutex_lock failed"); } */ if (!quiet){ printf("Packet received (%d Words)\n", n); printf(" Src: %s:%d\n",inet_ntoa(bind_addr[j].sin_addr),ntohs(bind_addr[j].sin_port)); } } /* else { nanosleep(&us, NULL); //Wait 1 us } */ } printf("Receive Loop exited.\n"); printf("Waiting for threads...\n"); pthread_join(thread_id, NULL); bail(EXIT_SUCCESS); } void* send_thread(void* vargp){ struct sockaddr_in src, dest; char buffer[BUF_LENGTH]; int n, addrlen, j; uint32_t* p; uint32_t tmp; printf("Entering Send Loop\n"); while (!done) { /*UDP OUTPUT*/ // FPGA has output /* while (pthread_mutex_lock(&mutex) != 0){ perror("pthread_mutex_lock failed"); } */ if (*ADDR_0 != 0) { src.sin_family = AF_INET; dest.sin_family = AF_INET; // Read SRC Address src.sin_addr.s_addr = *ADDR_1; // Read DEST Address dest.sin_addr.s_addr = *ADDR_1; // Read UDP Ports tmp = *ADDR_1; dest.sin_port = ((tmp >> 16) & 0xFFFF); src.sin_port = (tmp & 0xFFFF); // Read Packet Length n = ntohl(*ADDR_1); // Read Packet p = (uint32_t*) buffer; for(int i = 0; i < n; i++) { p[i] = *ADDR_1; } /* while (pthread_mutex_unlock(&mutex) != 0){ perror("pthread_mutex_lock failed"); } */ // Convert n to Byte Count n = n * 4; // Select correct socket j = NUM_SOCKETS; for (int i = 0; i < NUM_SOCKETS; i++) { if (bind_addr[i].sin_addr.s_addr == src.sin_addr.s_addr && bind_addr[i].sin_port == src.sin_port) { j = i; break; } } // No bind_addr Match if (j == NUM_SOCKETS) { fprintf(stderr, "Provided SRC does not have respective bind_addr\n"); fprintf(stderr, "SRC: %s %d\n", inet_ntoa(src.sin_addr), ntohs(src.sin_port)); continue; } n = sendto(sock_fd[j], buffer, n, 0, (const struct sockaddr*) &dest, sizeof(dest)); if (n < 0) { perror("sendto failed"); continue; } if (!quiet) { printf("Packet sent (%d Words)\n", n/4); printf(" Src: %s:%d\n",inet_ntoa(bind_addr[j].sin_addr),ntohs(bind_addr[j].sin_port)); printf(" Dest: %s:%d\n",inet_ntoa(dest.sin_addr),ntohs(dest.sin_port)); } } /* else { while (pthread_mutex_unlock(&mutex) != 0){ perror("pthread_mutex_lock failed"); } } */ /* nanosleep(&us, NULL); //Wait 1 us */ } printf("Send Loop exited.\n"); } void bail(int code){ // Close File Descriptors if (mem_fd != -1){ close(mem_fd); } for (int i = 0; i < NUM_SOCKETS; i++) { if (sock_fd[i] != -1) { close(sock_fd[i]); } } // Unmap /dev/mem if (lw_bridge != NULL){ munmap(lw_bridge, LW_FPGA_SLAVES_SPAN); } exit(code); } void sig_handler(int signum){ fprintf(stderr, "Received Signal %d.\n Exiting...\n", signum); done = true; } void usage(char* name){ printf("USAGE: %s [-q] ADDRESS DOMAIN_ID\n", name); printf("-q Quiet Flag. When specified, no information is printed during the main loops\n"); printf("ADDRESS The SRC IPv4 Address of the FPGA implementation (In x.x.x.x format)\n"); printf(" NOTE: The system has to have a interface with the same address\n"); printf("DOMAIN_ID The Domain ID of the FPGA implementation\n"); bail(EXIT_SUCCESS); }