This is gauge.c in view mode; [Download] [Up]
/* * PCN Abstract Machine Emulator * Authors: Steve Tuecke and Ian Foster * Argonne National Laboratory * * Please see the DISCLAIMER file in the top level directory of the * distribution regarding the provisions under which this software * is distributed. * * gauge.c - Gauge support routines. */ #include "pcn.h" #ifdef GAUGE #if !defined(s2010) && ((defined(sun) && !defined(os3)) || defined(next040) || defined(symmetry) || defined(rs6000)) #include <sys/param.h> #endif #ifndef MAXHOSTNAMELEN /* max length of the host name */ #define MAXHOSTNAMELEN 31 #endif #ifndef DEFAULT_GAUGE_FILE /* file name for gauge output file */ #define DEFAULT_GAUGE_FILE "profile.cnt" #endif struct gauge_stats _p_gauge_stats = {{0,0},{0,0},{0,0},0,0,0,0,0,0,0}; static int_t snapshots_taken; static char_t hostname[MAXHOSTNAMELEN+1]; static int_t pid; static FILE * gauge_tmp_fp; static gauge_timer start_time; #define MAX_SNAPSHOT_NAME_LENGTH 63 #define MAX_GAUGE_MSG_SIZE (((MAX_SNAPSHOT_NAME_LENGTH+1) / CELL_SIZE) + 16) static cell_t gauge_receive_buf[MAX_GAUGE_MSG_SIZE]; /* * The gauge_request list is used to queue requests for * gauge actions (snapshots, etc), while handling other requests. */ #define GAUGE_REQUEST_SNAPSHOT 1 #define GAUGE_REQUEST_RESET 2 #define GAUGE_REQUEST_RETURN_PROFILES 3 #define GAUGE_COMPLETED_REQUEST 4 typedef struct gauge_request_struct { struct gauge_request_struct *next; int_t type; int_t from_node; char_t snapshot_name[MAX_SNAPSHOT_NAME_LENGTH+1]; } gauge_request_t; static gauge_request_t *first_request = (gauge_request_t *) NULL; static gauge_request_t *last_request = (gauge_request_t *) NULL; static void enqueue_request(); #ifdef encore unsigned *_p_encore_timer; #endif #if defined(PCN_USE_BSD_TIME) || defined(symmetry) unsigned long _p_unix_base_time; #endif #if !defined(s2010) && !defined(ipsc860) && !defined(delta) extern int gethostname(); #else #define USE_HOSTNAME #define HOSTNAME PCN_ARCH #endif static void enqueue_request(); static void host_handle_requests(); static void host_do_snapshot(); static void host_do_reset(); #ifdef PARALLEL static void wait_for_completed(); static void process_and_enqueue_gauge_msg(); static void send_completed_request(); static void node_handle_requests(); #endif /* PARALLEL */ static void do_reset(); static void do_snapshot(); static void timer_secs(); static void get_times(); static void print_statistics(); /* * _p_init_gauge_tmp_file() * * Initialize the _p_gauge_tmp_file variable on the host so that it * can be distributed to the nodes. * * This file (with the node number concatenated) will be used to * hold the temporary snapshot files that each node will use * to hold the shapshots until they can be collected at the end * of the run. */ void _p_init_gauge_tmp_file() { if (_p_gauge_file[0] != '\0') _p_gauge = TRUE; if (_p_host &&_p_gauge) { #ifndef GAUGE_PRIMITIVES_DEFINED fprintf(_p_stdout, "Gauge's timing primities are not available\n"); fflush(_p_stdout); _p_gauge = FALSE; #else /* GAUGE_PRIMITIVES_DEFINED */ #ifdef USE_HOSTNAME strcpy(hostname, HOSTNAME); #else gethostname(hostname, MAXHOSTNAMELEN); #endif pid = (int_t) getpid(); #if defined(ipsc860) || defined(delta) /* File name lengths are a problem */ sprintf(_p_gauge_tmp_file, "%ld_cnt", (long) pid); #else sprintf(_p_gauge_tmp_file, "pcncnt_%s_%ld", hostname, (long) pid); #endif #endif /* GAUGE_PRIMITIVES_DEFINED */ } else { _p_gauge_tmp_file[0] = '\0'; } } /* _p_init_gauge_tmp_file() */ void _p_init_gauge() { if (_p_gauge) { char tmp_filename[MAX_PATH_LENGTH]; #ifdef symmetry /* initialize the microsecond clock */ struct timeval tval; struct timezone zone; usclk_init(); gettimeofday(&tval,&zone); _p_unix_base_time = tval.tv_sec; #endif /* symmetry */ #ifdef encore _p_encore_timer = mapfrcounter(); #endif /* encore */ #ifdef PCN_USE_BSD_TIME struct timeval tval; struct timezone zone; gettimeofday(&tval,&zone); _p_unix_base_time = tval.tv_sec; #endif /* PCN_USE_BSD_TIME */ TIMER(start_time); snapshots_taken = 0; if (_p_tmp_dir[0] == '\0') strcpy(_p_tmp_dir, PCN_TMP_DIR); strcpy(tmp_filename, _p_gauge_tmp_file); #if defined(ipsc860) || defined(delta) /* File name lengths are a problem */ sprintf(_p_gauge_tmp_file, "%s/%ld_%s", _p_tmp_dir, (long) _p_my_id, tmp_filename); #else sprintf(_p_gauge_tmp_file, "%s/%s_%ld", _p_tmp_dir, tmp_filename, (long) _p_my_id); #endif if ((gauge_tmp_fp = fopen(_p_gauge_tmp_file, "w+")) == (FILE *) NULL) { char_t buf[MAX_PATH_LENGTH + 128]; sprintf(buf, "_p_init_gauge(): Failed to open Gauge temporary file for writing: %s", _p_gauge_tmp_file); _p_fatal_error(buf); } if (_p_host) { if (_p_gauge_file[0] == '\0') { strcpy(_p_gauge_file, DEFAULT_GAUGE_FILE); } else { /* Make sure it has a .cnt suffix. If not, then add one. */ int_t i = strlen(_p_gauge_file); if (i < 4 || strcmp(_p_gauge_file + i - 4, ".cnt") != 0) { strcat(_p_gauge_file, ".cnt"); } } } #ifdef USE_HOSTNAME strcpy(hostname, HOSTNAME); #else gethostname(hostname, MAXHOSTNAMELEN); #endif } } /* _p_init_gauge() */ /* * enqueue_request() * * Allocate room for a gauge_request and enqueue it on the end of the * request list. */ static void enqueue_request(request_type, from_node, snapshot_name) int_t request_type; int_t from_node; char_t *snapshot_name; { gauge_request_t *new_request; char_t *s; new_request = (gauge_request_t *) malloc (sizeof(gauge_request_t)); if (new_request == (gauge_request_t *) NULL) _p_malloc_error(); new_request->next = (gauge_request_t *) NULL; new_request->type = request_type; new_request->from_node = from_node; /* Truncate the snapshot name as necessary */ strncpy(new_request->snapshot_name, snapshot_name, MAX_SNAPSHOT_NAME_LENGTH); new_request->snapshot_name[MAX_SNAPSHOT_NAME_LENGTH] = '\0'; if (first_request == (gauge_request_t *) NULL) { first_request = last_request = new_request; } else { last_request->next = new_request; last_request = new_request; } } /* enqueue_request() */ /* * host_handle_requests() * * Loop through the request queue until it is empty, handling all * gauge requests. If new requests come in in the mean time, * queue them up so that they also get handled before exiting this * procedure. */ static void host_handle_requests() { bool_t got_initiate_exit = FALSE; int_t exit_code; gauge_request_t *request; while (first_request != (gauge_request_t *) NULL && !got_initiate_exit) { request = first_request; first_request = first_request->next; switch (request->type) { case GAUGE_REQUEST_SNAPSHOT: host_do_snapshot(request, &got_initiate_exit, &exit_code); break; case GAUGE_REQUEST_RESET: host_do_reset(request, &got_initiate_exit, &exit_code); break; default: _p_fatal_error("host_handle_requests(): Illegal request type"); break; } free(request); } if (got_initiate_exit) { _p_do_exit(exit_code); } } /* host_handle_requests() */ /* * host_do_snapshot() * * Cause a snapshot on all nodes. */ static void host_do_snapshot(request, got_initiate_exit, exit_code) gauge_request_t *request; bool_t *got_initiate_exit; int_t *exit_code; { int_t snapshot_num; char_t *snapshot_name = request->snapshot_name; #ifdef PARALLEL int_t node; int_t msg_buf_size; cell_t *msg_buf; int_t from_node = request->from_node; cell_t *save_heap_ptr = _p_heap_ptr; cell_t *save_heap_hard_top = _p_heap_hard_top; #endif /* PARALLEL */ snapshot_num = snapshots_taken; #ifdef PARALLEL _p_heap_ptr = gauge_receive_buf; _p_heap_hard_top = _p_heap_ptr + MAX_GAUGE_MSG_SIZE; msg_buf_size = 3 + StringSizeToCells(strlen(snapshot_name) + 1); /* * Send out a request for a snapshot to each node. */ for (node = 1; node < _p_nodes; node++) { msg_buf = _p_alloc_msg_buffer(msg_buf_size); *((int_t *)(msg_buf)) = GAUGE_REQUEST_SNAPSHOT; *((int_t *)(msg_buf + 1)) = from_node; *((int_t *)(msg_buf + 2)) = snapshot_num; strcpy((char *) (msg_buf + 3), snapshot_name); #ifdef DEBUG if (ParDebug(5)) { fprintf(_p_stdout, "(%lu,%lu) host_do_snapshot(): Sending snapshot request to %lu\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) node); fflush(_p_stdout); } #endif /* DEBUG */ _p_msg_send(msg_buf, node, msg_buf_size, MSG_GAUGE); #ifdef DEBUG if (ParDebug(7)) { fprintf(_p_stdout, "(%lu,%lu) host_do_snapshot(): Sent\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); fflush(_p_stdout); } #endif /* DEBUG */ } #endif /* PARALLEL */ do_snapshot(snapshot_num, snapshot_name); #ifdef PARALLEL wait_for_completed(got_initiate_exit, exit_code); _p_heap_ptr = save_heap_ptr; _p_heap_hard_top = save_heap_hard_top; #endif /* PARALLEL */ } /* host_do_snapshot() */ /* * host_do_reset() * * Cause a snapshot on all nodes. */ static void host_do_reset(request, got_initiate_exit, exit_code) gauge_request_t *request; bool_t *got_initiate_exit; int_t *exit_code; { #ifdef PARALLEL int_t node; cell_t *msg_buf; int_t from_node = request->from_node; cell_t *save_heap_ptr = _p_heap_ptr; cell_t *save_heap_hard_top = _p_heap_hard_top; #endif /* PARALLEL */ #ifdef PARALLEL _p_heap_ptr = gauge_receive_buf; _p_heap_hard_top = _p_heap_ptr + MAX_GAUGE_MSG_SIZE; /* * Send out a request for a reset to each node. */ for (node = 1; node < _p_nodes; node++) { msg_buf = _p_alloc_msg_buffer(2); *((int_t *)(msg_buf)) = GAUGE_REQUEST_RESET; *((int_t *)(msg_buf + 1)) = from_node; #ifdef DEBUG if (ParDebug(5)) { fprintf(_p_stdout, "(%lu,%lu) host_do_reset(): Sending reset request to %lu\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) node); fflush(_p_stdout); } #endif /* DEBUG */ _p_msg_send(msg_buf, node, 2, MSG_GAUGE); #ifdef DEBUG if (ParDebug(7)) { fprintf(_p_stdout, "(%lu,%lu) host_do_reset(): Sent\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); fflush(_p_stdout); } #endif /* DEBUG */ } #endif /* PARALLEL */ do_reset(); #ifdef PARALLEL wait_for_completed(got_initiate_exit, exit_code); _p_heap_ptr = save_heap_ptr; _p_heap_hard_top = save_heap_hard_top; #endif /* PARALLEL */ } /* host_do_reset() */ #ifdef PARALLEL /* * wait_for_completed() * * Wait for a GAUGE_COMPLETED_REQUEST message from _p_nodes-1 nodes. * Enqueue any other gauge requests that we get in the mean time. * If we get an MSG_INITIATE_EXIT, then set *got_initiate_exit to TRUE * and set *exit_code to the exit code. */ static void wait_for_completed(got_initiate_exit, exit_code) bool_t *got_initiate_exit; int_t *exit_code; { int_t i; int_t msg_type, msg_size, msg_node; i = 1; while (i < _p_nodes) { _p_msg_receive(&msg_node, &msg_size, &msg_type, RCV_GAUGE); switch(msg_type) { case MSG_GAUGE: if (*((int_t *)(_p_heap_ptr)) == GAUGE_COMPLETED_REQUEST) i++; else process_and_enqueue_gauge_msg(msg_size, msg_node); break; case MSG_INITIATE_EXIT: if (!got_initiate_exit) { *got_initiate_exit = TRUE; *exit_code = *((int_t *)(_p_heap_ptr)); } break; default: _p_fatal_error("wait_for_completed(): Received unexpected message type"); break; } } } /* wait_for_completed() */ #endif /* PARALLEL */ #ifdef PARALLEL /* * process_and_enqueue_gauge_msg() * * Figure out what kind of request the gauge message at _p_heap_ptr is * and enqueue it. */ static void process_and_enqueue_gauge_msg(size, node) int_t size; int_t node; { switch (*((int_t *)(_p_heap_ptr))) { case GAUGE_REQUEST_SNAPSHOT: enqueue_request(GAUGE_REQUEST_SNAPSHOT, node, (char_t *) (_p_heap_ptr + 1)); break; case GAUGE_REQUEST_RESET: enqueue_request(GAUGE_REQUEST_RESET, node, ""); break; default: _p_fatal_error("process_and_enqueue_gauge_msg(): Invalid gauge message type"); break; } } /* process_and_enqueue_gauge_msg() */ #endif /* PARALLEL */ #ifdef PARALLEL /* * _p_process_gauge_msg() * * Process the MSG_GAUGE message that is at _p_heap_ptr. This * is called by parallel.c:_p_process_messages(). * * If I'm the host, then put the message on the gauge_request queue * and then call the queue handler. * * If I'm a node, then handle the request. */ void _p_process_gauge_msg(size, node) int_t size; int_t node; { if (_p_host) { process_and_enqueue_gauge_msg(); host_handle_requests(); } else { switch (*((int_t *)(_p_heap_ptr))) { case GAUGE_REQUEST_SNAPSHOT: do_snapshot(*((int_t *)(_p_heap_ptr + 2)), (char_t *) (_p_heap_ptr + 3)); break; case GAUGE_REQUEST_RESET: do_reset(); break; default: _p_fatal_error("process_and_enqueue_gauge_msg(): Invalid gauge message type"); break; } send_completed_request(); } } /* _p_process_gauge_msg() */ #endif /* PARALLEL */ #ifdef PARALLEL /* * send_completed_request() * * Send a GAUGE_COMPLETED_REQUEST message to the host. */ static void send_completed_request() { cell_t *msg_buf; msg_buf = _p_alloc_msg_buffer(1); *((int_t *)(msg_buf)) = GAUGE_COMPLETED_REQUEST; #ifdef DEBUG if (ParDebug(5)) { fprintf(_p_stdout, "(%lu,%lu) send_completed_request(): Sending to host\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); fflush(_p_stdout); } #endif /* DEBUG */ _p_msg_send(msg_buf, _p_host_id, 1, MSG_GAUGE); #ifdef DEBUG if (ParDebug(7)) { fprintf(_p_stdout, "(%lu,%lu) send_completed_request(): Sent\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); fflush(_p_stdout); } #endif /* DEBUG */ } /* send_completed_request() */ #endif /* PARALLEL */ #ifdef PARALLEL /* * node_handle_requests() * * Handle all gauge requests that come to my node, until we * receive a request that originated at my node. Since a node * can only have one outstanding gauge request at a time, * we know we've handled the request that caused this procedure * to get called if we handle a request that originated from this node. */ static void node_handle_requests() { bool_t done = FALSE; cell_t *save_heap_ptr = _p_heap_ptr; cell_t *save_heap_hard_top = _p_heap_hard_top; int_t msg_type, msg_size, msg_node; bool_t got_exit = FALSE; int_t exit_code; _p_heap_ptr = gauge_receive_buf; _p_heap_hard_top = _p_heap_ptr + MAX_GAUGE_MSG_SIZE; while (!done && !got_exit) { _p_msg_receive(&msg_node, &msg_size, &msg_type, RCV_GAUGE); switch (msg_type) { case MSG_GAUGE: switch (*((int_t *)(_p_heap_ptr))) { case GAUGE_REQUEST_SNAPSHOT: do_snapshot(*((int_t *)(_p_heap_ptr + 2)), (char_t *) (_p_heap_ptr + 3)); break; case GAUGE_REQUEST_RESET: do_reset(); break; default: _p_fatal_error("node_handle_requests(): Illegal request type"); break; } send_completed_request(); if (*((int_t *)(_p_heap_ptr + 1)) == _p_my_id) done = TRUE; break; case MSG_EXIT: got_exit = TRUE; exit_code = *((int_t *)(_p_heap_ptr)); break; default: _p_fatal_error("node_handle_requests(): Illegal message type"); break; } } _p_heap_ptr = save_heap_ptr; _p_heap_hard_top = save_heap_hard_top; if (got_exit) { _p_node_handle_exit(exit_code, FALSE); } } /* node_handle_requests() */ #endif /* PARALLEL */ /* * do_reset() * * Reset all counters and timers to 0. */ static void do_reset() { int_t i; proc_header_t *proc_header; for (i = 0; i <= _p_exported_table_size; i++) { for (proc_header = _p_exported_table[i]; proc_header != (proc_header_t *) NULL; proc_header = proc_header->next) { ZeroOutMemory(proc_header->counters, (proc_header->n_counters * sizeof(int_t))); ZeroOutMemory(proc_header->timers, (proc_header->n_timers * sizeof(int_t) * 2)); } } #if defined(PCN_USE_BSD_TIME) || defined(symmetry) { struct timeval tval; struct timezone zone; gettimeofday(&tval,&zone); _p_unix_base_time = tval.tv_sec; } #endif /* PCN_USE_BSD_TIME */ ZeroOutMemory(&_p_gauge_stats, sizeof(_p_gauge_stats)); } /* do_reset() */ /* * do_snapshot() * * Do a snapshot into my temporary file. */ static void do_snapshot(snapshot_num, snapshot_name) int_t snapshot_num; char_t *snapshot_name; { int_t i; proc_header_t *proc_header; int_t n_counters, n_timers; gauge_timer end_time; if (snapshot_num != snapshots_taken) { _p_fatal_error("do_snapshot(): snapshot_num != snapshots_taken"); } TIMER(end_time); timer_sub(start_time, end_time, _p_gauge_stats.run_time); for (i = 0; i <= _p_exported_table_size; i++) { for (proc_header = _p_exported_table[i]; proc_header != (proc_header_t *) NULL; proc_header = proc_header->next) { if (proc_header->model != (char_t **) NULL) { n_counters = proc_header->n_counters; n_timers = proc_header->n_timers; if (fprintf(gauge_tmp_fp, "K %s %ld %ld %s:%s/%ld %ld ", (char *) snapshot_name, (long) snapshot_num, (long) _p_my_id, (char *) proc_header->module_name, (char *) proc_header->proc_name, (long) proc_header->arity, (long) n_counters) == EOF) { _p_fatal_error("do_snapshot(): Failed to write to temporary Gauge profile file"); } if (fwrite((char *) proc_header->counters, CELL_SIZE, n_counters, gauge_tmp_fp) != n_counters) { _p_fatal_error("do_snapshot(): Failed to write to temporary Gauge profile file"); } if (fprintf(gauge_tmp_fp, "\nT %s %ld %ld %s:%s/%ld %ld ", (char *) snapshot_name, (long) snapshot_num, (long) _p_my_id, (char *) proc_header->module_name, (char *) proc_header->proc_name, (long) proc_header->arity, (long) n_timers) == EOF) { _p_fatal_error("do_snapshot(): Failed to write to temporary Gauge profile file"); } if (fwrite((char *) proc_header->timers, CELL_SIZE * 2, n_timers, gauge_tmp_fp) != n_timers) { _p_fatal_error("do_snapshot(): Failed to write to temporary Gauge profile file"); } if (fprintf(gauge_tmp_fp, "\n") == EOF) { _p_fatal_error("do_snapshot(): Failed to write to temporary Gauge profile file"); } } } } if (fprintf(gauge_tmp_fp, "P %s %ld %lu %lu %lu %lu %lu %lu %lu %lu %lu %lu %lu %lu\n", (char *) snapshot_name, (long) snapshot_num, (unsigned long) _p_my_id, (unsigned long) LOW_WORD(_p_gauge_stats.run_time), (unsigned long) HI_WORD(_p_gauge_stats.run_time), (unsigned long) LOW_WORD(_p_gauge_stats.gc_time), (unsigned long) HI_WORD(_p_gauge_stats.gc_time), (unsigned long) _p_gauge_stats.gc_calls, (unsigned long) _p_gauge_stats.ssmallmsgs, (unsigned long) _p_gauge_stats.sbigmsgs, (unsigned long) _p_gauge_stats.sbigmsgslen, (unsigned long) _p_gauge_stats.ssmallmsgs, (unsigned long) _p_gauge_stats.sbigmsgs, (unsigned long) _p_gauge_stats.sbigmsgslen) == EOF) { _p_fatal_error("do_snapshot(): Failed to write to temporary Gauge profile file"); } snapshots_taken++; } /* do_snapshot() */ #define OUTPUT_ERROR() \ _p_fatal_error("Failed to write to Gauge profile file"); /* * _p_host_final_profile() * * Take a final profile snapshot, and coalesce the node profiles */ void _p_host_final_profile() { FILE *cnt_fp; char_t lasthost[MAXHOSTNAMELEN+1]; int_t lastnode, i, j, n_hostnames, buf_size; long host_count_position, current_position; proc_header_t *proc_header; bool_t done; u_int_t sec, usec; char_t **m; #ifdef PARALLEL int_t msg_type, msg_size, msg_node; #endif /* PARALLEL */ if (!_p_gauge) return; do_snapshot(snapshots_taken, "end"); timer_secs(&(_p_gauge_stats.run_time) ,&sec, &usec); fprintf(_p_stdout, "Program completed in %lu.%lu seconds.\n", (unsigned long) sec, (unsigned long) usec); fprintf(_p_stdout, "Writing Gauge profile file: %s\n", _p_gauge_file); fflush(_p_stdout); if ((cnt_fp = fopen(_p_gauge_file, "w+")) == (FILE *) NULL) { char_t buf[MAX_PATH_LENGTH + 128]; sprintf(buf, "_p_host_final_profile(): Failed to open Gauge profile file for writing: %s", _p_gauge_file); _p_fatal_error(buf); } /* * Write out the header informaton of the .cnt file */ if (fprintf(cnt_fp, "S profile %s\n", _p_gauge_file) == EOF) OUTPUT_ERROR(); if (fprintf(cnt_fp, "V 0005\n") == EOF) OUTPUT_ERROR(); if (fprintf(cnt_fp, "N %ld %ld %ld ", (long) _p_nodes, (long) 0, (long) _p_nodes - 1) == EOF) OUTPUT_ERROR(); /* * Save the position and make space for the count of the * number of different hosts. */ host_count_position = ftell(cnt_fp); if (fprintf(cnt_fp, " ") == EOF) OUTPUT_ERROR(); /* * Get the hostname from each node, and coalesce adjacent * names into ranges if they are the same. */ strcpy(lasthost, hostname); lastnode = 0; n_hostnames = 0; i = 1; #ifdef PARALLEL while (i < _p_nodes) { /* * Send empty MSG_GAUGE message to node i, to prompt him to send * his hostname back. */ _p_msg_send((cell_t *) NULL, i, 0, MSG_GAUGE); _p_msg_receive(&msg_node, &msg_size, &msg_type, RCV_GAUGE); if (msg_type != MSG_GAUGE) _p_fatal_error("_p_host_final_profile(): Expected a MSG_GAUGE message"); if (strcmp(lasthost, (char_t *) _p_heap_ptr) != 0) { if (fprintf(cnt_fp, " %s %ld %ld", lasthost, (long) lastnode, (long) i - 1) == EOF) OUTPUT_ERROR(); lastnode = i; strcpy(lasthost, (char_t *) _p_heap_ptr); n_hostnames++; } i++; } #endif /* PARALLEL */ if (fprintf(cnt_fp, " %s %ld %ld", lasthost, (long) lastnode, (long) i - 1) == EOF) OUTPUT_ERROR(); n_hostnames++; /* * Go back and fill in the number of distinct hostnames that were * written */ current_position = ftell(cnt_fp); fseek(cnt_fp, host_count_position, 0); if (fprintf(cnt_fp, "%ld", n_hostnames) == EOF) OUTPUT_ERROR(); fseek(cnt_fp, current_position, 0); /* Finish off the N line */ if (fprintf(cnt_fp, "\n") == EOF) OUTPUT_ERROR(); /* * Now dump out the model information for each procedure */ for (i = 0; i <= _p_exported_table_size; i++) { for (proc_header = _p_exported_table[i]; proc_header != (proc_header_t *) NULL; proc_header = proc_header->next) { if (proc_header->model != (char_t **) NULL) { for (m = proc_header->model, j = 0; m[j] != (char_t *) NULL; j++) { if (fprintf(cnt_fp, "%s", m[j]) == EOF) OUTPUT_ERROR(); } } } } /* * Write out the profile header */ if (fprintf(cnt_fp, "S counter %lu %lu %ld %ld %ld []\n", (unsigned long) LOW_WORD(_p_gauge_stats.run_time), (unsigned long) HI_WORD(_p_gauge_stats.run_time), (long) 0, (long) _p_nodes - 1, (long) snapshots_taken) == EOF) OUTPUT_ERROR(); /* * Now dump the host's profile data */ fseek(gauge_tmp_fp, (long) 0, 0); i = MIN(8192, ((char_t *) _p_heap_real_top) - ((char_t *) _p_heap_ptr)); while ((buf_size = fread((char *) _p_heap_ptr, 1, i, gauge_tmp_fp)) != 0) { if (fwrite((char *) _p_heap_ptr, 1, buf_size, cnt_fp) != buf_size) OUTPUT_ERROR(); } fclose(gauge_tmp_fp); remove(_p_gauge_tmp_file); #ifdef PARALLEL /* * Send empty MSG_GAUGE message to node i, to prompt him to send * his profile information. */ for (i = 1; i < _p_nodes; i++) { _p_msg_send((cell_t *) NULL, i, 0, MSG_GAUGE); done = FALSE; while (!done) { _p_msg_receive(&msg_node, &msg_size, &msg_type, RCV_GAUGE); if (msg_type != MSG_GAUGE) _p_fatal_error("_p_host_final_profile(): Expected a MSG_GAUGE message"); if (msg_size == 0) { done = TRUE; } else { buf_size = *((int_t *)(_p_heap_ptr)); if (fwrite((char *) (_p_heap_ptr + 1), buf_size, 1, cnt_fp) != 1) OUTPUT_ERROR(); } } } #endif /* PARALLEL */ /* * And some trailers... */ if (fprintf(cnt_fp, "E counter\n") == EOF) OUTPUT_ERROR(); if (fprintf(cnt_fp, "E profile %s\n", _p_gauge_file) == EOF) OUTPUT_ERROR(); fclose(cnt_fp); fprintf(_p_stdout, "Done writing: %s\n", _p_gauge_file); fflush(_p_stdout); } /* _p_host_final_profile() */ #ifdef PARALLEL /* * _p_node_final_profile() * * Take a final profile snapshot, and coalesce the node profiles */ void _p_node_final_profile() { int_t msg_type, msg_size, msg_node; cell_t *msg_buf; int_t msg_buf_size; bool_t done; int_t buf_size; if (!_p_gauge) return; do_snapshot(snapshots_taken, "end"); /* * Wait for an empty MSG_GAUGE message, prompting me for my hostname */ _p_msg_receive(&msg_node, &msg_size, &msg_type, RCV_GAUGE); if (msg_type != MSG_GAUGE) _p_fatal_error("_p_node_final_profile(): Expected a MSG_GAUGE message"); msg_buf_size = StringSizeToCells(strlen(hostname) + 1); msg_buf = _p_alloc_msg_buffer(msg_buf_size); strcpy((char *) msg_buf, hostname); _p_msg_send(msg_buf, _p_host_id, msg_buf_size, MSG_GAUGE); /* * Now wait for an empty MSG_GAUGE message, prompting me for my * profile data. Send the profile data back in chunks. */ _p_msg_receive(&msg_node, &msg_size, &msg_type, RCV_GAUGE); if (msg_type != MSG_GAUGE) _p_fatal_error("_p_node_final_profile(): Expected a MSG_GAUGE message"); fseek(gauge_tmp_fp, (long) 0, 0); done = FALSE; while (!done) { msg_buf = _p_alloc_msg_buffer(_p_default_msg_buffer_size); buf_size = fread((char *) (msg_buf + 1), 1, (_p_default_msg_buffer_size - 1) * CELL_SIZE, gauge_tmp_fp); if (buf_size == 0) { _p_msg_send(msg_buf, _p_host_id, 0, MSG_GAUGE); done = TRUE; } else { *((int_t *) msg_buf) = buf_size; _p_msg_send(msg_buf, _p_host_id, _p_default_msg_buffer_size, MSG_GAUGE); } } fclose(gauge_tmp_fp); remove(_p_gauge_tmp_file); } /* _p_node_final_profile() */ #endif /* PARALLEL */ /* * timer_secs() * * Convert a timer value into seconds and micro-seconds. */ static void timer_secs(timer, secs, usecs) gauge_timer *timer; u_int_t *secs, *usecs; { double scale, tmp; unsigned long utmp = 0xFFFFFFFF; scale = (((double) utmp) + 1) / TICKS_PER_SEC; *secs = (u_int_t) ((((double) HI_WORD(*timer)) * scale) + (((double) LOW_WORD(*timer)) / TICKS_PER_SEC)); tmp = (((double) HI_WORD(*timer) * ((double) utmp + 1)) - ((double) *secs * TICKS_PER_SEC)); scale = (1000000.0 / TICKS_PER_SEC); *usecs = (u_int_t) ((tmp + ((double) LOW_WORD(*timer))) * scale); } /* timer_secs() */ /* * get_times() * * Convert a timer value into minutes, seconds, and milli-seconds. */ static void get_times(stats, min, sec, msec) struct gauge_stats *stats; u_int_t *min, *sec, *msec; { timer_secs(&(stats->run_time), sec, msec); *min = (u_int_t) *sec / 60; *sec %= 60; *msec /= 1000; } /* get_times() */ /* * print_statistics() * * Print out the values in the '_p_gauge_stats' structure. */ static void print_statistics() { u_int_t secs, usecs; timer_secs(&(_p_gauge_stats.run_time), &secs, &usecs); fprintf(_p_stdout, " Run time: %lu.%lu seconds\n", (unsigned long) secs, (unsigned long) usecs); timer_secs(&(_p_gauge_stats.idle_time), &secs, &usecs); fprintf(_p_stdout, " Idle time: %lu.%lu seconds\n", (unsigned long) secs, (unsigned long) usecs); fprintf(_p_stdout, " GC calls %lu\n", (unsigned long) _p_gauge_stats.gc_calls); timer_secs(&(_p_gauge_stats.gc_time), &secs, &usecs); fprintf(_p_stdout, " GC Time %lu.%lu\n", (unsigned long) secs, (unsigned long) usecs); fprintf(_p_stdout, " Messages Sent: %lu %lu/%lu\n", (unsigned long) _p_gauge_stats.ssmallmsgs, (unsigned long) _p_gauge_stats.sbigmsgs, (unsigned long) _p_gauge_stats.sbigmsgslen); fprintf(_p_stdout, " Messages Received %lu %lu/%lu\n", (unsigned long) _p_gauge_stats.rsmallmsgs, (unsigned long) _p_gauge_stats.rbigmsgs, (unsigned long) _p_gauge_stats.rbigmsgslen); fflush(_p_stdout); } /* print_statistics() */ #endif /* GAUGE */ /* * profile_reset() * * This is the entry point from user code for invoking a profile reset * on all nodes. * * If I'm a node, send a request to the host, and then * handle all gauge requests up to and including my request. * * If I'm the host, then go ahead and handle this request. */ void profile_reset() { #ifdef GAUGE if (!_p_gauge) return; #ifdef PARALLEL if (_p_host) { #endif /* PARALLEL */ enqueue_request(GAUGE_REQUEST_RESET, _p_my_id, ""); host_handle_requests(); #ifdef PARALLEL } else { cell_t *msg_buf; msg_buf = _p_alloc_msg_buffer(1); *((int_t *)(msg_buf)) = GAUGE_REQUEST_RESET; #ifdef DEBUG if (ParDebug(5)) { fprintf(_p_stdout, "(%lu,%lu) profile_reset(): Sending request to host\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); fflush(_p_stdout); } #endif /* DEBUG */ _p_msg_send(msg_buf, _p_host_id, 1, MSG_GAUGE); #ifdef DEBUG if (ParDebug(7)) { fprintf(_p_stdout, "(%lu,%lu) profile_reset(): Sent\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); fflush(_p_stdout); } #endif /* DEBUG */ node_handle_requests(); } #endif /* PARALLEL */ #endif /* GAUGE */ } /* profile_reset() */ /* * profile_snapshot() * * This is the entry point from user code for invoking a profile snapshot * on all nodes. * * If I'm a node, send a request to the host, and then * handle all gauge requests up to and including my request. * * If I'm the host, then go ahead and handle this request. */ void profile_snapshot(snapshot_name) char_t *snapshot_name; { #ifdef GAUGE if (!_p_gauge) return; #ifdef PARALLEL if (_p_host) { #endif /* PARALLEL */ enqueue_request(GAUGE_REQUEST_SNAPSHOT, _p_my_id, snapshot_name); host_handle_requests(); #ifdef PARALLEL } else { int_t msg_buf_size; cell_t *msg_buf; int_t snapshot_len; char_t *s; snapshot_len = MIN(strlen(snapshot_name), MAX_SNAPSHOT_NAME_LENGTH); msg_buf_size = 1 + StringSizeToCells(snapshot_len + 1); msg_buf = _p_alloc_msg_buffer(msg_buf_size); *((int_t *)(msg_buf)) = GAUGE_REQUEST_SNAPSHOT; /* Truncate the snapshot name if need be */ s = (char_t *) (msg_buf + 1); strncpy(s, snapshot_name, snapshot_len); *(s + snapshot_len) = '\0'; #ifdef DEBUG if (ParDebug(5)) { fprintf(_p_stdout, "(%lu,%lu) profile_snapshot(): Sending request to host\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); fflush(_p_stdout); } #endif /* DEBUG */ _p_msg_send(msg_buf, _p_host_id, msg_buf_size, MSG_GAUGE); #ifdef DEBUG if (ParDebug(7)) { fprintf(_p_stdout, "(%lu,%lu) profile_snapshot(): Sent\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); fflush(_p_stdout); } #endif /* DEBUG */ node_handle_requests(); } #endif /* PARALLEL */ #endif /* GAUGE */ } /* profile_snapshot() */
These are the contents of the former NiCE NeXT User Group NeXTSTEP/OpenStep software archive, currently hosted by Netfuture.ch.