From 0e2672eebb4efd725f0e906f07b3c1bae81dbde7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timo=20Ter=C3=A4s?= Date: Sat, 24 Oct 2015 21:35:15 +0300 Subject: nlplug-findfs: asynchronously fork child processes instead of waiting each child to run end before continuing, fork or queue each command allowing up to CPU count concurrent childs. this enables full use of SMP cores, and allows loading of modules after a blocking command is started; fixing e.g. keyboard driver to load even if crypto disk command is waiting keyboard input. --- nlplug-findfs.c | 267 +++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 185 insertions(+), 82 deletions(-) (limited to 'nlplug-findfs.c') diff --git a/nlplug-findfs.c b/nlplug-findfs.c index 8393152..5f6dc5e 100644 --- a/nlplug-findfs.c +++ b/nlplug-findfs.c @@ -23,6 +23,7 @@ #include #include +#include #include #include #include @@ -68,6 +69,105 @@ static void dbg(const char *fmt, ...) #define envcmp(env, key) (strncmp(env, key "=", strlen(key "=")) == 0) + +static char **clone_array(char *const *const a) +{ + size_t i, s; + char **c, *p; + + if (!a) return 0; + + s = sizeof(char*); + for (i = 0; a[i]; i++) + s += sizeof(char*) + strlen(a[i]) + 1; + c = malloc(s); + p = (char*)(c + i + 1); + for (i = 0; a[i]; i++) { + c[i] = p; + p += sprintf(p, "%s", a[i]) + 1; + } + c[i] = 0; + return c; +} + +struct spawn_task { + struct spawn_task *next; + char **argv, **envp; +}; +struct spawn_manager { + int num_running; + int max_running; + struct spawn_task *first, *last; +}; + +static struct spawn_manager spawnmgr; + +static void spawn_execute(struct spawn_manager *mgr, char **argv, char **envp) +{ + pid_t pid; + + dbg("[%d/%d] running %s", mgr->num_running+1, mgr->max_running, argv[0]); + if (!(pid = fork())) { + if (execve(argv[0], argv, envp ? envp : default_envp) < 0) + err(1, argv[0]); + exit(0); + } + if (pid < 0) + err(1,"fork"); + + mgr->num_running++; +} + +static void spawn_queue(struct spawn_manager *mgr, char **argv, char **envp) +{ + struct spawn_task *task; + + task = malloc(sizeof *task); + if (!task) return; + *task = (struct spawn_task) { + .next = NULL, + .argv = clone_array(argv), + .envp = clone_array(envp), + }; + if (mgr->last) { + mgr->last->next = task; + mgr->last = task; + } else { + mgr->first = mgr->last = task; + } +} + +static void spawn_command(struct spawn_manager *mgr, char **argv, char **envp) +{ + if (!mgr->max_running) + mgr->max_running = sysconf(_SC_NPROCESSORS_ONLN); + if (mgr->num_running < mgr->max_running) + spawn_execute(mgr, argv, envp); + else + spawn_queue(mgr, argv, envp); +} + +static void spawn_reap(struct spawn_manager *mgr, pid_t pid) +{ + mgr->num_running--; + if (mgr->first && mgr->num_running < mgr->max_running) { + struct spawn_task *task = mgr->first; + if (task->next) + mgr->first = task->next; + else + mgr->first = mgr->last = NULL; + spawn_execute(mgr, task->argv, task->envp); + free(task->argv); + free(task->envp); + free(task); + } +} + +static int spawn_active(struct spawn_manager *mgr) +{ + return mgr->num_running || mgr->first; +} + struct uevent { char *buf; size_t bufsize; @@ -99,14 +199,13 @@ struct ueventconf { static void sighandler(int sig) { - switch(sig) { + switch (sig) { case SIGHUP: case SIGINT: case SIGQUIT: case SIGABRT: case SIGTERM: exit(0); - break; default: break; } @@ -119,6 +218,7 @@ static void initsignals(void) signal(SIGQUIT, sighandler); signal(SIGABRT, sighandler); signal(SIGTERM, sighandler); + signal(SIGCHLD, sighandler); signal(SIGPIPE, SIG_IGN); } @@ -156,23 +256,6 @@ static int init_netlink_socket(void) return fd; } -static void run_child(char **argv, char **envp) -{ - pid_t pid; - - if (!(pid = fork())) { - dbg("running %s", argv[0]); - if (execve(argv[0], argv, envp) < 0) - err(1, argv[0]); - exit(0); - } - if (pid < 0) - err(1,"fork"); - - waitpid(pid, NULL, 0); -} - - static int load_kmod(const char *modalias) { static struct kmod_ctx *ctx = NULL; @@ -225,7 +308,7 @@ static void start_mdadm(char *devnode) devnode, NULL }; - run_child(mdadm_argv, default_envp); + spawn_command(&spawnmgr, mdadm_argv, 0); } static void start_lvm2(char *devnode) @@ -235,7 +318,7 @@ static void start_lvm2(char *devnode) "--activate" , "ay", "--noudevsync", "--sysinit", NULL }; - run_child(lvm2_argv, default_envp); + spawn_command(&spawnmgr, lvm2_argv, 0); } static void start_cryptsetup(char *devnode, char *cryptdm) @@ -245,7 +328,7 @@ static void start_cryptsetup(char *devnode, char *cryptdm) devnode, cryptdm ? cryptdm : "crypdm", NULL }; load_kmod("dm-crypt"); - run_child(cryptsetup_argv, default_envp); + spawn_command(&spawnmgr, cryptsetup_argv, 0); } static int is_mounted(const char *devnode) { @@ -511,7 +594,7 @@ static int dispatch_uevent(struct uevent *ev, struct ueventconf *conf) } else if (ev->devname != NULL) { if (conf->program_argv[0] != NULL) { - run_child(conf->program_argv, ev->envp); + spawn_command(&spawnmgr, conf->program_argv, ev->envp); conf->fork_count++; } @@ -634,15 +717,16 @@ static void usage(int rc) int main(int argc, char *argv[]) { - struct pollfd fds[2]; - int numfds = 2; + struct pollfd fds[3]; + int numfds = 3; int r; struct ueventconf conf; int event_count = 0; - size_t total_bytes; + size_t total_bytes = 0; int found = 0, trigger_running = 0; char *program_argv[2] = {0,0}; pthread_t tid; + sigset_t sigchldmask; for (r = 0; environ[r]; r++) { if (envcmp(environ[r], "PATH")) @@ -692,88 +776,107 @@ int main(int argc, char *argv[]) conf.search_device = argv[0]; initsignals(); + sigemptyset(&sigchldmask); + sigaddset(&sigchldmask, SIGCHLD); + sigprocmask(SIG_BLOCK, &sigchldmask, NULL); fds[0].fd = init_netlink_socket(); fds[0].events = POLLIN; - fds[1].fd = eventfd(0, EFD_CLOEXEC); + fds[1].fd = signalfd(-1, &sigchldmask, SFD_NONBLOCK|SFD_CLOEXEC); fds[1].events = POLLIN; - pthread_create(&tid, NULL, trigger_thread, &fds[1].fd); + fds[2].fd = eventfd(0, EFD_CLOEXEC); + fds[2].events = POLLIN; + pthread_create(&tid, NULL, trigger_thread, &fds[2].fd); trigger_running = 1; while (1) { - size_t len; - struct iovec iov; - char cbuf[CMSG_SPACE(sizeof(struct ucred))]; - char buf[16384]; - struct cmsghdr *chdr; - struct ucred *cred; - struct msghdr hdr; - struct sockaddr_nl cnls; - - r = poll(fds, numfds, trigger_running ? -1 : conf.timeout); - if (r == -1) + r = poll(fds, numfds, (spawn_active(&spawnmgr) || trigger_running) ? -1 : conf.timeout); + if (r == -1) { + if (errno == EINTR || errno == ERESTART) + continue; err(1, "poll"); - + } if (r == 0) { dbg("exit due to timeout"); break; } - if (numfds > 1 && fds[1].revents & POLLIN) { - close(fds[1].fd); - fds[1].fd = -1; - numfds--; - trigger_running = 0; - pthread_join(tid, NULL); - } - - if (!(fds[0].revents & POLLIN)) - continue; - - iov.iov_base = &buf; - iov.iov_len = sizeof(buf); - memset(&hdr, 0, sizeof(hdr)); - hdr.msg_iov = &iov; - hdr.msg_iovlen = 1; - hdr.msg_control = cbuf; - hdr.msg_controllen = sizeof(cbuf); - hdr.msg_name = &cnls; - hdr.msg_namelen = sizeof(cnls); - - len = recvmsg(fds[0].fd, &hdr, 0); - if (len < 0) { - if (errno == EINTR) + if (fds[0].revents & POLLIN) { + size_t len; + struct iovec iov; + char cbuf[CMSG_SPACE(sizeof(struct ucred))]; + char buf[16384]; + struct cmsghdr *chdr; + struct ucred *cred; + struct msghdr hdr; + struct sockaddr_nl cnls; + + iov.iov_base = &buf; + iov.iov_len = sizeof(buf); + memset(&hdr, 0, sizeof(hdr)); + hdr.msg_iov = &iov; + hdr.msg_iovlen = 1; + hdr.msg_control = cbuf; + hdr.msg_controllen = sizeof(cbuf); + hdr.msg_name = &cnls; + hdr.msg_namelen = sizeof(cnls); + + len = recvmsg(fds[0].fd, &hdr, 0); + if (len < 0) { + if (errno == EINTR) + continue; + err(1, "recvmsg"); + } + if (len < 32 || len >= sizeof(buf)) continue; - err(1, "recvmsg"); - } - if (len < 32 || len >= sizeof(buf)) - continue; - total_bytes += len; - chdr = CMSG_FIRSTHDR(&hdr); - if (chdr == NULL || chdr->cmsg_type != SCM_CREDENTIALS) - continue; + total_bytes += len; + chdr = CMSG_FIRSTHDR(&hdr); + if (chdr == NULL || chdr->cmsg_type != SCM_CREDENTIALS) + continue; - /* filter out messages that are not from root or kernel */ - cred = (struct ucred *)CMSG_DATA(chdr); - if (cred->uid != 0 || cnls.nl_pid > 0) - continue; + /* filter out messages that are not from root or kernel */ + cred = (struct ucred *)CMSG_DATA(chdr); + if (cred->uid != 0 || cnls.nl_pid > 0) + continue; - event_count++; - found |= process_uevent(buf, len, &conf); + event_count++; + found |= process_uevent(buf, len, &conf); - if ((found & FOUND_DEVICE) - || ((found & FOUND_BOOTREPO) && (found & FOUND_APKOVL))) { - dbg("setting timeout to 0"); - conf.timeout = 0; + if ((found & FOUND_DEVICE) + || ((found & FOUND_BOOTREPO) && + (found & FOUND_APKOVL))) { + dbg("setting timeout to 0"); + conf.timeout = 0; + } } if (fds[0].revents & POLLHUP) { dbg("parent hung up\n"); break; } + + if (fds[1].revents & POLLIN) { + struct signalfd_siginfo fdsi; + pid_t pid; + int status; + + while (read(fds[1].fd, &fdsi, sizeof fdsi) > 0) + ; + while ((pid = waitpid(-1, &status, WNOHANG)) > 0) + spawn_reap(&spawnmgr, pid); + } + + if (fds[2].revents & POLLIN) { + close(fds[2].fd); + fds[2].fd = -1; + fds[2].revents = 0; + numfds--; + trigger_running = 0; + pthread_join(tid, NULL); + } } dbg("modaliases: %i, forks: %i, events: %i, total bufsize: %zu", -- cgit v1.2.3