struct list async_list;
int sigchld_pipe[2];
struct waiter *sigchld_waiter;
+ bool dry_run;
};
/* Internal data type for process handling
return container_of(process, struct process_info, process);
}
+struct process *procinfo_get_process(struct process_info *procinfo)
+{
+ return &procinfo->process;
+}
+
/* Read as much as possible into the currently-allocated stdout buffer, and
- * possibly realloc it for the next read */
-static int process_read_stdout_once(struct process_info *procinfo)
+ * possibly realloc it for the next read
+ * If the line pointer is not NULL, it is set to the start of the latest
+ * output.
+ *
+ * Returns:
+ * > 0 on success (even though no bytes may have been read)
+ * 0 on EOF (no error, but no more reads can be performed)
+ * < 0 on error
+ **/
+static int process_read_stdout_once(struct process_info *procinfo, char **line)
{
struct process *process = &procinfo->process;
int rc, fd, max_len;
max_len = procinfo->stdout_buf_len - process->stdout_len - 1;
rc = read(fd, process->stdout_buf + process->stdout_len, max_len);
- if (rc <= 0)
+ if (rc == 0)
+ return 0;
+ if (rc < 0) {
+ if (errno == EINTR)
+ return 1;
+ pb_log("%s: read failed: %s\n", __func__, strerror(errno));
return rc;
+ }
+
+ if (line)
+ *line = process->stdout_buf + process->stdout_len;
process->stdout_len += rc;
if (process->stdout_len == procinfo->stdout_buf_len - 1) {
procinfo->stdout_buf_len);
}
- return rc;
+ return 1;
}
static int process_setup_stdout_pipe(struct process_info *procinfo)
{
int rc;
- if (!procinfo->process.keep_stdout)
+ if (!procinfo->process.keep_stdout || procinfo->process.raw_stdout)
return 0;
procinfo->stdout_buf_len = 4096;
static void process_setup_stdout_parent(struct process_info *procinfo)
{
- if (!procinfo->process.keep_stdout)
+ if (!procinfo->process.keep_stdout || procinfo->process.raw_stdout)
return;
close(procinfo->stdout_pipe[1]);
{
int log = fileno(pb_log_get_stream());
+ if (procinfo->process.raw_stdout)
+ return;
+
if (procinfo->process.keep_stdout)
dup2(procinfo->stdout_pipe[1], STDOUT_FILENO);
else
dup2(log, STDOUT_FILENO);
- dup2(log, STDERR_FILENO);
+ if (procinfo->process.keep_stdout && procinfo->process.add_stderr)
+ dup2(procinfo->stdout_pipe[1], STDERR_FILENO);
+ else
+ dup2(log, STDERR_FILENO);
}
static void process_finish_stdout(struct process_info *procinfo)
return 0;
do {
- rc = process_read_stdout_once(procinfo);
+ rc = process_read_stdout_once(procinfo, NULL);
} while (rc > 0);
process_finish_stdout(procinfo);
struct process_info *procinfo = arg;
int rc;
- rc = process_read_stdout_once(procinfo);
+ rc = process_read_stdout_once(procinfo, NULL);
+
+ /* if we're going to signal to the waitset that we're done (ie, non-zero
+ * return value), then the waiters will remove us, so we drop the
+ * reference */
+ if (rc < 0) {
+ talloc_unlink(procset, procinfo);
+ procinfo->stdout_waiter = NULL;
+ rc = -1;
+ } else {
+ rc = 0;
+ }
+
+ return rc;
+}
+
+int process_stdout_custom(struct process_info *procinfo, char **line)
+{
+ int rc;
+
+ rc = process_read_stdout_once(procinfo, line);
/* if we're going to signal to the waitset that we're done (ie, non-zero
* return value), then the waiters will remove us, so we drop the
return 0;
}
-struct procset *process_init(void *ctx, struct waitset *set)
+struct procset *process_init(void *ctx, struct waitset *set, bool dry_run)
{
struct sigaction sa;
int rc;
procset = talloc(ctx, struct procset);
procset->waitset = set;
+ procset->dry_run = dry_run;
list_init(&procset->async_list);
rc = pipe(procset->sigchld_pipe);
if (pid == 0) {
process_setup_stdout_child(procinfo);
+ if (procset->dry_run)
+ exit(EXIT_SUCCESS);
execvp(process->path, (char * const *)process->argv);
exit(EXIT_FAILURE);
}
process_read_stdout(procinfo);
- rc = waitpid(process->pid, &process->exit_status, 0);
- if (rc == -1) {
+ for (;;) {
+ rc = waitpid(process->pid, &process->exit_status, 0);
+ if (rc >= 0)
+ break;
+ if (errno == EINTR)
+ continue;
+
pb_log("%s: waitpid failed: %s\n", __func__, strerror(errno));
return rc;
}
int process_run_async(struct process *process)
{
struct process_info *procinfo = get_info(process);
+ waiter_cb stdout_cb;
int rc;
rc = process_run_common(procinfo);
return rc;
if (process->keep_stdout) {
+ stdout_cb = process->stdout_cb ?: process_stdout_cb;
procinfo->stdout_waiter = waiter_register_io(procset->waitset,
procinfo->stdout_pipe[0],
- WAIT_IN, process_stdout_cb,
- procinfo);
+ WAIT_IN, stdout_cb, procinfo);
talloc_reference(procset, procinfo);
}
void process_stop_async(struct process *process)
{
+ /* Avoid signalling an old pid */
+ if (process->cancelled)
+ return;
+
+ pb_debug("process: sending SIGTERM to pid %d\n", process->pid);
kill(process->pid, SIGTERM);
+ process->cancelled = true;
+}
+
+void process_stop_async_all(void)
+{
+ struct process_info *procinfo;
+ struct process *process = NULL;
+
+ pb_debug("process: cancelling all async jobs\n");
+
+ list_for_each_entry(&procset->async_list, procinfo, async_list) {
+ process = &procinfo->process;
+ /* Ignore the process completion - callbacks may use stale data */
+ process->exit_cb = NULL;
+ process->stdout_cb = NULL;
+ process_stop_async(process);
+ }
}
int process_run_simple_argv(void *ctx, const char *argv[])
return rc;
}
+
+bool process_exit_ok(struct process *process)
+{
+ return WIFEXITED(process->exit_status) &&
+ WEXITSTATUS(process->exit_status) == 0;
+}