157 lines
6.8 KiB
Diff
157 lines
6.8 KiB
Diff
|
From 706ce3faab879d4cb20906ad6d27e962ab3e4ad0 Mon Sep 17 00:00:00 2001
|
||
|
From: =?UTF-8?q?Zbigniew=20J=C4=99drzejewski-Szmek?= <zbyszek@in.waw.pl>
|
||
|
Date: Fri, 13 Mar 2015 00:02:28 -0400
|
||
|
Subject: [PATCH] journal-remote: process events without delay
|
||
|
|
||
|
journal-remote buffers input, and then parses it handling one journal entry at a time.
|
||
|
It was possible for useful data to be left in the buffer after some entries were
|
||
|
processesed. But all data would be already read from the fd, so there would be
|
||
|
no reason for the event loop to call the handler again. After some new data came in,
|
||
|
the handler would be called again, and would then process the "old" data in the buffer.
|
||
|
|
||
|
Fix this by enabling a handler wherever we process input data and do not exhaust data
|
||
|
from the input buffer (i.e. when EAGAIN was not encountered). The handler runs until
|
||
|
we encounter EAGAIN.
|
||
|
|
||
|
Looping over the input data is done in this roundabout way to allow the event loop
|
||
|
to dispatch other events in the meanwhile. If the loop was inside the handler, a
|
||
|
source which produced data fast enough could completely monopolize the process.
|
||
|
|
||
|
https://bugs.freedesktop.org/show_bug.cgi?id=89516
|
||
|
(cherry picked from commit 043945b93824e33e040954612aaa934cd1a43a1b)
|
||
|
---
|
||
|
src/journal-remote/journal-remote-parse.c | 1 +
|
||
|
src/journal-remote/journal-remote-parse.h | 1 +
|
||
|
src/journal-remote/journal-remote.c | 65 +++++++++++++++++++++++++++----
|
||
|
3 files changed, 59 insertions(+), 8 deletions(-)
|
||
|
|
||
|
diff --git a/src/journal-remote/journal-remote-parse.c b/src/journal-remote/journal-remote-parse.c
|
||
|
index 6c096de03a..7e62954351 100644
|
||
|
--- a/src/journal-remote/journal-remote-parse.c
|
||
|
+++ b/src/journal-remote/journal-remote-parse.c
|
||
|
@@ -41,6 +41,7 @@ void source_free(RemoteSource *source) {
|
||
|
writer_unref(source->writer);
|
||
|
|
||
|
sd_event_source_unref(source->event);
|
||
|
+ sd_event_source_unref(source->buffer_event);
|
||
|
|
||
|
free(source);
|
||
|
}
|
||
|
diff --git a/src/journal-remote/journal-remote-parse.h b/src/journal-remote/journal-remote-parse.h
|
||
|
index 22db550913..06a50296a1 100644
|
||
|
--- a/src/journal-remote/journal-remote-parse.h
|
||
|
+++ b/src/journal-remote/journal-remote-parse.h
|
||
|
@@ -54,6 +54,7 @@ typedef struct RemoteSource {
|
||
|
Writer *writer;
|
||
|
|
||
|
sd_event_source *event;
|
||
|
+ sd_event_source *buffer_event;
|
||
|
} RemoteSource;
|
||
|
|
||
|
RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer);
|
||
|
diff --git a/src/journal-remote/journal-remote.c b/src/journal-remote/journal-remote.c
|
||
|
index d1486e7cda..b7cc6d7172 100644
|
||
|
--- a/src/journal-remote/journal-remote.c
|
||
|
+++ b/src/journal-remote/journal-remote.c
|
||
|
@@ -289,6 +289,8 @@ static int dispatch_raw_source_event(sd_event_source *event,
|
||
|
int fd,
|
||
|
uint32_t revents,
|
||
|
void *userdata);
|
||
|
+static int dispatch_raw_source_until_block(sd_event_source *event,
|
||
|
+ void *userdata);
|
||
|
static int dispatch_blocking_source_event(sd_event_source *event,
|
||
|
void *userdata);
|
||
|
static int dispatch_raw_connection_event(sd_event_source *event,
|
||
|
@@ -376,8 +378,15 @@ static int add_source(RemoteServer *s, int fd, char* name, bool own_name) {
|
||
|
|
||
|
r = sd_event_add_io(s->events, &source->event,
|
||
|
fd, EPOLLIN|EPOLLRDHUP|EPOLLPRI,
|
||
|
- dispatch_raw_source_event, s);
|
||
|
- if (r == -EPERM) {
|
||
|
+ dispatch_raw_source_event, source);
|
||
|
+ if (r == 0) {
|
||
|
+ /* Add additional source for buffer processing. It will be
|
||
|
+ * enabled later. */
|
||
|
+ r = sd_event_add_defer(s->events, &source->buffer_event,
|
||
|
+ dispatch_raw_source_until_block, source);
|
||
|
+ if (r == 0)
|
||
|
+ sd_event_source_set_enabled(source->buffer_event, SD_EVENT_OFF);
|
||
|
+ } else if (r == -EPERM) {
|
||
|
log_debug("Falling back to sd_event_add_defer for fd:%d (%s)", fd, name);
|
||
|
r = sd_event_add_defer(s->events, &source->event,
|
||
|
dispatch_blocking_source_event, source);
|
||
|
@@ -997,15 +1006,18 @@ static void server_destroy(RemoteServer *s) {
|
||
|
**********************************************************************
|
||
|
**********************************************************************/
|
||
|
|
||
|
-static int dispatch_raw_source_event(sd_event_source *event,
|
||
|
- int fd,
|
||
|
- uint32_t revents,
|
||
|
- void *userdata) {
|
||
|
+static int handle_raw_source(sd_event_source *event,
|
||
|
+ int fd,
|
||
|
+ uint32_t revents,
|
||
|
+ RemoteServer *s) {
|
||
|
|
||
|
- RemoteServer *s = userdata;
|
||
|
RemoteSource *source;
|
||
|
int r;
|
||
|
|
||
|
+ /* Returns 1 if there might be more data pending,
|
||
|
+ * 0 if data is currently exhausted, negative on error.
|
||
|
+ */
|
||
|
+
|
||
|
assert(fd >= 0 && fd < (ssize_t) s->sources_size);
|
||
|
source = s->sources[fd];
|
||
|
assert(source->fd == fd);
|
||
|
@@ -1036,11 +1048,48 @@ static int dispatch_raw_source_event(sd_event_source *event,
|
||
|
return 1;
|
||
|
}
|
||
|
|
||
|
+static int dispatch_raw_source_until_block(sd_event_source *event,
|
||
|
+ void *userdata) {
|
||
|
+ RemoteSource *source = userdata;
|
||
|
+ int r;
|
||
|
+
|
||
|
+ /* Make sure event stays around even if source is destroyed */
|
||
|
+ sd_event_source_ref(event);
|
||
|
+
|
||
|
+ r = handle_raw_source(event, source->fd, EPOLLIN, server);
|
||
|
+ if (r != 1)
|
||
|
+ /* No more data for now */
|
||
|
+ sd_event_source_set_enabled(event, SD_EVENT_OFF);
|
||
|
+
|
||
|
+ sd_event_source_unref(event);
|
||
|
+
|
||
|
+ return r;
|
||
|
+}
|
||
|
+
|
||
|
+static int dispatch_raw_source_event(sd_event_source *event,
|
||
|
+ int fd,
|
||
|
+ uint32_t revents,
|
||
|
+ void *userdata) {
|
||
|
+ RemoteSource *source = userdata;
|
||
|
+ int r;
|
||
|
+
|
||
|
+ assert(source->event);
|
||
|
+ assert(source->buffer_event);
|
||
|
+
|
||
|
+ r = handle_raw_source(event, fd, EPOLLIN, server);
|
||
|
+ if (r == 1)
|
||
|
+ /* Might have more data. We need to rerun the handler
|
||
|
+ * until we are sure the buffer is exhausted. */
|
||
|
+ sd_event_source_set_enabled(source->buffer_event, SD_EVENT_ON);
|
||
|
+
|
||
|
+ return r;
|
||
|
+}
|
||
|
+
|
||
|
static int dispatch_blocking_source_event(sd_event_source *event,
|
||
|
void *userdata) {
|
||
|
RemoteSource *source = userdata;
|
||
|
|
||
|
- return dispatch_raw_source_event(event, source->fd, EPOLLIN, server);
|
||
|
+ return handle_raw_source(event, source->fd, EPOLLIN, server);
|
||
|
}
|
||
|
|
||
|
static int accept_connection(const char* type, int fd,
|