Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 31 additions & 8 deletions src/resmom/mom_comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -261,20 +261,36 @@ int task_save(
int fds;
int i;
int TaskID = 0;
char namebuf[MAXPATHLEN];
char portname[MAXPATHLEN];
char namebuf[MAXPATHLEN + 1];
char portname[MAXPATHLEN + 1];
int openflags;

strcpy(namebuf, path_jobs); /* job directory path */
strcat(namebuf, pjob->ji_qs.ji_fileprefix);
if (ptask == NULL)
{
log_err(PBSE_BAD_PARAMETER, __func__, "NULL input pointer");
return(PBSE_BAD_PARAMETER);
}

pjob = ptask->ti_job;

if (pjob == NULL)
{
log_err(PBSE_BAD_PARAMETER, __func__, "NULL pointer to owning job");
return(PBSE_BAD_PARAMETER);
}

strncpy(namebuf, path_jobs, sizeof(namebuf) - 1); /* job directory path */
strncat(namebuf, pjob->ji_qs.ji_fileprefix, sizeof(namebuf) - 1); /*TODO: think about stncats third arguments*/

if (multi_mom)
{
sprintf(portname, "%d", pbs_rm_port);
strcat(namebuf, portname);
/*TODO: do we have actually snprintf*/
/*snprintf(portname, sizeof(portname), "%d", pbs_rm_port);*/
strncat(namebuf, portname, sizeof(namebuf) - 1);
}

strcat(namebuf, JOB_TASKDIR_SUFFIX);
strncat(namebuf, JOB_TASKDIR_SUFFIX, sizeof(namebuf) - 1);

openflags = O_WRONLY | O_CREAT | O_Sync;

Expand Down Expand Up @@ -1457,6 +1473,11 @@ int check_ms(
}

np = pjob->ji_hosts;
if (pjob->ji_hosts == NULL)
{
log_err(PBSE_BAD_PARAMETER, __func__, "NULL ptr to job host management stuff");
return(PBSE_BAD_PARAMETER);
}
ipaddr_ms = ntohl(((struct sockaddr_in *)(&np->sock_addr))->sin_addr.s_addr);

/* make sure the ip addresses match */
Expand Down Expand Up @@ -2131,7 +2152,9 @@ int im_join_job_as_sister(
return(IM_FAILURE);
}
else
{
return(IM_DONE);
}
}

