summaryrefslogtreecommitdiff
path: root/fs/dlm/midcomms.c
diff options
context:
space:
mode:
authorAlexander Aring <aahringo@redhat.com>2020-09-24 10:31:26 -0400
committerDavid Teigland <teigland@redhat.com>2020-09-29 14:00:32 -0500
commit4798cbbfbd00c498339bdcf4cc2429f53eb374ec (patch)
tree97d35239f9ce8c5a3453234629b7a5b54732cb53 /fs/dlm/midcomms.c
parent4e192ee68e5af301470a925b76700d788db35d96 (diff)
fs: dlm: rework receive handling
This patch reworks the current receive handling of dlm. As I tried to change the send handling to fix reorder issues I took a look into the receive handling and simplified it, it works as the following: Each connection has a preallocated receive buffer with a minimum length of 4096. On receive, the upper layer protocol will process all dlm message until there is not enough data anymore. If there exists "leftover" data at the end of the receive buffer because the dlm message wasn't fully received it will be copied to the begin of the preallocated receive buffer. Next receive more data will be appended to the previous "leftover" data and processing will begin again. This will remove a lot of code of the current mechanism. Inside the processing functionality we will ensure with a memmove() that the dlm message should be memory aligned. To have a dlm message always started at the beginning of the buffer will reduce some amount of memmove() calls because src and dest pointers are the same. The cluster attribute "buffer_size" becomes a new meaning, it's now the size of application layer receive buffer size. If this is changed during runtime the receive buffer will be reallocated. It's important that the receive buffer size has at minimum the size of the maximum possible dlm message size otherwise the received message cannot be placed inside the receive buffer size. Signed-off-by: Alexander Aring <aahringo@redhat.com> Signed-off-by: David Teigland <teigland@redhat.com>
Diffstat (limited to 'fs/dlm/midcomms.c')
-rw-r--r--fs/dlm/midcomms.c136
1 files changed, 53 insertions, 83 deletions
diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c
index 921322d133e3..fde3a6afe4be 100644
--- a/fs/dlm/midcomms.c
+++ b/fs/dlm/midcomms.c
@@ -22,114 +22,84 @@
* into packets and sends them to the comms layer.
*/
+#include <asm/unaligned.h>
+
#include "dlm_internal.h"
#include "lowcomms.h"
#include "config.h"
#include "lock.h"
#include "midcomms.h"
-
-static void copy_from_cb(void *dst, const void *base, unsigned offset,
- unsigned len, unsigned limit)
-{
- unsigned copy = len;
-
- if ((copy + offset) > limit)
- copy = limit - offset;
- memcpy(dst, base + offset, copy);
- len -= copy;
- if (len)
- memcpy(dst + copy, base, len);
-}
-
/*
* Called from the low-level comms layer to process a buffer of
* commands.
- *
- * Only complete messages are processed here, any "spare" bytes from
- * the end of a buffer are saved and tacked onto the front of the next
- * message that comes in. I doubt this will happen very often but we
- * need to be able to cope with it and I don't want the task to be waiting
- * for packets to come in when there is useful work to be done.
*/
-int dlm_process_incoming_buffer(int nodeid, const void *base,
- unsigned offset, unsigned len, unsigned limit)
+int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
{
- union {
- unsigned char __buf[DLM_INBUF_LEN];
- /* this is to force proper alignment on some arches */
- union dlm_packet p;
- } __tmp;
- union dlm_packet *p = &__tmp.p;
- int ret = 0;
- int err = 0;
+ const unsigned char *ptr = buf;
+ const struct dlm_header *hd;
uint16_t msglen;
- uint32_t lockspace;
-
- while (len > sizeof(struct dlm_header)) {
-
- /* Copy just the header to check the total length. The
- message may wrap around the end of the buffer back to the
- start, so we need to use a temp buffer and copy_from_cb. */
-
- copy_from_cb(p, base, offset, sizeof(struct dlm_header),
- limit);
-
- msglen = le16_to_cpu(p->header.h_length);
- lockspace = p->header.h_lockspace;
+ int ret = 0;
- err = -EINVAL;
- if (msglen < sizeof(struct dlm_header))
- break;
- if (p->header.h_cmd == DLM_MSG) {
- if (msglen < sizeof(struct dlm_message))
- break;
- } else {
- if (msglen < sizeof(struct dlm_rcom))
- break;
- }
- err = -E2BIG;
- if (msglen > dlm_config.ci_buffer_size) {
- log_print("message size %d from %d too big, buf len %d",
- msglen, nodeid, len);
- break;
+ while (len >= sizeof(struct dlm_header)) {
+ hd = (struct dlm_header *)ptr;
+
+ /* no message should be more than this otherwise we
+ * cannot deliver this message to upper layers
+ */
+ msglen = get_unaligned_le16(&hd->h_length);
+ if (msglen > DEFAULT_BUFFER_SIZE) {
+ log_print("received invalid length header: %u, will abort message parsing",
+ msglen);
+ return -EBADMSG;
}
- err = 0;
-
- /* If only part of the full message is contained in this
- buffer, then do nothing and wait for lowcomms to call
- us again later with more data. We return 0 meaning
- we've consumed none of the input buffer. */
+ /* caller will take care that leftover
+ * will be parsed next call with more data
+ */
if (msglen > len)
break;
- /* Allocate a larger temp buffer if the full message won't fit
- in the buffer on the stack (which should work for most
- ordinary messages). */
-
- if (msglen > sizeof(__tmp) && p == &__tmp.p) {
- p = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS);
- if (p == NULL)
- return ret;
- }
+ switch (hd->h_cmd) {
+ case DLM_MSG:
+ if (msglen < sizeof(struct dlm_message)) {
+ log_print("dlm msg too small: %u, will skip this message",
+ msglen);
+ goto skip;
+ }
- copy_from_cb(p, base, offset, msglen, limit);
+ break;
+ case DLM_RCOM:
+ if (msglen < sizeof(struct dlm_rcom)) {
+ log_print("dlm rcom msg too small: %u, will skip this message",
+ msglen);
+ goto skip;
+ }
- BUG_ON(lockspace != p->header.h_lockspace);
+ break;
+ default:
+ log_print("unsupported h_cmd received: %u, will skip this message",
+ hd->h_cmd);
+ goto skip;
+ }
+ /* for aligned memory access, we just copy current message
+ * to begin of the buffer which contains already parsed buffer
+ * data and should provide align access for upper layers
+ * because the start address of the buffer has a aligned
+ * address. This memmove can be removed when the upperlayer
+ * is capable of unaligned memory access.
+ */
+ memmove(buf, ptr, msglen);
+ dlm_receive_buffer((union dlm_packet *)buf, nodeid);
+
+skip:
ret += msglen;
- offset += msglen;
- offset &= (limit - 1);
len -= msglen;
-
- dlm_receive_buffer(p, nodeid);
+ ptr += msglen;
}
- if (p != &__tmp.p)
- kfree(p);
-
- return err ? err : ret;
+ return ret;
}