/* * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2011 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2007-2017 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2009 Institut National de Recherche en Informatique * et Automatique. All rights reserved. * Copyright (c) 2011-2012 Los Alamos National Security, LLC. * Copyright (c) 2013-2018 Intel, Inc. All rights reserved. * Copyright (c) 2014-2018 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2016-2021 IBM Corporation. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ * */ #include "orte_config.h" #include "orte/constants.h" #ifdef HAVE_SYS_WAIT_H #include #endif #ifdef HAVE_SYS_TIME_H #include #endif /* HAVE_SYS_TIME_H */ #include #include "opal/hash_string.h" #include "opal/util/argv.h" #include "opal/util/opal_environ.h" #include "opal/class/opal_pointer_array.h" #include "opal/dss/dss.h" #include "opal/mca/hwloc/hwloc-internal.h" #include "opal/mca/pmix/pmix.h" #include "orte/util/dash_host/dash_host.h" #include "orte/util/session_dir.h" #include "orte/util/show_help.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/ess/ess.h" #include "orte/mca/iof/base/base.h" #include "orte/mca/odls/base/base.h" #include "orte/mca/ras/base/base.h" #include "orte/mca/regx/regx.h" #include "orte/mca/rmaps/rmaps.h" #include "orte/mca/rmaps/base/base.h" #include "orte/mca/rml/rml.h" #include "orte/mca/rml/rml_types.h" #include "orte/mca/routed/routed.h" #include "orte/mca/grpcomm/base/base.h" #if OPAL_ENABLE_FT_CR == 1 #include "orte/mca/snapc/base/base.h" #endif #include "orte/mca/filem/filem.h" #include "orte/mca/filem/base/base.h" #include "orte/mca/grpcomm/base/base.h" #include "orte/mca/rml/base/rml_contact.h" #include "orte/mca/rtc/rtc.h" #include "orte/runtime/orte_globals.h" #include "orte/runtime/runtime.h" #include "orte/runtime/orte_locks.h" #include "orte/runtime/orte_quit.h" #include "orte/util/compress.h" #include "orte/util/name_fns.h" #include "orte/util/pre_condition_transports.h" #include "orte/util/proc_info.h" #include "orte/util/threads.h" #include "orte/mca/state/state.h" #include "orte/mca/state/base/base.h" #include "orte/util/hostfile/hostfile.h" #include "orte/mca/odls/odls_types.h" #include "orte/mca/plm/base/plm_private.h" #include "orte/mca/plm/base/base.h" void orte_plm_base_set_slots(orte_node_t *node) { if (0 == strncmp(orte_set_slots, "cores", strlen(orte_set_slots))) { if (NULL != node->topology && NULL != node->topology->topo) { node->slots = opal_hwloc_base_get_nbobjs_by_type(node->topology->topo, HWLOC_OBJ_CORE, 0, OPAL_HWLOC_LOGICAL); } } else if (0 == strncmp(orte_set_slots, "sockets", strlen(orte_set_slots))) { if (NULL != node->topology && NULL != node->topology->topo) { if (0 == (node->slots = opal_hwloc_base_get_nbobjs_by_type(node->topology->topo, HWLOC_OBJ_SOCKET, 0, OPAL_HWLOC_LOGICAL))) { /* some systems don't report sockets - in this case, * use numanodes */ node->slots = opal_hwloc_base_get_nbobjs_by_type(node->topology->topo, HWLOC_OBJ_NODE, 0, OPAL_HWLOC_LOGICAL); } } } else if (0 == strncmp(orte_set_slots, "numas", strlen(orte_set_slots))) { if (NULL != node->topology && NULL != node->topology->topo) { node->slots = opal_hwloc_base_get_nbobjs_by_type(node->topology->topo, HWLOC_OBJ_NODE, 0, OPAL_HWLOC_LOGICAL); } } else if (0 == strncmp(orte_set_slots, "hwthreads", strlen(orte_set_slots))) { if (NULL != node->topology && NULL != node->topology->topo) { node->slots = opal_hwloc_base_get_nbobjs_by_type(node->topology->topo, HWLOC_OBJ_PU, 0, OPAL_HWLOC_LOGICAL); } } else { /* must be a number */ node->slots = strtol(orte_set_slots, NULL, 10); } /* mark the node as having its slots "given" */ ORTE_FLAG_SET(node, ORTE_NODE_FLAG_SLOTS_GIVEN); } void orte_plm_base_daemons_reported(int fd, short args, void *cbdata) { orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; orte_topology_t *t; orte_node_t *node; int i; ORTE_ACQUIRE_OBJECT(caddy); /* if we are not launching, then we just assume that all * daemons share our topology */ if (orte_do_not_launch) { node = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, 0); t = node->topology; for (i=1; i < orte_node_pool->size; i++) { if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, i))) { continue; } if (NULL == node->topology) { node->topology = t; } } } /* if this is an unmanaged allocation, then set the default * slots on each node as directed or using default */ if (!orte_managed_allocation) { if (NULL != orte_set_slots && 0 != strncmp(orte_set_slots, "none", strlen(orte_set_slots))) { caddy->jdata->total_slots_alloc = 0; for (i=0; i < orte_node_pool->size; i++) { if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, i))) { continue; } if (!ORTE_FLAG_TEST(node, ORTE_NODE_FLAG_SLOTS_GIVEN)) { OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output, "%s plm:base:setting slots for node %s by %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), node->name, orte_set_slots)); orte_plm_base_set_slots(node); } caddy->jdata->total_slots_alloc += node->slots; } } } if (orte_display_allocation) { orte_ras_base_display_alloc(); } /* ensure we update the routing plan */ orte_routed.update_routing_plan(NULL); /* progress the job */ caddy->jdata->state = ORTE_JOB_STATE_DAEMONS_REPORTED; ORTE_ACTIVATE_JOB_STATE(caddy->jdata, ORTE_JOB_STATE_VM_READY); /* cleanup */ OBJ_RELEASE(caddy); } void orte_plm_base_allocation_complete(int fd, short args, void *cbdata) { orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; ORTE_ACQUIRE_OBJECT(caddy); /* if we don't want to launch, then we at least want * to map so we can see where the procs would have * gone - so skip to the mapping state */ if (orte_do_not_launch) { caddy->jdata->state = ORTE_JOB_STATE_ALLOCATION_COMPLETE; ORTE_ACTIVATE_JOB_STATE(caddy->jdata, ORTE_JOB_STATE_MAP); } else { /* move the state machine along */ caddy->jdata->state = ORTE_JOB_STATE_ALLOCATION_COMPLETE; ORTE_ACTIVATE_JOB_STATE(caddy->jdata, ORTE_JOB_STATE_LAUNCH_DAEMONS); } /* cleanup */ OBJ_RELEASE(caddy); } void orte_plm_base_daemons_launched(int fd, short args, void *cbdata) { orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; ORTE_ACQUIRE_OBJECT(caddy); /* do NOT increment the state - we wait for the * daemons to report that they have actually * started before moving to the right state */ /* cleanup */ OBJ_RELEASE(caddy); } static void files_ready(int status, void *cbdata) { orte_job_t *jdata = (orte_job_t*)cbdata; if (ORTE_SUCCESS != status) { ORTE_FORCED_TERMINATE(status); } else { ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_MAP); } } void orte_plm_base_vm_ready(int fd, short args, void *cbdata) { orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; ORTE_ACQUIRE_OBJECT(caddy); /* progress the job */ caddy->jdata->state = ORTE_JOB_STATE_VM_READY; /* position any required files */ if (ORTE_SUCCESS != orte_filem.preposition_files(caddy->jdata, files_ready, caddy->jdata)) { ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); } /* cleanup */ OBJ_RELEASE(caddy); } void orte_plm_base_mapping_complete(int fd, short args, void *cbdata) { orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; ORTE_ACQUIRE_OBJECT(caddy); /* move the state machine along */ caddy->jdata->state = ORTE_JOB_STATE_MAP_COMPLETE; ORTE_ACTIVATE_JOB_STATE(caddy->jdata, ORTE_JOB_STATE_SYSTEM_PREP); /* cleanup */ OBJ_RELEASE(caddy); } void orte_plm_base_setup_job(int fd, short args, void *cbdata) { int rc; int i; orte_app_context_t *app; orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; char *key; orte_job_t *parent; orte_process_name_t name, *nptr; ORTE_ACQUIRE_OBJECT(caddy); OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output, "%s plm:base:setup_job", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); if (ORTE_JOB_STATE_INIT != caddy->job_state) { ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); OBJ_RELEASE(caddy); return; } /* update job state */ caddy->jdata->state = caddy->job_state; /* start by getting a jobid */ if (ORTE_JOBID_INVALID == caddy->jdata->jobid) { if (ORTE_SUCCESS != (rc = orte_plm_base_create_jobid(caddy->jdata))) { ORTE_ERROR_LOG(rc); ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); OBJ_RELEASE(caddy); return; } /* store it on the global job data pool - this is the key * step required before we launch the daemons. It allows * the orte_rmaps_base_setup_virtual_machine routine to * search all apps for any hosts to be used by the vm */ opal_hash_table_set_value_uint32(orte_job_data, caddy->jdata->jobid, caddy->jdata); } /* if job recovery is not enabled, set it to default */ if (!ORTE_FLAG_TEST(caddy->jdata, ORTE_JOB_FLAG_RECOVERABLE) && orte_enable_recovery) { ORTE_FLAG_SET(caddy->jdata, ORTE_JOB_FLAG_RECOVERABLE); } /* setup transport keys in case the MPI layer needs them. If * this is a dynamic spawn, then use the same keys as the * parent process had so the new/old procs can communicate. * Otherwise we can use the jobfam and stepid as unique keys * because they are unique values assigned by the RM */ nptr = &name; if (orte_get_attribute(&caddy->jdata->attributes, ORTE_JOB_LAUNCH_PROXY, (void**)&nptr, OPAL_NAME)) { /* get the parent jdata */ if (NULL == (parent = orte_get_job_data_object(name.jobid))) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); OBJ_RELEASE(caddy); return; } /* a tool might be the parent calling spawn, so cannot require that * a job transport key has been assigned to it */ key = NULL; if (orte_get_attribute(&parent->attributes, ORTE_JOB_TRANSPORT_KEY, (void**)&key, OPAL_STRING) && NULL != key) { /* record it */ orte_set_attribute(&caddy->jdata->attributes, ORTE_JOB_TRANSPORT_KEY, ORTE_ATTR_LOCAL, key, OPAL_STRING); /* add the transport key envar to each app */ for (i=0; i < caddy->jdata->apps->size; i++) { if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(caddy->jdata->apps, i))) { continue; } opal_setenv(OPAL_MCA_PREFIX"orte_precondition_transports", key, true, &app->env); } free(key); } else { if (ORTE_SUCCESS != (rc = orte_pre_condition_transports(caddy->jdata, NULL))) { ORTE_ERROR_LOG(rc); ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); OBJ_RELEASE(caddy); return; } } } else { /* this will also record the transport key attribute in the job object, and * adds the key envar to each app */ if (ORTE_SUCCESS != (rc = orte_pre_condition_transports(caddy->jdata, NULL))) { ORTE_ERROR_LOG(rc); ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); OBJ_RELEASE(caddy); return; } } /* if app recovery is not defined, set apps to defaults */ for (i=0; i < caddy->jdata->apps->size; i++) { if (NULL == (app = (orte_app_context_t*)opal_pointer_array_get_item(caddy->jdata->apps, i))) { continue; } if (!orte_get_attribute(&app->attributes, ORTE_APP_RECOV_DEF, NULL, OPAL_BOOL)) { orte_set_attribute(&app->attributes, ORTE_APP_MAX_RESTARTS, ORTE_ATTR_LOCAL, &orte_max_restarts, OPAL_INT32); } } /* set the job state to the next position */ ORTE_ACTIVATE_JOB_STATE(caddy->jdata, ORTE_JOB_STATE_INIT_COMPLETE); /* cleanup */ OBJ_RELEASE(caddy); } void orte_plm_base_setup_job_complete(int fd, short args, void *cbdata) { orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; ORTE_ACQUIRE_OBJECT(caddy); /* nothing to do here but move along */ ORTE_ACTIVATE_JOB_STATE(caddy->jdata, ORTE_JOB_STATE_ALLOCATE); OBJ_RELEASE(caddy); } void orte_plm_base_complete_setup(int fd, short args, void *cbdata) { orte_job_t *jdata, *jdatorted; orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; orte_node_t *node; uint32_t h; orte_vpid_t *vptr; int i, rc; char *serial_number; orte_process_name_t requestor, *rptr; ORTE_ACQUIRE_OBJECT(caddy); opal_output_verbose(5, orte_plm_base_framework.framework_output, "%s complete_setup on job %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(caddy->jdata->jobid)); /* bozo check */ if (ORTE_JOB_STATE_SYSTEM_PREP != caddy->job_state) { ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); OBJ_RELEASE(caddy); return; } /* update job state */ caddy->jdata->state = caddy->job_state; /* get the orted job data object */ if (NULL == (jdatorted = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid))) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); OBJ_RELEASE(caddy); return; } /* convenience */ jdata = caddy->jdata; /* If this job is being started by me, then there is nothing * further we need to do as any user directives (e.g., to tie * off IO to /dev/null) will have been included in the launch * message and the IOF knows how to handle any default situation. * However, if this is a proxy spawn request, then the spawner * might be a tool that wants IO forwarded to it. If that's the * situation, then the job object will contain an attribute * indicating that request */ if (orte_get_attribute(&jdata->attributes, ORTE_JOB_FWDIO_TO_TOOL, NULL, OPAL_BOOL)) { /* send a message to our IOF containing the requested pull */ rptr = &requestor; if (orte_get_attribute(&jdata->attributes, ORTE_JOB_LAUNCH_PROXY, (void**)&rptr, OPAL_NAME)) { ORTE_IOF_PROXY_PULL(jdata, rptr); } else { ORTE_IOF_PROXY_PULL(jdata, &jdata->originator); } /* the tool will PUSH its stdin, so nothing we need to do here * about stdin */ } /* if coprocessors were detected, now is the time to * identify who is attached to what host - this info * will be shipped to the daemons in the nidmap. Someday, * there may be a direct way for daemons on coprocessors * to detect their hosts - but not today. */ if (orte_coprocessors_detected) { /* cycle thru the nodes looking for coprocessors */ for (i=0; i < orte_node_pool->size; i++) { if (NULL == (node = (orte_node_t*)opal_pointer_array_get_item(orte_node_pool, i))) { continue; } /* if we don't have a serial number, then we are not a coprocessor */ serial_number = NULL; if (!orte_get_attribute(&node->attributes, ORTE_NODE_SERIAL_NUMBER, (void**)&serial_number, OPAL_STRING)) { continue; } if (NULL != serial_number) { /* if we have a serial number, then we are a coprocessor - so * compute our hash and lookup our hostid */ OPAL_HASH_STR(serial_number, h); free(serial_number); if (OPAL_SUCCESS != (rc = opal_hash_table_get_value_uint32(orte_coprocessors, h, (void**)&vptr))) { ORTE_ERROR_LOG(rc); break; } orte_set_attribute(&node->attributes, ORTE_NODE_HOSTID, ORTE_ATTR_LOCAL, vptr, ORTE_VPID); } } } /* done with the coprocessor mapping at this time */ if (NULL != orte_coprocessors) { OBJ_RELEASE(orte_coprocessors); } /* set the job state to the next position */ ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_LAUNCH_APPS); /* cleanup */ OBJ_RELEASE(caddy); } /* catch timeout to allow cmds to progress */ static void timer_cb(int fd, short event, void *cbdata) { orte_job_t *jdata = (orte_job_t*)cbdata; orte_timer_t *timer=NULL; ORTE_ACQUIRE_OBJECT(jdata); /* declare launch failed */ ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_FAILED_TO_START); /* free event */ if (orte_get_attribute(&jdata->attributes, ORTE_JOB_FAILURE_TIMER_EVENT, (void**)&timer, OPAL_PTR)) { /* timer is an orte_timer_t object */ OBJ_RELEASE(timer); orte_remove_attribute(&jdata->attributes, ORTE_JOB_FAILURE_TIMER_EVENT); } } void orte_plm_base_launch_apps(int fd, short args, void *cbdata) { orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; orte_job_t *jdata; orte_daemon_cmd_flag_t command; int rc; ORTE_ACQUIRE_OBJECT(caddy); /* convenience */ jdata = caddy->jdata; if (ORTE_JOB_STATE_LAUNCH_APPS != caddy->job_state) { ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); OBJ_RELEASE(caddy); return; } /* update job state */ caddy->jdata->state = caddy->job_state; OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output, "%s plm:base:launch_apps for job %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jdata->jobid))); /* pack the appropriate add_local_procs command */ if (orte_get_attribute(&jdata->attributes, ORTE_JOB_FIXED_DVM, NULL, OPAL_BOOL)) { command = ORTE_DAEMON_DVM_ADD_PROCS; } else { command = ORTE_DAEMON_ADD_LOCAL_PROCS; } if (ORTE_SUCCESS != (rc = opal_dss.pack(&jdata->launch_msg, &command, 1, ORTE_DAEMON_CMD))) { ORTE_ERROR_LOG(rc); ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); OBJ_RELEASE(caddy); return; } /* get the local launcher's required data */ if (ORTE_SUCCESS != (rc = orte_odls.get_add_procs_data(&jdata->launch_msg, jdata->jobid))) { ORTE_ERROR_LOG(rc); ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); } OBJ_RELEASE(caddy); return; } void orte_plm_base_send_launch_msg(int fd, short args, void *cbdata) { orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; orte_timer_t *timer; orte_grpcomm_signature_t *sig; orte_job_t *jdata; int rc; /* convenience */ jdata = caddy->jdata; OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output, "%s plm:base:send launch msg for job %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jdata->jobid))); /* if we don't want to launch the apps, now is the time to leave */ if (orte_do_not_launch) { bool compressed; uint8_t *cmpdata; size_t cmplen; /* report the size of the launch message */ compressed = orte_util_compress_block((uint8_t*)jdata->launch_msg.base_ptr, jdata->launch_msg.bytes_used, &cmpdata, &cmplen); if (compressed) { opal_output(0, "LAUNCH MSG RAW SIZE: %d COMPRESSED SIZE: %d", (int)jdata->launch_msg.bytes_used, (int)cmplen); free(cmpdata); } else { opal_output(0, "LAUNCH MSG RAW SIZE: %d", (int)jdata->launch_msg.bytes_used); } orte_never_launched = true; ORTE_FORCED_TERMINATE(0); OBJ_RELEASE(caddy); return; } /* goes to all daemons */ sig = OBJ_NEW(orte_grpcomm_signature_t); sig->signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t)); sig->signature[0].jobid = ORTE_PROC_MY_NAME->jobid; sig->signature[0].vpid = ORTE_VPID_WILDCARD; sig->sz = 1; if (ORTE_SUCCESS != (rc = orte_grpcomm.xcast(sig, ORTE_RML_TAG_DAEMON, &jdata->launch_msg))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(sig); ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); OBJ_RELEASE(caddy); return; } OBJ_DESTRUCT(&jdata->launch_msg); OBJ_CONSTRUCT(&jdata->launch_msg, opal_buffer_t); /* maintain accounting */ OBJ_RELEASE(sig); /* track that we automatically are considered to have reported - used * only to report launch progress */ caddy->jdata->num_daemons_reported++; /* if requested, setup a timer - if we don't launch within the * defined time, then we know things have failed */ if (0 < orte_startup_timeout) { OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output, "%s plm:base:launch defining timeout for job %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jdata->jobid))); timer = OBJ_NEW(orte_timer_t); timer->payload = jdata; opal_event_evtimer_set(orte_event_base, timer->ev, timer_cb, jdata); opal_event_set_priority(timer->ev, ORTE_ERROR_PRI); timer->tv.tv_sec = orte_startup_timeout; timer->tv.tv_usec = 0; orte_set_attribute(&jdata->attributes, ORTE_JOB_FAILURE_TIMER_EVENT, ORTE_ATTR_LOCAL, timer, OPAL_PTR); ORTE_POST_OBJECT(timer); opal_event_evtimer_add(timer->ev, &timer->tv); } /* cleanup */ OBJ_RELEASE(caddy); } void orte_plm_base_post_launch(int fd, short args, void *cbdata) { int32_t rc; orte_job_t *jdata; orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; orte_process_name_t name; orte_timer_t *timer=NULL; int ret; opal_buffer_t *answer; int room, *rmptr; ORTE_ACQUIRE_OBJECT(caddy); /* convenience */ jdata = caddy->jdata; /* if a timer was defined, cancel it */ if (orte_get_attribute(&jdata->attributes, ORTE_JOB_FAILURE_TIMER_EVENT, (void**)&timer, OPAL_PTR)) { opal_event_evtimer_del(timer->ev); OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output, "%s plm:base:launch deleting timeout for job %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jdata->jobid))); OBJ_RELEASE(timer); orte_remove_attribute(&jdata->attributes, ORTE_JOB_FAILURE_TIMER_EVENT); } if (ORTE_JOB_STATE_RUNNING != caddy->job_state) { ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); OBJ_RELEASE(caddy); return; } /* update job state */ caddy->jdata->state = caddy->job_state; /* complete wiring up the iof */ OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output, "%s plm:base:launch wiring up iof for job %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jdata->jobid))); /* push stdin - the IOF will know what to do with the specified target */ name.jobid = jdata->jobid; name.vpid = jdata->stdin_target; if (ORTE_SUCCESS != (rc = orte_iof.push(&name, ORTE_IOF_STDIN, 0))) { ORTE_ERROR_LOG(rc); ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); OBJ_RELEASE(caddy); return; } /* if this isn't a dynamic spawn, just cleanup */ if (ORTE_JOBID_INVALID == jdata->originator.jobid) { OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output, "%s plm:base:launch job %s is not a dynamic spawn", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jdata->jobid))); goto cleanup; } /* prep the response */ rc = ORTE_SUCCESS; answer = OBJ_NEW(opal_buffer_t); /* pack the status */ if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &rc, 1, OPAL_INT32))) { ORTE_ERROR_LOG(ret); ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); OBJ_RELEASE(caddy); return; } /* pack the jobid */ if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &jdata->jobid, 1, ORTE_JOBID))) { ORTE_ERROR_LOG(ret); ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); OBJ_RELEASE(caddy); return; } /* pack the room number */ rmptr = &room; if (orte_get_attribute(&jdata->attributes, ORTE_JOB_ROOM_NUM, (void**)&rmptr, OPAL_INT)) { if (ORTE_SUCCESS != (ret = opal_dss.pack(answer, &room, 1, OPAL_INT))) { ORTE_ERROR_LOG(ret); ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); OBJ_RELEASE(caddy); return; } } OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output, "%s plm:base:launch sending dyn release of job %s to %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jdata->jobid), ORTE_NAME_PRINT(&jdata->originator))); if (0 > (ret = orte_rml.send_buffer_nb(orte_mgmt_conduit, &jdata->originator, answer, ORTE_RML_TAG_LAUNCH_RESP, orte_rml_send_callback, NULL))) { ORTE_ERROR_LOG(ret); OBJ_RELEASE(answer); ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); OBJ_RELEASE(caddy); return; } cleanup: /* cleanup */ OBJ_RELEASE(caddy); } void orte_plm_base_registered(int fd, short args, void *cbdata) { orte_job_t *jdata; orte_state_caddy_t *caddy = (orte_state_caddy_t*)cbdata; ORTE_ACQUIRE_OBJECT(caddy); /* convenience */ jdata = caddy->jdata; OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output, "%s plm:base:launch %s registered", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jdata->jobid))); if (ORTE_JOB_STATE_REGISTERED != caddy->job_state) { OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output, "%s plm:base:launch job %s not registered - state %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jdata->jobid), orte_job_state_to_str(caddy->job_state))); ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); OBJ_RELEASE(caddy); return; } /* update job state */ jdata->state = caddy->job_state; /* if this wasn't a debugger job, then need to init_after_spawn for debuggers */ if (!ORTE_FLAG_TEST(jdata, ORTE_JOB_FLAG_DEBUGGER_DAEMON)) { ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_READY_FOR_DEBUGGERS); } OBJ_RELEASE(caddy); } /* daemons callback when they start - need to listen for them */ static bool orted_failed_launch; static orte_job_t *jdatorted=NULL; /* callback for topology reports */ void orte_plm_base_daemon_topology(int status, orte_process_name_t* sender, opal_buffer_t *buffer, orte_rml_tag_t tag, void *cbdata) { hwloc_topology_t topo; int rc, idx; char *sig, *coprocessors, **sns; orte_proc_t *daemon=NULL; orte_topology_t *t, *t2; int i; uint32_t h; orte_job_t *jdata; uint8_t flag; size_t inlen, cmplen; uint8_t *packed_data, *cmpdata; opal_buffer_t datbuf, *data; OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output, "%s plm:base:daemon_topology recvd for daemon %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(sender))); /* get the daemon job, if necessary */ if (NULL == jdatorted) { jdatorted = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid); } if (NULL == (daemon = (orte_proc_t*)opal_pointer_array_get_item(jdatorted->procs, sender->vpid))) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); orted_failed_launch = true; goto CLEANUP; } OBJ_CONSTRUCT(&datbuf, opal_buffer_t); /* unpack the flag to see if this payload is compressed */ idx=1; if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &flag, &idx, OPAL_INT8))) { ORTE_ERROR_LOG(rc); orted_failed_launch = true; goto CLEANUP; } if (flag) { /* unpack the data size */ idx=1; if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &inlen, &idx, OPAL_SIZE))) { ORTE_ERROR_LOG(rc); orted_failed_launch = true; goto CLEANUP; } /* unpack the unpacked data size */ idx=1; if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &cmplen, &idx, OPAL_SIZE))) { ORTE_ERROR_LOG(rc); orted_failed_launch = true; goto CLEANUP; } /* allocate the space */ packed_data = (uint8_t*)malloc(inlen); /* unpack the data blob */ idx = inlen; if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, packed_data, &idx, OPAL_UINT8))) { ORTE_ERROR_LOG(rc); orted_failed_launch = true; goto CLEANUP; } /* decompress the data */ if (orte_util_uncompress_block(&cmpdata, cmplen, packed_data, inlen)) { /* the data has been uncompressed */ opal_dss.load(&datbuf, cmpdata, cmplen); data = &datbuf; } else { data = buffer; } free(packed_data); } else { data = buffer; } /* unpack the topology signature for this node */ idx=1; if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &sig, &idx, OPAL_STRING))) { ORTE_ERROR_LOG(rc); orted_failed_launch = true; goto CLEANUP; } /* find it in the array */ t = NULL; for (i=0; i < orte_node_topologies->size; i++) { if (NULL == (t2 = (orte_topology_t*)opal_pointer_array_get_item(orte_node_topologies, i))) { continue; } /* just check the signature */ if (0 == strcmp(sig, t2->sig)) { t = t2; break; } } if (NULL == t) { /* should never happen */ ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); orted_failed_launch = true; goto CLEANUP; } /* unpack the topology */ idx=1; if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &topo, &idx, OPAL_HWLOC_TOPO))) { ORTE_ERROR_LOG(rc); orted_failed_launch = true; goto CLEANUP; } /* Apply any CPU filters (not preserved by the XML) */ opal_hwloc_base_filter_cpus(topo); /* record the final topology */ t->topo = topo; /* unpack any coprocessors */ idx=1; if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &coprocessors, &idx, OPAL_STRING))) { ORTE_ERROR_LOG(rc); orted_failed_launch = true; goto CLEANUP; } if (NULL != coprocessors) { /* init the hash table, if necessary */ if (NULL == orte_coprocessors) { orte_coprocessors = OBJ_NEW(opal_hash_table_t); opal_hash_table_init(orte_coprocessors, orte_process_info.num_procs); } /* separate the serial numbers of the coprocessors * on this host */ sns = opal_argv_split(coprocessors, ','); for (idx=0; NULL != sns[idx]; idx++) { /* compute the hash */ OPAL_HASH_STR(sns[idx], h); /* mark that this coprocessor is hosted by this node */ opal_hash_table_set_value_uint32(orte_coprocessors, h, (void*)&daemon->name.vpid); } opal_argv_free(sns); free(coprocessors); orte_coprocessors_detected = true; } /* see if this daemon is on a coprocessor */ idx=1; if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &coprocessors, &idx, OPAL_STRING))) { ORTE_ERROR_LOG(rc); orted_failed_launch = true; goto CLEANUP; } if (NULL != coprocessors) { if (orte_get_attribute(&daemon->node->attributes, ORTE_NODE_SERIAL_NUMBER, NULL, OPAL_STRING)) { /* this is not allowed - a coprocessor cannot be host * to another coprocessor at this time */ ORTE_ERROR_LOG(ORTE_ERR_NOT_SUPPORTED); orted_failed_launch = true; free(coprocessors); goto CLEANUP; } orte_set_attribute(&daemon->node->attributes, ORTE_NODE_SERIAL_NUMBER, ORTE_ATTR_LOCAL, coprocessors, OPAL_STRING); free(coprocessors); orte_coprocessors_detected = true; } CLEANUP: OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output, "%s plm:base:orted:report_topo launch %s for daemon %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), orted_failed_launch ? "failed" : "completed", ORTE_NAME_PRINT(sender))); if (orted_failed_launch) { ORTE_ACTIVATE_JOB_STATE(jdatorted, ORTE_JOB_STATE_FAILED_TO_START); return; } else { jdatorted->num_reported++; OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output, "%s plm:base:orted_report_launch recvd %d of %d reported daemons", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), jdatorted->num_reported, jdatorted->num_procs)); if (jdatorted->num_procs == jdatorted->num_reported) { bool dvm = true; uint32_t key; void *nptr; jdatorted->state = ORTE_JOB_STATE_DAEMONS_REPORTED; /* activate the daemons_reported state for all jobs * whose daemons were launched */ rc = opal_hash_table_get_first_key_uint32(orte_job_data, &key, (void **)&jdata, &nptr); while (OPAL_SUCCESS == rc) { if (ORTE_PROC_MY_NAME->jobid != jdata->jobid) { dvm = false; if (ORTE_JOB_STATE_DAEMONS_LAUNCHED == jdata->state) { ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_DAEMONS_REPORTED); } } rc = opal_hash_table_get_next_key_uint32(orte_job_data, &key, (void **)&jdata, nptr, &nptr); } if (dvm) { /* must be launching a DVM - activate the state */ ORTE_ACTIVATE_JOB_STATE(jdatorted, ORTE_JOB_STATE_DAEMONS_REPORTED); } } } } void orte_plm_base_daemon_callback(int status, orte_process_name_t* sender, opal_buffer_t *buffer, orte_rml_tag_t tag, void *cbdata) { char *ptr; int rc, idx; orte_proc_t *daemon=NULL; orte_job_t *jdata; orte_process_name_t dname; opal_buffer_t *relay; char *sig; orte_topology_t *t; hwloc_topology_t topo; int i; bool found; orte_daemon_cmd_flag_t cmd; int32_t flag; opal_value_t *kv; char *myendian; /* get the daemon job, if necessary */ if (NULL == jdatorted) { jdatorted = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid); } /* get my endianness */ t = (orte_topology_t*)opal_pointer_array_get_item(orte_node_topologies, 0); if (NULL == t) { /* should never happen */ myendian = "unknown"; } else { myendian = strrchr(t->sig, ':'); ++myendian; } /* multiple daemons could be in this buffer, so unpack until we exhaust the data */ idx = 1; while (OPAL_SUCCESS == (rc = opal_dss.unpack(buffer, &dname, &idx, ORTE_NAME))) { char *nodename = NULL; OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output, "%s plm:base:orted_report_launch from daemon %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&dname))); /* update state and record for this daemon contact info */ if (NULL == (daemon = (orte_proc_t*)opal_pointer_array_get_item(jdatorted->procs, dname.vpid))) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); orted_failed_launch = true; goto CLEANUP; } daemon->state = ORTE_PROC_STATE_RUNNING; /* record that this daemon is alive */ ORTE_FLAG_SET(daemon, ORTE_PROC_FLAG_ALIVE); /* unpack the flag indicating the number of connection blobs * in the report */ idx = 1; if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &flag, &idx, OPAL_INT32))) { ORTE_ERROR_LOG(rc); orted_failed_launch = true; goto CLEANUP; } for (i=0; i < flag; i++) { idx = 1; if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &kv, &idx, OPAL_VALUE))) { ORTE_ERROR_LOG(rc); orted_failed_launch = true; goto CLEANUP; } /* store this in a daemon wireup buffer for later distribution */ opal_pmix.store_local(&dname, kv); OBJ_RELEASE(kv); } /* unpack the node name */ idx = 1; if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &nodename, &idx, OPAL_STRING))) { ORTE_ERROR_LOG(rc); orted_failed_launch = true; goto CLEANUP; } if (!orte_have_fqdn_allocation) { /* remove any domain info */ if (NULL != (ptr = strchr(nodename, '.'))) { *ptr = '\0'; ptr = strdup(nodename); free(nodename); nodename = ptr; } } OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output, "%s plm:base:orted_report_launch from daemon %s on node %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&daemon->name), nodename)); /* mark the daemon as launched */ ORTE_FLAG_SET(daemon->node, ORTE_NODE_FLAG_DAEMON_LAUNCHED); if (orte_retain_aliases) { char *alias, **atmp=NULL; uint8_t naliases, ni; /* first, store the nodename itself as an alias. We do * this in case the nodename isn't the same as what we * were given by the allocation. For example, a hostfile * might contain an IP address instead of the value returned * by gethostname, yet the daemon will have returned the latter * and apps may refer to the host by that name */ opal_argv_append_nosize(&atmp, nodename); /* unpack and store the provided aliases */ idx = 1; if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &naliases, &idx, OPAL_UINT8))) { ORTE_ERROR_LOG(rc); orted_failed_launch = true; goto CLEANUP; } for (ni=0; ni < naliases; ni++) { idx = 1; if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &alias, &idx, OPAL_STRING))) { ORTE_ERROR_LOG(rc); orted_failed_launch = true; goto CLEANUP; } opal_argv_append_nosize(&atmp, alias); free(alias); } if (0 < naliases) { alias = opal_argv_join(atmp, ','); orte_set_attribute(&daemon->node->attributes, ORTE_NODE_ALIAS, ORTE_ATTR_LOCAL, alias, OPAL_STRING); free(alias); } opal_argv_free(atmp); } /* unpack the topology signature for that node */ idx=1; if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &sig, &idx, OPAL_STRING))) { ORTE_ERROR_LOG(rc); orted_failed_launch = true; goto CLEANUP; } OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output, "%s RECEIVED TOPOLOGY SIG %s FROM NODE %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), sig, nodename)); /* rank=1 always sends its topology back */ topo = NULL; if (1 == dname.vpid) { uint8_t flag; size_t inlen, cmplen; uint8_t *packed_data, *cmpdata; opal_buffer_t datbuf, *data; OBJ_CONSTRUCT(&datbuf, opal_buffer_t); /* unpack the flag to see if this payload is compressed */ idx=1; if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &flag, &idx, OPAL_INT8))) { ORTE_ERROR_LOG(rc); orted_failed_launch = true; goto CLEANUP; } if (flag) { /* unpack the data size */ idx=1; if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &inlen, &idx, OPAL_SIZE))) { ORTE_ERROR_LOG(rc); orted_failed_launch = true; goto CLEANUP; } /* unpack the unpacked data size */ idx=1; if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, &cmplen, &idx, OPAL_SIZE))) { ORTE_ERROR_LOG(rc); orted_failed_launch = true; goto CLEANUP; } /* allocate the space */ packed_data = (uint8_t*)malloc(inlen); /* unpack the data blob */ idx = inlen; if (ORTE_SUCCESS != (rc = opal_dss.unpack(buffer, packed_data, &idx, OPAL_UINT8))) { ORTE_ERROR_LOG(rc); orted_failed_launch = true; goto CLEANUP; } /* decompress the data */ if (orte_util_uncompress_block(&cmpdata, cmplen, packed_data, inlen)) { /* the data has been uncompressed */ opal_dss.load(&datbuf, cmpdata, cmplen); data = &datbuf; } else { data = buffer; } free(packed_data); } else { data = buffer; } idx=1; if (OPAL_SUCCESS != (rc = opal_dss.unpack(data, &topo, &idx, OPAL_HWLOC_TOPO))) { ORTE_ERROR_LOG(rc); orted_failed_launch = true; goto CLEANUP; } } /* do we already have this topology from some other node? */ found = false; for (i=0; i < orte_node_topologies->size; i++) { if (NULL == (t = (orte_topology_t*)opal_pointer_array_get_item(orte_node_topologies, i))) { continue; } /* just check the signature */ if (0 == strcmp(sig, t->sig)) { OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output, "%s TOPOLOGY ALREADY RECORDED", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); found = true; daemon->node->topology = t; if (NULL != topo) { hwloc_topology_destroy(topo); } free(sig); break; } #if !OPAL_ENABLE_HETEROGENEOUS_SUPPORT else { /* check if the difference is due to the endianness */ ptr = strrchr(sig, ':'); ++ptr; if (0 != strcmp(ptr, myendian)) { /* we don't currently handle multi-endian operations in the * MPI support */ orte_show_help("help-plm-base", "multi-endian", true, nodename, ptr, myendian); orted_failed_launch = true; if (NULL != topo) { hwloc_topology_destroy(topo); } goto CLEANUP; } } #endif } if (!found) { /* nope - save the signature and request the complete topology from that node */ OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output, "%s NEW TOPOLOGY - ADDING", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); t = OBJ_NEW(orte_topology_t); t->sig = sig; opal_pointer_array_add(orte_node_topologies, t); daemon->node->topology = t; if (NULL != topo) { /* Apply any CPU filters (not preserved by the XML) */ opal_hwloc_base_filter_cpus(topo); t->topo = topo; } else { /* nope - save the signature and request the complete topology from that node */ OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output, "%s REQUESTING TOPOLOGY FROM %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_NAME_PRINT(&dname))); /* construct the request */ relay = OBJ_NEW(opal_buffer_t); cmd = ORTE_DAEMON_REPORT_TOPOLOGY_CMD; if (OPAL_SUCCESS != (rc = opal_dss.pack(relay, &cmd, 1, ORTE_DAEMON_CMD))) { ORTE_ERROR_LOG(rc); OBJ_RELEASE(relay); orted_failed_launch = true; goto CLEANUP; } /* send it */ orte_rml.send_buffer_nb(orte_mgmt_conduit, &dname, relay, ORTE_RML_TAG_DAEMON, orte_rml_send_callback, NULL); /* we will count this node as completed * when we get the full topology back */ if (NULL != nodename) { free(nodename); nodename = NULL; } idx = 1; continue; } } CLEANUP: OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output, "%s plm:base:orted_report_launch %s for daemon %s at contact %s", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), orted_failed_launch ? "failed" : "completed", ORTE_NAME_PRINT(&dname), (NULL == daemon) ? "UNKNOWN" : daemon->rml_uri)); if (NULL != nodename) { free(nodename); nodename = NULL; } if (orted_failed_launch) { ORTE_ACTIVATE_JOB_STATE(jdatorted, ORTE_JOB_STATE_FAILED_TO_START); return; } else { jdatorted->num_reported++; OPAL_OUTPUT_VERBOSE((5, orte_plm_base_framework.framework_output, "%s plm:base:orted_report_launch job %s recvd %d of %d reported daemons", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), ORTE_JOBID_PRINT(jdatorted->jobid), jdatorted->num_reported, jdatorted->num_procs)); if (jdatorted->num_procs == jdatorted->num_reported) { bool dvm = true; uint32_t key; void *nptr; jdatorted->state = ORTE_JOB_STATE_DAEMONS_REPORTED; /* activate the daemons_reported state for all jobs * whose daemons were launched */ rc = opal_hash_table_get_first_key_uint32(orte_job_data, &key, (void **)&jdata, &nptr); while (OPAL_SUCCESS == rc) { if (ORTE_PROC_MY_NAME->jobid == jdata->jobid) { goto next; } dvm = false; if (ORTE_JOB_STATE_DAEMONS_LAUNCHED == jdata->state) { ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_DAEMONS_REPORTED); } next: rc = opal_hash_table_get_next_key_uint32(orte_job_data, &key, (void **)&jdata, nptr, &nptr); } if (dvm) { /* must be launching a DVM - activate the state */ ORTE_ACTIVATE_JOB_STATE(jdatorted, ORTE_JOB_STATE_DAEMONS_REPORTED); } } } idx = 1; } if (ORTE_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { ORTE_ERROR_LOG(rc); ORTE_ACTIVATE_JOB_STATE(jdatorted, ORTE_JOB_STATE_FAILED_TO_START); } } void orte_plm_base_daemon_failed(int st, orte_process_name_t* sender, opal_buffer_t *buffer, orte_rml_tag_t tag, void *cbdata) { int status, rc; int32_t n; orte_vpid_t vpid; orte_proc_t *daemon=NULL; /* get the daemon job, if necessary */ if (NULL == jdatorted) { jdatorted = orte_get_job_data_object(ORTE_PROC_MY_NAME->jobid); } /* unpack the daemon that failed */ n=1; if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &vpid, &n, ORTE_VPID))) { ORTE_ERROR_LOG(rc); ORTE_UPDATE_EXIT_STATUS(ORTE_ERROR_DEFAULT_EXIT_CODE); goto finish; } /* unpack the exit status */ n=1; if (OPAL_SUCCESS != (rc = opal_dss.unpack(buffer, &status, &n, OPAL_INT))) { ORTE_ERROR_LOG(rc); status = ORTE_ERROR_DEFAULT_EXIT_CODE; ORTE_UPDATE_EXIT_STATUS(ORTE_ERROR_DEFAULT_EXIT_CODE); } else { ORTE_UPDATE_EXIT_STATUS(WEXITSTATUS(status)); } /* find the daemon and update its state/status */ if (NULL == (daemon = (orte_proc_t*)opal_pointer_array_get_item(jdatorted->procs, vpid))) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); goto finish; } daemon->state = ORTE_PROC_STATE_FAILED_TO_START; daemon->exit_code = status; finish: if (NULL == daemon) { ORTE_FORCED_TERMINATE(ORTE_ERROR_DEFAULT_EXIT_CODE); return; } ORTE_ACTIVATE_PROC_STATE(&daemon->name, ORTE_PROC_STATE_FAILED_TO_START); } int orte_plm_base_setup_orted_cmd(int *argc, char ***argv) { int i, loc; char **tmpv; /* set default location to be 0, indicating that * only a single word is in the cmd */ loc = 0; /* split the command apart in case it is multi-word */ tmpv = opal_argv_split(orte_launch_agent, ' '); for (i = 0; NULL != tmpv && NULL != tmpv[i]; ++i) { if (0 == strcmp(tmpv[i], "orted")) { loc = i; } opal_argv_append(argc, argv, tmpv[i]); } opal_argv_free(tmpv); return loc; } /* pass all options as MCA params so anything we pickup * from the environment can be checked for duplicates */ int orte_plm_base_orted_append_basic_args(int *argc, char ***argv, char *ess, int *proc_vpid_index) { char *param = NULL; const char **tmp_value, **tmp_value2; int loc_id; char *tmp_force = NULL; int i, j, cnt, rc; orte_job_t *jdata; unsigned long num_procs; bool ignore; /* check for debug flags */ if (orte_debug_flag) { opal_argv_append(argc, argv, "-"OPAL_MCA_CMD_LINE_ID); opal_argv_append(argc, argv, "orte_debug"); opal_argv_append(argc, argv, "1"); } if (orte_debug_daemons_flag) { opal_argv_append(argc, argv, "-"OPAL_MCA_CMD_LINE_ID); opal_argv_append(argc, argv, "orte_debug_daemons"); opal_argv_append(argc, argv, "1"); } if (orte_debug_daemons_file_flag) { opal_argv_append(argc, argv, "-"OPAL_MCA_CMD_LINE_ID); opal_argv_append(argc, argv, "orte_debug_daemons_file"); opal_argv_append(argc, argv, "1"); } if (orte_leave_session_attached) { opal_argv_append(argc, argv, "-"OPAL_MCA_CMD_LINE_ID); opal_argv_append(argc, argv, "orte_leave_session_attached"); opal_argv_append(argc, argv, "1"); } if (orted_spin_flag) { opal_argv_append(argc, argv, "--spin"); } if (opal_hwloc_report_bindings) { opal_argv_append(argc, argv, "-"OPAL_MCA_CMD_LINE_ID); opal_argv_append(argc, argv, "orte_report_bindings"); opal_argv_append(argc, argv, "1"); } if (orte_map_stddiag_to_stderr) { opal_argv_append(argc, argv, "-"OPAL_MCA_CMD_LINE_ID); opal_argv_append(argc, argv, "orte_map_stddiag_to_stderr"); opal_argv_append(argc, argv, "1"); } else if (orte_map_stddiag_to_stdout) { opal_argv_append(argc, argv, "-"OPAL_MCA_CMD_LINE_ID); opal_argv_append(argc, argv, "orte_map_stddiag_to_stdout"); opal_argv_append(argc, argv, "1"); } /* the following is not an mca param */ if (NULL != getenv("ORTE_TEST_ORTED_SUICIDE")) { opal_argv_append(argc, argv, "--test-suicide"); } /* tell the orted what ESS component to use */ if (NULL != ess) { opal_argv_append(argc, argv, "-"OPAL_MCA_CMD_LINE_ID); opal_argv_append(argc, argv, "ess"); opal_argv_append(argc, argv, ess); } /* pass the daemon jobid */ opal_argv_append(argc, argv, "-"OPAL_MCA_CMD_LINE_ID); opal_argv_append(argc, argv, "ess_base_jobid"); if (ORTE_SUCCESS != (rc = orte_util_convert_jobid_to_string(¶m, ORTE_PROC_MY_NAME->jobid))) { ORTE_ERROR_LOG(rc); return rc; } opal_argv_append(argc, argv, param); free(param); /* setup to pass the vpid */ if (NULL != proc_vpid_index) { opal_argv_append(argc, argv, "-"OPAL_MCA_CMD_LINE_ID); opal_argv_append(argc, argv, "ess_base_vpid"); *proc_vpid_index = *argc; opal_argv_append(argc, argv, "