summaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
authorMagnus Karlsson <magnus.karlsson@intel.com>2019-04-16 14:58:08 +0200
committerAlexei Starovoitov <ast@kernel.org>2019-04-16 20:13:10 -0700
commitf63666de2ba9c1c3ac0ec57fc5d3032514ec80f1 (patch)
tree8e8af6ef9982d99779a317e02eb16898ed1d5572 /net
parentd1b7725dfea3e6c1aff2ee94baf3658aba450fbe (diff)
xsk: fix XDP socket ring buffer memory ordering
The ring buffer code of XDP sockets is missing a memory barrier on the consumer side between the load of the data and the write that signals that it is ok for the producer to put new data into the buffer. On architectures that does not guarantee that stores are not reordered with older loads, the producer might put data into the ring before the consumer had the chance to read it. As IA does guarantee this ordering, it would only need a compiler barrier here, but there are no primitives in Linux for this specific case (hinder writes to be ordered before older reads) so I had to add a smp_mb() here which will translate into a run-time synch operation on IA. Added a longish comment in the code explaining what each barrier in the ring implementation accomplishes and what would happen if we removed one of them. Signed-off-by: Magnus Karlsson <magnus.karlsson@intel.com> Signed-off-by: Alexei Starovoitov <ast@kernel.org>
Diffstat (limited to 'net')
-rw-r--r--net/xdp/xsk_queue.h56
1 files changed, 52 insertions, 4 deletions
diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h
index 610c0bdc0c2b..88b9ae24658d 100644
--- a/net/xdp/xsk_queue.h
+++ b/net/xdp/xsk_queue.h
@@ -43,6 +43,48 @@ struct xsk_queue {
u64 invalid_descs;
};
+/* The structure of the shared state of the rings are the same as the
+ * ring buffer in kernel/events/ring_buffer.c. For the Rx and completion
+ * ring, the kernel is the producer and user space is the consumer. For
+ * the Tx and fill rings, the kernel is the consumer and user space is
+ * the producer.
+ *
+ * producer consumer
+ *
+ * if (LOAD ->consumer) { LOAD ->producer
+ * (A) smp_rmb() (C)
+ * STORE $data LOAD $data
+ * smp_wmb() (B) smp_mb() (D)
+ * STORE ->producer STORE ->consumer
+ * }
+ *
+ * (A) pairs with (D), and (B) pairs with (C).
+ *
+ * Starting with (B), it protects the data from being written after
+ * the producer pointer. If this barrier was missing, the consumer
+ * could observe the producer pointer being set and thus load the data
+ * before the producer has written the new data. The consumer would in
+ * this case load the old data.
+ *
+ * (C) protects the consumer from speculatively loading the data before
+ * the producer pointer actually has been read. If we do not have this
+ * barrier, some architectures could load old data as speculative loads
+ * are not discarded as the CPU does not know there is a dependency
+ * between ->producer and data.
+ *
+ * (A) is a control dependency that separates the load of ->consumer
+ * from the stores of $data. In case ->consumer indicates there is no
+ * room in the buffer to store $data we do not. So no barrier is needed.
+ *
+ * (D) protects the load of the data to be observed to happen after the
+ * store of the consumer pointer. If we did not have this memory
+ * barrier, the producer could observe the consumer pointer being set
+ * and overwrite the data with a new value before the consumer got the
+ * chance to read the old value. The consumer would thus miss reading
+ * the old entry and very likely read the new entry twice, once right
+ * now and again after circling through the ring.
+ */
+
/* Common functions operating for both RXTX and umem queues */
static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
@@ -106,6 +148,7 @@ static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr)
static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr)
{
if (q->cons_tail == q->cons_head) {
+ smp_mb(); /* D, matches A */
WRITE_ONCE(q->ring->consumer, q->cons_tail);
q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE);
@@ -128,10 +171,11 @@ static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr)
if (xskq_nb_free(q, q->prod_tail, 1) == 0)
return -ENOSPC;
+ /* A, matches D */
ring->desc[q->prod_tail++ & q->ring_mask] = addr;
/* Order producer and data */
- smp_wmb();
+ smp_wmb(); /* B, matches C */
WRITE_ONCE(q->ring->producer, q->prod_tail);
return 0;
@@ -144,6 +188,7 @@ static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr)
if (xskq_nb_free(q, q->prod_head, LAZY_UPDATE_THRESHOLD) == 0)
return -ENOSPC;
+ /* A, matches D */
ring->desc[q->prod_head++ & q->ring_mask] = addr;
return 0;
}
@@ -152,7 +197,7 @@ static inline void xskq_produce_flush_addr_n(struct xsk_queue *q,
u32 nb_entries)
{
/* Order producer and data */
- smp_wmb();
+ smp_wmb(); /* B, matches C */
q->prod_tail += nb_entries;
WRITE_ONCE(q->ring->producer, q->prod_tail);
@@ -163,6 +208,7 @@ static inline int xskq_reserve_addr(struct xsk_queue *q)
if (xskq_nb_free(q, q->prod_head, 1) == 0)
return -ENOSPC;
+ /* A, matches D */
q->prod_head++;
return 0;
}
@@ -204,11 +250,12 @@ static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q,
struct xdp_desc *desc)
{
if (q->cons_tail == q->cons_head) {
+ smp_mb(); /* D, matches A */
WRITE_ONCE(q->ring->consumer, q->cons_tail);
q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE);
/* Order consumer and data */
- smp_rmb();
+ smp_rmb(); /* C, matches B */
}
return xskq_validate_desc(q, desc);
@@ -228,6 +275,7 @@ static inline int xskq_produce_batch_desc(struct xsk_queue *q,
if (xskq_nb_free(q, q->prod_head, 1) == 0)
return -ENOSPC;
+ /* A, matches D */
idx = (q->prod_head++) & q->ring_mask;
ring->desc[idx].addr = addr;
ring->desc[idx].len = len;
@@ -238,7 +286,7 @@ static inline int xskq_produce_batch_desc(struct xsk_queue *q,
static inline void xskq_produce_flush_desc(struct xsk_queue *q)
{
/* Order producer and data */
- smp_wmb();
+ smp_wmb(); /* B, matches C */
q->prod_tail = q->prod_head,
WRITE_ONCE(q->ring->producer, q->prod_tail);