This is gauge.c in view mode; [Download] [Up]
/*
* PCN Abstract Machine Emulator
* Authors: Steve Tuecke and Ian Foster
* Argonne National Laboratory
*
* Please see the DISCLAIMER file in the top level directory of the
* distribution regarding the provisions under which this software
* is distributed.
*
* gauge.c - Gauge support routines.
*/
#include "pcn.h"
#ifdef GAUGE
#if !defined(s2010) && ((defined(sun) && !defined(os3)) || defined(next040) || defined(symmetry) || defined(rs6000))
#include <sys/param.h>
#endif
#ifndef MAXHOSTNAMELEN
/* max length of the host name */
#define MAXHOSTNAMELEN 31
#endif
#ifndef DEFAULT_GAUGE_FILE
/* file name for gauge output file */
#define DEFAULT_GAUGE_FILE "profile.cnt"
#endif
struct gauge_stats _p_gauge_stats = {{0,0},{0,0},{0,0},0,0,0,0,0,0,0};
static int_t snapshots_taken;
static char_t hostname[MAXHOSTNAMELEN+1];
static int_t pid;
static FILE * gauge_tmp_fp;
static gauge_timer start_time;
#define MAX_SNAPSHOT_NAME_LENGTH 63
#define MAX_GAUGE_MSG_SIZE (((MAX_SNAPSHOT_NAME_LENGTH+1) / CELL_SIZE) + 16)
static cell_t gauge_receive_buf[MAX_GAUGE_MSG_SIZE];
/*
* The gauge_request list is used to queue requests for
* gauge actions (snapshots, etc), while handling other requests.
*/
#define GAUGE_REQUEST_SNAPSHOT 1
#define GAUGE_REQUEST_RESET 2
#define GAUGE_REQUEST_RETURN_PROFILES 3
#define GAUGE_COMPLETED_REQUEST 4
typedef struct gauge_request_struct
{
struct gauge_request_struct *next;
int_t type;
int_t from_node;
char_t snapshot_name[MAX_SNAPSHOT_NAME_LENGTH+1];
} gauge_request_t;
static gauge_request_t *first_request = (gauge_request_t *) NULL;
static gauge_request_t *last_request = (gauge_request_t *) NULL;
static void enqueue_request();
#ifdef encore
unsigned *_p_encore_timer;
#endif
#if defined(PCN_USE_BSD_TIME) || defined(symmetry)
unsigned long _p_unix_base_time;
#endif
#if !defined(s2010) && !defined(ipsc860) && !defined(delta)
extern int gethostname();
#else
#define USE_HOSTNAME
#define HOSTNAME PCN_ARCH
#endif
static void enqueue_request();
static void host_handle_requests();
static void host_do_snapshot();
static void host_do_reset();
#ifdef PARALLEL
static void wait_for_completed();
static void process_and_enqueue_gauge_msg();
static void send_completed_request();
static void node_handle_requests();
#endif /* PARALLEL */
static void do_reset();
static void do_snapshot();
static void timer_secs();
static void get_times();
static void print_statistics();
/*
* _p_init_gauge_tmp_file()
*
* Initialize the _p_gauge_tmp_file variable on the host so that it
* can be distributed to the nodes.
*
* This file (with the node number concatenated) will be used to
* hold the temporary snapshot files that each node will use
* to hold the shapshots until they can be collected at the end
* of the run.
*/
void _p_init_gauge_tmp_file()
{
if (_p_gauge_file[0] != '\0')
_p_gauge = TRUE;
if (_p_host &&_p_gauge)
{
#ifndef GAUGE_PRIMITIVES_DEFINED
fprintf(_p_stdout, "Gauge's timing primities are not available\n");
fflush(_p_stdout);
_p_gauge = FALSE;
#else /* GAUGE_PRIMITIVES_DEFINED */
#ifdef USE_HOSTNAME
strcpy(hostname, HOSTNAME);
#else
gethostname(hostname, MAXHOSTNAMELEN);
#endif
pid = (int_t) getpid();
#if defined(ipsc860) || defined(delta)
/* File name lengths are a problem */
sprintf(_p_gauge_tmp_file, "%ld_cnt", (long) pid);
#else
sprintf(_p_gauge_tmp_file, "pcncnt_%s_%ld", hostname, (long) pid);
#endif
#endif /* GAUGE_PRIMITIVES_DEFINED */
}
else
{
_p_gauge_tmp_file[0] = '\0';
}
} /* _p_init_gauge_tmp_file() */
void _p_init_gauge()
{
if (_p_gauge)
{
char tmp_filename[MAX_PATH_LENGTH];
#ifdef symmetry
/* initialize the microsecond clock */
struct timeval tval;
struct timezone zone;
usclk_init();
gettimeofday(&tval,&zone);
_p_unix_base_time = tval.tv_sec;
#endif /* symmetry */
#ifdef encore
_p_encore_timer = mapfrcounter();
#endif /* encore */
#ifdef PCN_USE_BSD_TIME
struct timeval tval;
struct timezone zone;
gettimeofday(&tval,&zone);
_p_unix_base_time = tval.tv_sec;
#endif /* PCN_USE_BSD_TIME */
TIMER(start_time);
snapshots_taken = 0;
if (_p_tmp_dir[0] == '\0')
strcpy(_p_tmp_dir, PCN_TMP_DIR);
strcpy(tmp_filename, _p_gauge_tmp_file);
#if defined(ipsc860) || defined(delta)
/* File name lengths are a problem */
sprintf(_p_gauge_tmp_file, "%s/%ld_%s", _p_tmp_dir,
(long) _p_my_id, tmp_filename);
#else
sprintf(_p_gauge_tmp_file, "%s/%s_%ld", _p_tmp_dir, tmp_filename,
(long) _p_my_id);
#endif
if ((gauge_tmp_fp = fopen(_p_gauge_tmp_file, "w+")) == (FILE *) NULL)
{
char_t buf[MAX_PATH_LENGTH + 128];
sprintf(buf,
"_p_init_gauge(): Failed to open Gauge temporary file for writing: %s",
_p_gauge_tmp_file);
_p_fatal_error(buf);
}
if (_p_host)
{
if (_p_gauge_file[0] == '\0')
{
strcpy(_p_gauge_file, DEFAULT_GAUGE_FILE);
}
else
{
/* Make sure it has a .cnt suffix. If not, then add one. */
int_t i = strlen(_p_gauge_file);
if (i < 4 || strcmp(_p_gauge_file + i - 4, ".cnt") != 0)
{
strcat(_p_gauge_file, ".cnt");
}
}
}
#ifdef USE_HOSTNAME
strcpy(hostname, HOSTNAME);
#else
gethostname(hostname, MAXHOSTNAMELEN);
#endif
}
} /* _p_init_gauge() */
/*
* enqueue_request()
*
* Allocate room for a gauge_request and enqueue it on the end of the
* request list.
*/
static void enqueue_request(request_type, from_node, snapshot_name)
int_t request_type;
int_t from_node;
char_t *snapshot_name;
{
gauge_request_t *new_request;
char_t *s;
new_request = (gauge_request_t *) malloc (sizeof(gauge_request_t));
if (new_request == (gauge_request_t *) NULL)
_p_malloc_error();
new_request->next = (gauge_request_t *) NULL;
new_request->type = request_type;
new_request->from_node = from_node;
/* Truncate the snapshot name as necessary */
strncpy(new_request->snapshot_name, snapshot_name,
MAX_SNAPSHOT_NAME_LENGTH);
new_request->snapshot_name[MAX_SNAPSHOT_NAME_LENGTH] = '\0';
if (first_request == (gauge_request_t *) NULL)
{
first_request = last_request = new_request;
}
else
{
last_request->next = new_request;
last_request = new_request;
}
} /* enqueue_request() */
/*
* host_handle_requests()
*
* Loop through the request queue until it is empty, handling all
* gauge requests. If new requests come in in the mean time,
* queue them up so that they also get handled before exiting this
* procedure.
*/
static void host_handle_requests()
{
bool_t got_initiate_exit = FALSE;
int_t exit_code;
gauge_request_t *request;
while (first_request != (gauge_request_t *) NULL && !got_initiate_exit)
{
request = first_request;
first_request = first_request->next;
switch (request->type)
{
case GAUGE_REQUEST_SNAPSHOT:
host_do_snapshot(request, &got_initiate_exit, &exit_code);
break;
case GAUGE_REQUEST_RESET:
host_do_reset(request, &got_initiate_exit, &exit_code);
break;
default:
_p_fatal_error("host_handle_requests(): Illegal request type");
break;
}
free(request);
}
if (got_initiate_exit)
{
_p_do_exit(exit_code);
}
} /* host_handle_requests() */
/*
* host_do_snapshot()
*
* Cause a snapshot on all nodes.
*/
static void host_do_snapshot(request, got_initiate_exit, exit_code)
gauge_request_t *request;
bool_t *got_initiate_exit;
int_t *exit_code;
{
int_t snapshot_num;
char_t *snapshot_name = request->snapshot_name;
#ifdef PARALLEL
int_t node;
int_t msg_buf_size;
cell_t *msg_buf;
int_t from_node = request->from_node;
cell_t *save_heap_ptr = _p_heap_ptr;
cell_t *save_heap_hard_top = _p_heap_hard_top;
#endif /* PARALLEL */
snapshot_num = snapshots_taken;
#ifdef PARALLEL
_p_heap_ptr = gauge_receive_buf;
_p_heap_hard_top = _p_heap_ptr + MAX_GAUGE_MSG_SIZE;
msg_buf_size = 3 + StringSizeToCells(strlen(snapshot_name) + 1);
/*
* Send out a request for a snapshot to each node.
*/
for (node = 1; node < _p_nodes; node++)
{
msg_buf = _p_alloc_msg_buffer(msg_buf_size);
*((int_t *)(msg_buf)) = GAUGE_REQUEST_SNAPSHOT;
*((int_t *)(msg_buf + 1)) = from_node;
*((int_t *)(msg_buf + 2)) = snapshot_num;
strcpy((char *) (msg_buf + 3), snapshot_name);
#ifdef DEBUG
if (ParDebug(5))
{
fprintf(_p_stdout,
"(%lu,%lu) host_do_snapshot(): Sending snapshot request to %lu\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(unsigned long) node);
fflush(_p_stdout);
}
#endif /* DEBUG */
_p_msg_send(msg_buf, node, msg_buf_size, MSG_GAUGE);
#ifdef DEBUG
if (ParDebug(7))
{
fprintf(_p_stdout, "(%lu,%lu) host_do_snapshot(): Sent\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction);
fflush(_p_stdout);
}
#endif /* DEBUG */
}
#endif /* PARALLEL */
do_snapshot(snapshot_num, snapshot_name);
#ifdef PARALLEL
wait_for_completed(got_initiate_exit, exit_code);
_p_heap_ptr = save_heap_ptr;
_p_heap_hard_top = save_heap_hard_top;
#endif /* PARALLEL */
} /* host_do_snapshot() */
/*
* host_do_reset()
*
* Cause a snapshot on all nodes.
*/
static void host_do_reset(request, got_initiate_exit, exit_code)
gauge_request_t *request;
bool_t *got_initiate_exit;
int_t *exit_code;
{
#ifdef PARALLEL
int_t node;
cell_t *msg_buf;
int_t from_node = request->from_node;
cell_t *save_heap_ptr = _p_heap_ptr;
cell_t *save_heap_hard_top = _p_heap_hard_top;
#endif /* PARALLEL */
#ifdef PARALLEL
_p_heap_ptr = gauge_receive_buf;
_p_heap_hard_top = _p_heap_ptr + MAX_GAUGE_MSG_SIZE;
/*
* Send out a request for a reset to each node.
*/
for (node = 1; node < _p_nodes; node++)
{
msg_buf = _p_alloc_msg_buffer(2);
*((int_t *)(msg_buf)) = GAUGE_REQUEST_RESET;
*((int_t *)(msg_buf + 1)) = from_node;
#ifdef DEBUG
if (ParDebug(5))
{
fprintf(_p_stdout,
"(%lu,%lu) host_do_reset(): Sending reset request to %lu\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction,
(unsigned long) node);
fflush(_p_stdout);
}
#endif /* DEBUG */
_p_msg_send(msg_buf, node, 2, MSG_GAUGE);
#ifdef DEBUG
if (ParDebug(7))
{
fprintf(_p_stdout, "(%lu,%lu) host_do_reset(): Sent\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction);
fflush(_p_stdout);
}
#endif /* DEBUG */
}
#endif /* PARALLEL */
do_reset();
#ifdef PARALLEL
wait_for_completed(got_initiate_exit, exit_code);
_p_heap_ptr = save_heap_ptr;
_p_heap_hard_top = save_heap_hard_top;
#endif /* PARALLEL */
} /* host_do_reset() */
#ifdef PARALLEL
/*
* wait_for_completed()
*
* Wait for a GAUGE_COMPLETED_REQUEST message from _p_nodes-1 nodes.
* Enqueue any other gauge requests that we get in the mean time.
* If we get an MSG_INITIATE_EXIT, then set *got_initiate_exit to TRUE
* and set *exit_code to the exit code.
*/
static void wait_for_completed(got_initiate_exit, exit_code)
bool_t *got_initiate_exit;
int_t *exit_code;
{
int_t i;
int_t msg_type, msg_size, msg_node;
i = 1;
while (i < _p_nodes)
{
_p_msg_receive(&msg_node, &msg_size, &msg_type, RCV_GAUGE);
switch(msg_type)
{
case MSG_GAUGE:
if (*((int_t *)(_p_heap_ptr)) == GAUGE_COMPLETED_REQUEST)
i++;
else
process_and_enqueue_gauge_msg(msg_size, msg_node);
break;
case MSG_INITIATE_EXIT:
if (!got_initiate_exit)
{
*got_initiate_exit = TRUE;
*exit_code = *((int_t *)(_p_heap_ptr));
}
break;
default:
_p_fatal_error("wait_for_completed(): Received unexpected message type");
break;
}
}
} /* wait_for_completed() */
#endif /* PARALLEL */
#ifdef PARALLEL
/*
* process_and_enqueue_gauge_msg()
*
* Figure out what kind of request the gauge message at _p_heap_ptr is
* and enqueue it.
*/
static void process_and_enqueue_gauge_msg(size, node)
int_t size;
int_t node;
{
switch (*((int_t *)(_p_heap_ptr)))
{
case GAUGE_REQUEST_SNAPSHOT:
enqueue_request(GAUGE_REQUEST_SNAPSHOT, node,
(char_t *) (_p_heap_ptr + 1));
break;
case GAUGE_REQUEST_RESET:
enqueue_request(GAUGE_REQUEST_RESET, node, "");
break;
default:
_p_fatal_error("process_and_enqueue_gauge_msg(): Invalid gauge message type");
break;
}
} /* process_and_enqueue_gauge_msg() */
#endif /* PARALLEL */
#ifdef PARALLEL
/*
* _p_process_gauge_msg()
*
* Process the MSG_GAUGE message that is at _p_heap_ptr. This
* is called by parallel.c:_p_process_messages().
*
* If I'm the host, then put the message on the gauge_request queue
* and then call the queue handler.
*
* If I'm a node, then handle the request.
*/
void _p_process_gauge_msg(size, node)
int_t size;
int_t node;
{
if (_p_host)
{
process_and_enqueue_gauge_msg();
host_handle_requests();
}
else
{
switch (*((int_t *)(_p_heap_ptr)))
{
case GAUGE_REQUEST_SNAPSHOT:
do_snapshot(*((int_t *)(_p_heap_ptr + 2)),
(char_t *) (_p_heap_ptr + 3));
break;
case GAUGE_REQUEST_RESET:
do_reset();
break;
default:
_p_fatal_error("process_and_enqueue_gauge_msg(): Invalid gauge message type");
break;
}
send_completed_request();
}
} /* _p_process_gauge_msg() */
#endif /* PARALLEL */
#ifdef PARALLEL
/*
* send_completed_request()
*
* Send a GAUGE_COMPLETED_REQUEST message to the host.
*/
static void send_completed_request()
{
cell_t *msg_buf;
msg_buf = _p_alloc_msg_buffer(1);
*((int_t *)(msg_buf)) = GAUGE_COMPLETED_REQUEST;
#ifdef DEBUG
if (ParDebug(5))
{
fprintf(_p_stdout,
"(%lu,%lu) send_completed_request(): Sending to host\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction);
fflush(_p_stdout);
}
#endif /* DEBUG */
_p_msg_send(msg_buf, _p_host_id, 1, MSG_GAUGE);
#ifdef DEBUG
if (ParDebug(7))
{
fprintf(_p_stdout, "(%lu,%lu) send_completed_request(): Sent\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction);
fflush(_p_stdout);
}
#endif /* DEBUG */
} /* send_completed_request() */
#endif /* PARALLEL */
#ifdef PARALLEL
/*
* node_handle_requests()
*
* Handle all gauge requests that come to my node, until we
* receive a request that originated at my node. Since a node
* can only have one outstanding gauge request at a time,
* we know we've handled the request that caused this procedure
* to get called if we handle a request that originated from this node.
*/
static void node_handle_requests()
{
bool_t done = FALSE;
cell_t *save_heap_ptr = _p_heap_ptr;
cell_t *save_heap_hard_top = _p_heap_hard_top;
int_t msg_type, msg_size, msg_node;
bool_t got_exit = FALSE;
int_t exit_code;
_p_heap_ptr = gauge_receive_buf;
_p_heap_hard_top = _p_heap_ptr + MAX_GAUGE_MSG_SIZE;
while (!done && !got_exit)
{
_p_msg_receive(&msg_node, &msg_size, &msg_type, RCV_GAUGE);
switch (msg_type)
{
case MSG_GAUGE:
switch (*((int_t *)(_p_heap_ptr)))
{
case GAUGE_REQUEST_SNAPSHOT:
do_snapshot(*((int_t *)(_p_heap_ptr + 2)),
(char_t *) (_p_heap_ptr + 3));
break;
case GAUGE_REQUEST_RESET:
do_reset();
break;
default:
_p_fatal_error("node_handle_requests(): Illegal request type");
break;
}
send_completed_request();
if (*((int_t *)(_p_heap_ptr + 1)) == _p_my_id)
done = TRUE;
break;
case MSG_EXIT:
got_exit = TRUE;
exit_code = *((int_t *)(_p_heap_ptr));
break;
default:
_p_fatal_error("node_handle_requests(): Illegal message type");
break;
}
}
_p_heap_ptr = save_heap_ptr;
_p_heap_hard_top = save_heap_hard_top;
if (got_exit)
{
_p_node_handle_exit(exit_code, FALSE);
}
} /* node_handle_requests() */
#endif /* PARALLEL */
/*
* do_reset()
*
* Reset all counters and timers to 0.
*/
static void do_reset()
{
int_t i;
proc_header_t *proc_header;
for (i = 0; i <= _p_exported_table_size; i++)
{
for (proc_header = _p_exported_table[i];
proc_header != (proc_header_t *) NULL;
proc_header = proc_header->next)
{
ZeroOutMemory(proc_header->counters,
(proc_header->n_counters * sizeof(int_t)));
ZeroOutMemory(proc_header->timers,
(proc_header->n_timers * sizeof(int_t) * 2));
}
}
#if defined(PCN_USE_BSD_TIME) || defined(symmetry)
{
struct timeval tval;
struct timezone zone;
gettimeofday(&tval,&zone);
_p_unix_base_time = tval.tv_sec;
}
#endif /* PCN_USE_BSD_TIME */
ZeroOutMemory(&_p_gauge_stats, sizeof(_p_gauge_stats));
} /* do_reset() */
/*
* do_snapshot()
*
* Do a snapshot into my temporary file.
*/
static void do_snapshot(snapshot_num, snapshot_name)
int_t snapshot_num;
char_t *snapshot_name;
{
int_t i;
proc_header_t *proc_header;
int_t n_counters, n_timers;
gauge_timer end_time;
if (snapshot_num != snapshots_taken)
{
_p_fatal_error("do_snapshot(): snapshot_num != snapshots_taken");
}
TIMER(end_time);
timer_sub(start_time, end_time, _p_gauge_stats.run_time);
for (i = 0; i <= _p_exported_table_size; i++)
{
for (proc_header = _p_exported_table[i];
proc_header != (proc_header_t *) NULL;
proc_header = proc_header->next)
{
if (proc_header->model != (char_t **) NULL)
{
n_counters = proc_header->n_counters;
n_timers = proc_header->n_timers;
if (fprintf(gauge_tmp_fp, "K %s %ld %ld %s:%s/%ld %ld ",
(char *) snapshot_name, (long) snapshot_num,
(long) _p_my_id,
(char *) proc_header->module_name,
(char *) proc_header->proc_name,
(long) proc_header->arity,
(long) n_counters) == EOF)
{
_p_fatal_error("do_snapshot(): Failed to write to temporary Gauge profile file");
}
if (fwrite((char *) proc_header->counters, CELL_SIZE,
n_counters, gauge_tmp_fp) != n_counters)
{
_p_fatal_error("do_snapshot(): Failed to write to temporary Gauge profile file");
}
if (fprintf(gauge_tmp_fp, "\nT %s %ld %ld %s:%s/%ld %ld ",
(char *) snapshot_name, (long) snapshot_num,
(long) _p_my_id,
(char *) proc_header->module_name,
(char *) proc_header->proc_name,
(long) proc_header->arity,
(long) n_timers) == EOF)
{
_p_fatal_error("do_snapshot(): Failed to write to temporary Gauge profile file");
}
if (fwrite((char *) proc_header->timers, CELL_SIZE * 2,
n_timers, gauge_tmp_fp) != n_timers)
{
_p_fatal_error("do_snapshot(): Failed to write to temporary Gauge profile file");
}
if (fprintf(gauge_tmp_fp, "\n") == EOF)
{
_p_fatal_error("do_snapshot(): Failed to write to temporary Gauge profile file");
}
}
}
}
if (fprintf(gauge_tmp_fp,
"P %s %ld %lu %lu %lu %lu %lu %lu %lu %lu %lu %lu %lu %lu\n",
(char *) snapshot_name,
(long) snapshot_num,
(unsigned long) _p_my_id,
(unsigned long) LOW_WORD(_p_gauge_stats.run_time),
(unsigned long) HI_WORD(_p_gauge_stats.run_time),
(unsigned long) LOW_WORD(_p_gauge_stats.gc_time),
(unsigned long) HI_WORD(_p_gauge_stats.gc_time),
(unsigned long) _p_gauge_stats.gc_calls,
(unsigned long) _p_gauge_stats.ssmallmsgs,
(unsigned long) _p_gauge_stats.sbigmsgs,
(unsigned long) _p_gauge_stats.sbigmsgslen,
(unsigned long) _p_gauge_stats.ssmallmsgs,
(unsigned long) _p_gauge_stats.sbigmsgs,
(unsigned long) _p_gauge_stats.sbigmsgslen) == EOF)
{
_p_fatal_error("do_snapshot(): Failed to write to temporary Gauge profile file");
}
snapshots_taken++;
} /* do_snapshot() */
#define OUTPUT_ERROR() \
_p_fatal_error("Failed to write to Gauge profile file");
/*
* _p_host_final_profile()
*
* Take a final profile snapshot, and coalesce the node profiles
*/
void _p_host_final_profile()
{
FILE *cnt_fp;
char_t lasthost[MAXHOSTNAMELEN+1];
int_t lastnode, i, j, n_hostnames, buf_size;
long host_count_position, current_position;
proc_header_t *proc_header;
bool_t done;
u_int_t sec, usec;
char_t **m;
#ifdef PARALLEL
int_t msg_type, msg_size, msg_node;
#endif /* PARALLEL */
if (!_p_gauge)
return;
do_snapshot(snapshots_taken, "end");
timer_secs(&(_p_gauge_stats.run_time) ,&sec, &usec);
fprintf(_p_stdout, "Program completed in %lu.%lu seconds.\n",
(unsigned long) sec, (unsigned long) usec);
fprintf(_p_stdout, "Writing Gauge profile file: %s\n", _p_gauge_file);
fflush(_p_stdout);
if ((cnt_fp = fopen(_p_gauge_file, "w+")) == (FILE *) NULL)
{
char_t buf[MAX_PATH_LENGTH + 128];
sprintf(buf,
"_p_host_final_profile(): Failed to open Gauge profile file for writing: %s",
_p_gauge_file);
_p_fatal_error(buf);
}
/*
* Write out the header informaton of the .cnt file
*/
if (fprintf(cnt_fp, "S profile %s\n", _p_gauge_file) == EOF)
OUTPUT_ERROR();
if (fprintf(cnt_fp, "V 0005\n") == EOF)
OUTPUT_ERROR();
if (fprintf(cnt_fp, "N %ld %ld %ld ", (long) _p_nodes,
(long) 0, (long) _p_nodes - 1) == EOF)
OUTPUT_ERROR();
/*
* Save the position and make space for the count of the
* number of different hosts.
*/
host_count_position = ftell(cnt_fp);
if (fprintf(cnt_fp, " ") == EOF)
OUTPUT_ERROR();
/*
* Get the hostname from each node, and coalesce adjacent
* names into ranges if they are the same.
*/
strcpy(lasthost, hostname);
lastnode = 0;
n_hostnames = 0;
i = 1;
#ifdef PARALLEL
while (i < _p_nodes)
{
/*
* Send empty MSG_GAUGE message to node i, to prompt him to send
* his hostname back.
*/
_p_msg_send((cell_t *) NULL, i, 0, MSG_GAUGE);
_p_msg_receive(&msg_node, &msg_size, &msg_type, RCV_GAUGE);
if (msg_type != MSG_GAUGE)
_p_fatal_error("_p_host_final_profile(): Expected a MSG_GAUGE message");
if (strcmp(lasthost, (char_t *) _p_heap_ptr) != 0)
{
if (fprintf(cnt_fp, " %s %ld %ld",
lasthost, (long) lastnode, (long) i - 1) == EOF)
OUTPUT_ERROR();
lastnode = i;
strcpy(lasthost, (char_t *) _p_heap_ptr);
n_hostnames++;
}
i++;
}
#endif /* PARALLEL */
if (fprintf(cnt_fp, " %s %ld %ld",
lasthost, (long) lastnode, (long) i - 1) == EOF)
OUTPUT_ERROR();
n_hostnames++;
/*
* Go back and fill in the number of distinct hostnames that were
* written
*/
current_position = ftell(cnt_fp);
fseek(cnt_fp, host_count_position, 0);
if (fprintf(cnt_fp, "%ld", n_hostnames) == EOF)
OUTPUT_ERROR();
fseek(cnt_fp, current_position, 0);
/* Finish off the N line */
if (fprintf(cnt_fp, "\n") == EOF)
OUTPUT_ERROR();
/*
* Now dump out the model information for each procedure
*/
for (i = 0; i <= _p_exported_table_size; i++)
{
for (proc_header = _p_exported_table[i];
proc_header != (proc_header_t *) NULL;
proc_header = proc_header->next)
{
if (proc_header->model != (char_t **) NULL)
{
for (m = proc_header->model, j = 0;
m[j] != (char_t *) NULL;
j++)
{
if (fprintf(cnt_fp, "%s", m[j]) == EOF)
OUTPUT_ERROR();
}
}
}
}
/*
* Write out the profile header
*/
if (fprintf(cnt_fp, "S counter %lu %lu %ld %ld %ld []\n",
(unsigned long) LOW_WORD(_p_gauge_stats.run_time),
(unsigned long) HI_WORD(_p_gauge_stats.run_time),
(long) 0, (long) _p_nodes - 1, (long) snapshots_taken) == EOF)
OUTPUT_ERROR();
/*
* Now dump the host's profile data
*/
fseek(gauge_tmp_fp, (long) 0, 0);
i = MIN(8192, ((char_t *) _p_heap_real_top) - ((char_t *) _p_heap_ptr));
while ((buf_size = fread((char *) _p_heap_ptr, 1, i, gauge_tmp_fp)) != 0)
{
if (fwrite((char *) _p_heap_ptr, 1, buf_size, cnt_fp) != buf_size)
OUTPUT_ERROR();
}
fclose(gauge_tmp_fp);
remove(_p_gauge_tmp_file);
#ifdef PARALLEL
/*
* Send empty MSG_GAUGE message to node i, to prompt him to send
* his profile information.
*/
for (i = 1; i < _p_nodes; i++)
{
_p_msg_send((cell_t *) NULL, i, 0, MSG_GAUGE);
done = FALSE;
while (!done)
{
_p_msg_receive(&msg_node, &msg_size, &msg_type, RCV_GAUGE);
if (msg_type != MSG_GAUGE)
_p_fatal_error("_p_host_final_profile(): Expected a MSG_GAUGE message");
if (msg_size == 0)
{
done = TRUE;
}
else
{
buf_size = *((int_t *)(_p_heap_ptr));
if (fwrite((char *) (_p_heap_ptr + 1), buf_size, 1,
cnt_fp) != 1)
OUTPUT_ERROR();
}
}
}
#endif /* PARALLEL */
/*
* And some trailers...
*/
if (fprintf(cnt_fp, "E counter\n") == EOF)
OUTPUT_ERROR();
if (fprintf(cnt_fp, "E profile %s\n", _p_gauge_file) == EOF)
OUTPUT_ERROR();
fclose(cnt_fp);
fprintf(_p_stdout, "Done writing: %s\n", _p_gauge_file);
fflush(_p_stdout);
} /* _p_host_final_profile() */
#ifdef PARALLEL
/*
* _p_node_final_profile()
*
* Take a final profile snapshot, and coalesce the node profiles
*/
void _p_node_final_profile()
{
int_t msg_type, msg_size, msg_node;
cell_t *msg_buf;
int_t msg_buf_size;
bool_t done;
int_t buf_size;
if (!_p_gauge)
return;
do_snapshot(snapshots_taken, "end");
/*
* Wait for an empty MSG_GAUGE message, prompting me for my hostname
*/
_p_msg_receive(&msg_node, &msg_size, &msg_type, RCV_GAUGE);
if (msg_type != MSG_GAUGE)
_p_fatal_error("_p_node_final_profile(): Expected a MSG_GAUGE message");
msg_buf_size = StringSizeToCells(strlen(hostname) + 1);
msg_buf = _p_alloc_msg_buffer(msg_buf_size);
strcpy((char *) msg_buf, hostname);
_p_msg_send(msg_buf, _p_host_id, msg_buf_size, MSG_GAUGE);
/*
* Now wait for an empty MSG_GAUGE message, prompting me for my
* profile data. Send the profile data back in chunks.
*/
_p_msg_receive(&msg_node, &msg_size, &msg_type, RCV_GAUGE);
if (msg_type != MSG_GAUGE)
_p_fatal_error("_p_node_final_profile(): Expected a MSG_GAUGE message");
fseek(gauge_tmp_fp, (long) 0, 0);
done = FALSE;
while (!done)
{
msg_buf = _p_alloc_msg_buffer(_p_default_msg_buffer_size);
buf_size = fread((char *) (msg_buf + 1), 1,
(_p_default_msg_buffer_size - 1) * CELL_SIZE,
gauge_tmp_fp);
if (buf_size == 0)
{
_p_msg_send(msg_buf, _p_host_id, 0, MSG_GAUGE);
done = TRUE;
}
else
{
*((int_t *) msg_buf) = buf_size;
_p_msg_send(msg_buf, _p_host_id, _p_default_msg_buffer_size,
MSG_GAUGE);
}
}
fclose(gauge_tmp_fp);
remove(_p_gauge_tmp_file);
} /* _p_node_final_profile() */
#endif /* PARALLEL */
/*
* timer_secs()
*
* Convert a timer value into seconds and micro-seconds.
*/
static void timer_secs(timer, secs, usecs)
gauge_timer *timer;
u_int_t *secs, *usecs;
{
double scale, tmp;
unsigned long utmp = 0xFFFFFFFF;
scale = (((double) utmp) + 1) / TICKS_PER_SEC;
*secs = (u_int_t) ((((double) HI_WORD(*timer)) * scale)
+ (((double) LOW_WORD(*timer)) / TICKS_PER_SEC));
tmp = (((double) HI_WORD(*timer) * ((double) utmp + 1))
- ((double) *secs * TICKS_PER_SEC));
scale = (1000000.0 / TICKS_PER_SEC);
*usecs = (u_int_t) ((tmp + ((double) LOW_WORD(*timer))) * scale);
} /* timer_secs() */
/*
* get_times()
*
* Convert a timer value into minutes, seconds, and milli-seconds.
*/
static void get_times(stats, min, sec, msec)
struct gauge_stats *stats;
u_int_t *min, *sec, *msec;
{
timer_secs(&(stats->run_time), sec, msec);
*min = (u_int_t) *sec / 60;
*sec %= 60;
*msec /= 1000;
} /* get_times() */
/*
* print_statistics()
*
* Print out the values in the '_p_gauge_stats' structure.
*/
static void print_statistics()
{
u_int_t secs, usecs;
timer_secs(&(_p_gauge_stats.run_time), &secs, &usecs);
fprintf(_p_stdout, " Run time: %lu.%lu seconds\n",
(unsigned long) secs, (unsigned long) usecs);
timer_secs(&(_p_gauge_stats.idle_time), &secs, &usecs);
fprintf(_p_stdout, " Idle time: %lu.%lu seconds\n",
(unsigned long) secs, (unsigned long) usecs);
fprintf(_p_stdout, " GC calls %lu\n",
(unsigned long) _p_gauge_stats.gc_calls);
timer_secs(&(_p_gauge_stats.gc_time), &secs, &usecs);
fprintf(_p_stdout, " GC Time %lu.%lu\n",
(unsigned long) secs, (unsigned long) usecs);
fprintf(_p_stdout, " Messages Sent: %lu %lu/%lu\n",
(unsigned long) _p_gauge_stats.ssmallmsgs,
(unsigned long) _p_gauge_stats.sbigmsgs,
(unsigned long) _p_gauge_stats.sbigmsgslen);
fprintf(_p_stdout, " Messages Received %lu %lu/%lu\n",
(unsigned long) _p_gauge_stats.rsmallmsgs,
(unsigned long) _p_gauge_stats.rbigmsgs,
(unsigned long) _p_gauge_stats.rbigmsgslen);
fflush(_p_stdout);
} /* print_statistics() */
#endif /* GAUGE */
/*
* profile_reset()
*
* This is the entry point from user code for invoking a profile reset
* on all nodes.
*
* If I'm a node, send a request to the host, and then
* handle all gauge requests up to and including my request.
*
* If I'm the host, then go ahead and handle this request.
*/
void profile_reset()
{
#ifdef GAUGE
if (!_p_gauge)
return;
#ifdef PARALLEL
if (_p_host)
{
#endif /* PARALLEL */
enqueue_request(GAUGE_REQUEST_RESET, _p_my_id, "");
host_handle_requests();
#ifdef PARALLEL
}
else
{
cell_t *msg_buf;
msg_buf = _p_alloc_msg_buffer(1);
*((int_t *)(msg_buf)) = GAUGE_REQUEST_RESET;
#ifdef DEBUG
if (ParDebug(5))
{
fprintf(_p_stdout,
"(%lu,%lu) profile_reset(): Sending request to host\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction);
fflush(_p_stdout);
}
#endif /* DEBUG */
_p_msg_send(msg_buf, _p_host_id, 1, MSG_GAUGE);
#ifdef DEBUG
if (ParDebug(7))
{
fprintf(_p_stdout, "(%lu,%lu) profile_reset(): Sent\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction);
fflush(_p_stdout);
}
#endif /* DEBUG */
node_handle_requests();
}
#endif /* PARALLEL */
#endif /* GAUGE */
} /* profile_reset() */
/*
* profile_snapshot()
*
* This is the entry point from user code for invoking a profile snapshot
* on all nodes.
*
* If I'm a node, send a request to the host, and then
* handle all gauge requests up to and including my request.
*
* If I'm the host, then go ahead and handle this request.
*/
void profile_snapshot(snapshot_name)
char_t *snapshot_name;
{
#ifdef GAUGE
if (!_p_gauge)
return;
#ifdef PARALLEL
if (_p_host)
{
#endif /* PARALLEL */
enqueue_request(GAUGE_REQUEST_SNAPSHOT, _p_my_id, snapshot_name);
host_handle_requests();
#ifdef PARALLEL
}
else
{
int_t msg_buf_size;
cell_t *msg_buf;
int_t snapshot_len;
char_t *s;
snapshot_len = MIN(strlen(snapshot_name),
MAX_SNAPSHOT_NAME_LENGTH);
msg_buf_size = 1 + StringSizeToCells(snapshot_len + 1);
msg_buf = _p_alloc_msg_buffer(msg_buf_size);
*((int_t *)(msg_buf)) = GAUGE_REQUEST_SNAPSHOT;
/* Truncate the snapshot name if need be */
s = (char_t *) (msg_buf + 1);
strncpy(s, snapshot_name, snapshot_len);
*(s + snapshot_len) = '\0';
#ifdef DEBUG
if (ParDebug(5))
{
fprintf(_p_stdout,
"(%lu,%lu) profile_snapshot(): Sending request to host\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction);
fflush(_p_stdout);
}
#endif /* DEBUG */
_p_msg_send(msg_buf, _p_host_id, msg_buf_size, MSG_GAUGE);
#ifdef DEBUG
if (ParDebug(7))
{
fprintf(_p_stdout, "(%lu,%lu) profile_snapshot(): Sent\n",
(unsigned long) _p_my_id, (unsigned long) _p_reduction);
fflush(_p_stdout);
}
#endif /* DEBUG */
node_handle_requests();
}
#endif /* PARALLEL */
#endif /* GAUGE */
} /* profile_snapshot() */
These are the contents of the former NiCE NeXT User Group NeXTSTEP/OpenStep software archive, currently hosted by Netfuture.ch.