#include #include #include #include #include #include #include #include #include "common.h" #include "message.h" #include "buffer.h" #include "transfer.h" #include "merge.h" static int Number = 0; extern int NumRunningLinks; extern STATUS_TABLE *status_table; extern BUFFER_TABLE *buffer_table; extern LINK_TABLE *link_table[NUMlinks]; extern void storage_init (); extern int do_storage (char *, int); extern int do_merge (int *, int, int *, char **, int *); extern int catchup (int , char *, int); BUFFER_HEADER* baseaddress; extern void * shm_bufferarea[NUMlinks]; static int firstLink; static int ActiveLinks[NUMlinks]; static int isActive[NUMlinks]; static int dataLen[NUMlinks]; int bufferNumber[NUMlinks]; static char * dataBuffer[NUMlinks]; extern int BufferSize; static char *output_buffer = NULL; static long TimeNow = 0; long timeWaited; static int NumSources = 0; UINT64 lastTime = 0LL; UINT64 oldestTime = 0LL; static int dataStart[NUMlinks]; int oldestLink; int* Buffer32; void merge_server () { int Signal; int Waiting; int tryAgain; int ID; int i, j, l; int rc; int jj; sprintf (message_buffer,"Starting New Merge Server"); report_message(MSG_INFORMATION); storage_init(); /* initialise link buffer queues */ for (ID=0; IDnum_links; ID++) { link_table[ID]->link_isActive = IDLE; isActive[ID] = IDLE; link_table[ID]->link_status = INUSE; } NumSources = 0; for (ID = 0; ID < status_table->num_links; ID++) { if (link_table[ID]->link_status != INUSE) {continue;} NumSources++; } sprintf (message_buffer,"Working with %d data sources\n",NumSources); report_message(MSG_INFORMATION); tryAgain = FALSE; for (;;) { if (!tryAgain) Signal = wait_for_wakeup (); if (!(tryAgain || Signal == SIGWAKEUP)) {continue;} if (tryAgain) { sprintf (message_buffer,"Trying again"); report_message(MSG_TRACE); } else { sprintf (message_buffer,"Received signal SIGWAKEUP"); report_message(MSG_TRACE); } tryAgain = FALSE; // signal from Link task - a new data buffer is available on one of the links // or trying again- a new data buffer might be available on one of the links Waiting = FALSE; WAITING: NumRunningLinks = 0; for (ID = 0; ID < status_table->num_links; ID++) { if (link_table[ID]->link_status != INUSE) {continue;} NumRunningLinks++; if (isActive[ID] == IS_ACTIVE) { /* already have a buffer for this link */ ActiveLinks[NumRunningLinks] = ID; i = bufferNumber[ID]; } else { i = obtain_data_buffer(ID); if (i >= 0) { j = ID*N_DATA_BUFFERS; if (buffer_table->buffer_status[i+j] == PROCESSING) { char * data_buffer; DATA_HEADER* data_header; baseaddress = (BUFFER_HEADER *) (char*) shm_bufferarea[ID]; data_buffer = (char *) ((char *)baseaddress + sizeof(BUFFER_HEADER) + (i * status_table->data_buffer_size * 1024)); data_header = (DATA_HEADER *)data_buffer; // header_dataLen is correct length of data minus the header dataLen[ID] = data_header->header_dataLen / sizeof (NewItDesc); // skip the header dataBuffer[ID] = &data_buffer[sizeof (HEADER)]; dataStart[ID] = 0; sprintf (message_buffer,"new data buffer: %d %d", ID, dataLen[ID]); report_message(MSG_DEBUG); if (isActive[ID] == IS_DEAD) { rc = catchup (ID, dataBuffer[ID], dataLen[ID]); if (rc == TRUE) { // we caught up isActive[ID] = IS_ACTIVE; } else { // otherwise it's still dead, release it and try again int j = ID*N_DATA_BUFFERS; buffer_table->buffer_status[i+j] = DONE; sprintf (message_buffer,"done with dead data buffer %d %d", ID, i); report_message(MSG_TRACE); release_data_buffer(ID,i); ID--; continue; } } else { isActive[ID] = IS_ACTIVE; } ;// end of if isActive[ID] == IS_DEAD if (isActive[ID] == IS_ACTIVE) { sprintf (message_buffer,"processing new data buffer %d %d %d %d 0x%08x %d", ID, i, j, dataLen[ID], dataBuffer[ID], dataStart[ID]); report_message(MSG_TRACE); /* Buffer32 = (int*)dataBuffer[ID]; for (jj = 0; jj < 64;) { printf( " 0x%08x", Buffer32[dataStart[ID]+jj]); jj++; if ((jj/8)*8 == jj) { printf("\n");} } printf("\n"); */ bufferNumber[ID] = i; ActiveLinks[NumRunningLinks] = ID; } } ;/* end of if (buffer_table->buffer_status[i+j] == PROCESSING */ } else { // No data buffer at present isActive[ID] = IS_WAITING; Waiting = TRUE; goto WAITING; } ;// end of if i>= 0) } ;// end of if (isActive[ID] == IS_ACTIVE } ;// end of for (ID = 0; ID < status_table->num_links; ID++ rc = do_merge (&ID, NumRunningLinks, ActiveLinks, dataBuffer, dataLen); sprintf (message_buffer,"return do_merge %d %d", rc, ID); report_message(MSG_TRACE); for (ID = 0; ID < status_table->num_links; ID++) { if (link_table[ID]->link_status == INUSE) { i = bufferNumber[ID]; j = ID*N_DATA_BUFFERS; if (buffer_table->buffer_status[i+j] == DONE) { sprintf (message_buffer,"done with data buffer %d %d", ID, i); report_message(MSG_TRACE); release_data_buffer(ID,i); isActive[ID] = WAS_ACTIVE; } } } } ;/* end of for (;;) */ }