From 62522d63f12d180b86d3ecc8e97532aa4ddb77d9 Mon Sep 17 00:00:00 2001 From: Natanael Copa Date: Thu, 9 Sep 2010 21:29:09 +0200 Subject: use unix domain socket instead of fifos --- sircbot.c | 433 +++++++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 289 insertions(+), 144 deletions(-) diff --git a/sircbot.c b/sircbot.c index 67dc5a9..b85d7d0 100644 --- a/sircbot.c +++ b/sircbot.c @@ -7,7 +7,8 @@ Intended usage is git hook that sends commits, etc. #define _GNU_SOURCE #include -#include +#include +#include #include #include @@ -29,21 +30,34 @@ Intended usage is git hook that sends commits, etc. struct tq_string { char *value; - TAILQ_ENTRY(tq_string) entries; + struct tq_string *next; }; -typedef TAILQ_HEAD(tq_stringlist, tq_string) TQ_STRINGLIST; - struct sircbot_channel { char *name; - char *fifo; - int fd; + char *socket_path; + int listen_fd; time_t last_data_sent; time_t last_closetime; /* last time we got kicked */ - TQ_STRINGLIST queue_head; /* queue of in-data */ + struct tq_string indata_head; /* queue of in-data */ + int *fd_array; + int fd_array_size; }; +struct sircbot_session { + struct irc_session *sess; + struct sircbot_channel *chan; + int numchan; + time_t last_ping; +}; +struct sircbot_socket_callback { + void *context; + int (*callback)(struct sircbot_session *sb, struct pollfd *fds, + void *ctx); +}; + + static int foreground = 0; static int sigterm = 0; static int flush_rate = 2; @@ -103,59 +117,84 @@ static void log_err(const char *msg) syslog(LOG_ERR, "%s: %s", msg, strerror(errno)); } -struct tq_string *tq_stringlist_add(TQ_STRINGLIST *list, const char *data) +struct tq_string *tq_stringlist_add(struct tq_string *head, const char *data) { + struct tq_string *n = head; struct tq_string *item = malloc(sizeof(struct tq_string)); + if (item == NULL) { log_err("malloc"); return NULL; } + memset(item, 0, sizeof(struct tq_string)); if ((item->value = strdup(data)) == NULL) { log_err("strdup"); return NULL; } - TAILQ_INSERT_TAIL(list, item, entries); + item->next = NULL; + + while (n->next != NULL) + n = n->next; + n->next = item; return item; } - -int init_fifo(struct sircbot_channel *chan) + + +int init_channel_socket(struct sircbot_channel *chan) { - unlink(chan->fifo); - if (mkfifo(chan->fifo, 0666) < 0) - return -1; - chan->fd = open(chan->fifo, O_RDONLY | O_NONBLOCK); - if (chan->fd < 0) + struct sockaddr_un sun; + + unlink(chan->socket_path); + + memset(&sun, 0, sizeof(sun)); + sun.sun_family = AF_UNIX; + strncpy(sun.sun_path, chan->socket_path, sizeof(sun.sun_path)); + chan->listen_fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (chan->listen_fd < 0) return -1; + + fcntl(chan->listen_fd, F_SETFD, FD_CLOEXEC); + if (bind(chan->listen_fd, (struct sockaddr *) &sun, sizeof(sun)) < 0) + goto err_close; + + if (listen(chan->listen_fd, 8) < 0) + goto err_close; + return 0; + +err_close: + log_err(chan->socket_path); + close(chan->listen_fd); + return -1; } -int close_fifo(struct sircbot_channel *chan, time_t closetime) +int close_channel_socket(struct sircbot_channel *chan, time_t closetime) { - close(chan->fd); - chan->fd = -1; + close(chan->listen_fd); + chan->listen_fd = -1; chan->last_closetime = closetime; - return unlink(chan->fifo); + return unlink(chan->socket_path); } -int join_channel(struct sircbot_channel *chan, int numchan, char *name) +int join_channel(struct sircbot_session *sb, char *name) { int i; - for (i = 0; i < numchan; i++) { - if (strcmp(chan[i].name, name) == 0) - return init_fifo(&chan[i]); + for (i = 0; i < sb->numchan; i++) { + if (strcmp(sb->chan[i].name, name) == 0) + return init_channel_socket(&sb->chan[i]); } return 0; } -int kick_channel(struct sircbot_channel *chan, int numchan, char *name) +int kick_channel(struct sircbot_session *sb, char *name) { int i; time_t now = time(NULL); printf("got kicked from %s\n", name); - for (i = 0; i < numchan; i++) { - if (strcmp(chan[i].name, name) == 0) - return close_fifo(&chan[i], now); + for (i = 0; i < sb->numchan; i++) { + if (strcmp(sb->chan[i].name, name) == 0) + return close_channel_socket(&sb->chan[i], now); } return 0; } @@ -193,24 +232,24 @@ int run_hooks(char *user, char *rcpt, char* data) return 0; } -int handle_response(struct irc_session *sess, char *user, char *cmd, char *data, - struct sircbot_channel *chan, int numchan) +int handle_response(struct sircbot_session *sb, char *user, char *cmd, + char *data) { printf("DEBUG: handling response: user=%s, cmd=%s, data=%s\n", user, cmd, data); if (strncmp(cmd, "PING", 4) == 0) { - return irc_send(sess, "PONG", data); + return irc_send(sb->sess, "PONG", data); } else if (strncmp(cmd, "PONG", 4) == 0) { - sess->last_pong = time(NULL); + sb->sess->last_pong = time(NULL); } else if (strncmp(cmd, "JOIN", 4) == 0 && - strncmp(user, sess->nick, strlen(sess->nick)) == 0) { - return join_channel(chan, numchan, data); + strncmp(user, sb->sess->nick, strlen(sb->sess->nick)) == 0) { + return join_channel(sb, data); } else if (strncmp(cmd, "KICK", 4) == 0) { char *p = strchr(data, ' '); if (p) *p = '\0'; - return kick_channel(chan, numchan, data); + return kick_channel(sb, data); } else if (strncmp(cmd, "PRIVMSG", 7) == 0) { char *p = strchr(data, ' '); if (p) { @@ -223,11 +262,9 @@ int handle_response(struct irc_session *sess, char *user, char *cmd, char *data, return 0; } -int parse_line(struct irc_session *sess, char *line, - struct sircbot_channel *chan, int numchan) +int parse_line(struct sircbot_session *sb, char *line) { char *user = NULL, *p, *cmd, *data = NULL; - printf("DEBUG: parsing: '%s'\n", line); if (line[0] == ':') { user = &line[1]; @@ -247,13 +284,11 @@ int parse_line(struct irc_session *sess, char *line, if (*data == ':') data++; } -// printf("DEBUG: user='%s', cmd='%s', data='%s'\n", user, cmd, data); - handle_response(sess, user, cmd, data, chan, numchan); + handle_response(sb, user, cmd, data); return 0; } -int parse_irc_data(char *buf, struct sircbot_channel *chan, int numchan, - struct irc_session *sess) +int parse_irc_data(struct sircbot_session *sb, char *buf) { char *p = buf; while ((p = strsep(&buf, "\n")) != NULL) { @@ -261,140 +296,247 @@ int parse_irc_data(char *buf, struct sircbot_channel *chan, int numchan, if (c != NULL) *c = '\0'; if (*p != '\0') - parse_line(sess, p, chan, numchan); + parse_line(sb, p); } return 0; } -/* init pollfd strucs */ -static void irc_reset_pollfds(struct irc_session *sess, struct pollfd *fds, - struct sircbot_channel *chan, int numchan) +/* callback functions */ +static int irc_server_cb(struct sircbot_session *sb, struct pollfd *fds, + void *ctx) +{ + char buf[4096]; + int r; + struct irc_session *sess = (struct irc_session *) ctx; + + if (fds->revents & POLLHUP) + /* server hang up on us */ + return 0; + + if (fds->revents & POLLERR) { + log_err(sess->server); + return -1; + } + + r = read(fds->fd, buf, sizeof(buf)-1); + if (r < 0) + return -1; + buf[r] = '\0'; + parse_irc_data(sb, buf); + + return 1; +} + +int channel_extend_fd_array(struct sircbot_channel *chan) +{ + int i; + int oldsize = chan->fd_array_size; + + chan->fd_array = realloc(chan->fd_array, (oldsize + 8) * sizeof(int)); + if (chan->fd_array == NULL) + return -1; + chan->fd_array_size += 8; + for (i = oldsize; i < chan->fd_array_size; i++) + chan->fd_array[i] = -1; + return 0; +} + + +void channel_add_connection(struct sircbot_channel *chan, int fd) +{ + int i; + for (i = 0; i < chan->fd_array_size; i++) { + if (chan->fd_array[i] == -1) + break; + } + if (i >= chan->fd_array_size) + if (channel_extend_fd_array(chan) < 0) + return; + chan->fd_array[i] = fd; + printf("DEBUG: new connection (fd=%i) for %s\n", + fd, chan->name); + +} + +static int channel_del_connection(struct sircbot_channel *chan, int fd) { int i; + for (i = 0; i < chan->fd_array_size; i++) + if (chan->fd_array[i] == fd) { + close(fd); + chan->fd_array[i] = -1; + printf("DEBUG: close connection %i for %s\n", fd, chan->name); + return 1; + } + return 0; +} + +static int channel_listener_cb(struct sircbot_session *sb, struct pollfd *fds, + void *ctx) +{ + int fd = 0; + struct sircbot_channel *chan = ctx; + if (fds->revents & POLLERR) { + log_err(chan->socket_path); + return -1; + } + fd = accept(chan->listen_fd, NULL, NULL); + if (fd < 0) { + log_err(chan->socket_path); + return -1; + } + channel_add_connection(chan, fd); + return 1; +} + +static int channel_conn_cb(struct sircbot_session *sb, struct pollfd *fds, + void *ctx) +{ + int r; + char buf[4096]; + char *p, *n; + struct sircbot_channel *chan = ctx; + if (fds->revents & (POLLHUP | POLLERR | POLLNVAL)) + return channel_del_connection(chan, fds->fd); + + if (!(fds->revents & POLLIN)) + return 1; + + r = read(fds->fd, buf, sizeof(buf)-1); + if (r < 0) { + log_err(chan->socket_path); + return -1; + } + buf[r] = '\0'; + p = buf; + while ((n = strchr(p, '\n')) != NULL) { + *n = '\0'; + tq_stringlist_add(&chan->indata_head, p); + p = n + 1; + } + if (*p) + tq_stringlist_add(&chan->indata_head, p); + return 1; +} + +/* init pollfd strucs */ +static int irc_reset_pollfds(struct sircbot_session *sb, struct pollfd *fds, + struct sircbot_socket_callback *cb, int maxfds) +{ + int i, j, n = 0; /* first pollfd struc is the irc session */ - fds[0].fd = sess->fd; - fds[0].events = POLLIN; - fds[0].revents = 0; - /* rest is channel fifos */ - for (i = 1; i < numchan + 1; i++) { - fds[i].fd = chan[i-1].fd; - fds[i].events = POLLIN; - fds[i].revents = 0; + fds[n].fd = sb->sess->fd; + fds[n].events = POLLIN; + fds[n].revents = 0; + cb[n].context = NULL; + cb[n].callback = &irc_server_cb; + n++; + + /* channel socket listeners */ + for (i = 0; i < sb->numchan && n < maxfds; i++) { + if (sb->chan[i].listen_fd < 0) + continue; + fds[n].fd = sb->chan[i].listen_fd; + fds[n].events = POLLIN; + fds[n].revents = 0; + cb[n].callback = &channel_listener_cb; + cb[n].context = &sb->chan[i]; + n++; + /* open channel connections */ + for (j = 0; j < sb->chan[i].fd_array_size && n < maxfds; j++) { + if (sb->chan[i].fd_array[j] == -1) + continue; + fds[n].fd = sb->chan[i].fd_array[j]; + fds[n].events = POLLIN; + fds[n].revents = 0; + cb[n].callback = &channel_conn_cb; + cb[n].context = &sb->chan[i]; + n++; + } } + return n; } static int send_fifo_queue(struct irc_session *sess, struct sircbot_channel *chan, time_t now) { int r; - struct tq_string *item = TAILQ_FIRST(&chan->queue_head); - if (item == NULL || (now - chan->last_data_sent) < flush_rate) + struct tq_string *item; + if (chan->indata_head.next == NULL || (now - chan->last_data_sent) < flush_rate) return 0; /* nothing in queue, or too early to send */ + item = chan->indata_head.next; r = irc_send_chan(sess, chan->name, item->value); chan->last_data_sent = now; - /* remove from FIFO queue */ - TAILQ_REMOVE(&chan->queue_head, item, entries); + chan->indata_head.next = item->next; free(item->value); free(item); return r; } -static int irc_loop(struct irc_session *sess, struct sircbot_channel *chan, - int numchan) + +static void join_channels(struct sircbot_session *sb) { - int i, r, joined = 0; - char buf[4096]; - struct pollfd fds[numchan + 1]; + time_t now = time(NULL); + int i; + /* wait atleast 5 secs before we join a channel */ + for (i = 0; i < sb->numchan; i++) + if ((now - sb->chan[i].last_closetime) > 5 + && sb->chan[i].listen_fd < 0) { + printf("DEBUG: joining %s\n", sb->chan[i].name); + sb->chan[i].last_closetime = now; + irc_send(sb->sess, "JOIN", sb->chan[i].name); + } +} + +static int irc_loop(struct sircbot_session *sb) +{ + int i, r, n, joined = 0; + const int maxconn = sb->numchan * 128; + const int maxfds = 1 + sb->numchan + maxconn; + struct pollfd fds[maxfds]; + struct sircbot_socket_callback cbs[maxfds]; sigset_t sigmask; struct timespec tv; - time_t now, last_ping; + time_t now; sigemptyset(&sigmask); sigaddset(&sigmask, SIGTERM); tv.tv_sec = 1; tv.tv_nsec = 0; while (!sigterm) { - now = time(NULL); - /* wait atleast 5 secs before we join a channel */ - for (i = 0; i < numchan; i++) - if ((now - chan[i].last_closetime) > 5 - && chan[i].fd < 0) { - printf("DEBUG: joining %s\n", chan[i].name); - chan[i].last_closetime = now; - irc_send(sess, "JOIN", chan[i].name); - } - - - irc_reset_pollfds(sess, fds, chan, numchan); - r = ppoll(fds, numchan+1, &tv, &sigmask); -// r = poll(fds, numchan+1, 120000); + join_channels(sb); + n = irc_reset_pollfds(sb, fds, cbs, maxfds); + r = ppoll(fds, n, &tv, &sigmask); if (r < 0) { log_err("ppoll"); return -1; } - now = time(NULL); + now = time(NULL); /* send a ping every 2 min */ - if ((now - last_ping) > 120) { - irc_send_ping(sess); - last_ping = now; + if ((now - sb->last_ping) > 120) { + irc_send_ping(sb->sess); + sb->last_ping = now; } - for (i = 0; i < numchan + 1; i++) { - int j; - - /* send data in queue for this fifo */ - if (i != 0 && - send_fifo_queue(sess, &chan[i-1], now) < 0) - goto ret_err; - - if (fds[i].revents & POLLHUP) { - if (i == 0) - /* server hang up on us */ - return 0; - - /* one of the fifos closed due to - writer process exit. We just - reopen it assuming we are still - joined the channel */ - close_fifo(&chan[i-1], - chan[i-1].last_closetime); - init_fifo(&chan[i-1]); - continue; - } - - if (fds[i].revents & POLLERR) + /* send data in the channel queues */ + for (i = 0; i < sb->numchan; i++) + if (send_fifo_queue(sb->sess, &sb->chan[i], now) < 0) goto ret_err; - if (!(fds[i].revents & POLLIN)) - continue; /* no data available for read */ - - printf("DEBUG: data available from fds[%i]\n", i); - r = read(fds[i].fd, buf, sizeof(buf)-1); - if (r < 0) - goto ret_err; - if (r == 0) - continue; - - printf("DEBUG: read %i bytes\n", r); - - buf[r] = '\0'; - if (i == 0) { - /* data was from IRC server */ - printf("DEBUG: data from server: %s\n", buf); - parse_irc_data(buf, chan, numchan, sess); + for (i = 0; i < n; i++) { + if (fds[i].revents == 0) continue; - } - - /* data was from fifos */ - printf("DEBUG: data from fifo %s: %s\n", chan[i-1].name, buf); - tq_stringlist_add(&chan[i-1].queue_head, buf); + r = cbs[i].callback(sb, &fds[i], cbs[i].context); + if (r <= 0) + return r; } + } return 0; ret_err: - log_err(sess->server); + log_err(sb->sess->server); return -1; } @@ -422,7 +564,7 @@ int main(int argc, char *argv[]) const char *group = "sircbot"; const char *pass = NULL; const char *logfile = "/dev/null"; - struct sircbot_channel *chan; + struct sircbot_session sb; int i, c, port = 6667; while ((c = getopt(argc, argv, "fl:n:p:P:r:s:")) != -1) { @@ -459,16 +601,19 @@ int main(int argc, char *argv[]) usage_exit(1); /* init channel strucs */ - chan = malloc(argc * sizeof(struct sircbot_channel)); - if (chan == NULL) + sb.numchan = argc; + sb.chan = malloc(argc * sizeof(struct sircbot_channel)); + if (sb.chan == NULL) err(1, "malloc"); for (i = 0; i < argc; i++) { - chan[i].fd = -1; - chan[i].name = strdup(argv[i]); - chan[i].last_closetime = 0; - asprintf(&chan[i].fifo, "/var/run/sircbot/%s", argv[i]); - unlink(chan[i].fifo); - TAILQ_INIT(&chan[i].queue_head); + sb.chan[i].listen_fd = -1; + sb.chan[i].name = strdup(argv[i]); + sb.chan[i].last_closetime = 0; + asprintf(&sb.chan[i].socket_path, "/var/run/sircbot/%s", argv[i]); + unlink(sb.chan[i].socket_path); + sb.chan[i].indata_head.next = NULL; + sb.chan[i].fd_array = NULL; + sb.chan[i].fd_array_size = 0; } /* daemonize */ @@ -480,20 +625,20 @@ int main(int argc, char *argv[]) openlog("sircbot",0, LOG_DAEMON); while (1) { - struct irc_session *s = irc_connect(server, port, nick, pass); + sb.sess = irc_connect(server, port, nick, pass); char buf[256]; - if (s == NULL) { + if (sb.sess == NULL) { log_err(server); sleep(10); continue; } - irc_loop(s, chan, argc); - irc_close(s, "bye"); - /* reset fifos */ + irc_loop(&sb); + irc_close(sb.sess, "bye"); + /* reset channel sockets */ for (i = 0; i < argc; i++) - close_fifo(&chan[i], 0); + close_channel_socket(&sb.chan[i], 0); if (sigterm) break; sleep(10); -- cgit v1.2.3