/******************************************************************************* * GANILtransfer.c Interface procedures for GANIL specific use */ #ifdef UNIX #include #include #include #include #include #endif #ifdef LYNXOS #include #include #include #include #include #endif #include #include #include #include #include #include #include #include "msg.h" #include "transfer.h" #define VERSION "1.1" #include "TCP.h" #include "TCP_err.h" #define TCPIP_PORT 3001 #define TCPIP_ADDRPORT 3000 #define TIMEOUT 25 /* basic unit - 5 secs */ #define MSGSIZE 12 #define BUFSIZE 0x4000 /* 16 Kbytes */ /*********************/ /* Codes de commande */ /*********************/ #define CMD_HELLO 0 /* Hello : pour se dire Bonjour ... */ #define CMD_INIT 1 /* Init : initialiser la session */ #define CMD_READ 2 /* Read : demande de lecture d'un buffer */ #define CMD_BYE 0xFFFF /* Bye : fermer la session et Au revoir */ #define CMD_KILL 0xFFFFFFFF /* Kill : arret du serveur, exit */ /********************/ /* Codes de reponse */ /********************/ #define REPLY_OK 0 /* OK, commande bien traitee ... */ #define REPLY_FAIL 1 /* Echec dans le traitement de la commande */ #define REPLY_NOREAD 2 /* Bien compris mais pas de donnees lues */ extern int GANILServerRunState; static char MsgBuf[256]; static int verbose = 0; #ifdef LYNXOS #include static struct sigaction sa; static int sigpipe=0; // set when signal handler for SIGPIPE installed #endif static pthread_mutex_t M1; static pthread_cond_t C1; static pthread_mutex_t M2; static pthread_cond_t C2; static int pthreads_started = 0; static int pthread_running = 0; static void * Thread_Send(void *); /* main communication thread */ void signal_block(); // arguments for request passed to thread static char * pthread_blockptr; static int pthread_len; static int NiceValue = 0; static int OverlapMode = 0; /* GANIL buffer header (EBYEDAT) */ typedef struct _GANIL_BUF_HD { char ident[8]; /* identificateur ASCII */ unsigned int num; /* numero de buffer */ unsigned int magic; /* mot magique (0x22061999) */ char free[4]; /* 4 octets libres */ unsigned short stream; /* numero de flux */ unsigned short eventcount; /* nombre d'evenements */ unsigned int checksum; /* checksum */ unsigned int length; /* nombre de mots de 16 bits */ } GANIL_BUF_HD; static int ServerSocket = -1; /* Numero du socket serveur */ static int CloneSocket = -1; /* Numero du socket clone */ static int ByeSocket = -1; /* Numero du socket clone qui arrete la session */ static int AddrSocket = -1; /* for response to IP address request */ static int ConnectionActive = 0; static void * ByeHandler(void *); /* Bye thread function */ static pthread_t ByeThread = -1; /* Bye thread Id */ static pthread_attr_t ByeAttr; /* Bye thread attributes */ static void * AddrHandler(void *); /* Address thread function */ static pthread_t AddrThread = -1; /* Address thread Id */ static pthread_attr_t AddrAttr; /* Address thread attributes */ char IPAddress[80]; /* IP address for the data source */ static char * BufAddr; static unsigned int BufSize = 0; /* Buffer size */ static int ExitLoop = 0; /* flag pour sortir de la boucle d'envoi des buffers */ static void ReportError () { perror(""); sprintf(MsgBuf, "Error %d", errno); fprintf(stderr, "%s\n", MsgBuf); #ifndef SOLARIS msgDefineMessage (0, MSG_C_EGLK, MSG_S_ERROR, NULL, MsgBuf); #endif } /******************************************************************************* sends any pending buffer loops until all data has been sent (or ERROR) there is one thread for each USER */ static void * Thread_Send(void * arg) { char * p; /* address of buffer to send */ int l; /* length of buffer to send */ int rc; int err = 0; int tmp; unsigned int CmdCode = CMD_HELLO; /* Command code */ char Message[MSGSIZE]; int ByteCount; GANIL_BUF_HD * bufhd; int retval; int nicevalue = 0; #ifdef LYNXOS int id, prty1, prty2; #endif if (verbose) printf("Thread_Send running\n"); signal_block(); // block signals handled by the main thread /* initialise connection with server */ tmp = 0; /* Create server socket */ if ((err = TcpOpen(&tmp,0,TCPIP_PORT,0,TIMEOUT)) != 0) { printf("TCP/IP server socket creation failed, error 0x%x\n",err); exit(0); } ServerSocket = tmp; printf("TCP/IP server socket created for Thread_Send\n"); do { if (NiceValue != nicevalue) { // check if nice has changed nicevalue = NiceValue; #ifndef LYNXOS nice(NiceValue); #else id = BUILDPID(getpid(), getstid()); prty1 = getpriority(PRIO_PROCESS, id); (void) setpriority(PRIO_PROCESS, id, prty1 - NiceValue); prty2 = getpriority(PRIO_PROCESS, id); printf("Thread priority changed from %d to %d\n", prty1, prty2); #endif } tmp = ServerSocket; printf("Waiting network connection request\n"); /* Waiting connection on server socket port */ if ((err = TcpOpen(&tmp,0,0,0,0)) != 0) { printf("TCP/IP clone socket creation failed, error 0x%x\n",err); TcpClose(&ServerSocket); ServerSocket = -1; exit(0); } CloneSocket = tmp; if (fcntl(CloneSocket, F_SETFL, O_NONBLOCK) == -1) { printf ("fcntl() returned an error in Thread_Send\n"); } printf("Network connection established\n"); do { /* Waiting command */ if ((err = TcpRead(&CloneSocket,Message,MSGSIZE,&ByteCount,TIMEOUT*5)) != 0) { printf("TcpRead error at location 1: code 0x%x\n",err); } else { CmdCode = *(unsigned int *)Message; CmdCode = (unsigned int)Message[3]; /* for Intel processors */ if (verbose) printf("TcpRead at location 1: received 0x%x\n", CmdCode); switch (CmdCode) { case CMD_HELLO : case CMD_BYE : case CMD_KILL : *((unsigned int *)Message) = REPLY_OK; if ((err=TcpWrite(&CloneSocket,Message,MSGSIZE,&ByteCount,TIMEOUT)) != 0) { printf("TcpWrite error at location 2: code 0x%x\n",err); } else { if (verbose) printf("TcpWrite at location 2: sent REPLY_OK\n"); } break; case CMD_INIT : BufSize = BUFSIZE; *((unsigned int *)Message) = REPLY_OK; if ((err=TcpWrite(&CloneSocket,Message,MSGSIZE,&ByteCount,TIMEOUT)) != 0) { printf("TcpWrite error at location 3: code 0x%x\n",err); } else { if (verbose) printf("TcpWrite at location 3: sent REPLY_OK\n"); printf("Session initialized\n"); } break; case CMD_READ : if (BufSize == 0) { /* INIT command has not been executed yet */ *((unsigned int *)Message) = REPLY_FAIL; if ((err=TcpWrite(&CloneSocket,Message,MSGSIZE,&ByteCount,TIMEOUT)) != 0) { printf("TcpWrite error at location 4: code 0x%x\n",err); } else { if (verbose) printf("TcpWrite at location 4: sent REPLY_FAIL\n"); } } else { /* Create Bye thread */ retval = pthread_attr_init(&ByeAttr); if (retval != 0) perror("thread attribute"); retval = pthread_create(&ByeThread,&ByeAttr,ByeHandler,0); if (retval != 0) { ReportError(); sprintf(MsgBuf, "Client: Error creating ByeHandler: "); fprintf (stderr, "%s\n", MsgBuf); #ifndef SOLARIS msgDefineMessage (0, MSG_C_EGLK, MSG_S_ERROR, NULL, MsgBuf); #endif } ExitLoop = 0; ConnectionActive =1; do { if (verbose) printf("Thread_Send waiting\n"); // wait on the conditional variable C1 for work to be done rc = pthread_mutex_lock(&M1); if (rc != 0) perror("error in pthread_mutex_lock 1: "); while (!pthread_running) { rc = pthread_cond_wait (&C1, &M1); // this will block until signaled if (rc != 0) perror("error in pthread_cond_wait 1: "); } rc = pthread_mutex_unlock(&M1); if (rc != 0) perror("error in pthread_mutex_unlock 1: "); if (verbose) printf("Thread_Send running\n"); // pick up request argunments bufhd = (GANIL_BUF_HD *) pthread_blockptr; l = pthread_len; BufAddr = (char *) (pthread_blockptr - MSGSIZE); if (l == 0) { *((unsigned int *)BufAddr) = REPLY_NOREAD; if ((err=TcpWrite(&CloneSocket,BufAddr,MSGSIZE,&ByteCount,TIMEOUT)) != 0) { printf("TcpWrite error at location 5: code 0x%x\n",err); ConnectionActive =0; } else { if (verbose) printf("TcpWrite at location 5: sent REPLY_NOREAD\n"); } } else { p = BufAddr; *p++ = 0x5a;*p++ = 0x5a;*p++ = 0x5a;*p++ = 0x5a;*p++ = 0x5a;*p++ = 0x5a; *p++ = 0x5a;*p++ = 0x5a;*p++ = 0x5a;*p++ = 0x5a;*p++ = 0x5a;*p++ = 0x5a; *((unsigned int *)BufAddr) = REPLY_OK; if ((err=TcpWrite(&CloneSocket,BufAddr,BUFSIZE,&ByteCount,TIMEOUT*5)) != 0) { printf("TcpWrite error at location 6: code 0x%x\n",err); ConnectionActive =0; } else { if (verbose) printf("TcpWrite at location 6: sent REPLY_OK\n"); } } if (verbose) printf("Thread_Send finishing\n"); // signal the conditional variable C2 that the work has been completed rc = pthread_mutex_lock(&M2); if (rc != 0) perror("error in pthread_mutex_lock 2: "); pthread_running =0; // clear the running flag rc = pthread_cond_signal(&C2); // signal the thread if (rc != 0) perror("error in pthread_cond_signal 2: "); rc = pthread_mutex_unlock(&M2); // allow it to run if (rc != 0) perror("error in pthread_mutex_unlock 2: "); if (ExitLoop) { /* We force the BYE command */ CmdCode = CMD_BYE; } } while ((err == 0) && (CmdCode != CMD_BYE)); ConnectionActive =0; } break; default : printf("Unknown command code 0x%x\n",CmdCode); CmdCode = CMD_BYE; /* We force the BYE command */ break; } } } while ((err == 0) && (CmdCode != CMD_BYE) && (CmdCode != CMD_KILL)); ConnectionActive =0; TcpClose(&CloneSocket); CloneSocket = -1; printf("Network connection closed\n"); BufSize = 0; if (ByeThread != -1) { if ((ByeSocket != -1) && (ByeSocket != ServerSocket)) { TcpClose(&ByeSocket); ByeSocket = -1; } pthread_cancel(ByeThread); ByeThread = -1; } } while (CmdCode != CMD_KILL); ExitLoop = 1; if (ByeThread != -1) { pthread_cancel(ByeThread); ByeThread = -1; } if (ByeSocket != -1) { TcpClose(&ByeSocket); ByeSocket = -1; } if (CloneSocket != -1) { TcpClose(&CloneSocket); CloneSocket = -1; } return ((void *)0); } /*******************************************************************************/ static void do_send_thread(int S, char * p, int l) { int rc; // start the send thread // signal the conditional variable C1 that there is work to be done rc = pthread_mutex_lock(&M1); if (rc != 0) perror("error in pthread_mutex_lock 3: "); pthread_blockptr = p; /* save the thread arguments */ pthread_len = l; pthread_running =1; // set the running flag rc = pthread_cond_signal(&C1); // signal the thread if (rc != 0) perror("error in pthread_cond_signal 3: "); rc = pthread_mutex_unlock(&M1); // allow it to run if (rc != 0) perror("error in pthread_mutex_unlock 3: "); } /******************************************************************************** * GANILtransferTxWait wait for any pending GANILtransfer to complete * */ int GANILtransferTxWait () { int rc; int retval; if (!ConnectionActive) return 0; if (verbose) printf("TransferTxWait waiting\n"); // wait on the conditional variable C2 for completion retval = 0; rc = pthread_mutex_lock(&M2); if (rc != 0) perror("error in pthread_mutex_lock 4: "); while (pthread_running) { rc = pthread_cond_wait(&C2, &M2); // this will block until signaled if (rc != 0) perror("error in pthread_cond_wait 4: "); retval = 1; } rc = pthread_mutex_unlock(&M2); if (rc != 0) perror("error in pthread_mutex_unlock 4: "); if (verbose) printf("TransferTxWait complete\n"); return retval; } /******************************************************************************* */ int GANILtransferState () { return pthread_running; } /******************************************************************************* */ int GANILtransferStatus () { return ConnectionActive; } /******************************************************************************* * GANILtransferInit initialise interface * return OK or ERROR * tsIPStr is a char* to the name of the server - this is not used in the GANIL version */ int GANILtransferInit (char *tsIPStr) { int rs; strcpy(IPAddress,tsIPStr); /* save for return to data destination */ rs = GANILtransferTxRestart(); return rs; } /******************************************************************************* /* the following are defined for compatibility with the MIDAS library */ int GANILtransferBlockSize(int n) {return OK;} int GANILtransferMode(int n) {return OK;} int GANILtransferPort(int n) {return OK;} int GANILtransferSetUser(int n) {return OK;} int GANILtransferTxReset () {return OK;} /******************************************************************************* * GANILtransferSetVerbose allows more debug information */ int GANILtransferSetVerbose(int n) { verbose = (unsigned short) n; return OK; } /******************************************************************************* * GANILtransferUseOverlap enables the overlap mode */ int GANILtransferUseOverlap(int n) { OverlapMode = n; printf ("Setting Overlap Mode %d\n", OverlapMode); return OK; } /******************************************************************************* * GANILtransferNice sets the thread priority */ int GANILtransferNice(int n) { NiceValue = n; printf ("Setting Nice %d\n", NiceValue); return OK; } /******************************************************************************* * tsOpenClient initial function to establish host/client connection */ static int tsOpenClient () { int retval; pthread_t thread; pthread_attr_t attr; int i =0; sprintf(MsgBuf, "TCP GANIL transfer library build %s %s",__DATE__, __TIME__); fprintf (stdout, "%s\n", MsgBuf); #ifndef SOLARIS msgDefineMessage (0, MSG_C_EGLK, MSG_S_INFORMATION, NULL, MsgBuf); #endif /* start an initial thread which will handle the connection with the server */ if (!pthreads_started) { retval = pthread_attr_init(&attr); if (retval != 0) perror("thread attribute"); retval = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); // we never use join if (retval != 0) perror("thread attribute"); pthread_running = 0; retval = pthread_create(&thread, &attr, Thread_Send, 0); if (retval != 0) { ReportError(); sprintf(MsgBuf, "Client: Error creating Thread_Send: "); fprintf (stderr, "%s\n", MsgBuf); #ifndef SOLARIS msgDefineMessage (0, MSG_C_EGLK, MSG_S_ERROR, NULL, MsgBuf); #endif return ERROR; } retval = pthread_attr_init(&AddrAttr); if (retval != 0) perror("thread attribute"); retval = pthread_attr_setdetachstate(&AddrAttr, PTHREAD_CREATE_DETACHED); // we never use join if (retval != 0) perror("thread attribute"); retval = pthread_create(&AddrThread, &AddrAttr, AddrHandler, 0); if (retval != 0) { ReportError(); sprintf(MsgBuf, "Client: Error creating AddrHandler: "); fprintf (stderr, "%s\n", MsgBuf); #ifndef SOLARIS msgDefineMessage (0, MSG_C_EGLK, MSG_S_ERROR, NULL, MsgBuf); #endif return ERROR; } pthread_mutex_init(&M1, NULL); pthread_mutex_init(&M2, NULL); pthread_cond_init(&C1, NULL); pthread_cond_init(&C2, NULL); pthread_attr_destroy(&attr); pthreads_started =1; sleep(2); // allows the threads to get running } #ifdef LYNXOS if (!sigpipe) { sa.sa_handler = SIG_IGN; sigemptyset(&sa.sa_mask); sa.sa_flags=0; (void) sigaction(SIGPIPE, &sa, NULL); sigpipe=1; } #endif pthreads_started = 1; return OK; } /******************************************************************************* * GANILtransferClose final function to close host/client connection * */ void GANILtransferClose () { if (pthreads_started != 0) { ExitLoop = 1; /* force the BYE command */ do_send_thread(0,MsgBuf+MSGSIZE,0); } } /******************************************************************************* * GANILtransferTxRestart start or restart the connection */ int GANILtransferTxRestart () { int rs; GANILtransferClose (); rs = tsOpenClient (); return rs; } /******************************************************************************** * GANILtransferTxData Transmit the data * Check if stream is on * Check window */ int GANILtransferTxData (char *data, unsigned long stream, int length) { // length = 0 is used to send a PUSH message /* start the communications thread if it does not exist */ if (pthreads_started == 0) (void)GANILtransferTxRestart(); if (pthreads_started == 0) return ERROR; if (!ConnectionActive) return ERROR; (void) GANILtransferTxWait(); // ensure there is no pending request // a suitable request to be passed to the send thread if (verbose) printf("GANILtransferTxData sending\n"); do_send_thread(0,data,length); if (!OverlapMode) GANILtransferTxWait(); if (verbose) printf("Sent %d bytes to server\n",length); return OK; } /*******************************************************************************/ static void * ByeHandler(void *arg) { int i; int err = 0; int ByteCount; char Message[MSGSIZE]; int tmp; pthread_detach(ByeThread); pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,&i); pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,&i); signal_block(); // block signals used by the main thread if (verbose) printf("ByeHandler starting\n"); ByeSocket = -1; tmp = ServerSocket; if ((err = TcpOpen(&tmp,0,0,0,0)) != 0) { printf("Fatal error 0x%x on TcpOpen() in ByeHandler()\n",err); if (CloneSocket != -1) { TcpClose(&CloneSocket); CloneSocket = -1; } exit(0); } if (verbose) printf("at ByeHandler location 1\n"); ByeSocket = tmp; if (fcntl(ByeSocket, F_SETFL, O_NONBLOCK) == -1) { printf ("fcntl() returned an error in ByeHandler\n"); } if ((err = TcpRead(&ByeSocket,Message,MSGSIZE,&ByteCount,TIMEOUT)) != 0) { printf("TcpRead error in AddrHandler at location 1: code 0x%x\n",err); } if (verbose) printf("at ByeHandler location 2\n"); *(unsigned int *)Message = REPLY_OK; if ((err = TcpWrite(&ByeSocket,Message,MSGSIZE,&ByteCount,TIMEOUT)) != 0) { printf("TcpWrite error in ByeHandler at location 2: code 0x%x\n",err); } if (verbose) printf("at ByeHandler location 3\n"); TcpClose(&ByeSocket); ByeSocket = -1; ExitLoop = 1; ByeThread = -1; if (verbose) printf("ByeHandler ending\n"); return ((void *)0); } /*******************************************************************************/ static void * AddrHandler(void *arg) { int i; int err = 0; int ByteCount; char Message[80]; int tmp; char BufW[24], BufR[24]; int j; /* initialise the Address dialogue buffer */ for (j=0; j<24; j++) {BufW[j] = 0;} BufW[0] = 0x1; BufW[1] = 0x9e; BufW[2] = 0xc6; BufW[3] = 0xe2; if (verbose) printf("AddrHandler starting\n"); signal_block(); // block signals handled by the main thread /* initialise connection with server */ tmp = 0; /* Create server socket */ if ((err = TcpOpen(&tmp,NULL,TCPIP_ADDRPORT,0,0)) != 0) { printf("Fatal error 0x%x on TcpOpen() in AddrHandler\n",err); exit(0); } AddrSocket = tmp; printf("TCP/IP server socket created for AddrHandler\n"); for (;;) { tmp = AddrSocket; if ((err =TcpOpen(&tmp,NULL,TCPIP_ADDRPORT,0,0)) != 0) { printf("TcpOpen error in AddrHandler at location 0: code 0x%x\n",err); } if (verbose) printf("at AddrHandler location 1\n"); if (fcntl(tmp, F_SETFL, O_NONBLOCK) == -1) { printf ("fcntl() returned an error in AddrHandler\n"); } if ((err=TcpRead(&tmp,BufR,24,&ByteCount,TIMEOUT)) != 0) { printf("TcpRead error in AddrHandler at location 1: code 0x%x\n",err); } if (verbose) printf("at AddrHandler location 2\n"); if (BufR[15] != 0x10) {printf("AddrHandler received cmd with 0x%x when 0x10 expected\n", (int)BufR[15]);} BufW[15] = 0; //OK if ((err=TcpWrite(&tmp,BufW,24,&ByteCount,TIMEOUT)) != 0) { printf("TcpWrite error in AddrHandler at location 2: code 0x%x\n",err); } if (verbose) printf("at AddrHandler location 3\n"); strcpy (Message,IPAddress); if ((err=TcpWrite(&tmp,Message,80,&ByteCount,TIMEOUT)) != 0) /* send the IP address */ { printf("TcpWrite error in AddrHandler at location 3: code 0x%x\n",err); } if (verbose) printf("at AddrHandler location 4\n"); if ((err=TcpRead(&tmp,BufR,24,&ByteCount,TIMEOUT)) != 0) { printf("TcpRead error in AddrHandler at location 4: code 0x%x\n",err); } if (verbose) printf("at AddrHandler location 5\n"); if (BufR[15] != 2) {printf("AddrHandler received cmd with 0x%x when 0x2 expected\n", (int)BufR[15]);} BufW[15] = 0; // OK if ((err=TcpWrite(&tmp,BufW,24,&ByteCount,TIMEOUT)) != 0) { printf("TcpWrite error in AddrHandler at location 5: code 0x%x\n",err); } if (verbose) printf("at AddrHandler location 6\n"); TcpClose(&tmp); printf("Supplied address %s to GANIL server\n",IPAddress); GANILServerRunState = 1; } return ((void *)0); } /*******************************************************************************/