#include #include #include #include #include #include #include #include #include #include #include #include #include #include "common.h" #include "message.h" #define N_DRIVER_DATALENGTH 16384 // in units of words #include "transfer.h" #include "buffer.h" #define SOCKET_ERROR -1 #define INVALID_SOCKET -1 #define SOCKET int extern int verbose; extern int data_transfer (int, int, char *, int); extern int Tcp_Port[]; extern int SocketConnected[]; // =0 not connected; =1 listening; =2 connected; =-1 not in use extern int TransferSize[]; extern int BlockSequence[]; extern int Done[]; extern int ReadEnable[]; extern int ACKneeded []; extern char * input_buffer; extern STATUS_TABLE *status_table; extern LINK_TABLE *link_table[]; extern BUFFER_TABLE *buffer_table; extern int * LinkState; extern void * shm_bufferarea[]; int nfds = 0; SOCKET listen_socket[MAXPORTS]; SOCKET msg_socket[MAXPORTS]; int rc [MAXPORTS]; #define MINSNDBUFSIZE 32 /* units Kbytes */ #define MAXSNDBUFSIZE 4*1024 /* default max attempted */ #define MINRCVBUFSIZE 32 /* units Kbytes */ #define MAXRCVBUFSIZE 4*1024 /* default max attempted */ int * BufferL; int j; /******************************************************************************* same parameter list as send loops until all data has been sent (or ERROR) Note - we only at most send ACKs so there shuld be no need to retry */ int do_send(SOCKET S, char * p, int l, int t) { int rc, done, left; if (verbose > 2) printf("send %d bytes\n", l); done = 0; left = l; while (done < l) { rc = send(S, p+done, left, t); if (verbose > 2) printf("done %d\n", rc); if (rc == SOCKET_ERROR) return rc; done += rc; left -= rc; } if (verbose > 2) printf("Complete\n"); return l; } /*******************************************************************************/ int ReadBlock(int i, char *Buffer, int size) { int rc; SOCKET S; if (verbose > 2) printf ("ReadBlock # %d %d\n", i, SocketConnected[i]); if (verbose > 2) printf("ReadBlock %d %d bytes \n",i,size); S = msg_socket[i]; if (verbose > 2) printf ("ReadBlock # %d %d\n", i, SocketConnected[i]); rc = recv(S, Buffer, size, 0); if (verbose > 2) printf ("ReadBlock # %d %d\n", i, SocketConnected[i]); if (verbose > 2) { BufferL = (int*)Buffer; printf("ReadBlock 0x%llx >\n", BufferL); for (j = 0; j < 64;) { printf(" 0x%08x", BufferL[j]); j++; if ((j/8)*8 == j) printf("\n"); } printf("\n"); } if (rc == SOCKET_ERROR && errno == EWOULDBLOCK) return 0; /* should not happen because of select */ if (rc == SOCKET_ERROR) { sprintf(message_buffer,"thread %d recv() failed: ",i); report_message(MSG_FAILURE); ReportError(); close(S); sprintf(message_buffer,"thread %d connection terminated",i); report_message(MSG_INFORMATION); return -1; } if (rc == 0) { sprintf(message_buffer,"thread %d client closed connection",i); report_message(MSG_INFORMATION); close(S); return 0; } if (verbose > 2) printf("received %d %d bytes\n",i,rc); if (verbose > 2) printf ("ReadBlock # %d %d\n", i, SocketConnected[i]); return rc; } /*******************************************************************************/ int WriteResponse(int i, char *Buffer, int size) { int rc; SOCKET S; if (verbose > 2) printf("WriteResponse %d %d bytes ",i,size); S = msg_socket[i]; rc = do_send(S, Buffer, size, 0); if (rc == SOCKET_ERROR) { sprintf(message_buffer,"thread %d send() failed: ",i); report_message(MSG_FAILURE); ReportError(); close(S); sprintf(message_buffer,"thread %d connection terminated",i); report_message(MSG_INFORMATION); return -1; } if (rc == 0) { sprintf(message_buffer,"thread %d client closed connection",i); report_message(MSG_INFORMATION); close(S); return 0; } if (verbose > 2) printf("complete %d %d\n",i,rc); return rc; } /*******************************************************************************/ void StartUp_Socket (int i) { int yes = 1; char * interface = NULL; struct sockaddr_in local; int rcvsize, rcvreq, sndsize, sndreq, temp; socklen_t length; SocketConnected[i]=0; memset(&local,0,sizeof(local)); // zero structure local.sin_family = AF_INET; local.sin_addr.s_addr = (!interface)?INADDR_ANY:inet_addr(interface); local.sin_port = htons((unsigned short)Tcp_Port[i]); // Port MUST be in Network Byte Order listen_socket[i] = socket(AF_INET, SOCK_STREAM, 0); // TCP socket if (listen_socket[i] == INVALID_SOCKET) { sprintf(message_buffer,"socket() failed: "); report_message(MSG_FAILURE); ReportError(); exit(16); } if (setsockopt(listen_socket[i],SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(int)) == SOCKET_ERROR) { ReportError(); sprintf(message_buffer,"setsockopt() failed for SO_REUSEADDR: "); report_message(MSG_FAILURE); exit(16); } /* optimize TCP receive buffer size */ length=sizeof(rcvsize); if (getsockopt(listen_socket[i],SOL_SOCKET,SO_RCVBUF,(char *)&rcvsize,&length) == SOCKET_ERROR) { ReportError(); sprintf(message_buffer, "getsockopt() failed: "); report_message(MSG_WARNING); } temp = rcvsize; for (rcvreq = MINRCVBUFSIZE; rcvreq <= MAXRCVBUFSIZE; rcvreq = rcvreq+MINRCVBUFSIZE) { rcvsize = rcvreq*1024; if (setsockopt(listen_socket[i],SOL_SOCKET,SO_RCVBUF,(char *)&rcvsize,sizeof(rcvsize)) == SOCKET_ERROR) { // ReportError(); // sprintf(message_buffer, "setsockopt() failed for SO_RCVBUF: "); // report_message(MSG_WARNING); break; } } length=sizeof(rcvsize); if (getsockopt(listen_socket[i],SOL_SOCKET,SO_RCVBUF,(char *)&rcvsize,&length) == SOCKET_ERROR) { ReportError(); sprintf(message_buffer, "getsockopt() failed: "); report_message(MSG_WARNING);; } sprintf(message_buffer, "TCP socket receive buffer was %d - now %d",temp,rcvsize); report_message(MSG_INFORMATION); /* repeat for send buffer which seems to also be needed */ length=sizeof(sndsize); if (getsockopt(listen_socket[i],SOL_SOCKET,SO_SNDBUF,(char *)&sndsize,&length) == SOCKET_ERROR) { ReportError(); sprintf(message_buffer, "getsockopt() failed: "); report_message(MSG_WARNING); } temp = sndsize; for (sndreq = MINSNDBUFSIZE; sndreq <= MAXSNDBUFSIZE; sndreq = sndreq+MINSNDBUFSIZE) { sndsize = sndreq*1024; if (setsockopt(listen_socket[i],SOL_SOCKET,SO_SNDBUF,(char *)&sndsize,sizeof(sndsize)) == SOCKET_ERROR) { // ReportError(); // sprintf(message_buffer, "setsockopt() failed for SO_RCVBUF: "); // report_message(MSG_WARNING); break; } } length=sizeof(sndsize); if (getsockopt(listen_socket[i],SOL_SOCKET,SO_SNDBUF,(char *)&sndsize,&length) == SOCKET_ERROR) { ReportError(); sprintf(message_buffer, "getsockopt() failed: "); report_message(MSG_WARNING);; } sprintf(message_buffer, "TCP socket send buffer was %d - now %d",temp,sndsize); report_message(MSG_INFORMATION); // bind() associates a local address and port combination with the socket just created. if (bind(listen_socket[i],(struct sockaddr*)&local,sizeof(local) ) == SOCKET_ERROR) { sprintf(message_buffer,"bind() failed: "); report_message(MSG_FAILURE); ReportError(); exit(16); } if (listen(listen_socket[i],5) == SOCKET_ERROR) { sprintf(message_buffer,"listen() failed: "); report_message(MSG_FAILURE); ReportError(); exit(16); } // set the socket to non-blocking mode fcntl(listen_socket[i], F_SETFL, O_NONBLOCK); if (listen_socket[i] > nfds) nfds = listen_socket[i]; link_table[i]->link_state == LINK_GOING; sprintf(message_buffer, "Merge Data Link thread %d startup using TCP port %d.", i, Tcp_Port[i]); LogMessage(MSG_INFORMATION); } /*******************************************************************************/ void CloseDown_Socket (int i) { close(listen_socket[i]); SocketConnected[i] = -1; link_table[i]->link_state == LINK_STOPPED; sprintf(message_buffer, "Merge Data Link thread %d closedown using TCP port %d.", i, Tcp_Port[i]); LogMessage(MSG_INFORMATION); } /*******************************************************************************/ void Write_Socket (int i) { char Ack[sizeof(ACK)]; ACK * ackptr; if (verbose > 2) printf ("Write_Socket %d %d\n", i, SocketConnected[i]); // // socket is ready to write - send ack // ackptr = (ACK*) &Ack[0]; ackptr->acq_flags = htons(1); if (rc[i] < 0) { ackptr->acq_code = htons(1); } else { ackptr->acq_code = htons(0); } if (WriteResponse(i,Ack,sizeof(Ack)) <= 0) { SocketConnected[i]=0; return; } ACKneeded[i]=0; if (verbose > 2) printf("Write_Socket complete %d %d\n", i, SocketConnected[i]); } /*******************************************************************************/ /* A reminder of the fields typedef struct block_header { /* format of data block uint16_t hdr_flags; /* see below uint16_t hdr_stream; /* =0 for forced ack request or 1=>MAX_STREAM uint16_t hdr_endian; /* =1 transmitted in host byte order uint16_t hdr_spare; uint32_t hdr_sequence; /* for this stream uint32_t hdr_blocklength; /* total length of this block including the header uint32_t hdr_datalength; /* length of user data in the block uint32_t hdr_offset; /* very large blocks may be fragmented uint32_t hdr_id1; /* for spy to locate header =0x19062002 uint32_t hdr_id2; /* for spy to locate header =0x09592400 } HEADER; */ void Read_Socket (int i) { HEADER * blockptr; int * dataptr; int len; int endian; int stream; int received; if (verbose > 2) { int j; } printf ("Read_Socket %d %d 0x%llx 0x%llx 0x%llx \n", i, SocketConnected[i], input_buffer, blockptr, dataptr); blockptr = (HEADER*) input_buffer; // Note we always use this local static buffer for reading from the TCP socket dataptr = (int*) input_buffer; if (verbose > 2) printf ("Read_Socket %d %d 0x%llx 0x%llx 0x%llx \n", i, SocketConnected[i], input_buffer, blockptr, dataptr); // // socket is ready to read, i.e., there is data on the socket. // len = Done[i]; if (verbose > 2) printf ("Read_Socket %d %d %d %d\n", i, SocketConnected[i], TransferSize[i], len); if (verbose > 2) printf ("Read_Socket # %d %d\n", i, SocketConnected[i]); received = ReadBlock(i,&input_buffer[len],TransferSize[i]-len); if (verbose > 2) printf ("Read_Socket (received %d) # %d %d\n", received, i, SocketConnected[i]); if (received <= 0) { SocketConnected[i]=0; return; } Done[i] += received; if (Done[i] < TransferSize[i]) return; if (verbose > 2) printf ("Read_Socket (data block complete - Done %d) # %d %d\n", Done[i], i, SocketConnected[i]); // a data block is now complete stream=ntohs(blockptr->hdr_stream); len=ntohl(blockptr->hdr_datalength); endian=(int)blockptr->hdr_endian; if (verbose > 2) printf ("Read_Socket # %d %d\n", i, SocketConnected[i]); if (verbose > 2) { for (j = 0; j < 64;) { printf(" 0x%08x", dataptr[j]); j++; if ((j/8)*8 == j) printf("\n"); } printf("\n"); } if (len >= 0) { if (verbose > 1) printf("thread %d block length %d, data length %d, out buffer %llx\n", i, ntohl(blockptr->hdr_blocklength), len, &input_buffer[sizeof(HEADER)]); if (verbose > 2) printf ("Read_Socket # %d %d\n", i, SocketConnected[i]); rc[i] = data_transfer(i, endian, &input_buffer[sizeof(HEADER)], len); } else { len = ntohl(blockptr->hdr_blocklength); if (len < 1024 || len > (1025*1024)) { sprintf(message_buffer,"buffer size NOT changed to %d",len); } else { TransferSize[i] = len; sprintf(message_buffer,"buffer size changed to %d",len); } report_message(MSG_INFORMATION); } if (verbose > 2) printf ("Read_Socket # %d %d\n", i, SocketConnected[i]); Done[i] =0; if ((ntohs(blockptr->hdr_flags) & 2) != 2) { ACKneeded[i]=1; } if (verbose > 2) printf("Read_Socket complete %d %d\n", i, SocketConnected[i]); } /*******************************************************************************/ void Listen_Socket (int i) { struct sockaddr_in from; unsigned int fromlen; int yes=1; if (verbose > 2) printf ("Listen_Socket %d %d\n", i, SocketConnected[i]); // // close the previous client socket. // if (SocketConnected[i]==2) close(msg_socket[i]); if (SocketConnected[i]==2) { sprintf(message_buffer,"thread %d closed existing connection", i); report_message(MSG_INFORMATION); } SocketConnected[i]=0; fromlen = sizeof(from); msg_socket[i] = accept(listen_socket[i],(struct sockaddr*)&from, &fromlen); if (msg_socket[i] == INVALID_SOCKET) { sprintf(message_buffer,"thread %d accept() failed: ", i); report_message(MSG_FAILURE); ReportError(); exit(16); } sprintf(message_buffer,"thread %d accepted connection from %s, port %d", i, inet_ntoa(from.sin_addr),htons(from.sin_port)); report_message(MSG_INFORMATION); if (msg_socket[i] > nfds) nfds = msg_socket[i]; SocketConnected[i]=2; ACKneeded[i]=0; TransferSize[i]=1024; Done[i]=0; ReadEnable[i] = 1; } /*******************************************************************************/ void link_init (int ID) { int i; int index; BUFFER_HEADER * baseaddress; baseaddress = (BUFFER_HEADER *) (char*) shm_bufferarea[ID]; baseaddress->buffer_offset = sizeof(BUFFER_HEADER); baseaddress->buffer_number = NBLOCKS; baseaddress->buffer_length = status_table->data_buffer_size * 1024; baseaddress->buffer_next = 0; baseaddress->buffer_max = MAX_BUFFERS; baseaddress->buffer_spare1 = 0; baseaddress->buffer_spare2 = 0; baseaddress->buffer_spare3 = 0; baseaddress->buffer_currentage = 0; for (i = 0; i < MAX_BUFFERS; i++) baseaddress->buffer_status[i] = 0; for (i = 0; i < MAX_BUFFERS; i++) baseaddress->buffer_age[i] = 0; for (i=0; i 2) { BufferL = (int*)baseaddress; printf("link_init %d 0x%llx >\n", ID, BufferL); for (j = 0; j < 64;) { printf(" 0x%08x", BufferL[j]); j++; if ((j/8)*8 == j) printf("\n"); } printf("\n"); } } /*******************************************************************************/ void link_server (int ID) { char *interface= NULL; int i; int r; fd_set readfds, writefds; int Signal; int Waiting = FALSE; int tryAgain; int linkNum; verbose = 3; struct timeval timeout; timeout.tv_sec = 5; timeout.tv_usec = 0; sprintf(message_buffer,"Starting the network interface for thread %d", ID); report_message(MSG_INFORMATION); link_init(ID); sprintf(message_buffer,"Entering server loop for thread %d", ID); report_message(MSG_INFORMATION); for (;;) { Signal = wait_for_wakeup (); sprintf (message_buffer,"Received signal %d", Signal); report_message(MSG_TRACE); /* if (!(tryAgain || Signal == SIGWAKEUP)) {continue;} if (tryAgain) { sprintf (message_buffer,"Trying again"); report_message(MSG_TRACE); } else { sprintf (message_buffer,"Received signal SIGWAKEUP"); report_message(MSG_TRACE); } */ /* signal from Master task - change in run mode */ printf("link %d report; %d %d\n", ID, link_table[ID]->link_ctrl, link_table[ID]->link_state); if (Tcp_Port[ID]==0) {SocketConnected[ID]=-1;} Waiting = TRUE; if (link_table[ID]->link_ctrl == LINK_GO && link_table[ID]->link_state == LINK_DOINGGO) {StartUp_Socket(ID); Waiting = FALSE;} if (link_table[ID]->link_ctrl == LINK_STOP && link_table[ID]->link_state == LINK_DOINGSTOP) {CloseDown_Socket(ID); Waiting = FALSE;} // We only accept once connection at a time for each port. As soon as another client connects on a port we // disconnect the first one and start talking to the new client on that port. // // A socket in the listen() state becomes ready to read when a // client connects to it. An accept() will complete without blocking. // Since select sets the sockets that are ready to be read from or // written to, we have to include listen_socket in the fdset each time // through the loop. // // for (i=0; i= 1) FD_SET(listen_socket[i],&readfds); if (SocketConnected[i] == 2) { if (ReadEnable[i]) FD_SET(msg_socket[i],&readfds); if (ACKneeded[i]) FD_SET(msg_socket[i],&writefds); /* send ACK: enable write */ } // } printf ("calling select\n"); timeout.tv_sec = 5; timeout.tv_usec = 0; r = select(nfds+1,&readfds,&writefds,NULL,NULL); printf ("select returned %d %d\n",r,errno); // if (r == 0 && errno == EINTR) {goto WAITAGAIN;} } while (r == SOCKET_ERROR && errno == EINTR); if (r == SOCKET_ERROR) { sprintf(message_buffer,"select() failed: "); report_message(MSG_FAILURE); ReportError(); } if (r == 0) { sprintf(message_buffer,"Select returned no fds ready"); report_message(MSG_ERROR); } // for (i=0; i