This is emulate.c in view mode; [Download] [Up]
/* * PCN Abstract Machine Emulator * Authors: Steve Tuecke and Ian Foster * Argonne National Laboratory * * Please see the DISCLAIMER file in the top level directory of the * distribution regarding the provisions under which this software * is distributed. * * emulate.c - main emulation loop */ #include "pcn.h" static int equal(); static void wait_for_process(); static int_t run(); static int_t run_call_tuple(); static void save_arguments(); static void schedule_process(); #ifdef DEBUG static void test_f_regs(); #endif static cell_t * program_counter; /* Program counter */ static cell_t * failure_label; /* Failure label */ static int_t timeslice_counter; /* Reductions left in timeslice */ static instruction_t * instr; /* Current instruction */ static bool_t queue_check = 0;/* TRUE is global susp queue has */ /* been scheduled but has resulted */ /* in no successful reductions. */ static u_int_t next_gsq_reschedule; /* Next reduction on which to */ /* reschedule the global */ /* suspension queue. */ #ifdef PDB static bool_t gc_on_next_reduction = FALSE; #endif /* Scratch variables */ static proc_record_t *proc_record; static int_t tag1, tag2, size; static u_char_t *charp1, *charp2; static int_t *intp1, *intp2; static u_int_t *uintp1, *uintp2; static double_t *doublep1, *doublep2; static int_t i1, i2; static cell_t *cp1, *cp2, *cp3; static double_t d1, d2; static bool_t need_to_gc; static void (*f)(); #ifdef STREAMS static stream_t *streamp1, *streamp2; #endif /* STREAMS */ #ifdef DEBUG static char *s1; #endif /* DEBUG */ #ifdef GAUGE static gauge_timer start_time, stop_time; #endif /* GAUGE */ void init_em_vars() { #ifdef PDB_HOST if (_pdb_enter_immediately) _pdb_enter(FALSE); #endif /* PDB_HOST */ _p_reduction = 1; next_gsq_reschedule = _p_reduction + _p_gsq_interval; _p_structure_ptr = _p_structure_start_ptr = (cell_t *) NULL; _p_foreign_ptr = _p_f_reg; schedule_process(); } /* init_em_vars() */ /* * _p_emulate() * * Main PCN abstract machine emulation loop */ void _p_emulate() { init_em_vars(); while (1) { instr = (instruction_t *) program_counter; #ifdef DEBUG if (EmDebug(7)) { _p_print_instruction(instr); } #endif /* DEBUG */ switch(instr->I_OPCODE) { case I_FORK: proc_record =_p_alloc_proc_record(instr->I_FORK_ARITY);/*GC_ALERT*/ proc_record->proc = (proc_header_t *) (instr->I_FORK_PROC); #ifdef PDB proc_record->instance = _pdb_get_next_instance(); proc_record->reduction = _p_reduction; proc_record->called_by = _p_current_proc->proc; #endif /* PDB */ _p_structure_start_ptr = (cell_t *) proc_record; _p_structure_ptr = (cell_t *) proc_record->args; EnqueueProcess(proc_record, _p_active_qf, _p_active_qb); PDB_EnqueueProcess(proc_record); program_counter += SIZE_FORK; break; case I_RECURSE: #ifdef GAUGE IncrementCounter(instr->I_RECURSE_COUNTER, 1); #endif /* GAUGE */ #ifdef PDB _p_current_proc->reduction = _p_reduction; #endif /* PDB */ _p_reduction++; need_to_gc = (NeedToGC() #ifdef PDB || gc_on_next_reduction #endif /* PDB */ ); #ifdef PDB_HOST if (_p_reduction == _pdb_reduction_break || (((proc_header_t *) instr->I_RECURSE_PROC)->break_num > 0)) _pdb_breakout = TRUE; #endif /* PDB_HOST */ if (timeslice_counter == 0 || need_to_gc #ifdef PDB_HOST || _pdb_breakout #endif /* PDB_HOST */ ) { #ifdef PDB _p_current_proc->instance = _pdb_get_next_instance(); _p_current_proc->reduction = _p_reduction; _p_current_proc->called_by = _p_current_proc->proc; #endif /* PDB */ _p_current_proc->proc = (proc_header_t *)instr->I_RECURSE_PROC; save_arguments(instr->I_RECURSE_ARITY); /* GC_ALERT */ #ifdef PDB_HOST if (_pdb_breakout && timeslice_counter != 0 && !need_to_gc) { EnqueueProcessAtFront(_p_current_proc, _p_active_qf, _p_active_qb); } else { #endif /* PDB_HOST */ EnqueueProcess(_p_current_proc, _p_active_qf, _p_active_qb); #ifdef PDB_HOST } #endif /* PDB_HOST */ PDB_EnqueueProcess(_p_current_proc); _p_current_proc = (proc_record_t *) NULL; if (need_to_gc) { DoGC(); #ifdef PDB gc_on_next_reduction = FALSE; #endif /* PDB */ } TryMSG(); if (_p_globsusp_qf != (proc_record_t *) NULL && _p_reduction > next_gsq_reschedule) { /* Reschedule global suspension queue */ AppendProcessQueue(_p_active_qf, _p_active_qb, _p_globsusp_qf, _p_globsusp_qb); next_gsq_reschedule = _p_reduction + _p_gsq_interval; } schedule_process(); } else { timeslice_counter--; #ifdef PDB _p_current_proc->instance = _pdb_get_next_instance(); _p_current_proc->reduction = _p_reduction; _p_current_proc->called_by = _p_current_proc->proc; #endif /* PDB */ _p_current_proc->proc = (proc_header_t *)instr->I_RECURSE_PROC; program_counter = (cell_t *) _p_current_proc->proc->code; #ifdef DYNAMIC_PAM_LOADING if (program_counter == (cell_t *) NULL) { char buf[256]; sprintf(buf, "_p_emulate(): i_recurse: Procedure %s:%s() has not been loaded, so I cannot execute it.", _p_current_proc->proc->module_name, _p_current_proc->proc->proc_name); _p_fatal_error(buf); } #endif /* DYNAMIC_PAM_LOADING */ } _p_suspension_var = (cell_t *) NULL; #ifdef DEBUG if (EmDebug(4)) _p_print_proc("Schedule process from recurse: ", _p_current_proc->proc, (_p_em_dl >= 5)); #endif /* DEBUG */ queue_check = 0; break; case I_HALT: #ifdef GAUGE IncrementCounter(instr->I_HALT_COUNTER, 1); #endif /* GAUGE */ _p_free_proc_record(_p_current_proc); _p_current_proc = (proc_record_t *) NULL; if (NeedToGC() #ifdef PDB || gc_on_next_reduction #endif /* PDB */ ) { DoGC(); /* GC_ALERT */ #ifdef PDB gc_on_next_reduction = FALSE; #endif /* PDB */ } TryMSG(); /* GC_ALERT */ _p_reduction++; queue_check = 0; if (_p_globsusp_qf != (proc_record_t *) NULL) { if (_p_reduction > next_gsq_reschedule) { /* Reschedule global suspension queue */ AppendProcessQueue(_p_active_qf, _p_active_qb, _p_globsusp_qf, _p_globsusp_qb); next_gsq_reschedule = _p_reduction + _p_gsq_interval; } else if (_p_active_qf == (proc_record_t *) NULL) { AppendProcessQueue(_p_active_qf, _p_active_qb, _p_globsusp_qf, _p_globsusp_qb); queue_check = 1; } } if (_p_active_qf != (proc_record_t *) NULL) { schedule_process(); } else { #ifdef DEBUG if (EmDebug(4)) { fprintf(_p_stdout, "(%lu,%lu) halt: Process queue is empty.\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); fflush(_p_stdout); } #endif /* DEBUG */ wait_for_process(); /* GC_ALERT */ schedule_process(); } #ifdef DEBUG if (EmDebug(4)) { _p_print_proc("Schedule process from halt: ", _p_current_proc->proc, (_p_em_dl >= 5)); fflush(_p_stdout); } #endif /* DEBUG */ break; case I_DEFAULT: if (_p_suspension_var == (cell_t *) 0) { /* Continue with default case */ program_counter += SIZE_DEFAULT; } else { /* Suspend this process */ #ifdef GAUGE IncrementCounter(instr->I_DEFAULT_COUNTER, 1); #endif /* GAUGE */ save_arguments(instr->I_DEFAULT_ARITY); /* GC_ALERT */ if (_p_suspension_var == (cell_t *) -1) { /* Suspend this process on the global suspension queue */ #ifdef DEBUG if (EmDebug(6)) { fprintf(_p_stdout, "(%lu,%lu) default: Enqueueing global susp: ", (unsigned long) _p_my_id, (unsigned long) _p_reduction); fflush(_p_stdout); _p_print_proc_record("", _p_current_proc); } #endif /* DEBUG */ EnqueueProcess(_p_current_proc, _p_globsusp_qf, _p_globsusp_qb); PDB_EnqueueProcess(_p_current_proc); } else { /* Suspend this process on a variable suspension queue */ #ifdef DEBUG if (EmDebug(6)) { fprintf(_p_stdout, "(%lu,%lu) default: Enqueueing suspension at 0x%lx: ", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) _p_suspension_var); fflush(_p_stdout); _p_print_proc_record("", _p_current_proc); } #endif /* DEBUG */ EnqueueSuspension(_p_current_proc, _p_suspension_var); PDB_EnqueueProcess(_p_current_proc); } _p_current_proc = (proc_record_t *) NULL; TryGC(); /* GC_ALERT */ TryMSG(); /* GC_ALERT */ _p_reduction++; if (_p_active_qf == (proc_record_t *) NULL && _p_globsusp_qf != (proc_record_t *) NULL) { /* * We don't want to spin on the processes in the * global suspension queue. So only allow us * to dequeue _p_globsusp_qf once when * _p_active_qf goes empty and we don't do any * other work. */ if (!queue_check) { AppendProcessQueue(_p_active_qf, _p_active_qb, _p_globsusp_qf, _p_globsusp_qb); queue_check = 1; } } if (_p_active_qf != (proc_record_t *) NULL) { schedule_process(); } else { #ifdef DEBUG if (EmDebug(4)) { fprintf(_p_stdout, "(%lu,%lu) default: Empty process queue. Suspensions remaining.\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); fflush(_p_stdout); } #endif /* DEBUG */ wait_for_process(); /* GC_ALERT */ schedule_process(); } #ifdef DEBUG if (EmDebug(4)) _p_print_proc("Scheduling process from default: ", _p_current_proc->proc, (_p_em_dl >= 5)); #endif /* DEBUG */ } break; case I_TRY: failure_label = instr->I_TRY_LOCATION; program_counter += SIZE_TRY; break; case I_RUN: i1 = run(_p_a_reg[instr->I_RUN_VT_R], _p_a_reg[instr->I_RUN_TARGET_R], _p_a_reg[instr->I_RUN_MOD_R], _p_a_reg[instr->I_RUN_PROC_R], _p_a_reg[instr->I_RUN_SYNC_R], &cp1); /* GC_ALERT */ if (i1 == 1) { Fail(); } else if (i1 == 2) { SuspendOn(cp1); } program_counter += SIZE_RUN; break; case I_BUILD_STATIC: /* * Build a data structure whose size is known at compiler time. * The valid tags it might receive are: * TUPLE_TAG, INT_TAG, STRING_TAG, DOUBLE_TAG, STREAM_TAG */ tag1 = instr->I_BUILD_STATIC_TAG; size = (int_t) instr->I_BUILD_STATIC_SIZE; TryGCWithSize(_p_size_with_trailer(tag1, size)); /* GC_ALERT */ i1 = _p_size_without_trailer(tag1, size); HeaderCell(_p_heap_ptr, tag1, size); _p_a_reg[instr->I_BUILD_STATIC_DEST_R] = _p_heap_ptr; if (tag1 == TUPLE_TAG) { _p_structure_start_ptr = _p_heap_ptr; _p_structure_ptr = _p_heap_ptr + 1; /* * Zero out tuple argument pointers so that if we * gc before they are all filled in with I_PUT_VALUE, * the gc won't mess up. */ ZeroOutMemory((_p_heap_ptr + 1), (i1 - 1) * CELL_SIZE); } #ifdef STREAMS if (tag1 == STREAM_TAG) { /* * Need to initialize the stream structure so * that its state==0 */ ZeroOutMemory((_p_heap_ptr + 1), (i1 - 1) * CELL_SIZE); } #endif _p_heap_ptr += i1; #ifndef PCN_ALIGN_DOUBLES if (tag1 != TUPLE_TAG || size != 0) { /* * 0 arity tuples do not have a trailer tag is we are not * double word aligning things */ #endif TrailerCell(_p_heap_ptr, i1); _p_heap_ptr++; #ifndef PCN_ALIGN_DOUBLES } #endif program_counter += SIZE_BUILD_STATIC; break; case I_BUILD_DYNAMIC: /* * Build a data structure whose size is known at compiler time. * The valid tags it might receive are: * TUPLE_TAG, INT_TAG, STRING_TAG, DOUBLE_TAG, STREAM_TAG */ tag1 = instr->I_BUILD_DYNAMIC_TAG; Dereference((cell_t *), _p_a_reg[instr->I_BUILD_DYNAMIC_SIZE_R], cp1); size = *((int_t *) (cp1 + 1)); if (tag1 == TUPLE_TAG) { TryGCWithSize(TupleSizeWithTrailer(size) + (size * UndefSizeWithTrailer()));/* GC_ALERT */ i1 = _p_size_without_trailer(tag1, size); HeaderCell(_p_heap_ptr, tag1, size); _p_a_reg[instr->I_BUILD_DYNAMIC_DEST_R] = _p_heap_ptr; cp1 = _p_heap_ptr + 1; _p_heap_ptr += i1; #ifndef PCN_ALIGN_DOUBLES if (size != 0) { /* * 0 arity tuples do not have a trailer tag is we are not * double word aligning things */ #endif TrailerCell(_p_heap_ptr, i1); _p_heap_ptr++; #ifndef PCN_ALIGN_DOUBLES } #endif for (i1 = size; i1 > 0; i1--) { BuildUndefNoGC((cell_t), *cp1++); } } else { TryGCWithSize(_p_size_with_trailer(tag1, size)); /* GC_ALERT */ i1 = _p_size_without_trailer(tag1, size); HeaderCell(_p_heap_ptr, tag1, size); _p_a_reg[instr->I_BUILD_DYNAMIC_DEST_R] = _p_heap_ptr; #ifdef STREAMS if (tag1 == STREAM_TAG) { /* * Need to initialize the stream structure so * that its state==0 */ ZeroOutMemory((_p_heap_ptr + 1), (i1 - 1) * CELL_SIZE); } #endif _p_heap_ptr += i1; TrailerCell(_p_heap_ptr, i1); _p_heap_ptr++; } program_counter += SIZE_BUILD_DYNAMIC; break; case I_BUILD_DEF: /* GC_ALERT */ BuildUndef((cell_t *), _p_a_reg[instr->I_BUILD_DEF_DEST_R]); program_counter += SIZE_BUILD_DEF; break; case I_PUT_DATA: /* * The pointer in the put_data instruction points to * a structure that looks like: * * typedef struct static_double_value_struct * { * int_t n_cells; * data_header_t h; * double_t d; * } static_double_value_t; * * Where n_cells is the number of cells consumed by * the header 'h', and the data 'd'. * * Otherwise, his header and data needs to be copied to the heap, * because the header 'h' may not be word aligned (which * is required for tags to be distinguished from * pointers). */ cp1 = (cell_t *) (instr->I_PUT_DATA_PTR); #ifdef CONSTANTS_ALIGNED /* * We are guaranteed that the data_header cell of the * offheap constant is word aligned. Therefore, we just * point to it. */ _p_a_reg[instr->I_PUT_DATA_DEST_R] = (cp1 + 1); #else /* CONSTANTS_ALIGNED */ /* * We are NOT guaranteed that the data_header cell of the * offheap constant is word aligned. Therefore, we must * copy it to the heap. */ i1 = *((int_t *) cp1); #ifdef PCN_ALIGN_DOUBLES i2 = (i1 + 1) % 2; /* Number of pad words needed to align it */ #endif /* PCN_ALIGN_DOUBLES */ #ifdef PCN_ALIGN_DOUBLES TryGCWithSize(i1 + 1 + i2); /* GC_ALERT */ #else /* PCN_ALIGN_DOUBLES */ TryGCWithSize(i1 + 1); /* GC_ALERT */ #endif /* PCN_ALIGN_DOUBLES */ _p_a_reg[instr->I_PUT_DATA_DEST_R] = _p_heap_ptr; memcpy(_p_heap_ptr, (cp1 + 1), (i1 * CELL_SIZE)); #ifdef PCN_ALIGN_DOUBLES if (i2) i1++; #endif /* PCN_ALIGN_DOUBLES */ _p_heap_ptr += i1; TrailerCell(_p_heap_ptr, i1); _p_heap_ptr++; #endif /* CONSTANTS_ALIGNED */ program_counter += SIZE_PUT_DATA; break; case I_PUT_VALUE: *((cell_t **) _p_structure_ptr) = _p_a_reg[instr->I_PUT_VALUE_SRC_R]; _p_structure_ptr++; program_counter += SIZE_PUT_VALUE; break; case I_COPY: _p_a_reg[instr->I_COPY_DEST_R] = _p_a_reg[instr->I_COPY_SRC_R]; program_counter += SIZE_COPY; break; case I_GET_TUPLE: DerefAndCheckSusp((cell_t *), _p_a_reg[instr->I_GET_TUPLE_SRC_R], cp1); if (!IsTuple(cp1)) Fail(); i1 = instr->I_GET_TUPLE_ARITY; if (i1 != ((data_header_t *)cp1)->size) Fail(); _p_a_reg[instr->I_GET_TUPLE_SRC_R] = cp1; /* deref src register */ i2 = instr->I_GET_TUPLE_DEST_R; for (++cp1; i1 > 0; i1--, cp1++) _p_a_reg[i2++] = (cell_t *) (*cp1); program_counter += SIZE_GET_TUPLE; break; case I_EQUAL: DerefAndCheckSusp((cell_t *),_p_a_reg[instr->I_EQUAL_SRC1_R], cp1); DerefAndCheckSusp((cell_t *),_p_a_reg[instr->I_EQUAL_SRC2_R], cp2); _p_a_reg[instr->I_EQUAL_SRC1_R] = cp1; _p_a_reg[instr->I_EQUAL_SRC2_R] = cp2; if ((i1 = equal(cp1, cp2, &cp3)) == 0) { Fail(); } else if (i1 == 1) { SuspendOn(cp3); } else program_counter += SIZE_EQUAL; break; case I_NEQ: DerefAndCheckSusp((cell_t *), _p_a_reg[instr->I_NEQ_SRC1_R], cp1); DerefAndCheckSusp((cell_t *), _p_a_reg[instr->I_NEQ_SRC2_R], cp2); _p_a_reg[instr->I_NEQ_SRC1_R] = cp1; _p_a_reg[instr->I_NEQ_SRC2_R] = cp2; if ((i1 = equal(cp1, cp2, &cp3)) == 2) { Fail(); } else if (i1 == 1) { SuspendOn(cp3); } else program_counter += SIZE_NEQ; break; case I_TYPE: DerefAndCheckSusp((cell_t *), _p_a_reg[instr->I_TYPE_SRC_R], cp1); if (((data_header_t *)cp1)->tag != instr->I_TYPE_TAG) { Fail(); } _p_a_reg[instr->I_TYPE_SRC_R] = cp1; program_counter += SIZE_TYPE; break; case I_LE: Compare(<=); program_counter += SIZE_LE; break; case I_LT: Compare(<); program_counter += SIZE_LT; break; case I_DATA: DerefAndCheckSusp((cell_t *), _p_a_reg[instr->I_DATA_SRC_R], cp1); _p_a_reg[instr->I_DATA_SRC_R] = cp1; program_counter += SIZE_DATA; break; case I_UNKNOWN: Dereference((cell_t *), _p_a_reg[instr->I_UNKNOWN_SRC_R], cp1); _p_a_reg[instr->I_UNKNOWN_SRC_R] = cp1; #ifdef PARALLEL if (IsRref(cp1)) { if (RrefNotRead(cp1)) _p_send_read(cp1); } else #endif /* PARALLEL */ if (!IsUndef(cp1)) { Fail(); } program_counter += SIZE_UNKNOWN; break; case I_DEFINE: Dereference((cell_t *), _p_a_reg[instr->I_DEFINE_TO_R], cp1); #ifdef DEBUG Dereference((cell_t *), _p_a_reg[instr->I_DEFINE_FROM_R], cp2); if (cp1 == cp2) { fprintf(_p_stdout, "(%lu,%lu) Warning in define PAM instruction in %s:%s()\n Creating a circular reference chain which could cause the emulator to hang!\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, _p_current_proc->proc->module_name, _p_current_proc->proc->proc_name); #ifdef PDB_HOST _pdb_breakout = TRUE; #endif /* PDB_HOST */ } #else /* DEBUG */ cp2 = _p_a_reg[instr->I_DEFINE_FROM_R]; #endif /* DEBUG */ if (!_p_define(cp1, cp2, TRUE, FALSE, 0, 0)) _p_bad_define("define", _p_current_proc); program_counter += SIZE_DEFINE; break; case I_GET_ELEMENT: /* Dereference((cell_t *), _p_a_reg[instr->I_GET_ELEMENT_SINDEX_R], cp1); Dereference((cell_t *), _p_a_reg[instr->I_GET_ELEMENT_SRC_R], cp2); */ cp1 = _p_a_reg[instr->I_GET_ELEMENT_SINDEX_R]; cp2 = _p_a_reg[instr->I_GET_ELEMENT_SRC_R]; #ifdef DEBUG if (*(cp1+1) >= ((data_header_t *) cp2)->size || *(cp1+1) < 0) { fprintf(_p_stdout, "(%lu,%lu) Warning: get_element bounds violation (size=%lu (0..%lu), get element #%lu) in %s:%s\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) ((data_header_t *) cp2)->size, (unsigned long) (((data_header_t *) cp2)->size - 1), (unsigned long) *((int_t *)(cp1+1)), _p_current_proc->proc->module_name, _p_current_proc->proc->proc_name); } #endif /* DEBUG */ if (IsTuple(cp2)) { cp3 = cp2 + 1 + *((int_t *) (cp1 + 1)); Dereference((cell_t *), *cp3, cp3); _p_a_reg[instr->I_GET_ELEMENT_DEST_R] = cp3; } else { if (IsInt(cp2)) { cp3 = cp2 + 1 + *((int_t *) (cp1 + 1)); i1 = *((int_t *) cp3); BuildInt((cell_t *), _p_a_reg[instr->I_GET_ELEMENT_DEST_R], i1); } else if (IsDouble(cp2)) { cp3 = cp2 + 1 + (*((int_t *)(cp1 + 1)) * CELLS_PER_DOUBLE); d1 = *((double_t *) cp3); BuildDouble((cell_t *), _p_a_reg[instr->I_GET_ELEMENT_DEST_R], d1); } else if (IsString(cp2)) { charp1 = (u_char_t *) (cp2 + 1); charp1 += *((int_t *) (cp1 + 1)); i1 = (u_int_t) *charp1; BuildInt((cell_t *), _p_a_reg[instr->I_GET_ELEMENT_DEST_R], i1); } #ifdef DEBUG else { fprintf(_p_stdout, "(%lu,%lu) Internal Error: get_element: Illegal tag found in %s:%s\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, _p_current_proc->proc->module_name, _p_current_proc->proc->proc_name); } #endif /* DEBUG */ } program_counter += SIZE_GET_ELEMENT; break; case I_PUT_ELEMENT: /* Dereference((cell_t *), _p_a_reg[instr->I_PUT_ELEMENT_DINDEX_R], cp1); Dereference((cell_t *), _p_a_reg[instr->I_PUT_ELEMENT_DEST_R],cp2); */ cp1 = _p_a_reg[instr->I_PUT_ELEMENT_DINDEX_R]; cp2 = _p_a_reg[instr->I_PUT_ELEMENT_DEST_R]; Dereference((cell_t *), _p_a_reg[instr->I_PUT_ELEMENT_SRC_R], cp3); i1 = *((int_t *) (cp1 + 1)); #ifdef DEBUG if (i1 >= ((data_header_t *) cp2)->size || i1 < 0) { fprintf(_p_stdout, "(%lu,%lu) Warning: put_element bounds violation (size=%lu (0..%lu), put element #%lu) in %s:%s\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) ((data_header_t *) cp2)->size, (unsigned long) (((data_header_t *) cp2)->size - 1), (unsigned long) i1, _p_current_proc->proc->module_name, _p_current_proc->proc->proc_name); } #endif /* DEBUG */ if (IsInt(cp2) && IsInt(cp3)) { intp1 = cp2 + 1 + i1; *intp1 = *((int_t *) (cp3 + 1)); } else if (IsDouble(cp2) && IsDouble(cp3)) { doublep1 = ((double_t *) (cp2 + 1)) + i1; *doublep1 = *((double_t *) (cp3 + 1)); } else if (IsString(cp2) && IsString(cp3)) { charp1 = ((u_char_t *) (cp2 + 1)) + i1; *charp1 = *((u_char_t *) (cp3 + 1)); } else if (IsDouble(cp2) && IsInt(cp3)) { doublep1 = ((double_t *) (cp2 + 1)) + i1; *doublep1 = (double_t) *((int_t *) (cp3 + 1)); } else if (IsInt(cp2) && IsDouble(cp3)) { intp1 = cp2 + 1 + i1; *intp1 = (int_t) *((double_t *) (cp3 + 1)); } else if (IsString(cp2) && IsInt(cp3)) { charp1 = ((u_char_t *) (cp2 + 1)) + i1; *charp1 = (u_char_t) *((u_int_t *) (cp3 + 1)); } else if (IsInt(cp2) && IsString(cp3)) { intp1 = cp2 + 1 + i1; *intp1 = (int_t) *((u_char_t *) (cp3 + 1)); } else if (IsString(cp2) && IsDouble(cp3)) { charp1 = ((u_char_t *) (cp2 + 1)) + i1; *charp1 = (u_char_t) *((double_t *) (cp3 + 1)); } else if (IsDouble(cp2) && IsString(cp3)) { doublep1 = ((double_t *) (cp2 + 1)) + i1; *doublep1 = (double_t) *((u_char_t *) (cp3 + 1)); } else if (IsTuple(cp2)) { *((cell_t **)(cp2 + 1 + i1)) = cp3; } #ifdef DEBUG else { fprintf(_p_stdout, "(%lu,%lu) Internal Error: put_element: Illegal tag found in %s:%s\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, _p_current_proc->proc->module_name, _p_current_proc->proc->proc_name); } #endif /* DEBUG */ program_counter += SIZE_PUT_ELEMENT; break; case I_ADD: Arith(+); program_counter += SIZE_ADD; break; case I_SUB: Arith(-); program_counter += SIZE_SUB; break; case I_MUL: Arith(*); program_counter += SIZE_MUL; break; case I_DIV: Arith(/); program_counter += SIZE_DIV; break; case I_MOD: cp1 = _p_a_reg[instr->I_ARITH_LSRC_R] + 1; cp2 = _p_a_reg[instr->I_ARITH_RSRC_R] + 1; i1 = *((int_t *) cp1); i2 = *((int_t *) cp2); BuildInt((cell_t *), _p_a_reg[instr->I_ARITH_DEST_R], (i1 % i2)); program_counter += SIZE_MOD; break; case I_LENGTH: DerefAndCheckSusp((cell_t *), _p_a_reg[instr->I_LENGTH_SRC_R],cp1); Dereference((cell_t *), _p_a_reg[instr->I_LENGTH_DEST_R], cp2); cp2++; *((int_t *) cp2) = ((data_header_t *)cp1)->size; program_counter += SIZE_LENGTH; break; case I_COPY_MUT: cp1 = _p_a_reg[instr->I_COPY_MUT_SRC_R]; cp2 = _p_a_reg[instr->I_COPY_MUT_DEST_R]; i1 = ((data_header_t *)cp1)->size; #ifdef GAUGE IncrementCounter(instr->I_COPY_MUT_COUNTER, i1); #endif /* GAUGE */ #ifdef DEBUG if (i1 != ((data_header_t *) cp2)->size || i1 < 0) { fprintf(_p_stdout, "(%lu,%lu) Warning: copy_mut bounds violation (from size=%lu, to size=%lu) in %s:%s\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (unsigned long) i1, (unsigned long) ((data_header_t *) cp2)->size, _p_current_proc->proc->module_name, _p_current_proc->proc->proc_name); } #endif /* DEBUG */ tag1 = ((data_header_t *)cp1)->tag; tag2 = ((data_header_t *)cp2)->tag; if (tag1 == INT_TAG && tag2 == INT_TAG) { intp1 = (int_t *) (cp1 + 1); intp2 = (int_t *) (cp2 + 1); for ( ; i1 > 0; i1--) *intp2++ = *intp1++; } else if (tag1 == DOUBLE_TAG && tag2 == DOUBLE_TAG) { doublep1 = (double_t *) (cp1 + 1); doublep2 = (double_t *) (cp2 + 1); for ( ; i1 > 0; i1--) *doublep2++ = *doublep1++; } else if (tag1 == STRING_TAG && tag2 == STRING_TAG) { charp1 = (u_char_t *) (cp1 + 1); charp2 = (u_char_t *) (cp2 + 1); for ( ; i1 > 0; i1--) *charp2++ = *charp1++; } else if (tag1 == INT_TAG && tag2 == DOUBLE_TAG) { intp1 = (int_t *) (cp1 + 1); doublep2 = (double_t *) (cp2 + 1); for ( ; i1 > 0; i1--) *doublep2++ = (double_t) *intp1++; } else if (tag1 == DOUBLE_TAG && tag2 == INT_TAG) { doublep1 = (double_t *) (cp1 + 1); intp2 = (int_t *) (cp2 + 1); for ( ; i1 > 0; i1--) *intp2++ = (int_t) *doublep1++; } else if (tag1 == STRING_TAG && tag2 == INT_TAG) { charp1 = (u_char_t *) (cp1 + 1); uintp2 = (u_int_t *) (cp2 + 1); for ( ; i1 > 0; i1--) *uintp2++ = (u_int_t) *charp1++; } else if (tag1 == INT_TAG && tag2 == STRING_TAG) { uintp1 = (u_int_t *) (cp1 + 1); charp2 = (u_char_t *) (cp2 + 1); for ( ; i1 > 0; i1--) *charp2++ = (u_char_t) *uintp1++; } else if (tag1 == STRING_TAG && tag2 == DOUBLE_TAG) { charp1 = (u_char_t *) (cp1 + 1); doublep2 = (double_t *) (cp2 + 1); for ( ; i1 > 0; i1--) *doublep2++ = (double_t) *charp1++; } else if (tag1 == DOUBLE_TAG && tag2 == STRING_TAG) { doublep1 = (double_t *) (cp1 + 1); charp2 = (u_char_t *) (cp2 + 1); for ( ; i1 > 0; i1--) *charp2++ = (u_char_t) *doublep1++; } #ifdef DEBUG else { fprintf(_p_stdout, "(%lu,%lu) Internal Error: copy_mut: Illegal tag found in %s:%s\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, _p_current_proc->proc->module_name, _p_current_proc->proc->proc_name); } #endif /* DEBUG */ program_counter += SIZE_COPY_MUT; break; case I_PUT_FOREIGN: *_p_foreign_ptr++ = (void *) (_p_a_reg[instr->I_PUT_FOREIGN_SRC_R] + 1); program_counter += SIZE_PUT_FOREIGN; break; case I_CALL_FOREIGN: #ifdef GAUGE TIMER(start_time); #endif /* GAUGE */ f = (void (*)()) instr->I_CALL_FOREIGN_FOR; /* the function */ i1 = instr->I_CALL_FOREIGN_N_ARGS; /* number of args */ #ifdef DEBUG if (EmDebug(3)) { s1 = _p_foreign_lookup((u_int_t) f); fprintf(_p_stdout, "(%lu,%lu) FOREIGN: %s/%lu : Calling\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, s1, (unsigned long) i1); fflush(_p_stdout); } test_f_regs(i1, (u_int_t) f, "entering"); #endif /* DEBUG */ #ifdef PDB if (_pdb_gc_after_foreign) { gc_on_next_reduction = TRUE; _pdb_last_called_foreign = (u_int_t) f; } #endif /* PDB */ if (i1 == 0) (*f)(); else if (i1 == 1) (*f)(F_REG_1); else if (i1 <= 2) (*f)(F_REG_2); else if (i1 <= 4) (*f)(F_REG_4); else if (i1 <= 8) (*f)(F_REG_8); else if (i1 <= 16) (*f)(F_REG_16); else if (i1 <= 32) (*f)(F_REG_32); else if (i1 <= 64) (*f)(F_REG_64); #ifdef DEBUG test_f_regs(i1, (u_int_t) f, "exiting"); if (EmDebug(3)) { fprintf(_p_stdout, "(%lu,%lu) FOREIGN: %s/%lu : Done\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, s1, (unsigned long) i1); fflush(_p_stdout); } #endif /* DEBUG */ #ifdef GAUGE TIMER(stop_time); lapse_time(start_time,stop_time,stop_time); cp1 = instr->I_CALL_FOREIGN_TIMER; timer_add(stop_time,*((gauge_timer *) cp1),*((gauge_timer *) cp1)); #endif /* GAUGE */ _p_foreign_ptr = _p_f_reg; program_counter += SIZE_CALL_FOREIGN; break; case I_EXIT: exit_from_pcn(_p_a_reg[instr->I_EXIT_CODE_R]); break; case I_PRINT_TERM: _p_print_term(_p_stdout, _p_a_reg[instr->I_PRINT_TERM_SRC_R]); fflush(_p_stdout); program_counter += SIZE_PRINT_TERM; break; #ifdef STREAMS case I_INIT_SEND: /* R1=connection_array (integer array of size 2) * R2=stream_array (array of 1 or more stream data structures) * R3=index into the stream_array * * R1 and R3 are already dereferenced when we enter (a data * check was already done on them) */ #ifdef DEBUG if (((data_header_t *) _p_a_reg[instr->I_INIT_SEND_CON_ARRAY_R])->tag != INT_TAG) { char s_buff[1024]; sprintf(s_buff, "%s:%s(): init_send: Expected an INT_TAG in R1", _p_current_proc->proc->module_name, _p_current_proc->proc->proc_name); _p_fatal_error(s_buff); } #endif /* DEBUG */ intp1 = (int_t *) (_p_a_reg[instr->I_INIT_SEND_CON_ARRAY_R] + 1); streamp1 = _p_get_stream_record(_p_a_reg[instr->I_INIT_SEND_STREAM_R], _p_a_reg[instr->I_INIT_SEND_SINDEX_R]); streamp1->id = i2 = *intp1; /* stream id */ streamp1->u.s.node = i1 = *(intp1 + 1);/* node id */ if (i1 < 0) { /* Error during init_recv() */ streamp1->send = 1; streamp1->open = 0; fprintf(_p_stdout, "(%lu,%lu) Warning: init_send: Illegal node -- probably a failed init_recv\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); } else { streamp1->send = 1; streamp1->state = 0; streamp1->open = 1; if (i1 == _p_my_id) { /* * This is a local stream, so mark both the send * and receive stream records as local streams. */ streamp1->remote = 0; streamp2 = _p_lookup_stream(i2); /* receive side */ if (streamp2->state == 2 && streamp2->remote) { /* * There was a receive posted on this stream already * (before the init_send occured), and the receive * side thinks its a remote stream (the default). * Since we now know it is a local stream, disable * remote receives on this stream. */ _p_disable_stream_recv(i2); } streamp2->remote = 0; } else { /* This is a remote stream */ streamp1->remote = 1; } } program_counter += SIZE_INIT_SEND; break; case I_INIT_RECV: /* R1=connection_array (integer array of size 2) * R2=stream_array (array of 1 or more stream data structures) * R3=index into the stream_array * * R1 and R3 are already dereferenced when we enter (a data * check was just done on R3, and R1 was just created) */ #ifdef DEBUG if (((data_header_t *) _p_a_reg[instr->I_INIT_RECV_CON_ARRAY_R])->tag != INT_TAG) { char s_buff[1024]; sprintf(s_buff, "%s:%s(): init_recv: Expected an INT_TAG in R1", _p_current_proc->proc->module_name, _p_current_proc->proc->proc_name); _p_fatal_error(s_buff); } #endif /* DEBUG */ intp1 = (int_t *) (_p_a_reg[instr->I_INIT_RECV_CON_ARRAY_R] + 1); streamp1 = _p_get_stream_record(_p_a_reg[instr->I_INIT_RECV_STREAM_R], _p_a_reg[instr->I_INIT_RECV_SINDEX_R]); streamp1->send = 0; streamp1->state = 0; if (_p_alloc_stream(cp1, i1, &i2)) { *intp1 = i2; /* stream id */ *(intp1 + 1) = _p_my_id; /* node id */ streamp1->open = 1; if (_p_multiprocessing) { /* * Set the stream to be a remote stream initially * The init_send will fix this if it turns out to * be a local stream. */ streamp1->remote = 1; } else { streamp1->remote = 0; } streamp1->id = i2; } else { /* * Failed to allocate stream. This is relayed * to the sr_init_send by setting the node_id to -1 */ *(intp1 + 1) = -1; /* node id */ streamp1->open = 0; } program_counter += SIZE_INIT_RECV; break; case I_CLOSE_STREAM: /* R1=stream_array (array of 1 or more stream data structures) * R2=index into the stream_array * * R2 is already dereferenced when we enter (a data * check was already done on it) */ streamp1 = _p_get_stream_record(_p_a_reg[instr->I_CLOSE_STREAM_STREAM_R], _p_a_reg[instr->I_CLOSE_STREAM_SINDEX_R]); if (!streamp1->open) { fprintf(_p_stdout, "(%lu,%lu) Warning: close_stream: Stream already closed\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); } streamp1->open = 0; if (streamp1->u.s.node == _p_my_id) { streamp2 = _p_lookup_stream(streamp1->id); streamp2->open = 0; } else { _p_send_close_stream(streamp1->u.s.node, streamp1->id); } program_counter += SIZE_CLOSE_STREAM; break; case I_STREAM_SEND: /* R1=stream_array (array of 1 or more stream data structures) * R2=index into the stream_array * R3=array * R4=offset * R5=size * * R2, R3, R4, and R5 are already dereferenced when we * enter (a data check was already done on them) */ streamp1 = _p_get_stream_record(_p_a_reg[instr->I_STREAM_SEND_STREAM_R], _p_a_reg[instr->I_STREAM_SEND_SINDEX_R]); if (streamp1->open) /* Stream is still open */ { if (streamp1->remote) { /* Remote stream */ _p_stream_send(streamp1->id, streamp1->u.s.node, _p_a_reg[instr->I_STREAM_SEND_ARRAY_R], *((u_int_t *) (_p_a_reg[instr->I_STREAM_SEND_OFFSET_R] + 1)), *((u_int_t *) (_p_a_reg[instr->I_STREAM_SEND_SIZE_R] + 1)) ); } else { /* Local stream */ streamp2 = _p_lookup_stream(streamp1->id); if (streamp2->state == 0 || streamp2->state == 1) { /* * Send before recv -- on local stream * Need to queue up the message on local stream queue */ _p_enqueue_local_stream_msg(streamp2, _p_a_reg[instr->I_STREAM_SEND_ARRAY_R], (u_int_t) *(_p_a_reg[instr->I_STREAM_SEND_OFFSET_R] + 1), (u_int_t) *(_p_a_reg[instr->I_STREAM_SEND_SIZE_R] + 1) ); streamp2->state = 1; } else /* streamp2->state == 2 */ { /* * A receive had been posted before any sends. * So copy data from sending array to receiving * array. */ if (IsUnknown(streamp2->u.r.array)) { /* Create array on heap and fill it in */ } else { /* Copy data into mutable array */ u_int_t offset = (u_int_t) *(_p_a_reg[instr->I_STREAM_SEND_OFFSET_R] + 1); u_int_t size = (u_int_t) *(_p_a_reg[instr->I_STREAM_SEND_SIZE_R] + 1); /* Do the copy */ } _p_set_stream_recv_status(streamp2->u.r.status, 0, "stream_send"); streamp2->state = 0; } } } else /* Stream is closed */ { fprintf(_p_stdout, "(%lu,%lu) Warning: stream_send: Trying to send to a closed stream\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); } program_counter += SIZE_STREAM_SEND; break; case I_STREAM_RECV: /* R1=stream_array (array of 1 or more stream data structures) * R2=index into the stream_array * R3=mutable array * R4=offset * R5=size * R6=status * * R2, R4, and R5 are already dereferenced when we enter (a data * check was already done on them) */ streamp1 = _p_get_stream_record(_p_a_reg[instr->I_STREAM_RECV_STREAM_R], _p_a_reg[instr->I_STREAM_RECV_SINDEX_R]); if (streamp1->state == 0) { /* * There is NOT a local message waiting to be delivered * This is could be either a local or remote stream */ if (streamp1->open) { /* * The stream is still open. * This could be either a local or remote stream. * So fill receive info into the stream record * and enable receives if it is a remote stream. */ Dereference((cell_t *), _p_a_reg[instr->I_STREAM_RECV_ARRAY_R], streamp1->u.r.array); streamp1->u.r.offset = (u_int_t) *(_p_a_reg[instr->I_STREAM_RECV_OFFSET_R] + 1); streamp1->u.r.size = (u_int_t) *(_p_a_reg[instr->I_STREAM_RECV_SIZE_R] + 1); Dereference((cell_t *), _p_a_reg[instr->I_STREAM_RECV_STATUS_R], streamp1->u.r.status); streamp1->state = 2; if (streamp1->remote) _p_enable_stream_recv(streamp1->id); } else { /* * The stream has been closed (local or remote). * So, set return status = 1 to indicate a closed stream. */ _p_set_stream_recv_status(_p_a_reg[instr->I_STREAM_RECV_STATUS_R], 1, "stream_recv"); } } else if (streamp1->state == 1) { /* * It is a local stream and there at least one * message waiting. So dequeue a message from the local * stream queue and fill in the receiving array. */ local_stream_msg_t *msg = _p_dequeue_local_stream_msg(streamp1); /* * The array argument could be either a mutable array or * an undefined variable. If it is an array, then fill in * the array elements with the received data. If it is * and undefined variable, then create the appropriate * array on the heap for the received data, and defined * this variable to the array. */ Dereference((cell_t *), _p_a_reg[instr->I_STREAM_RECV_ARRAY_R], cp1); if (IsUnknown(cp1)) { /* Create array on heap and fill it in */ } else { /* Copy data into mutable array */ u_int_t offset = (u_int_t) *(_p_a_reg[instr->I_STREAM_RECV_OFFSET_R] + 1); u_int_t size = (u_int_t) *(_p_a_reg[instr->I_STREAM_RECV_SIZE_R] + 1); Dereference((cell_t *), _p_a_reg[instr->I_STREAM_RECV_STATUS_R], cp2); /* Do the copy */ } _p_free_local_stream_msg(msg); _p_set_stream_recv_status(_p_a_reg[instr->I_STREAM_RECV_STATUS_R], 0, "stream_recv"); if (streamp1->u.lq.queue_head == (local_stream_msg_t *) NULL) streamp1->state == 0; } else { /* We're already waiting for a message */ fprintf(_p_stdout, "(%lu,%lu) Warning: i_stream_recv: stream id %ld: A second receive was posted on this stream before the first receive was fulfilled. Stream receives must be sequentialized.\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, (long) streamp1->id); } program_counter += SIZE_STREAM_RECV; break; #else /* STREAMS */ case I_INIT_SEND: fprintf(_p_stdout, "(%lu,%lu) Warning: i_init_send: Not yet implemented\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); program_counter += SIZE_INIT_SEND; break; case I_INIT_RECV: fprintf(_p_stdout, "(%lu,%lu) Warning: i_init_recv: Not yet implemented\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); program_counter += SIZE_INIT_RECV; break; case I_CLOSE_STREAM: fprintf(_p_stdout, "(%lu,%lu) Warning: i_close_stream: Not yet implemented\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); program_counter += SIZE_CLOSE_STREAM; break; case I_STREAM_SEND: fprintf(_p_stdout, "(%lu,%lu) Warning: i_stream_send: Not yet implemented\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); program_counter += SIZE_STREAM_SEND; break; case I_STREAM_RECV: fprintf(_p_stdout, "(%lu,%lu) Warning: i_stream_recv: Not yet implemented\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); program_counter += SIZE_STREAM_RECV; break; #endif /* STREAMS */ default: { char s_buff[1024]; sprintf(s_buff, "%s:%s(): Illegal instruction", _p_current_proc->proc->module_name, _p_current_proc->proc->proc_name); _p_fatal_error(s_buff); } break; } } } /* _p_emulate() */ /* * equal() * * Test for equality of two terms. * * It is assumed that the arguments have already be dereferenced and * checked for suspension before this function is called. * * Return: 0 = not equal * 1 = suspend (encountered a variable) * *rcp = pointer to variable to suspend on * 2 = equal */ static int equal(t1, t2, rcp) cell_t *t1, *t2; cell_t **rcp; { cell_t *cp1, *cp2; int i1, rc; /* Check to see if their tags match */ if (((data_header_t *)t1)->tag != ((data_header_t *)t2)->tag) return (0); if (IsString(t1)) /* Handle Strings */ { /* Treat a comparison between two strings as a strcmp() */ if (strcmp((char *) (t1 + 1), (char *) (t2 + 1)) == 0) return (2); else return (0); } else { i1 = ((data_header_t *)t1)->size; if (i1 != ((data_header_t *)t2)->size) return (0); if (IsTuple(t1)) /* Handle Tuple */ { for ( ; i1 > 0; i1--) { t1++; t2++; /* Dereference the arguments and check for undefined vars */ Dereference((cell_t *), t1, cp1); if (IsUnknown(cp1)) { *rcp = cp1; return (1); } Dereference((cell_t *), t2, cp2); if (IsUnknown(cp2)) { *rcp = cp2; return (1); } if ((rc = equal(cp1, cp2, rcp)) == 0) return (0); else if (rc == 1) return (1); } return (2); } else if (IsInt(t1)) /* Handle Int */ { int_t *ip1 = (int_t *) (t1 + 1); int_t *ip2 = (int_t *) (t2 + 1); for ( ; i1 > 0; i1--, ip1++, ip2++) { if (*ip1 != *ip2) return (0); } return (2); } else if (IsDouble(t1)) /* Handle Double */ { double_t *rp1 = (double_t *) (t1 + 1); double_t *rp2 = (double_t *) (t2 + 1); for ( ; i1 > 0; i1--, rp1++, rp2++) { if (*rp1 != *rp2) return (0); } return (2); } else { char s_buff[1024]; sprintf(s_buff, "%s:%s(): equal(): Illegal tag found", _p_current_proc->proc->module_name, _p_current_proc->proc->proc_name); _p_fatal_error(s_buff); return (0); } } } /* equal() */ /* * wait_for_process() * * Wait until we can get an active process. This should be called when * both the active queue (_p_active_qf) and the global suspension * queue (_p_globsusp_qf) are empty. * * This means waiting for characters to become available on the keyboard. * * In multiprocessor version, it should also watch for activity in the * communications component. * * GC_ALERT: This procedure may induce garbage collection */ static void wait_for_process() { #ifdef GAUGE gauge_timer *timer; int size = 0; proc_record_t *proc_record; TIMER(start_time); #endif /* GAUGE */ /* DoGC(); */ #ifdef PDB_HOST if (_pdb_breakout || _pdb_empty_queue_break) { _pdb_enter(FALSE); } #endif /* PDB_HOST */ while (_p_active_qf == (proc_record_t *) NULL) { if (!_p_multiprocessing) { #ifdef PDB_HOST fprintf(_p_stdout, "(%lu) reduction %lu\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); fprintf(_p_stdout, "Error: Node %lu: Process queue is empty\n", (unsigned long) _p_my_id); _pdb_enter(FALSE); #else /* PDB_HOST */ _p_fatal_error("Process queue is empty"); #endif /* PDB_HOST */ } #ifdef PARALLEL if (_p_host) { #endif /* PARALLEL */ TryMSGNoSkip(); /* GC_ALERT */ #ifdef PDB_HOST if (_pdb_breakout) _pdb_enter(FALSE); #endif /* PDB_HOST */ #ifdef PARALLEL } else { /* Block waiting for MSG_DEFINE or MSG_VALUE messages */ _p_process_messages(RCV_BLOCK); /* GC_ALERT */ /* * A define or value message could have unsuspended something * on the global suspension queue, so add it to active queue. */ if (_p_globsusp_qf != (proc_record_t *) NULL) { AppendProcessQueue(_p_active_qf, _p_active_qb, _p_globsusp_qf, _p_globsusp_qb); } } #endif /* PARALLEL */ } #ifdef GAUGE TIMER(stop_time); lapse_time(start_time, stop_time, stop_time); for (proc_record = _p_active_qf ; proc_record != (proc_record_t *) NULL ; proc_record = proc_record->next ) { size += 1; } scale_timer(stop_time, size); for (proc_record = _p_active_qf ; proc_record != (proc_record_t *) NULL ; proc_record = proc_record->next ) { timer = (gauge_timer *) (proc_record->proc->idle_timer); timer_add(*timer, stop_time, *timer); } #endif /* GAUGE */ } /* wait_for_process() */ #define RunCheckForSuspension(VarName) \ if (IsUnknown(VarName)) \ { \ *run_suspend_var = VarName; \ return (2); \ } /* * run() * * Perform a metacall. * Lookup the call 'target', and create a process record that contains * the 'vt_tuple' (virtual topology tuple), the call arguments from * 'target', and the synchronization variable 'sync_var'. * * GC_ALERT: This procedure may induce garbage collection * * Return: 0 -- metacall succeeded * 1 -- metacall failed * 2 -- metacall suspended, *run_suspend_var set to pointer to * the variable to suspend on */ static int_t run(vt_tuple, target, caller_module, caller_proc, sync_var, run_suspend_var) cell_t *vt_tuple; cell_t *target; cell_t *caller_module; cell_t *caller_proc; cell_t *sync_var; cell_t **run_suspend_var; { proc_header_t *proc_header; proc_record_t *proc_record; int_t use_vt_arg; int_t i; char_t *caller_module_name; char_t *caller_proc_name; char_t *module_name; char_t *proc_name; int_t current_arg; Dereference((cell_t *), vt_tuple, vt_tuple); Dereference((cell_t *), target, target); Dereference((cell_t *), caller_module, caller_module); Dereference((cell_t *), caller_proc, caller_proc); Dereference((cell_t *), sync_var, sync_var); RunCheckForSuspension(vt_tuple); RunCheckForSuspension(caller_module); RunCheckForSuspension(caller_proc); if (!IsString(caller_module) || !IsString(caller_proc)) { /* Illegal caller module or proc string */ fprintf(_p_stdout, "(%lu,%lu) Warning: Failed to metacall a procedure -- illegal caller module or procedure string\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction); fflush(_p_stdout); return (1); } caller_module_name = (char_t *) (caller_module + 1); caller_proc_name = (char_t *) (caller_proc + 1); if (!IsTuple(vt_tuple)) { /* Illegal vt tuple */ fprintf(_p_stdout, "(%lu,%lu) Warning: Failed to metacall a procedure from %s:%s() -- illegal VT tuple\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, caller_module_name, caller_proc_name); fflush(_p_stdout); return (1); } if (DHCellSize(vt_tuple) == 0) /* Empty vt tuple -- we are not using vts, so ignore this argument */ use_vt_arg = 0; else use_vt_arg = 1; if (IsString(target)) { /* The target is a string, so use it as the procedure name */ proc_name = (char_t *) (target + 1); module_name = caller_module_name; proc_header = _p_proc_lookup(module_name, proc_name); if (proc_header = (proc_header_t *) NULL) { /* Could not find this procedure */ fprintf(_p_stdout, "(%lu,%lu) Warning: Failed to metacall %s:%s() from %s:%s() -- could not find procedure\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, module_name, proc_name, caller_module_name, caller_proc_name); fflush(_p_stdout); return (1); } else if (proc_header->arity != (use_vt_arg + 2)) { /* Arity mismatch */ fprintf(_p_stdout, "(%lu,%lu) Warning: Failed to metacall %s:%s() from %s:%s() -- different number of arguments\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, module_name, proc_name, caller_module_name, caller_proc_name); fflush(_p_stdout); return (1); } else { cell_t *empty_list; #ifdef DEBUG if (EmDebug(2)) { fprintf(_p_stdout, "(%lu,%lu) metacall to %s:%s()\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, module_name, proc_name); fflush(_p_stdout); } #endif /* DEBUG */ /* * GC_ALERT * Once we start filling in the process record arguments, * we must make sure that the proc_record is completely * filled in an queued before another garbage collection is * called. The only things that could cause garbage * collection are the proc_record allocation, and the * creation of the empty list argument (the left sync variable). * So make sure there is enough room on the heap for both * before proceeding with filling in the proc_record arguments. * We also need to preserve the vt_tuple and sync_var if we gc. */ PushGCReference(&vt_tuple); PushGCReference(&sync_var); proc_record = _p_alloc_proc_record(use_vt_arg + 2); /* GC_ALERT */ BuildEmptyList((cell_t *), empty_list); /* GC_ALERT */ PopGCReference(2); proc_record->proc = proc_header; #ifdef PDB proc_record->instance = _pdb_get_next_instance(); proc_record->reduction = _p_reduction; proc_record->called_by = _p_current_proc->proc; #endif /* PDB */ i = 0; if (use_vt_arg) proc_record->args[i++] = vt_tuple; proc_record->args[i++] = empty_list; proc_record->args[i] = sync_var; EnqueueProcess(proc_record, _p_active_qf, _p_active_qb); PDB_EnqueueProcess(proc_record); return (0); } } else { /* The target is a call tuple, so pull it apart and make the call. */ return (run_call_tuple(vt_tuple, use_vt_arg, sync_var, target, caller_module_name, caller_proc_name, (char_t *) NULL, (char_t *) NULL, 0, run_suspend_var, TRUE, &proc_record, ¤t_arg)); } } /* run() */ /* * run_call_tuple() * * Traverse a call tuple that looks like: * {"call",{":",mod,proc},[args],_} * Where: * 'mod' is a string giving the module name, or an empty string * denoting to use the current module * 'proc' is either a string, giving the procedure name, or * another call tuple * 'args' are the arguments to the call * * This needs to traverse this call tuple, looking for a module name, * procedure name, and arguments. * If the tuple is several levels deep, then concatentate * the argument lists together in a depth first manner. Also, * make sure the 'mod' strings match in the multiple levels (or * are empty strings). * * When decending down the call tuples, count the number of arguments * and make sure the argument lists are complete (no undefined list tail) * on the way down. When we bottom out, then create the process * record. Then fill in the arguments to the process record on the * way back up. * * GC_ALERT: This procedure may induce garbage collection * * Return: Same return values as run() */ static int_t run_call_tuple(vt_tuple, use_vt_arg, sync_var, target, caller_module_name, caller_proc_name, module_name, proc_name, n_args, run_suspend_var, top_level, proc_record, current_arg) cell_t *vt_tuple; int_t use_vt_arg; cell_t *sync_var; cell_t *target; char_t *caller_module_name; char_t *caller_proc_name; char_t *module_name; char_t *proc_name; int_t n_args; cell_t **run_suspend_var; bool_t top_level; proc_record_t **proc_record; int_t *current_arg; { cell_t *ct_header, *ct_call, *ct_args, *ct_colon, *ct_mod, *ct_proc; cell_t *args_head, *args_tail; int_t rc; proc_header_t *proc_header; char_t *this_module_name; bool_t bad_call_argument = TRUE; RunCheckForSuspension(target); if (DHCellSize(target) != 4) { /* Illegal call tuple */ fprintf(_p_stdout, "(%lu,%lu) Warning: Failed to metacall a procedure from %s:%s() -- illegal call tuple, wrong size\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, caller_module_name, caller_proc_name); fflush(_p_stdout); return (1); } Dereference((cell_t *), target + 1, ct_header); Dereference((cell_t *), target + 2, ct_call); Dereference((cell_t *), target + 3, ct_args); /* Dereference((cell_t *), target + 4, ct_anno); */ RunCheckForSuspension(ct_header); RunCheckForSuspension(ct_call); RunCheckForSuspension(ct_args); if (!IsString(ct_header) || strcmp("call", (char *) (ct_header + 1)) != 0) { /* Illegal call tuple -- the first argument is not the string "call" */ fprintf(_p_stdout, "(%lu,%lu) Warning: Failed to metacall a procedure from %s:%s() -- illegal call tuple, bad header argument (first argument)\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, caller_module_name, caller_proc_name); fflush(_p_stdout); return (1); } if (IsString(ct_call)) { /* * The second arg of the call tuple is a string, so use it as * the procedure name. */ ct_proc = ct_call; bad_call_argument = FALSE; } else if (IsTuple(ct_call) && DHCellSize(ct_call) == 4) { /* * The second arg of the call tuple is another call tuple. */ ct_proc = ct_call; bad_call_argument = FALSE; } else if (IsTuple(ct_call) && DHCellSize(ct_call) == 3) { /* * The second arg of the call tuple is a {":",mod,proc} tuple. */ Dereference((cell_t *), ct_call + 1, ct_colon); Dereference((cell_t *), ct_call + 2, ct_mod); Dereference((cell_t *), ct_call + 3, ct_proc); RunCheckForSuspension(ct_colon); RunCheckForSuspension(ct_mod); RunCheckForSuspension(ct_proc); if (IsString(ct_colon) && strcmp(":", (char *) (ct_colon + 1)) == 0 && IsString(ct_mod) && (IsString(ct_proc) || IsTuple(ct_proc))) { /* * This is a valid {":",mod,proc} tuple. So use the 'mod' portion. */ this_module_name = (char_t *) (ct_mod + 1); if (*this_module_name != '\0') { if (module_name != (char_t *) NULL) { if (strcmp(module_name, this_module_name) != 0) { /* Encountered two different module names in call tuple */ fprintf(_p_stdout, "(%lu,%lu) Warning: Failed to metacall a procedure from %s:%s() -- encountered two different mdoule names in nested call tuple\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, caller_module_name, caller_proc_name); fflush(_p_stdout); return (1); } } else { module_name = this_module_name; } } bad_call_argument = FALSE; } } if (bad_call_argument) { /* * Illegal call procedure -- the second argument is not * a string, another call tuple, or a {":",mod,proc}. */ fprintf(_p_stdout, "(%lu,%lu) Warning: Failed to metacall a procedure from %s:%s() -- illegal call tuple, bad call argument (second argument)\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, caller_module_name, caller_proc_name); fflush(_p_stdout); return (1); } /* Count the number of arguments, and make sure the list is complete */ args_tail = ct_args; while (1) { RunCheckForSuspension(args_tail); if (!IsTuple(args_tail) || (DHCellSize(args_tail) != 0 && DHCellSize(args_tail) != 2)) { /* Illegal call tuple -- argument list not a list */ fprintf(_p_stdout, "(%lu,%lu) Warning: Failed to metacall a procedure from %s:%s() -- illegal call tuple, bad argument list\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, caller_module_name, caller_proc_name); fflush(_p_stdout); return (1); } if (DHCellSize(args_tail) == 0) { break; } else /* DHCellSize(args_tail) == 2 */ { n_args++; Dereference((cell_t *), args_tail + 2, args_tail); } } if (IsString(ct_proc)) { /* * We have bottomed out in the call tuple recursion * So lookup the procedure, allocate the process record, * fill in the vt_tuple argument if necessary, and fill * in my level's arguments. */ proc_name = (char_t *) (ct_proc + 1); if (module_name == (char_t *) NULL) module_name = caller_module_name; proc_header = _p_proc_lookup(module_name, proc_name); if (proc_header == (proc_header_t *) NULL) { /* Could not find this procedure */ fprintf(_p_stdout, "(%lu,%lu) Warning: Failed to metacall %s:%s() from %s:%s() -- could not find procedure\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, module_name, proc_name, caller_module_name, caller_proc_name); fflush(_p_stdout); return (1); } else if (proc_header->arity != (use_vt_arg + n_args + 2)) { /* Arity mismatch */ fprintf(_p_stdout, "(%lu,%lu) Warning: Failed to metacall %s:%s() from %s:%s() -- different number of arguments\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, module_name, proc_name, caller_module_name, caller_proc_name); fflush(_p_stdout); return (1); } else { #ifdef DEBUG if (EmDebug(2)) { fprintf(_p_stdout, "(%lu,%lu) metacall to %s:%s()\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, module_name, proc_name); fflush(_p_stdout); } #endif /* DEBUG */ /* * GC_ALERT * Once we start filling in the process record arguments, * we must make sure that the proc_record is completely * filled in an queued before another garbage collection is * called. The only things that could cause garbage * collection are the proc_record allocation, and the * creation of the empty list argument (the left sync variable). * So make sure there is enough room on the heap for both * before proceeding with filling in the proc_record arguments. * We also need to preserve the vt_tuple and sync_var if we gc. */ PushGCReference(&vt_tuple); PushGCReference(&sync_var); *proc_record = _p_alloc_proc_record(use_vt_arg + n_args + 2); TryGCWithSize(EmptyListSizeWithTrailer()); PopGCReference(2); (*proc_record)->proc = proc_header; #ifdef PDB (*proc_record)->instance = _pdb_get_next_instance(); (*proc_record)->reduction = _p_reduction; (*proc_record)->called_by = _p_current_proc->proc; #endif /* PDB */ *current_arg = 0; if (use_vt_arg) (*proc_record)->args[(*current_arg)++] = vt_tuple; rc = 0; } } else /* IsTuple(ct_proc) */ { /* This is another call tuple */ PushGCReference(&ct_args); rc = run_call_tuple(vt_tuple, use_vt_arg, sync_var, ct_proc, caller_module_name, caller_proc_name, module_name, proc_name, n_args, run_suspend_var, FALSE, proc_record, current_arg); PopGCReference(1); } if (rc == 0) { /* Fillin the arguments for my level */ args_tail = ct_args; while (1) { /* * We don't need to check for unknown or an illegal list, * since we did this on the way down */ if (DHCellSize(args_tail) == 0) { break; } else /* DHCellSize(args_tail) == 2 */ { Dereference((cell_t *), args_tail + 1, args_head); (*proc_record)->args[(*current_arg)++] = args_head; Dereference((cell_t *), args_tail + 2, args_tail); } } if (top_level) { /* * GC_ALERT - We checked this above, when we allocated * the proc_record. So we know we will not gc here. */ BuildEmptyList((cell_t *), (*proc_record)->args[(*current_arg)++]); (*proc_record)->args[(*current_arg)] = sync_var; EnqueueProcess(*proc_record, _p_active_qf, _p_active_qb); PDB_EnqueueProcess(*proc_record); } } return (rc); } /* run_call_tuple() */ /* * save_arguments() * * Save the first 'n_args' registers into the process record _p_current_proc. * If _p_current_proc is not big enough to hold the arguments, then allocate * a new process record. * * GC_ALERT: This procedure may induce garbage collection */ static void save_arguments(n_args) int_t n_args; { int_t i; if (_p_current_proc->header.size < n_args) { proc_record_t *old_cp = _p_current_proc; _p_first_unused_register = n_args; _p_current_proc = _p_alloc_proc_record(n_args); /* GC_ALERT */ _p_first_unused_register = DEFAULT_FIRST_UNUSED_REGISTER; _p_current_proc->proc = old_cp->proc; #ifdef PDB _p_current_proc->instance = old_cp->instance; _p_current_proc->reduction = old_cp->reduction; _p_current_proc->called_by = old_cp->called_by; #endif _p_free_proc_record(old_cp); } for (i = 0; i < n_args; i++) _p_current_proc->args[i] = _p_a_reg[i]; } /* save_arguments() */ /* * schedule_process() * * Take the first argument off the active process queue and * schedule it for execution. (i.e., set the appropriate registers) */ static void schedule_process() { int_t i; int_t arity; #ifdef PDB_HOST if (_p_active_qf->proc->break_num > 0) _pdb_enter(TRUE); else if (_pdb_breakout || _p_reduction == _pdb_reduction_break) _pdb_enter(FALSE); #endif /* PDB_HOST */ DequeueProcess(_p_current_proc, _p_active_qf, _p_active_qb); PDB_DequeueProcess(_p_current_proc); program_counter = (cell_t *) _p_current_proc->proc->code; #ifdef DYNAMIC_PAM_LOADING if (program_counter == (cell_t *) NULL) { char buf[256]; sprintf(buf, "schedule_process(): Procedure %s:%s() has not been loaded, so I cannot execute it.", _p_current_proc->proc->module_name, _p_current_proc->proc->proc_name); _p_fatal_error(buf); } #endif /* DYNAMIC_PAM_LOADING */ arity = _p_current_proc->proc->arity; /* load the arguments to registers */ for (i = 0; i < arity; i++) _p_a_reg[i] = _p_current_proc->args[i]; timeslice_counter = TIMESLICE; _p_suspension_var = (cell_t *) NULL; } /* schedule_process() */ #ifdef DEBUG /* * test_f_regs() * * Tests the header and trailer cells for all arguments that will * be (or just were) passed to a foreign procedure. Make sure both * the header and trailer cells have valid tags. * * This is called immediately before and after a foreign procedure * is called in order to help catch out of bounds array writes, etc. */ static void test_f_regs(n_args, func, when) int_t n_args; u_int_t func; char_t *when; { int_t i; cell_t *header, *trailer; u_int_t tag, size; char_t *func_name; for (i = 0; i < n_args; i++) { header = ((cell_t *) _p_f_reg[i]) - 1; tag = ((data_header_t *)header)->tag; size = ((data_header_t *)header)->size; if (tag != INT_TAG && tag != DOUBLE_TAG && tag != STRING_TAG) { func_name = _p_foreign_lookup(func); if (tag == TUPLE_TAG) { fprintf(_p_stdout, "(%lu,%lu) Warning while %s foreign procedure %s().\n\tIllegally passing a tuple as argument %lu of %lu to %s().\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, when, func_name, (unsigned long) i + 1, (unsigned long) n_args, func_name); } else { fprintf(_p_stdout, "(%lu,%lu) Warning while %s foreign procedure %s().\n\tThe word immediately before the data structure that is passed as\n\targument %lu of %lu to %s() is corrupt.\n\tThis was likely caused by writing out of the bounds of an array.\n\tThis may cause a subsequent crash of this program.\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, when, func_name, (unsigned long) i + 1, (unsigned long) n_args, func_name); } } else { trailer = header + _p_size_without_trailer(tag, size); if (((data_header_t *)trailer)->tag != TRAILER_TAG) { func_name = _p_foreign_lookup(func); fprintf(_p_stdout, "(%lu,%lu) Warning while %s foreign procedure %s().\n\tThe word immediately after the data structure that is passed as\n\targument %lu of %lu to %s() is corrupt.\n\tThis was likely caused by writing out of the bounds of an array.\n\tThis may cause a subsequent crash of this program.\n", (unsigned long) _p_my_id, (unsigned long) _p_reduction, when, func_name, (unsigned long) i + 1, (unsigned long) n_args, func_name); } } } } /* test_f_regs() */ #endif /* DEBUG */
These are the contents of the former NiCE NeXT User Group NeXTSTEP/OpenStep software archive, currently hosted by Netfuture.ch.