#include #include #include #include #include #include #include #include #include #include #include #include #include "common.h" #include "message.h" #define N_DRIVER_DATALENGTH 16384 // in units of words #include "transfer.h" #include "buffer.h" #define SOCKET_ERROR -1 #define INVALID_SOCKET -1 #define SOCKET int extern int verbose; extern int receiveDATAbuffer (int, int, char *, int); extern int Tcp_Port[]; extern int SocketConnected[]; // =0 not connected; =1 listening; =2 connected; =-1 not in use extern int TransferSize[]; extern int Done[]; extern int ReadEnable[]; extern int ACKneeded []; extern char * input_buffer; extern STATUS_TABLE *status_table; extern LINK_TABLE *link_table[]; extern BUFFER_TABLE *buffer_table; extern int *LinkStatus; /* &link_table[LinkNum]->link_status */ extern int *LinkState; /* &link_table[LinkNum]->link_state */ extern int *LinkCtrl; /* &link_table[LinkNum]->link_ctrl */ extern int *LinkActive; /* &link_table[LinkNum]->link_isActive */ extern void * shm_bufferarea[]; int nfds = 0; SOCKET listen_socket[MAXPORTS]; SOCKET msg_socket[MAXPORTS]; int rc [MAXPORTS]; #define MINSNDBUFSIZE 32 /* units Kbytes */ #define MAXSNDBUFSIZE 4*1024 /* default max attempted */ #define MINRCVBUFSIZE 32 /* units Kbytes */ #define MAXRCVBUFSIZE 4*1024 /* default max attempted */ int * BufferL; int j; /******************************************************************************* same parameter list as send loops until all data has been sent (or ERROR) Note - we only at most send ACKs so there shuld be no need to retry */ int do_send(SOCKET S, char * p, int l, int t) { int rc, done, left; if (verbose > 2) printf("send %d bytes\n", l); done = 0; left = l; while (done < l) { rc = send(S, p+done, left, t); if (verbose > 2) printf("done %d\n", rc); if (rc == SOCKET_ERROR) return rc; done += rc; left -= rc; } if (verbose > 2) printf("Complete\n"); return l; } /*******************************************************************************/ int ReadBlock(int ID, char *Buffer, int size) { int rc; SOCKET S; if (verbose > 2) printf ("ReadBlock # %d %d\n", ID, SocketConnected[ID]); if (verbose > 2) printf("ReadBlock %d %d bytes \n",ID,size); S = msg_socket[ID]; if (verbose > 2) printf ("ReadBlock # %d %d\n", ID, SocketConnected[ID]); rc = recv(S, Buffer, size, 0); if (verbose > 2) printf ("ReadBlock # %d %d\n", ID, SocketConnected[ID]); if (verbose > 2) { BufferL = (int*)Buffer; printf("ReadBlock 0x%llx >\n", BufferL); for (j = 0; j < 64;) { printf(" 0x%08x", BufferL[j]); j++; if ((j/8)*8 == j) printf("\n"); } printf("\n"); } if (rc == SOCKET_ERROR && errno == EWOULDBLOCK) return 0; /* should not happen because of select */ if (rc == SOCKET_ERROR) { sprintf(message_buffer,"thread %d recv() failed: ",ID); report_message(MSG_FAILURE); ReportError(); close(S); sprintf(message_buffer,"thread %d connection terminated",ID); report_message(MSG_INFORMATION); return -1; } if (rc == 0) { sprintf(message_buffer,"thread %d client closed connection",ID); report_message(MSG_INFORMATION); close(S); return 0; } if (verbose > 2) printf("received %d %d bytes\n",ID,rc); if (verbose > 2) printf ("ReadBlock # %d %d\n", ID, SocketConnected[ID]); return rc; } /*******************************************************************************/ int WriteResponse(int ID, char *Buffer, int size) { int rc; SOCKET S; if (verbose > 2) printf("WriteResponse %d %d bytes ",ID,size); S = msg_socket[ID]; rc = do_send(S, Buffer, size, 0); if (rc == SOCKET_ERROR) { sprintf(message_buffer,"thread %d send() failed: ",ID); report_message(MSG_FAILURE); ReportError(); close(S); sprintf(message_buffer,"thread %d connection terminated",ID); report_message(MSG_INFORMATION); return -1; } if (rc == 0) { sprintf(message_buffer,"thread %d client closed connection",ID); report_message(MSG_INFORMATION); close(S); return 0; } if (verbose > 2) printf("complete %d %d\n",ID,rc); return rc; } /*******************************************************************************/ void StartUp_Socket (int ID) { int yes = 1; char * interface = NULL; struct sockaddr_in local; int rcvsize, rcvreq, sndsize, sndreq, temp; socklen_t length; SocketConnected[ID]=0; memset(&local,0,sizeof(local)); // zero structure local.sin_family = AF_INET; local.sin_addr.s_addr = (!interface)?INADDR_ANY:inet_addr(interface); local.sin_port = htons((unsigned short)Tcp_Port[ID]); // Port MUST be in Network Byte Order listen_socket[ID] = socket(AF_INET, SOCK_STREAM, 0); // TCP socket if (listen_socket[ID] == INVALID_SOCKET) { sprintf(message_buffer,"socket() failed: "); report_message(MSG_FAILURE); ReportError(); exit(16); } if (setsockopt(listen_socket[ID],SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(int)) == SOCKET_ERROR) { ReportError(); sprintf(message_buffer,"setsockopt() failed for SO_REUSEADDR: "); report_message(MSG_FAILURE); exit(16); } /* optimize TCP receive buffer size */ length=sizeof(rcvsize); if (getsockopt(listen_socket[ID],SOL_SOCKET,SO_RCVBUF,(char *)&rcvsize,&length) == SOCKET_ERROR) { ReportError(); sprintf(message_buffer, "getsockopt() failed: "); report_message(MSG_WARNING); } temp = rcvsize; for (rcvreq = MINRCVBUFSIZE; rcvreq <= MAXRCVBUFSIZE; rcvreq = rcvreq+MINRCVBUFSIZE) { rcvsize = rcvreq*1024; if (setsockopt(listen_socket[ID],SOL_SOCKET,SO_RCVBUF,(char *)&rcvsize,sizeof(rcvsize)) == SOCKET_ERROR) { // ReportError(); // sprintf(message_buffer, "setsockopt() failed for SO_RCVBUF: "); // report_message(MSG_WARNING); break; } } length=sizeof(rcvsize); if (getsockopt(listen_socket[ID],SOL_SOCKET,SO_RCVBUF,(char *)&rcvsize,&length) == SOCKET_ERROR) { ReportError(); sprintf(message_buffer, "getsockopt() failed: "); report_message(MSG_WARNING);; } sprintf(message_buffer, "TCP socket receive buffer was %d - now %d",temp,rcvsize); report_message(MSG_INFORMATION); /* repeat for send buffer which seems to also be needed */ length=sizeof(sndsize); if (getsockopt(listen_socket[ID],SOL_SOCKET,SO_SNDBUF,(char *)&sndsize,&length) == SOCKET_ERROR) { ReportError(); sprintf(message_buffer, "getsockopt() failed: "); report_message(MSG_WARNING); } temp = sndsize; for (sndreq = MINSNDBUFSIZE; sndreq <= MAXSNDBUFSIZE; sndreq = sndreq+MINSNDBUFSIZE) { sndsize = sndreq*1024; if (setsockopt(listen_socket[ID],SOL_SOCKET,SO_SNDBUF,(char *)&sndsize,sizeof(sndsize)) == SOCKET_ERROR) { // ReportError(); // sprintf(message_buffer, "setsockopt() failed for SO_RCVBUF: "); // report_message(MSG_WARNING); break; } } length=sizeof(sndsize); if (getsockopt(listen_socket[ID],SOL_SOCKET,SO_SNDBUF,(char *)&sndsize,&length) == SOCKET_ERROR) { ReportError(); sprintf(message_buffer, "getsockopt() failed: "); report_message(MSG_WARNING);; } sprintf(message_buffer, "TCP socket send buffer was %d - now %d",temp,sndsize); report_message(MSG_INFORMATION); // bind() associates a local address and port combination with the socket just created. if (bind(listen_socket[ID],(struct sockaddr*)&local,sizeof(local) ) == SOCKET_ERROR) { sprintf(message_buffer,"bind() failed: "); report_message(MSG_FAILURE); ReportError(); exit(16); } if (listen(listen_socket[ID],5) == SOCKET_ERROR) { sprintf(message_buffer,"listen() failed: "); report_message(MSG_FAILURE); ReportError(); exit(16); } // set the socket to non-blocking mode fcntl(listen_socket[ID], F_SETFL, O_NONBLOCK); if (listen_socket[ID] > nfds) nfds = listen_socket[ID]; *LinkState = LINK_GOING; sprintf(message_buffer, "Merge Data Link thread %d using TCP port %d.", ID, Tcp_Port[ID]); LogMessage(MSG_INFORMATION); } /*******************************************************************************/ void CloseDown_Socket (int ID) { close(listen_socket[ID]); SocketConnected[ID] = -1; *LinkState == LINK_STOPPED; sprintf(message_buffer, "Merge Data Link thread %d closedown using TCP port %d.", ID, Tcp_Port[ID]); LogMessage(MSG_INFORMATION); } /*******************************************************************************/ void Write_Socket (int ID) { char Ack[sizeof(ACK)]; ACK * ackptr; if (verbose > 2) printf ("Write_Socket %d %d\n", ID, SocketConnected[ID]); // // socket is ready to write - send ack // ackptr = (ACK*) &Ack[0]; ackptr->acq_flags = htons(1); if (rc[ID] < 0) { ackptr->acq_code = htons(1); } else { ackptr->acq_code = htons(0); } if (WriteResponse(ID,Ack,sizeof(Ack)) <= 0) { SocketConnected[ID]=0; return; } ACKneeded[ID]=0; if (verbose > 2) printf("Write_Socket complete %d %d\n", ID, SocketConnected[ID]); } /*******************************************************************************/ /* A reminder of the fields typedef struct block_header { /* format of data block uint16_t hdr_flags; /* see below uint16_t hdr_stream; /* =0 for forced ack request or 1=>MAX_STREAM uint16_t hdr_endian; /* =1 transmitted in host byte order uint16_t hdr_spare; uint32_t hdr_sequence; /* for this stream uint32_t hdr_blocklength; /* total length of this block including the header uint32_t hdr_datalength; /* length of user data in the block uint32_t hdr_offset; /* very large blocks may be fragmented uint32_t hdr_id1; /* for spy to locate header =0x19062002 uint32_t hdr_id2; /* for spy to locate header =0x09592400 } HEADER; */ void Read_Socket (int ID) { HEADER * blockptr; int * dataptr; int len; int endian; int stream; int received; if (verbose > 2) {int j;} if (verbose >= 0) printf ("Read_Socket %d %d 0x%llx 0x%llx 0x%llx \n", ID, SocketConnected[ID], input_buffer, blockptr, dataptr); blockptr = (HEADER*) input_buffer; // Note we always use this local static buffer for reading from the TCP socket dataptr = (int*) input_buffer; if (verbose > 2) printf ("Read_Socket %d %d 0x%llx 0x%llx 0x%llx \n", ID, SocketConnected[ID], input_buffer, blockptr, dataptr); // // socket is ready to read, i.e., there is data on the socket. // len = Done[ID]; if (verbose > 2) printf ("Read_Socket %d %d %d %d\n", ID, SocketConnected[ID], TransferSize[ID], len); if (verbose > 2) printf ("Read_Socket # %d %d\n", ID, SocketConnected[ID]); received = ReadBlock(ID,&input_buffer[len],TransferSize[ID]-len); if (verbose > 2) printf ("Read_Socket (received %d) # %d %d\n", received, ID, SocketConnected[ID]); if (received <= 0) { SocketConnected[ID]=0; return; } Done[ID] += received; if (Done[ID] < TransferSize[ID]) return; if (verbose > 2) printf ("Read_Socket (data block complete - Done %d) # %d %d\n", Done[ID], ID, SocketConnected[ID]); // a data block is now complete stream=ntohs(blockptr->hdr_stream); len=ntohl(blockptr->hdr_datalength); endian=(int)blockptr->hdr_endian; if (verbose > 2) printf ("Read_Socket # %d %d\n", ID, SocketConnected[ID]); if (verbose > 2) { for (j = 0; j < 64;) { printf(" 0x%08x", dataptr[j]); j++; if ((j/8)*8 == j) printf("\n"); } printf("\n"); } if (len >= 0) { if (verbose > 1) printf("thread %d block length %d, data length %d, out buffer %llx\n", ID, ntohl(blockptr->hdr_blocklength), len, &input_buffer[sizeof(HEADER)]); if (verbose > 2) printf ("Read_Socket # %d %d\n", ID, SocketConnected[ID]); rc[ID] = receiveDATAbuffer(ID, endian, &input_buffer[sizeof(HEADER)], len); } else { len = ntohl(blockptr->hdr_blocklength); if (len < 1024 || len > (1025*1024)) { sprintf(message_buffer,"buffer size NOT changed to %d",len); } else { TransferSize[ID] = len; sprintf(message_buffer,"buffer size changed to %d",len); } report_message(MSG_INFORMATION); } if (verbose > 2) printf ("Read_Socket # %d %d\n", ID, SocketConnected[ID]); Done[ID] =0; if ((ntohs(blockptr->hdr_flags) & 2) != 2) { ACKneeded[ID]=1; } if (verbose > 2) printf("Read_Socket complete %d %d\n", ID, SocketConnected[ID]); } /*******************************************************************************/ void Listen_Socket (int ID) { struct sockaddr_in from; unsigned int fromlen; int yes=1; if (verbose > 2) printf ("Listen_Socket %d %d\n", ID, SocketConnected[ID]); // // close the previous client socket. // if (SocketConnected[ID]==2) close(msg_socket[ID]); if (SocketConnected[ID]==2) { sprintf(message_buffer,"thread %d closed existing connection", ID); report_message(MSG_INFORMATION); } SocketConnected[ID]=0; fromlen = sizeof(from); msg_socket[ID] = accept(listen_socket[ID],(struct sockaddr*)&from, &fromlen); if (msg_socket[ID] == INVALID_SOCKET) { sprintf(message_buffer,"thread %d accept() failed: ", ID); report_message(MSG_FAILURE); ReportError(); exit(16); } sprintf(message_buffer,"thread %d accepted connection from %s, port %d", ID, inet_ntoa(from.sin_addr),htons(from.sin_port)); report_message(MSG_INFORMATION); if (msg_socket[ID] > nfds) nfds = msg_socket[ID]; SocketConnected[ID]=2; ACKneeded[ID]=0; TransferSize[ID]=1024; Done[ID]=0; ReadEnable[ID] = 1; } /*******************************************************************************/ void link_init (int ID) { int i; int index; BUFFER_HEADER * baseaddress; baseaddress = (BUFFER_HEADER *) (char*) shm_bufferarea[ID]; baseaddress->buffer_offset = sizeof(BUFFER_HEADER); baseaddress->buffer_number = NBLOCKS; baseaddress->buffer_length = status_table->data_buffer_size * 1024; baseaddress->buffer_next = 0; baseaddress->buffer_max = MAX_BUFFERS; baseaddress->buffer_spare1 = 0; baseaddress->buffer_spare2 = 0; baseaddress->buffer_spare3 = 0; baseaddress->buffer_currentage = 0; for (i = 0; i < MAX_BUFFERS; i++) baseaddress->buffer_status[ID] = 0; for (i = 0; i < MAX_BUFFERS; i++) baseaddress->buffer_age[ID] = 0; SocketConnected[ID]=-1; if (verbose > 3) { BufferL = (int*)baseaddress; printf("link_init %d 0x%llx >\n", ID, BufferL); for (j = 0; j < 64;) { printf(" 0x%08x", BufferL[j]); j++; if ((j/8)*8 == j) printf("\n"); } printf("\n"); } } /*******************************************************************************/ void link_server (int ID) { char *interface= NULL; int i; int r; fd_set readfds, writefds; int Signal; int Waiting = TRUE; struct timeval timeout; verbose = 3; sprintf(message_buffer,"Starting the network interface for thread %d", ID); report_message(MSG_INFORMATION); link_init(ID); // We only accept once connection at a time for each port. As soon as another client connects on a port we // disconnect the first one and start talking to the new client on that port. Waiting = TRUE; for (;;) { sprintf(message_buffer,"Entering server loop for thread %d", ID); report_message(MSG_INFORMATION); printf("link %d report; %d %d %d %d\n", ID, link_table[ID]->link_ctrl, link_table[ID]->link_state, *LinkCtrl, *LinkState); if (Waiting == TRUE) { Signal = wait_for_wakeup (); sprintf (message_buffer,"Received signal %d", Signal); report_message(MSG_TRACE); } if (*LinkCtrl == LINK_GO && *LinkState == LINK_DOINGGO) {StartUp_Socket(ID); Waiting = FALSE;} if (*LinkCtrl == LINK_STOP && *LinkState == LINK_DOINGSTOP) {CloseDown_Socket(ID); Waiting = FALSE;} // // A socket in the listen() state becomes ready to read when a // client connects to it. An accept() will complete without blocking. // Since select sets the sockets that are ready to be read from or // written to, we have to include listen_socket in the fdset each time // through the loop. // if (SocketConnected[ID]==0) { sprintf(message_buffer,"thread %d listening on port %d", ID, Tcp_Port[ID]); report_message(MSG_INFORMATION); SocketConnected[ID] = 1; ReadEnable[ID] =0; ACKneeded[ID] =0; } do { printf ("in loop awaiting connect\n"); FD_ZERO(&readfds); FD_ZERO(&writefds); printf ("connecting #1 %d %d %d\n", SocketConnected[ID] ,ReadEnable[ID], ACKneeded[ID]); if (SocketConnected[ID] >= 1) FD_SET(listen_socket[ID],&readfds); if (SocketConnected[ID] == 2) { if (ReadEnable[ID]) FD_SET(msg_socket[ID],&readfds); if (ACKneeded[ID]) FD_SET(msg_socket[ID],&writefds); /* send ACK: enable write */ } printf ("calling select %d %d %d\n", SocketConnected[ID] ,ReadEnable[ID], ACKneeded[ID]); timeout.tv_sec = 5; timeout.tv_usec = 0; r = select(nfds+1,&readfds,&writefds,NULL,&timeout); printf ("select returned %d %d\n",r,errno); printf ("connecting #2 %d %d %d\n", SocketConnected[ID] ,ReadEnable[ID], ACKneeded[ID]); } while (r == SOCKET_ERROR && errno == EINTR); if (r == SOCKET_ERROR) { sprintf(message_buffer,"select() failed: "); report_message(MSG_FAILURE); ReportError(); } if (r == 0){ sprintf(message_buffer,"Select returned no fds ready"); report_message(MSG_ERROR); } i = ID; if (FD_ISSET(listen_socket[ID], &readfds)) {Listen_Socket(i);} printf ("connecting #3 %d %d %d\n", SocketConnected[ID] ,ReadEnable[ID], ACKneeded[ID]); if (SocketConnected[ID] == 2) { if (ReadEnable[ID] && FD_ISSET(msg_socket[ID], &readfds)) Read_Socket(i); if (ACKneeded[ID] && FD_ISSET(msg_socket[ID], &writefds)) Write_Socket(i); } } /* end of for(;;) loop */ } /*******************************************************************************/