#include #include #include #include #include #include #include "common.h" #include "message.h" #include "buffer.h" #include "transfer.h" #include "merge.h" extern STATUS_TABLE *status_table; extern BUFFER_TABLE *buffer_table; extern LINK_TABLE *link_table[NUMlinks]; extern int do_storage (char *, int); extern int Location; int oldestLink; extern int BufferSize; // I'm getting the buffer/block sizes wrong & TS can't handle it. // Fudge it for now #define MY_BUFFER_SIZE (BufferSize - 32) extern UINT64 lastTime; extern UINT64 oldestTime; extern ItDesc_Ptr OutputBufferPtr; extern int OutputBufferLen; extern char * OutputBuffer; static int dataStart[NUMlinks]; extern int bufferNumber[NUMlinks]; // returns TRUE if caught up and can be merged or FALSE if not and complete buffer too old int catchup (int id, char *dataBuffer, int dataLen) { int rc = 0; int i; NewItDesc_Ptr ps; sprintf (message_buffer, "catchup - link %d, dataLen %d, lastTime %llx", id, dataLen, lastTime); report_message(MSG_DEBUG); dataStart[id] = 0; ps = (NewItDesc_Ptr)dataBuffer; for (i=0; iTime >= lastTime) break; } if (iTime); report_message(MSG_DEBUG); } else { sprintf (message_buffer, "not caught up - last time in buffer %llx", (ps-1)->Time); report_message(MSG_DEBUG); } // If we've looped thru' the whole buffer and not caught up we start at the start of the next buffer dataStart[id] = (i == dataLen) ? 0 : i; rc = (i == dataLen) ? FALSE : TRUE; sprintf (message_buffer, "catchup (link %d) returns %s", id, rc ? "True" : "False"); report_message(MSG_DEBUG); return rc; } int do_merge (int *ID, int howMany, int *ActiveLinks, char **dataBuffer, int *dataLen) { int id, rc, i, j, done; NewItDesc_Ptr ps, os; sprintf(message_buffer, "do_merge merging %d links", howMany); report_message(MSG_TRACE); for (i=1; i<=howMany; i++) { sprintf(message_buffer, "link %d", ActiveLinks[i]); report_message(MSG_TRACE); } done=FALSE; while (done == FALSE) { os = NULL; oldestTime = 0LL; for (i=1; i<=howMany; i++) { id = ActiveLinks[i]; ps = (NewItDesc_Ptr)dataBuffer[id]; ps += dataStart[id]; sprintf(message_buffer, "%d ps=%llx %llx %llx %llx %d",id, ps, ps->Time, lastTime, oldestTime, dataStart[id]); report_message(MSG_DEBUG); if (ps->Time < oldestTime || oldestTime == 0LL) { sprintf(message_buffer, "%d %llx %d %llx %llx %d %llx",i,ps, id, ps->Time, oldestTime, oldestLink, lastTime); report_message(MSG_DEBUG); os = ps; oldestTime = os->Time; oldestLink = id; } } ;// for (i=1; i<=howMany; i++) - oldest TS now found sprintf(message_buffer, "oldest %d %llx %llx %d %d",oldestLink,os,oldestTime,dataStart[oldestLink],dataLen[oldestLink]); report_message(MSG_DEBUG); if (os != NULL) { // be cautious - be sure an oldest has really been found dataStart[oldestLink]++; if (dataStart[oldestLink] >= dataLen[oldestLink]) { dataStart[oldestLink] = 0; done = TRUE; sprintf(message_buffer, "merge_server (merge done=TRUE) %d", oldestLink); report_message(MSG_DEBUG); i = bufferNumber[oldestLink]; j = oldestLink*N_DATA_BUFFERS; buffer_table->buffer_status[i+j] = DONE; *ID = oldestLink; } if (os->Time < lastTime) { sprintf(message_buffer, "older than previous %llx < %llx", os->Time, lastTime); report_message(MSG_WARNING); } else { // For now just output the data in the standard format sprintf(message_buffer, "merge_server (OP) %d %llx %lx %lx %d", oldestLink, os->Time, os->Data, os->Timestamp, OutputBufferLen); report_message(MSG_DEBUG); lastTime = os->Time; OutputBufferPtr->data = os->Data; OutputBufferPtr->timestamp = os->Timestamp; OutputBufferPtr++; OutputBufferLen += sizeof(ItDesc); if (OutputBufferLen >= (BufferSize - 32 - sizeof(ItDesc))) { // No space for another item - the output buffer is full rc = do_storage (OutputBuffer, OutputBufferLen); OutputBufferPtr = (ItDesc_Ptr)OutputBuffer; OutputBufferLen = 0; } } ;// end of os->Time < lastTime } ;// end of if (os) } // end of while (done == FALSE) loop - one of the input buffers is empty return rc; }