#include "common.h" #include "message.h" #include "merge.h" #include #include #include #include #include "stats.h" #include "transfer.h" #include "buffer.h" extern pid_t mypid; extern int verbose; extern int LinkNum; extern int * StatsMem[NUMCOUNTERS]; extern int *LinkStatus; /* &link_table[LinkNum]->link_status */ extern int *LinkState; /* &link_table[LinkNum]->link_state */ extern int *LinkCtrl; /* &link_table[LinkNum]->link_ctrl */ extern int *LinkActive; /* &link_table[LinkNum]->link_isActive */ unsigned long long SYNC100=0; unsigned long long WRTimeF=0; unsigned long long timestampf=0; unsigned int WRTime0019=0; unsigned int WRTime2847=0; unsigned int WRTime4863=0; extern int BlockSequence[]; extern char * data_buffer[]; extern char * input_buffer; extern STATUS_TABLE *status_table; extern LINK_TABLE *link_table[]; extern BUFFER_TABLE *buffer_table; extern void * shm_bufferarea[]; unsigned int MyCounter = 0; unsigned int LastCounter = 0; unsigned int LinkCounter = 0; unsigned int EventNum = 0; unsigned long long LastSync = 0LL; unsigned long long LastTimeStamp = 0LL; int FoundFirstSync = FALSE; int FullTStamp = FALSE; char spinner[4] = "-\\|/"; int spin = 0; int data_restructure(char *, char *, int, int *); int datacopy(NewItDesc_Ptr, ItDesc_Ptr, int); int processDATAbuffer(ItDesc_Ptr, NewItDesc_Ptr, int); void byteswop32(char*, int); /* link_server.c calls receiveDATAbuffer handle_buffer.c contains receiveDATAbuffer receiveDATAbuffer calls data_restructure & store_buffer data_restructure calls datacopy calls processDATAbuffer */ void queueDATAbuffer (int ID, int endian, char * ptr, int l) { DATA_HEADER* data_header; BUFFER_HEADER* baseaddress; int * BufferL; int i, j; int rc; data_header = (DATA_HEADER*) (ptr-sizeof(HEADER)); // construct 24 byte block header by overwritting the 32 byte data transfer header memcpy (data_header->header_id,"EBYEDATA",8); data_header->header_sequence = 0; data_header->header_stream = (ID << 8) + 1; data_header->header_tape = 0; data_header->header_MyEndian = 1; data_header->header_DataEndian = endian; data_header->header_dataLen = l; memcpy (ptr-8,"AAAAAAAA",8); sprintf(message_buffer, "store_buffer: sending %d bytes\n", l); report_message(MSG_TRACE); if (verbose > 2) { BufferL = (int*)data_header; sprintf(message_buffer, "store_buffer #1 %d 0x%llx >\n", l, BufferL); report_message(MSG_DEBUG); for (j = 0; j < 64;) { sprintf(message_buffer, " 0x%08x", BufferL[j]); report_message(MSG_DEBUG); j++; if ((j/8)*8 == j) {sprintf(message_buffer, "\n"); report_message(MSG_DEBUG);} } sprintf(message_buffer, "\n"); report_message(MSG_DEBUG); } baseaddress = (BUFFER_HEADER *) (char*) shm_bufferarea[ID]; // set the age of the buffer to be written as zero i = baseaddress->buffer_next; baseaddress->buffer_age[i] = 0; // set the age of the buffer and increment the age and the next buffer baseaddress->buffer_age[i] = baseaddress->buffer_currentage; baseaddress->buffer_currentage++; baseaddress->buffer_next = ((baseaddress->buffer_next + 1) & (NBLOCKS - 1)); // signal MERGE that a buffer is waiting rc = queue_data_buffer(ID, link_table[ID]->link_buffer_index); if (rc != 0) { sprintf(message_buffer, "error %d in queue_data_buffer\n", rc); report_message(MSG_ERROR); } wakeup_merge(); /* j = ID*N_DATA_BUFFERS; i = link_table[ID]->link_buffer_index; buffer_table->buffer_status[j+i] = DONE; rc = release_data_buffer(ID, link_table[ID]->link_buffer_index); */ sprintf(message_buffer, "store_buffer #2 %d %d >\n", ID, l); report_message(MSG_DEBUG); if (verbose > 2) { BufferL = (int*)baseaddress; for (j = 0; j < 64;) { sprintf(message_buffer, " 0x%08x", BufferL[j]); report_message(MSG_DEBUG); j++; if ((j/8)*8 == j) { sprintf(message_buffer, "\n"); report_message(MSG_DEBUG);} } sprintf(message_buffer, "\n"); report_message(MSG_DEBUG); } } int receiveDATAbuffer(int ID, int endian, char * ptr, int len) { char * iptr; char * optr; HEADER *ip; HEADER op; int index; int * BufferL; int i, j; int rc; (*StatsMem[LBUFFERS])++; /* counter for number of data buffers */ (*StatsMem[LBUFFERS+((LinkNum+1)*MAXCOUNTERS)])++; (*StatsMem[DATALEN]) = (*StatsMem[DATALEN]) + ((len+512)/1024); /* counter for data in 1Kbyte units */ (*StatsMem[DATALEN+((LinkNum+1)*MAXCOUNTERS)]) = (*StatsMem[DATALEN+((LinkNum+1)*MAXCOUNTERS)]) + ((len+512)/1024); iptr = &input_buffer[sizeof(HEADER)]; // check block sequence number ip = (HEADER*) (ptr-sizeof(HEADER)); memcpy(&op, ip, sizeof(HEADER)); if (ntohl(ip->hdr_sequence) == BlockSequence[ID]) { BlockSequence[ID]++; // good } else { sprintf(message_buffer, "block sequence error: ID %d, expected %d; received %d", ID, BlockSequence[ID], ip->hdr_sequence); report_message(MSG_WARNING); BlockSequence[ID] = ntohl(ip->hdr_sequence)+1; } if (ntohl(ip->hdr_datalength) != len) { sprintf(message_buffer, "check#1: %d %d",ntohl(ip->hdr_datalength),len); report_message(MSG_WARNING);} if (len + 32 > ntohl(ip->hdr_blocklength)) { sprintf(message_buffer, "check#2: %d %d",ntohl(ip->hdr_blocklength),len); report_message(MSG_WARNING);} if (ntohl(ip->hdr_id1) != HDR_ID1) { sprintf(message_buffer, "check#3: 0x%x",ntohl(ip->hdr_id1)); report_message(MSG_WARNING);} if (ntohl(ip->hdr_id2) != HDR_ID2) { sprintf(message_buffer, "check#4: 0x%x",ntohl(ip->hdr_id2)); report_message(MSG_WARNING);} if (len > ((64*1024)-32)) { sprintf(message_buffer, "%d: Unexpected block length %d",ID,len); report_message(MSG_ERROR); return(0);} if (verbose > 2) { BufferL = (int*)ip; sprintf(message_buffer, "data_transfer #1 len=%d 0x%llx >\n", len, BufferL); report_message(MSG_DEBUG); for (j = 0; j < 64;) { sprintf(message_buffer, " 0x%08x", BufferL[j]); report_message(MSG_DEBUG); j++; if ((j/8)*8 == j) { sprintf(message_buffer, "\n"); report_message(MSG_DEBUG);} } sprintf(message_buffer, "\n"); report_message(MSG_DEBUG); } sprintf(message_buffer, "data_transfer #1.1 len=%d >\n", len); report_message(MSG_DEBUG); index = obtain_data_buffer(ID); /* get buffer for incoming data */ link_table[ID]->link_buffer_index = index; if (index == -1) return -1; data_buffer[ID] = (char *) ((char *)buffer_table->buffer_baseaddress[ID] + (index * status_table->data_buffer_size * 1024)); optr = &data_buffer[ID][sizeof (HEADER)]; len = data_restructure(optr, iptr, len, &endian); /* maybe swap for TDR format */ if (verbose > 2) { BufferL = (int*)optr; sprintf(message_buffer, "data_transfer #2 len=%d 0x%llx >\n", len, BufferL); report_message(MSG_DEBUG); for (j = 0; j < 64;) { sprintf(message_buffer, " 0x%08x", BufferL[j]); report_message(MSG_DEBUG); j++; if ((j/8)*8 == j) { sprintf(message_buffer, "\n"); report_message(MSG_DEBUG);} } sprintf(message_buffer, "\n"); report_message(MSG_DEBUG); } sprintf(message_buffer, "data_transfer #2.1 len=%d >\n", len); report_message(MSG_DEBUG); j = ID*N_DATA_BUFFERS; i = link_table[ID]->link_buffer_index; buffer_table->buffer_status[j+i] = DONE; rc = release_data_buffer(ID, link_table[ID]->link_buffer_index); // queueDATAbuffer(ID, endian, optr, len); // if (verbose > 1) { printf ("%c\b", spinner[spin]); fflush(NULL); if (++spin == 4) spin=0; // } return 0; } int datacopy(NewItDesc_Ptr op, const ItDesc_Ptr ip, int l) { ItDesc_Ptr ps; NewItDesc_Ptr pd; int len, ret; int * BufferL; int i, j; len = l / sizeof (ItDesc); /* this is 64 bits */ ps = ip; /* input data buffer */ pd = op; /* output data buffer (in internal format) */ ret = 0; sprintf(message_buffer, "%d datacopy.1 < len %d pd= 0x%llx ps= 0x%llx",mypid, len, pd, ps); report_message(MSG_TRACE); if (verbose > 2) { BufferL = (int*)ip; sprintf(message_buffer, "%d datacopy #1 < len=%d 0x%llx \n",mypid, len, BufferL); report_message(MSG_DEBUG); for (j = 0; j < 64;) { sprintf(message_buffer, " 0x%08x", BufferL[j]); report_message(MSG_DEBUG); j++; if ((j/8)*8 == j) { sprintf(message_buffer, "\n"); report_message(MSG_DEBUG);} } sprintf(message_buffer, "\n"); report_message(MSG_DEBUG); } /* process the data buffer (which may be project specific) extract the time information */ ret = processDATAbuffer (ps, pd, len); // returns output length (NewItDesc) if (verbose > 2) { BufferL = (int*)op; sprintf(message_buffer, "%d datacopy #2 > len=%d 0x%llx",mypid, ret, BufferL); report_message(MSG_DEBUG); for (j = 0; j < 64;) { sprintf(message_buffer, " 0x%08x", BufferL[j]); report_message(MSG_DEBUG); j++; if ((j/8)*8 == j) {sprintf(message_buffer, "\n"); report_message(MSG_DEBUG);} } sprintf(message_buffer, "\n"); report_message(MSG_DEBUG); } sprintf(message_buffer, "%d datacopy.2 > len %d",mypid, ret); report_message(MSG_TRACE); return (ret * sizeof(NewItDesc)); /* this is bytes */ } int data_restructure(char* op, char* ip, int l, int *pEndian) { int len; int i, j; if (*pEndian != 1) { byteswop32(ip, l); /* for TDR format */ *pEndian = 1; } sprintf(message_buffer, "data_restructure length in: %d (bytes)", l); report_message(MSG_DEBUG); // We now have correct data - still in the input buffer - ip, copy it to new structured output buffer len = datacopy((NewItDesc_Ptr)op, (ItDesc_Ptr)ip, l); sprintf(message_buffer, "data_restructure length out: %d (bytes)", len); report_message(MSG_DEBUG); return len; } /* Data buffer process code for AIDA */ int processDATAbuffer(ItDesc_Ptr ps, NewItDesc_Ptr pd, int l /*units of ItDesc*/) { int len; int module, range, channel; int traceflag; int trace_adc, trace_sync; unsigned int chan; unsigned int * databuffer; ItDesc_Ptr ip; NewItDesc_Ptr op; unsigned int data; unsigned short adc; unsigned short icode; unsigned int ifield; unsigned int timestamp; int i, j; int count; int samples; int timeL; int timeH; extern unsigned long long SYNC100; extern unsigned long long WRTimeF; extern unsigned int WRTime0019; extern unsigned int WRTime2847; extern unsigned int WRTime4863; unsigned int TotalItems; len = l*2; // units of int databuffer = (unsigned int *) ps; /* printf("processDATAbuffer len=%d\n",len); printf(">\n"); for (j = 0; j < 8; j++) { for (i = 0; i < 16; i++) { printf("%08lx ", *databuffer++); } printf("\n"); } */ if (verbose > 2) { len = l*2; // units of int databuffer = (unsigned int *) ps; sprintf(message_buffer, "processDATAbuffer len=%d\n",len); report_message(MSG_DEBUG); sprintf(message_buffer, ">"); report_message(MSG_DEBUG); for (j = 0; j < 32; j++) { sprintf(message_buffer, "0x%08lx ", *databuffer++); report_message(MSG_DEBUG); } sprintf(message_buffer, "\n"); report_message(MSG_DEBUG); } len = l; /* len in units of unsigned long long (ItDesc) */ ip = ps; op = pd; TotalItems = 0; /* output length (NewItDesc) */ sprintf(message_buffer, "Block received link %d, length %d\n", LinkNum, len ); report_message(MSG_TRACE); while (len > 0) { data = ip->data; timestamp = ip->timestamp; ip++; len--; sprintf(message_buffer, "raw data: link %d, H=0x%08lx L=0x%08lx",LinkNum, data, timestamp); report_message(MSG_DEBUG); op->Data = data; op->Timestamp = timestamp; op->Code = FALSE; op->Info = 0; op->Time = FALSE; if (data == 0xffffffff) {break;} switch ((data >> 30 ) & 3) { case 3: /* ADC data word */ (*StatsMem[ADCDATA])++; /* counter for number of ADC data words */ (*StatsMem[ADCDATA+((LinkNum+1)*MAXCOUNTERS)])++; range = (data >> 28) & 1; module = (data >> 22) & 63; channel = (data >> 16) & 63; adc = (unsigned short) (data & 0x0000ffff); timeL = timestamp; timeH = SYNC100 >> 28; if (timeL >= 0 && timeL <= 0xa0) { timestampf = ((unsigned long long) (timeH + 1) << 28) + (unsigned long long) timeL; } else { timestampf = ((unsigned long long) timeH << 28) + (unsigned long long) timeL; } sprintf(message_buffer, "adc data: link %d, 0x%04x range=%d module=%d channel=%d: H=0x%08lx L=0x%08lx: TS=0x%016llx", LinkNum, adc, range, module, channel, data, timestamp, timestampf); report_message(MSG_DEBUG); op->Code = PSEUDO_ADC_CODE; op->Time = timestampf; break; case 2: /* information data word */ icode = (data >> 20) & 0x0f; /* information code */ ifield = data & 0x000fffff; /* information field */ module = (data >> 24) & 0x3f; switch (icode) { case 2: /* PAUSE timestamp */ (*StatsMem[PAUSE])++; /* counter for number of PAUSE items */ (*StatsMem[PAUSE+((LinkNum+1)*MAXCOUNTERS)])++; timestampf = ((unsigned long long) ifield << 28) | (unsigned long long) timestamp; sprintf(message_buffer, "received PAUSE: link %d, ifield=0x%05x: H=0x%08lx L=0x%08lx: TS=0x%016llx", LinkNum, ifield, data, timestamp, timestampf); report_message(MSG_INFORMATION); op->Code = PAUSE_CODE; op->Time = timestampf; *LinkState = PAUSED; break; case 3: /* RESUME timestamp */ (*StatsMem[RESUME])++; /* counter for number of RESUME items */ (*StatsMem[RESUME+((LinkNum+1)*MAXCOUNTERS)])++; timestampf = ((unsigned long long) ifield << 28) | (unsigned long long) timestamp; sprintf(message_buffer, "received RESUME: link %d, ifield=0x%05x: H=0x%08lx L=0x%08lx: TS=0x%016llx", LinkNum, ifield, data, timestamp, timestampf); report_message(MSG_INFORMATION); op->Code = RESUME_CODE; op->Time = timestampf; if (*LinkState == PAUSED) { // if ((timestampf & 0xfffffffffffc0000ull) == (SYNC100 & 0xfffffffffffc0000ull)) { *LinkState = RESUMED; // } else { // *LinkState = IDLE; // } } break; case 4: /* SYNC100 timestamp / WR timestamp L */ (*StatsMem[SYNC])++; /* counter for number of SYNC100 items */ (*StatsMem[SYNC+((LinkNum+1)*MAXCOUNTERS)])++; timestampf = ((unsigned long long) ifield << 28) | (unsigned long long) timestamp; WRTime0019 = timestamp; WRTime2847 = (ifield << 28); sprintf(message_buffer, "received SYNC100 TS: link %d, ifield=0x%05x: H=0x%08lx L=0x%08lx: TS=0x%016llx", LinkNum, ifield, data, timestamp, timestampf); report_message(MSG_INFORMATION); op->Code = SYNC_CODE; op->Time = timestampf; if (*LinkState == IDLE || *LinkState == RESUMED) { sprintf(message_buffer, "received SYNC100 TS link %d -> GOING: TS=0x%016llx", LinkNum, timestampf); report_message(MSG_INFORMATION); *LinkState = GOING; } else { if (timestampf - SYNC100 != SYNC_STEP) { sprintf(message_buffer, "received SYNC100 TS link %d with unexpected value: 0x%016llx 0x%016llx 0x%016llx",LinkNum, timestampf, SYNC100,timestampf-SYNC100); report_message(MSG_INFORMATION); (*StatsMem[SYNCERR1])++; /* counter for number of out of step SYNC100 items */ (*StatsMem[SYNCERR1+((LinkNum+1)*MAXCOUNTERS)])++; } } SYNC100 = timestampf; break; case 5: /* WR timestamp H */ (*StatsMem[MBS])++; /* counter for number of WR H data words */ (*StatsMem[MBS+((LinkNum+1)*MAXCOUNTERS)])++; timestampf = ((unsigned long long) ifield << 28) | (unsigned long long) timestamp; WRTime0019 = timestamp; WRTime4863 = (ifield << 28); WRTimeF = ((unsigned long long) WRTime4863 << 48) | ((unsigned long long) WRTime2847 << 28) | (unsigned long long) WRTime0019; sprintf(message_buffer, "received WR High TS: link %d, ifield=0x%05x: H=0x%08lx L=0x%08lx: WR=0x%016llx", LinkNum, ifield, data, timestamp, WRTimeF); report_message(MSG_DEBUG); op->Code = WR_CODE; op->Time = WRTimeF; break; case 6: /* FEE64 discriminator data */ (*StatsMem[DISC])++; /* counter for number of discrimator data words */ (*StatsMem[DISC+((LinkNum+1)*MAXCOUNTERS)])++; timeL = timestamp; timeH = SYNC100 >> 28; if (timeL >= 0 && timeL <= 0xa0) { timestampf = ((unsigned long long) (timeH + 1) << 28) + (unsigned long long) timeL; } else { timestampf = ((unsigned long long) timeH << 28) + (unsigned long long) timeL; } sprintf(message_buffer, "received AIDA Disc: link %d, 0x%05x: H=0x%08lx L=0x%08lx: TS=0x%016llx", LinkNum, ifield, data, timestamp, timestampf); // report_message(MSG_DEBUG); op->Code = FEE64_DISC_CODE; op->Time = timestampf; break; case 8: /* AIDA correlation scaler data */ (*StatsMem[ACS])++; /* counter for number of correlation scaler data words */ (*StatsMem[ACS+((LinkNum+1)*MAXCOUNTERS)])++; timeL = timestamp; timeH = SYNC100 >> 28; if (timeL >= 0 && timeL <= 0xa0) { timestampf = ((unsigned long long) (timeH + 1) << 28) + (unsigned long long) timeL; } else { timestampf = ((unsigned long long) timeH << 28) + (unsigned long long) timeL; } sprintf(message_buffer, "received AIDA CS: link %d, 0x%05x: H=0x%08lx L=0x%08lx: TS=0x%016llx", LinkNum, ifield, data, timestamp, timestampf); // report_message(MSG_DEBUG); op->Code = AIDA_SCALAR_CODE; op->Time = timestampf; break; default: /* undefined code */ (*StatsMem[BADINFO])++; /* counter for number of undefined data items */ (*StatsMem[BADINFO+((LinkNum+1)*MAXCOUNTERS)])++; sprintf(message_buffer, "received undefined information code: %d", icode); report_message(MSG_WARNING); break; } break; case 1: /* wave form data */ (*StatsMem[WAVEFORM])++; (*StatsMem[WAVEFORM+((LinkNum+1)*MAXCOUNTERS)])++; module = (data >> 22) & 63; channel = (data >> 16) & 63; timeL = timestamp; timeH = SYNC100 >> 28; if (timeL >= 0 && timeL <= 0xa0) { timestampf = ((unsigned long long) (timeH + 1) << 28) + (unsigned long long) timeL; } else { timestampf = ((unsigned long long) timeH << 28) + (unsigned long long) timeL; } samples = (data >> 18) & 0x00003fff; /* in units of 4 samples (64 bits) */ while (samples > 0) { samples--; ip++; len--; (*StatsMem[WAVEDATA])++; (*StatsMem[WAVEDATA+((LinkNum+1)*MAXCOUNTERS)])++; } sprintf(message_buffer, "received waveform: link %d, channel=%d: samples=%d, H=0x%08lx L=0x%08lx: TS=0x%016llx", LinkNum, channel, samples*4, data, timestamp, timestampf); report_message(MSG_DEBUG); op->Code = PSEUDO_TRACE_CODE; op->Time = timestampf; break; case 0: /* bad format */ default: (*StatsMem[IGNORE2])++; (*StatsMem[IGNORE2+((LinkNum+1)*MAXCOUNTERS)])++; sprintf(message_buffer, "bad event data (%d %d) - (0x%08lx 0x%08lx)", l/4, len, data, timestamp); report_message(MSG_WARNING); len = 0; break; } /* end of switch */ // printf("process item %d 0x%x 0x%04x 0x%04x 0x%08lx 0x%08lx\n",LinkNum, op->Code, op->Data, op->Timestamp, op->Time, LastTimeStamp); if (*LinkState == GOING && op->Code != FALSE) { /* potentially add this data item to the queue for merging however perform a check that the full timestamp is valid */ if (op->Time < LastTimeStamp) { // invalid time stamp (*StatsMem[TSSEQERR])++; (*StatsMem[TSSEQERR+((LinkNum+1)*MAXCOUNTERS)])++; // printf("bad timestamp %d %d 0x%x 0x%04x 0x%04x 0x%08lx 0x%08lx 0x%08lx\n",LinkNum, *LinkState, op->Code, op->Data, op->Timestamp, SYNC100, op->Time, LastTimeStamp); // LastTimeStamp = op->Time; } else { // printf("good timestamp %d %d 0x%x 0x%04x 0x%04x 0x%08lx 0x%08lx 0x%08lx\n",LinkNum, *LinkState, op->Code, op->Data, op->Timestamp, SYNC100, op->Time, LastTimeStamp); LastTimeStamp = op->Time; op++; /* advance output pointer */ TotalItems++; } } else { // if (!(*LinkState == IDLE && op->Code == PAUSE_CODE)) { (*StatsMem[IGNORE1])++; (*StatsMem[IGNORE1+((LinkNum+1)*MAXCOUNTERS)])++; // printf("ignored timestamp %d %d 0x%x 0x%04x 0x%04x 0x%08lx 0x%08lx 0x%08lx\n",LinkNum, *LinkState, op->Code, op->Data, op->Timestamp, SYNC100, op->Time, LastTimeStamp); // } else { // LastTimeStamp = op->Time; // op++; /* advance output pointer */ // TotalItems++; } // } } /* end of while len > 0 */ return TotalItems; } void byteswop32(char* p, int l) { int len; const char *ps; char *pd; char b0, b1, b2, b3; len = l/4; ps = p; pd = p; while (len > 0) { b0 = *ps++; b1 = *ps++; b2 = *ps++; b3 = *ps++; *pd++ = b3; *pd++ = b2; *pd++ = b1; *pd++ = b0; len--; } }