#include #include #include #include /* getenv, exit */ #include #include #include #include "common.h" #include "message.h" #include "buffer.h" #include "stats.h" #include "netvar.h" #define SIG_RT1 SIGRTMIN+1 #define SIG_RT2 SIGRTMIN+2 #define SIGWAKEUP SIG_RT1 // this tasks process ID and its master process ID pid_t mypid; pid_t Masterpid; pid_t Mergepid; pid_t StatRatepid; pid_t Linkpid[NUMlinks]; int verbose = 0; // Shared Memory Key to use key_t shmkey = SHMKEY; // address of shared data area void * shm_dataarea; // address of shared buffer area void * shm_bufferarea[NUMlinks]; // Shared tables held in the shared memory segment STATUS_TABLE *status_table; BUFFER_TABLE *buffer_table; // table of data link threads LINK_TABLE *link_table[NUMlinks]; int * StatsMem[NUMCOUNTERS]; int * RatesMem[NUMCOUNTERS]; int * MERGE_State; // some program options int server_options = 0; int Tcp_Port = MERGE_TCP_PORT; int IDs = NUMlinks; int LINKs = NUMlinks; char merge_task[PATH_LENGTH] = "./merge64"; char link_task[PATH_LENGTH] = "./link64"; char statrate_task[PATH_LENGTH] = "./statrate64"; void start_merge(int); void start_links(); void start_statrate(); void stats_init(); void Usage(char *progname) { fprintf(stderr,"Usage\n%s -p [port] -s [shmkey] - i [IDs]\n",progname); fprintf(stderr,"\tport is the base TCP port to use (default 11001)\n"); fprintf(stderr,"\tshmkey is the base shared memory key to use (default 11000)\n"); fprintf(stderr,"\tIDs is the number of data streams to handle (default 4)\n"); exit(1); } int main(int argc, char *argv[], char *envp[]) { int i, j; int Signal; int size; int linkNum; mypid = getpid(); Masterpid = mypid; msg_init(); sig_init(); report_version(); shmkey = SHMKEY; /* set default value */ Tcp_Port = MERGE_TCP_PORT; // command line arguments if (argc >1) { for(i=1;i buffer_table_offset = sizeof(STATUS_TABLE); for (i = 0; i < NUMlinks; i++) { status_table->link_table_offset[i] = sizeof(STATUS_TABLE) + sizeof(BUFFER_TABLE) + (i * sizeof(LINK_TABLE)); } status_table->master_pid = mypid; status_table->num_links = LINKs; status_table->msg_logging_level = msg_logging_level; status_table->msg_reporting_level = msg_reporting_level; status_table->server_options = server_options; // calculate size of Shared Memory buffer Segment required status_table->data_buffer_size = TRANSFER_BUFFER_SIZE; size = ((TRANSFER_BUFFER_SIZE * 1024) * N_DATA_BUFFERS) + sizeof(BUFFER_HEADER); for (i = 0; i < status_table->num_links; i++) { shm_bufferarea[i] = alloc_data_area(shmkey+1+i, size); /* obtain shared memory segment per link for data buffers */ } // initialise buffer table buffer_table = (BUFFER_TABLE *) ((char*) status_table + status_table->buffer_table_offset); for (i = 0; i < status_table->num_links; i++) { buffer_table->buffer_baseaddress[i] = (char*) shm_bufferarea[i] + sizeof(BUFFER_HEADER); buffer_table->buffers_free[i] = N_DATA_BUFFERS; /* set all initially to free */ for (j = 0; j < N_DATA_BUFFERS; j++) { buffer_table->buffer_status[i*N_DATA_BUFFERS + j] = FREE; /* set all initially to free */ } } // initialise link table for (i = 0; i < status_table->num_links; i++) { link_table[i] = (LINK_TABLE *) ((char *) status_table + status_table->link_table_offset[i]); } for (i = 0; i < status_table->num_links; i++) { link_table[i]->link_status = FREE; link_table[i]->link_pid = 0; link_table[i]->link_index = i; link_table[i]->link_buffer_head = link_table[i]->link_buffer_tail = 0; /* initialize buffer Q */ } #include "netvar.c" stats_init(); start_statrate(); start_merge(IDs); // sleep (5); assignNetInt("NetVar.MERGE.LinksAvailable", 0, 1, IDs); assignNetInt("NetVar.MERGE.LinksInUse", 0, 1, LINKs); assignNetInt("NetVar.MERGE.LinksAlive", 0, 1, 0); start_links(); sleep (5); #include "handle_signals.c" exit(0); }