/* Merger link task enable incoming TCP port (link_server.c) passes data to proc receiveDATAbuffer (handle_buffer.c) passes data to processDATAbuffer (handle_buffer.c) calls queueDATAbuffer (handle_buffer.c) passes data to merger */ #include #include #include /* getenv, exit */ #include #include #include #include #include "common.h" #include "message.h" #include "buffer.h" #include "transfer.h" #include "stats.h" #include "netvar.h" // this tasks process ID and its master process ID pid_t mypid; pid_t Masterpid; pid_t Mergepid; pid_t Linkpid[NUMlinks]; int verbose = 0; // Shared Memory Key to use key_t shmkey = SHMKEY; // address of shared data area void * shm_dataarea; // address of shared buffer area void * shm_bufferarea[NUMlinks]; // Shared tables held in the shared memory segment STATUS_TABLE *status_table; BUFFER_TABLE *buffer_table; // table of data link threads LINK_TABLE *link_table[NUMlinks]; int * StatsMem[NUMCOUNTERS]; int * RatesMem[NUMCOUNTERS]; // some program options int data_buffer_size = TRANSFER_BUFFER_SIZE; int server_options = 0; int NumRunningLinks = 0; void merge_server(); void MERGE_InitStats(); void stats_init(); void Usage(char *progname) { fprintf(stderr,"Usage\n%s -p [port] - i [IDs]\n",progname); fprintf(stderr,"\tport is the base TCP port to use (default 11001)\n"); fprintf(stderr,"\tIDs is the number of data streams to handle (default 2)\n"); exit(1); } int main(int argc, char *argv[], char *envp[]) { int i, j; int size; int index; int IDs = NUMlinks; mypid = getpid(); Masterpid = getppid(); msg_init(); sig_init(); report_version(); shmkey = SHMKEY; /* set default value */ // command line arguments if (argc >1) { for(i=1;i buffer_table_offset = sizeof(STATUS_TABLE); for (i = 0; i < NUMlinks; i++) { status_table->link_table_offset[i] = sizeof(STATUS_TABLE) + sizeof(BUFFER_TABLE) + (i * sizeof(LINK_TABLE)); } status_table->master_pid = Masterpid; // status_table->num_links = IDs; sprintf(message_buffer, "Merging up to %d data sources", status_table->num_links); report_message(MSG_INFORMATION+3); msg_logging_level = status_table->msg_logging_level; msg_reporting_level = status_table->msg_reporting_level; server_options = status_table->server_options; // map shared memory segment containing data buffers size = ((TRANSFER_BUFFER_SIZE * 1024) * N_DATA_BUFFERS) + sizeof(BUFFER_HEADER); for (i = 0; i < status_table->num_links; i++) { shm_bufferarea[i] = alloc_data_area(shmkey+1+i, size); /* obtain shared memory segment per link for data buffers */ } // initialise buffer table buffer_table = (BUFFER_TABLE *) ((char*) status_table + status_table->buffer_table_offset); /* for (i = 0; i < status_table->num_links; i++) { buffer_table->buffer_baseaddress[i] = (char*) shm_bufferarea[i] + sizeof(BUFFER_HEADER); } */ // initialise link table for (i = 0; i < status_table->num_links; i++) { link_table[i] = (LINK_TABLE *) ((char *) status_table + status_table->link_table_offset[i]); } /* lastly statistics array */ stats_init(); // start execution merge_server(); exit(0); }