pjob->ji_numnodes = nodenum; /* XXX */
Expand Down Expand Up @@ -4896,7 +4919,9 @@ void im_request(
case IM_KILL_JOB:
{
if (check_ms(chan, pjob) == FALSE)
{
im_kill_job_as_sister(pjob,event,momport,FALSE);
}
close_conn(chan->sock, FALSE);
svr_conn[chan->sock].cn_stay_open = FALSE;
chan->sock = -1;
Expand Down Expand Up @@ -7090,8 +7115,6 @@ int tm_request(

extern struct connection svr_conn[];

int start_process(task *ptask, char **argv, char **envp);

if (svr_conn[chan->sock].cn_addr != localaddr)
{
sprintf(log_buffer, "non-local connect");
Expand Down
116 changes: 61 additions & 55 deletions src/resmom/mom_comm.h
Original file line number Diff line number Diff line change
@@ -1,136 +1,142 @@
#ifndef _MOM_COMM_H
#define _MOM_COMM_H
#include "license_pbs.h" /* See here for the software license */

#include "pbs_job.h" /* task, hnodent, tm_task_id, job, eventent, fwdevent */
#include "tm_.h" /* tm_event_t */
#include "sys/socket.h" /* sockaddr_in */
#include "resource.h" /* resource */
#include "tcp.h" /* tcp_chan */

int task_save(task *ptask);
/* Forward declarations */
struct job;
struct task;
struct eventent;
struct hnodent;
struct fwdevent;
struct infoent;
struct sockaddr_in;
struct resource;
struct tcp_chan;

int task_save(struct task *ptask);

eventent *event_alloc(int command, hnodent *pnode, tm_event_t event, tm_task_id taskid);
struct eventent *event_alloc(int command, struct hnodent *pnode, tm_event_t event, tm_task_id taskid);

task *pbs_task_create(job *pjob, tm_task_id taskid);
struct task *pbs_task_create(struct job *pjob, tm_task_id taskid);

task *task_find(job *pjob, tm_task_id taskid);
struct task *task_find(struct job *pjob, tm_task_id taskid);

task *task_check(job *pjob, tm_task_id taskid);
struct task *task_check(struct job *pjob, tm_task_id taskid);

int task_recov(job *pjob);
int task_recov(struct job *pjob);

int tm_reply(struct tcp_chan *chan, int com, tm_event_t event);

int im_compose(struct tcp_chan *chan, char *jobid, char *cookie, int command, tm_event_t event, tm_task_id taskid);

int send_sisters(job *pjob, int com, int using_radix);
int send_sisters(struct job *pjob, int com, int using_radix);

hnodent *find_node(job *pjob, int stream, tm_node_id nodeid);
struct hnodent *find_node(struct job *pjob, int stream, tm_node_id nodeid);

void job_start_error(job *pjob, int code, char *nodename);
void job_start_error(struct job *pjob, int code, char *nodename);

void arrayfree(char **array);

void node_bailout(job *pjob, hnodent *np);
void node_bailout(struct job *pjob, struct hnodent *np);

void term_job(job *pjob);
void term_job(struct job *pjob);

void im_eof(int stream, int ret);

int check_ms(struct tcp_chan *chan, job *pjob);
int check_ms(struct tcp_chan *chan, struct job *pjob);

u_long resc_used(job *pjob, const char *name, u_long(*func)(resource *));
u_long resc_used(struct job *pjob, const char *name, u_long(*func)(struct resource *));

infoent *task_findinfo(task *ptask, char *name);
struct infoent *task_findinfo(struct task *ptask, char *name);

void task_saveinfo(task *ptask, char *name, void *info, size_t len);
void task_saveinfo(struct task *ptask, char *name, void *info, size_t len);

char *resc_string(job *pjob);
char *resc_string(struct job *pjob);

int contact_sisters(job *pjob, tm_event_t parent_event, int sister_count, char *radix_hosts, char *radix_ports);
int contact_sisters(struct job *pjob, tm_event_t parent_event, int sister_count, char *radix_hosts, char *radix_ports);

void send_im_error(int err, int reply, job *pjob, char *cookie, tm_event_t event, tm_task_id fromtask);
void send_im_error(int err, int reply, struct job *pjob, char *cookie, tm_event_t event, tm_task_id fromtask);

int im_join_job_as_sister(struct tcp_chan *chan, char *jobid, struct sockaddr_in *addr, char *cookie, tm_event_t event, int fromtask, int command, int job_radix);

void im_kill_job_as_sister(job *pjob, tm_event_t event, unsigned int momport, int radix);
void im_kill_job_as_sister(struct job *pjob, tm_event_t event, unsigned int momport, int radix);

int im_spawn_task(struct tcp_chan *chan, char *cookie, tm_event_t event, struct sockaddr_in *addr, tm_task_id fromtask, job *pjob);
int im_spawn_task(struct tcp_chan *chan, char *cookie, tm_event_t event, struct sockaddr_in *addr, tm_task_id fromtask, struct job *pjob);

int im_signal_task(struct tcp_chan *chan, job *pjob, char *cookie, tm_event_t event, tm_task_id fromtask);
int im_signal_task(struct tcp_chan *chan, struct job *pjob, char *cookie, tm_event_t event, tm_task_id fromtask);

int im_obit_task(struct tcp_chan *chan, job *pjob, char *cookie, tm_event_t event, tm_task_id fromtask);
int im_obit_task(struct tcp_chan *chan, struct job *pjob, char *cookie, tm_event_t event, tm_task_id fromtask);

int im_get_info(struct tcp_chan *chan, job *pjob, char *cookie, tm_event_t event, tm_task_id fromtask);
int im_get_info(struct tcp_chan *chan, struct job *pjob, char *cookie, tm_event_t event, tm_task_id fromtask);

int im_get_resc_as_sister(struct tcp_chan *chan, job *pjob, char *cookie, tm_event_t event, tm_task_id fromtask);
int im_get_resc_as_sister(struct tcp_chan *chan, struct job *pjob, char *cookie, tm_event_t event, tm_task_id fromtask);

int get_reply_stream(job *pjob);
int get_reply_stream(struct job *pjob);

int get_radix_reply_stream(job *pjob);
int get_radix_reply_stream(struct job *pjob);

int im_poll_job_as_sister(job *pjob, char *cookie, tm_event_t event, tm_task_id fromtask);
int im_poll_job_as_sister(struct job *pjob, char *cookie, tm_event_t event, tm_task_id fromtask);

int im_abort_job(job *pjob, struct sockaddr_in *addr, char *cookie, tm_event_t event, tm_task_id fromtask);
int im_abort_job(struct job *pjob, struct sockaddr_in *addr, char *cookie, tm_event_t event, tm_task_id fromtask);

int im_get_tid(job *pjob, char *cookie, tm_event_t event, tm_task_id fromtask);
int im_get_tid(struct job *pjob, char *cookie, tm_event_t event, tm_task_id fromtask);

int handle_im_join_job_response(struct tcp_chan *chan, job *pjob, struct sockaddr_in *addr);
int handle_im_join_job_response(struct tcp_chan *chan, struct job *pjob, struct sockaddr_in *addr);

int handle_im_kill_job_response(struct tcp_chan *chan, job *pjob, hnodent *np, int event_com, int nodeidx);
int handle_im_kill_job_response(struct tcp_chan *chan, struct job *pjob, struct hnodent *np, int event_com, int nodeidx);

int handle_im_spawn_task_response(struct tcp_chan *chan, job *pjob, tm_task_id event_task, tm_event_t event);
int handle_im_spawn_task_response(struct tcp_chan *chan, struct job *pjob, tm_task_id event_task, tm_event_t event);

int handle_im_signal_task_response(job *pjob, tm_task_id event_task, tm_event_t event);
int handle_im_signal_task_response(struct job *pjob, tm_task_id event_task, tm_event_t event);

int handle_im_get_tasks_response(struct tcp_chan *chan, job *pjob, tm_task_id event_task, tm_event_t event);
int handle_im_get_tasks_response(struct tcp_chan *chan, struct job *pjob, tm_task_id event_task, tm_event_t event);

int handle_im_obit_task_response(struct tcp_chan *chan, job *pjob, tm_task_id event_task, tm_event_t event);
int handle_im_obit_task_response(struct tcp_chan *chan, struct job *pjob, tm_task_id event_task, tm_event_t event);

int handle_im_get_info_response(struct tcp_chan *chan, job *pjob, tm_task_id event_task, tm_event_t event);
int handle_im_get_info_response(struct tcp_chan *chan, struct job *pjob, tm_task_id event_task, tm_event_t event);

int handle_im_get_resc_response(struct tcp_chan *chan, job *pjob, tm_task_id event_task, tm_event_t event);
int handle_im_get_resc_response(struct tcp_chan *chan, struct job *pjob, tm_task_id event_task, tm_event_t event);

int handle_im_poll_job_response(struct tcp_chan *chan, job *pjob, int nodeidx, hnodent *np);
int handle_im_poll_job_response(struct tcp_chan *chan, struct job *pjob, int nodeidx, struct hnodent *np);

int handle_im_get_tid_response(struct tcp_chan *chan, job *pjob, char *cookie, char **argv, char **envp, fwdevent *efwd);
int handle_im_get_tid_response(struct tcp_chan *chan, struct job *pjob, char *cookie, char **argv, char **envp, struct fwdevent *efwd);

void im_request(struct tcp_chan *chan, int version);

void tm_eof(int fd);

void tm_request_init(job *pjob, task *ptask, int *ret, int event, int prev_error);
void tm_request_init(struct job *pjob, struct task *ptask, int *ret, int event, int prev_error);

int tm_postinfo(char *name, char *info, char *jobid, int fromtask, int prev_error, int event, int *ret, task *ptask, size_t *len);
int tm_postinfo(char *name, char *info, char *jobid, int fromtask, int prev_error, int event, int *ret, struct task *ptask, size_t *len);

int tm_spawn_request(struct tcp_chan *chan, job *pjob, int prev_error, int event, char *cookie, int *reply_ptr, int *ret, tm_task_id fromtask, hnodent *phost, int nodeid);
int tm_spawn_request(struct tcp_chan *chan, struct job *pjob, int prev_error, int event, char *cookie, int *reply_ptr, int *ret, tm_task_id fromtask, struct hnodent *phost, int nodeid);

int tm_tasks_request(struct tcp_chan *chan, job *pjob, int prev_error, int event, char *cookie, int *reply_ptr, int *ret, tm_task_id fromtask, hnodent *phost, int nodeid);
int tm_tasks_request(struct tcp_chan *chan, struct job *pjob, int prev_error, int event, char *cookie, int *reply_ptr, int *ret, tm_task_id fromtask, struct hnodent *phost, int nodeid);

int tm_signal_request(struct tcp_chan *chan, job *pjob, int prev_error, int event, char *cookie, tm_task_id fromtask, int *ret, int *reply_ptr, hnodent *phost, int nodeid);
int tm_signal_request(struct tcp_chan *chan, struct job *pjob, int prev_error, int event, char *cookie, tm_task_id fromtask, int *ret, int *reply_ptr, struct hnodent *phost, int nodeid);

int tm_obit_request(struct tcp_chan *chan, job *pjob, int prev_error, int event, char *cookie, int *reply_ptr, int *ret, tm_task_id fromtask, hnodent *phost, int nodeid);
int tm_obit_request(struct tcp_chan *chan, struct job *pjob, int prev_error, int event, char *cookie, int *reply_ptr, int *ret, tm_task_id fromtask, struct hnodent *phost, int nodeid);

int tm_getinfo_request(struct tcp_chan *chan, job *pjob, int prev_error, int event, char *cookie, int *reply_ptr, int *ret, tm_task_id fromtask, hnodent *phost, int nodeid);
int tm_getinfo_request(struct tcp_chan *chan, struct job *pjob, int prev_error, int event, char *cookie, int *reply_ptr, int *ret, tm_task_id fromtask, struct hnodent *phost, int nodeid);

int tm_resources_request(struct tcp_chan *chan, job *pjob, int prev_error, int event, char *cookie, int *reply_ptr, int *ret, tm_task_id fromtask, hnodent *phost, int nodeid);
int tm_resources_request(struct tcp_chan *chan, struct job *pjob, int prev_error, int event, char *cookie, int *reply_ptr, int *ret, tm_task_id fromtask, struct hnodent *phost, int nodeid);

int tm_request(struct tcp_chan *chan, int version);

/* static int adoptSession(pid_t sid, pid_t pid, char *id, int command, char *cookie); */

char *cat_dirs(char *root, char *base);

char *get_local_script_path(job *pjob, char *base);
char *get_local_script_path(struct job *pjob, char *base);

int get_job_struct(job **pjob, char *jobid, int command, struct tcp_chan *chan, struct sockaddr_in *addr, tm_node_id nodeid);
int get_job_struct(struct job **pjob, char *jobid, int command, struct tcp_chan *chan, struct sockaddr_in *addr, tm_node_id nodeid);

int readit(int sock, int fd);

void demux_wait(int sig);

void fork_demux(job *pjob);
void fork_demux(struct job *pjob);

void send_update_soon();

Expand Down
Loading