summaryrefslogtreecommitdiff
path: root/fs/dlm
diff options
context:
space:
mode:
authorAlexander Aring <aahringo@redhat.com>2024-04-02 15:18:10 -0400
committerDavid Teigland <teigland@redhat.com>2024-04-09 11:47:51 -0500
commit92d59adfaf710f34ae7788fa54f0731a7640833b (patch)
tree4de104575a8dfdd25f5f978e72cf0414dffb86b6 /fs/dlm
parent578acf9a87a87531df5b59b3799ccc1256a4bbcc (diff)
dlm: do message processing in softirq context
Move dlm message processing from an ordered workqueue context to an ordered softirq context. Handling dlm messages in softirq will allow requests to be cleared more quickly and efficiently, and should avoid longer queues of incomplete requests. Later patches are expected to run completion/blocking callbacks directly from this message processing context, further reducing context switches required to complete a request. In the longer term, concurrent message processing could be implemented. Signed-off-by: Alexander Aring <aahringo@redhat.com> Signed-off-by: David Teigland <teigland@redhat.com>
Diffstat (limited to 'fs/dlm')
-rw-r--r--fs/dlm/lowcomms.c28
1 files changed, 20 insertions, 8 deletions
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c
index 444dc858c4a4..6b8078085e56 100644
--- a/fs/dlm/lowcomms.c
+++ b/fs/dlm/lowcomms.c
@@ -204,6 +204,7 @@ static void process_dlm_messages(struct work_struct *work);
static DECLARE_WORK(process_work, process_dlm_messages);
static DEFINE_SPINLOCK(processqueue_lock);
static bool process_dlm_messages_pending;
+static DECLARE_WAIT_QUEUE_HEAD(processqueue_wq);
static atomic_t processqueue_count;
static LIST_HEAD(processqueue);
@@ -877,7 +878,8 @@ static void process_dlm_messages(struct work_struct *work)
}
list_del(&pentry->list);
- atomic_dec(&processqueue_count);
+ if (atomic_dec_and_test(&processqueue_count))
+ wake_up(&processqueue_wq);
spin_unlock_bh(&processqueue_lock);
for (;;) {
@@ -895,7 +897,8 @@ static void process_dlm_messages(struct work_struct *work)
}
list_del(&pentry->list);
- atomic_dec(&processqueue_count);
+ if (atomic_dec_and_test(&processqueue_count))
+ wake_up(&processqueue_wq);
spin_unlock_bh(&processqueue_lock);
}
}
@@ -1511,7 +1514,20 @@ static void process_recv_sockets(struct work_struct *work)
/* CF_RECV_PENDING cleared */
break;
case DLM_IO_FLUSH:
- flush_workqueue(process_workqueue);
+ /* we can't flush the process_workqueue here because a
+ * WQ_MEM_RECLAIM workequeue can occurr a deadlock for a non
+ * WQ_MEM_RECLAIM workqueue such as process_workqueue. Instead
+ * we have a waitqueue to wait until all messages are
+ * processed.
+ *
+ * This handling is only necessary to backoff the sender and
+ * not queue all messages from the socket layer into DLM
+ * processqueue. When DLM is capable to parse multiple messages
+ * on an e.g. per socket basis this handling can might be
+ * removed. Especially in a message burst we are too slow to
+ * process messages and the queue will fill up memory.
+ */
+ wait_event(processqueue_wq, !atomic_read(&processqueue_count));
fallthrough;
case DLM_IO_RESCHED:
cond_resched();
@@ -1701,11 +1717,7 @@ static int work_start(void)
return -ENOMEM;
}
- /* ordered dlm message process queue,
- * should be converted to a tasklet
- */
- process_workqueue = alloc_ordered_workqueue("dlm_process",
- WQ_HIGHPRI | WQ_MEM_RECLAIM);
+ process_workqueue = alloc_workqueue("dlm_process", WQ_HIGHPRI | WQ_BH, 0);
if (!process_workqueue) {
log_print("can't start dlm_process");
destroy_workqueue(io_workqueue);