diff options
-rw-r--r-- | include/linux/ceph/libceph.h | 1 | ||||
-rw-r--r-- | include/linux/ceph/messenger.h | 5 | ||||
-rw-r--r-- | net/ceph/ceph_common.c | 7 | ||||
-rw-r--r-- | net/ceph/messenger.c | 4 | ||||
-rw-r--r-- | net/ceph/messenger_v1.c | 54 | ||||
-rw-r--r-- | net/ceph/messenger_v2.c | 250 |
6 files changed, 251 insertions, 70 deletions
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h index 6a89ea410e43..edf62eaa6285 100644 --- a/include/linux/ceph/libceph.h +++ b/include/linux/ceph/libceph.h @@ -35,6 +35,7 @@ #define CEPH_OPT_TCP_NODELAY (1<<4) /* TCP_NODELAY on TCP sockets */ #define CEPH_OPT_NOMSGSIGN (1<<5) /* don't sign msgs (msgr1) */ #define CEPH_OPT_ABORT_ON_FULL (1<<6) /* abort w/ ENOSPC when full */ +#define CEPH_OPT_RXBOUNCE (1<<7) /* double-buffer read data */ #define CEPH_OPT_DEFAULT (CEPH_OPT_TCP_NODELAY) diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index ff99ce094cfa..e7f2fb2fc207 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -383,6 +383,10 @@ struct ceph_connection_v2_info { struct ceph_gcm_nonce in_gcm_nonce; struct ceph_gcm_nonce out_gcm_nonce; + struct page **in_enc_pages; + int in_enc_page_cnt; + int in_enc_resid; + int in_enc_i; struct page **out_enc_pages; int out_enc_page_cnt; int out_enc_resid; @@ -457,6 +461,7 @@ struct ceph_connection { struct ceph_msg *out_msg; /* sending message (== tail of out_sent) */ + struct page *bounce_page; u32 in_front_crc, in_middle_crc, in_data_crc; /* calculated crc */ struct timespec64 last_keepalive_ack; /* keepalive2 ack stamp */ diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index ecc400a0b7bb..4c6441536d55 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c @@ -246,6 +246,7 @@ enum { Opt_cephx_sign_messages, Opt_tcp_nodelay, Opt_abort_on_full, + Opt_rxbounce, }; enum { @@ -295,6 +296,7 @@ static const struct fs_parameter_spec ceph_parameters[] = { fsparam_u32 ("osdkeepalive", Opt_osdkeepalivetimeout), fsparam_enum ("read_from_replica", Opt_read_from_replica, ceph_param_read_from_replica), + fsparam_flag ("rxbounce", Opt_rxbounce), fsparam_enum ("ms_mode", Opt_ms_mode, ceph_param_ms_mode), fsparam_string ("secret", Opt_secret), @@ -584,6 +586,9 @@ int ceph_parse_param(struct fs_parameter *param, struct ceph_options *opt, case Opt_abort_on_full: opt->flags |= CEPH_OPT_ABORT_ON_FULL; break; + case Opt_rxbounce: + opt->flags |= CEPH_OPT_RXBOUNCE; + break; default: BUG(); @@ -660,6 +665,8 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client, seq_puts(m, "notcp_nodelay,"); if (show_all && (opt->flags & CEPH_OPT_ABORT_ON_FULL)) seq_puts(m, "abort_on_full,"); + if (opt->flags & CEPH_OPT_RXBOUNCE) + seq_puts(m, "rxbounce,"); if (opt->mount_timeout != CEPH_MOUNT_TIMEOUT_DEFAULT) seq_printf(m, "mount_timeout=%d,", diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 45eba2dcb67a..d3bb656308b4 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -515,6 +515,10 @@ static void ceph_con_reset_protocol(struct ceph_connection *con) ceph_msg_put(con->out_msg); con->out_msg = NULL; } + if (con->bounce_page) { + __free_page(con->bounce_page); + con->bounce_page = NULL; + } if (ceph_msgr2(from_msgr(con->msgr))) ceph_con_v2_reset_protocol(con); diff --git a/net/ceph/messenger_v1.c b/net/ceph/messenger_v1.c index 2cb5ffdf071a..6b014eca3a13 100644 --- a/net/ceph/messenger_v1.c +++ b/net/ceph/messenger_v1.c @@ -992,8 +992,7 @@ static int read_partial_message_section(struct ceph_connection *con, static int read_partial_msg_data(struct ceph_connection *con) { - struct ceph_msg *msg = con->in_msg; - struct ceph_msg_data_cursor *cursor = &msg->cursor; + struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor; bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); struct page *page; size_t page_offset; @@ -1001,9 +1000,6 @@ static int read_partial_msg_data(struct ceph_connection *con) u32 crc = 0; int ret; - if (!msg->num_data_items) - return -EIO; - if (do_datacrc) crc = con->in_data_crc; while (cursor->total_resid) { @@ -1031,6 +1027,46 @@ static int read_partial_msg_data(struct ceph_connection *con) return 1; /* must return > 0 to indicate success */ } +static int read_partial_msg_data_bounce(struct ceph_connection *con) +{ + struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor; + struct page *page; + size_t off, len; + u32 crc; + int ret; + + if (unlikely(!con->bounce_page)) { + con->bounce_page = alloc_page(GFP_NOIO); + if (!con->bounce_page) { + pr_err("failed to allocate bounce page\n"); + return -ENOMEM; + } + } + + crc = con->in_data_crc; + while (cursor->total_resid) { + if (!cursor->resid) { + ceph_msg_data_advance(cursor, 0); + continue; + } + + page = ceph_msg_data_next(cursor, &off, &len, NULL); + ret = ceph_tcp_recvpage(con->sock, con->bounce_page, 0, len); + if (ret <= 0) { + con->in_data_crc = crc; + return ret; + } + + crc = crc32c(crc, page_address(con->bounce_page), ret); + memcpy_to_page(page, off, page_address(con->bounce_page), ret); + + ceph_msg_data_advance(cursor, ret); + } + con->in_data_crc = crc; + + return 1; /* must return > 0 to indicate success */ +} + /* * read (part of) a message. */ @@ -1141,7 +1177,13 @@ static int read_partial_message(struct ceph_connection *con) /* (page) data */ if (data_len) { - ret = read_partial_msg_data(con); + if (!m->num_data_items) + return -EIO; + + if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) + ret = read_partial_msg_data_bounce(con); + else + ret = read_partial_msg_data(con); if (ret <= 0) return ret; } diff --git a/net/ceph/messenger_v2.c b/net/ceph/messenger_v2.c index c4099b641b38..c81379f93ad5 100644 --- a/net/ceph/messenger_v2.c +++ b/net/ceph/messenger_v2.c @@ -57,8 +57,9 @@ #define IN_S_HANDLE_CONTROL_REMAINDER 3 #define IN_S_PREPARE_READ_DATA 4 #define IN_S_PREPARE_READ_DATA_CONT 5 -#define IN_S_HANDLE_EPILOGUE 6 -#define IN_S_FINISH_SKIP 7 +#define IN_S_PREPARE_READ_ENC_PAGE 6 +#define IN_S_HANDLE_EPILOGUE 7 +#define IN_S_FINISH_SKIP 8 #define OUT_S_QUEUE_DATA 1 #define OUT_S_QUEUE_DATA_CONT 2 @@ -1032,22 +1033,41 @@ static int decrypt_control_remainder(struct ceph_connection *con) padded_len(rem_len) + CEPH_GCM_TAG_LEN); } -static int decrypt_message(struct ceph_connection *con) +static int decrypt_tail(struct ceph_connection *con) { + struct sg_table enc_sgt = {}; struct sg_table sgt = {}; + int tail_len; int ret; + tail_len = tail_onwire_len(con->in_msg, true); + ret = sg_alloc_table_from_pages(&enc_sgt, con->v2.in_enc_pages, + con->v2.in_enc_page_cnt, 0, tail_len, + GFP_NOIO); + if (ret) + goto out; + ret = setup_message_sgs(&sgt, con->in_msg, FRONT_PAD(con->v2.in_buf), MIDDLE_PAD(con->v2.in_buf), DATA_PAD(con->v2.in_buf), con->v2.in_buf, true); if (ret) goto out; - ret = gcm_crypt(con, false, sgt.sgl, sgt.sgl, - tail_onwire_len(con->in_msg, true)); + dout("%s con %p msg %p enc_page_cnt %d sg_cnt %d\n", __func__, con, + con->in_msg, con->v2.in_enc_page_cnt, sgt.orig_nents); + ret = gcm_crypt(con, false, enc_sgt.sgl, sgt.sgl, tail_len); + if (ret) + goto out; + + WARN_ON(!con->v2.in_enc_page_cnt); + ceph_release_page_vector(con->v2.in_enc_pages, + con->v2.in_enc_page_cnt); + con->v2.in_enc_pages = NULL; + con->v2.in_enc_page_cnt = 0; out: sg_free_table(&sgt); + sg_free_table(&enc_sgt); return ret; } @@ -1733,54 +1753,157 @@ static int prepare_read_control_remainder(struct ceph_connection *con) return 0; } -static void prepare_read_data(struct ceph_connection *con) +static int prepare_read_data(struct ceph_connection *con) { struct bio_vec bv; - if (!con_secure(con)) - con->in_data_crc = -1; + con->in_data_crc = -1; ceph_msg_data_cursor_init(&con->v2.in_cursor, con->in_msg, data_len(con->in_msg)); get_bvec_at(&con->v2.in_cursor, &bv); - set_in_bvec(con, &bv); + if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) { + if (unlikely(!con->bounce_page)) { + con->bounce_page = alloc_page(GFP_NOIO); + if (!con->bounce_page) { + pr_err("failed to allocate bounce page\n"); + return -ENOMEM; + } + } + + bv.bv_page = con->bounce_page; + bv.bv_offset = 0; + set_in_bvec(con, &bv); + } else { + set_in_bvec(con, &bv); + } con->v2.in_state = IN_S_PREPARE_READ_DATA_CONT; + return 0; } static void prepare_read_data_cont(struct ceph_connection *con) { struct bio_vec bv; - if (!con_secure(con)) + if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) { + con->in_data_crc = crc32c(con->in_data_crc, + page_address(con->bounce_page), + con->v2.in_bvec.bv_len); + + get_bvec_at(&con->v2.in_cursor, &bv); + memcpy_to_page(bv.bv_page, bv.bv_offset, + page_address(con->bounce_page), + con->v2.in_bvec.bv_len); + } else { con->in_data_crc = ceph_crc32c_page(con->in_data_crc, con->v2.in_bvec.bv_page, con->v2.in_bvec.bv_offset, con->v2.in_bvec.bv_len); + } ceph_msg_data_advance(&con->v2.in_cursor, con->v2.in_bvec.bv_len); if (con->v2.in_cursor.total_resid) { get_bvec_at(&con->v2.in_cursor, &bv); - set_in_bvec(con, &bv); + if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) { + bv.bv_page = con->bounce_page; + bv.bv_offset = 0; + set_in_bvec(con, &bv); + } else { + set_in_bvec(con, &bv); + } WARN_ON(con->v2.in_state != IN_S_PREPARE_READ_DATA_CONT); return; } /* - * We've read all data. Prepare to read data padding (if any) - * and epilogue. + * We've read all data. Prepare to read epilogue. */ reset_in_kvecs(con); - if (con_secure(con)) { - if (need_padding(data_len(con->in_msg))) - add_in_kvec(con, DATA_PAD(con->v2.in_buf), - padding_len(data_len(con->in_msg))); - add_in_kvec(con, con->v2.in_buf, CEPH_EPILOGUE_SECURE_LEN); + add_in_kvec(con, con->v2.in_buf, CEPH_EPILOGUE_PLAIN_LEN); + con->v2.in_state = IN_S_HANDLE_EPILOGUE; +} + +static int prepare_read_tail_plain(struct ceph_connection *con) +{ + struct ceph_msg *msg = con->in_msg; + + if (!front_len(msg) && !middle_len(msg)) { + WARN_ON(!data_len(msg)); + return prepare_read_data(con); + } + + reset_in_kvecs(con); + if (front_len(msg)) { + add_in_kvec(con, msg->front.iov_base, front_len(msg)); + WARN_ON(msg->front.iov_len != front_len(msg)); + } + if (middle_len(msg)) { + add_in_kvec(con, msg->middle->vec.iov_base, middle_len(msg)); + WARN_ON(msg->middle->vec.iov_len != middle_len(msg)); + } + + if (data_len(msg)) { + con->v2.in_state = IN_S_PREPARE_READ_DATA; } else { add_in_kvec(con, con->v2.in_buf, CEPH_EPILOGUE_PLAIN_LEN); + con->v2.in_state = IN_S_HANDLE_EPILOGUE; } + return 0; +} + +static void prepare_read_enc_page(struct ceph_connection *con) +{ + struct bio_vec bv; + + dout("%s con %p i %d resid %d\n", __func__, con, con->v2.in_enc_i, + con->v2.in_enc_resid); + WARN_ON(!con->v2.in_enc_resid); + + bv.bv_page = con->v2.in_enc_pages[con->v2.in_enc_i]; + bv.bv_offset = 0; + bv.bv_len = min(con->v2.in_enc_resid, (int)PAGE_SIZE); + + set_in_bvec(con, &bv); + con->v2.in_enc_i++; + con->v2.in_enc_resid -= bv.bv_len; + + if (con->v2.in_enc_resid) { + con->v2.in_state = IN_S_PREPARE_READ_ENC_PAGE; + return; + } + + /* + * We are set to read the last piece of ciphertext (ending + * with epilogue) + auth tag. + */ + WARN_ON(con->v2.in_enc_i != con->v2.in_enc_page_cnt); con->v2.in_state = IN_S_HANDLE_EPILOGUE; } +static int prepare_read_tail_secure(struct ceph_connection *con) +{ + struct page **enc_pages; + int enc_page_cnt; + int tail_len; + + tail_len = tail_onwire_len(con->in_msg, true); + WARN_ON(!tail_len); + + enc_page_cnt = calc_pages_for(0, tail_len); + enc_pages = ceph_alloc_page_vector(enc_page_cnt, GFP_NOIO); + if (IS_ERR(enc_pages)) + return PTR_ERR(enc_pages); + + WARN_ON(con->v2.in_enc_pages || con->v2.in_enc_page_cnt); + con->v2.in_enc_pages = enc_pages; + con->v2.in_enc_page_cnt = enc_page_cnt; + con->v2.in_enc_resid = tail_len; + con->v2.in_enc_i = 0; + + prepare_read_enc_page(con); + return 0; +} + static void __finish_skip(struct ceph_connection *con) { con->in_seq++; @@ -2589,47 +2712,26 @@ static int __handle_control(struct ceph_connection *con, void *p) } msg = con->in_msg; /* set in process_message_header() */ - if (!front_len(msg) && !middle_len(msg)) { - if (!data_len(msg)) - return process_message(con); - - prepare_read_data(con); - return 0; - } - - reset_in_kvecs(con); if (front_len(msg)) { WARN_ON(front_len(msg) > msg->front_alloc_len); - add_in_kvec(con, msg->front.iov_base, front_len(msg)); msg->front.iov_len = front_len(msg); - - if (con_secure(con) && need_padding(front_len(msg))) - add_in_kvec(con, FRONT_PAD(con->v2.in_buf), - padding_len(front_len(msg))); } else { msg->front.iov_len = 0; } if (middle_len(msg)) { WARN_ON(middle_len(msg) > msg->middle->alloc_len); - add_in_kvec(con, msg->middle->vec.iov_base, middle_len(msg)); msg->middle->vec.iov_len = middle_len(msg); - - if (con_secure(con) && need_padding(middle_len(msg))) - add_in_kvec(con, MIDDLE_PAD(con->v2.in_buf), - padding_len(middle_len(msg))); } else if (msg->middle) { msg->middle->vec.iov_len = 0; } - if (data_len(msg)) { - con->v2.in_state = IN_S_PREPARE_READ_DATA; - } else { - add_in_kvec(con, con->v2.in_buf, - con_secure(con) ? CEPH_EPILOGUE_SECURE_LEN : - CEPH_EPILOGUE_PLAIN_LEN); - con->v2.in_state = IN_S_HANDLE_EPILOGUE; - } - return 0; + if (!front_len(msg) && !middle_len(msg) && !data_len(msg)) + return process_message(con); + + if (con_secure(con)) + return prepare_read_tail_secure(con); + + return prepare_read_tail_plain(con); } static int handle_preamble(struct ceph_connection *con) @@ -2717,7 +2819,7 @@ static int handle_epilogue(struct ceph_connection *con) int ret; if (con_secure(con)) { - ret = decrypt_message(con); + ret = decrypt_tail(con); if (ret) { if (ret == -EBADMSG) con->error_msg = "integrity error, bad epilogue auth tag"; @@ -2785,13 +2887,16 @@ static int populate_in_iter(struct ceph_connection *con) ret = handle_control_remainder(con); break; case IN_S_PREPARE_READ_DATA: - prepare_read_data(con); - ret = 0; + ret = prepare_read_data(con); break; case IN_S_PREPARE_READ_DATA_CONT: prepare_read_data_cont(con); ret = 0; break; + case IN_S_PREPARE_READ_ENC_PAGE: + prepare_read_enc_page(con); + ret = 0; + break; case IN_S_HANDLE_EPILOGUE: ret = handle_epilogue(con); break; @@ -3326,20 +3431,16 @@ void ceph_con_v2_revoke(struct ceph_connection *con) static void revoke_at_prepare_read_data(struct ceph_connection *con) { - int remaining; /* data + [data padding] + epilogue */ + int remaining; int resid; + WARN_ON(con_secure(con)); WARN_ON(!data_len(con->in_msg)); WARN_ON(!iov_iter_is_kvec(&con->v2.in_iter)); resid = iov_iter_count(&con->v2.in_iter); WARN_ON(!resid); - if (con_secure(con)) - remaining = padded_len(data_len(con->in_msg)) + - CEPH_EPILOGUE_SECURE_LEN; - else - remaining = data_len(con->in_msg) + CEPH_EPILOGUE_PLAIN_LEN; - + remaining = data_len(con->in_msg) + CEPH_EPILOGUE_PLAIN_LEN; dout("%s con %p resid %d remaining %d\n", __func__, con, resid, remaining); con->v2.in_iter.count -= resid; @@ -3350,8 +3451,9 @@ static void revoke_at_prepare_read_data(struct ceph_connection *con) static void revoke_at_prepare_read_data_cont(struct ceph_connection *con) { int recved, resid; /* current piece of data */ - int remaining; /* [data padding] + epilogue */ + int remaining; + WARN_ON(con_secure(con)); WARN_ON(!data_len(con->in_msg)); WARN_ON(!iov_iter_is_bvec(&con->v2.in_iter)); resid = iov_iter_count(&con->v2.in_iter); @@ -3363,12 +3465,7 @@ static void revoke_at_prepare_read_data_cont(struct ceph_connection *con) ceph_msg_data_advance(&con->v2.in_cursor, recved); WARN_ON(resid > con->v2.in_cursor.total_resid); - if (con_secure(con)) - remaining = padding_len(data_len(con->in_msg)) + - CEPH_EPILOGUE_SECURE_LEN; - else - remaining = CEPH_EPILOGUE_PLAIN_LEN; - + remaining = CEPH_EPILOGUE_PLAIN_LEN; dout("%s con %p total_resid %zu remaining %d\n", __func__, con, con->v2.in_cursor.total_resid, remaining); con->v2.in_iter.count -= resid; @@ -3376,11 +3473,26 @@ static void revoke_at_prepare_read_data_cont(struct ceph_connection *con) con->v2.in_state = IN_S_FINISH_SKIP; } +static void revoke_at_prepare_read_enc_page(struct ceph_connection *con) +{ + int resid; /* current enc page (not necessarily data) */ + + WARN_ON(!con_secure(con)); + WARN_ON(!iov_iter_is_bvec(&con->v2.in_iter)); + resid = iov_iter_count(&con->v2.in_iter); + WARN_ON(!resid || resid > con->v2.in_bvec.bv_len); + + dout("%s con %p resid %d enc_resid %d\n", __func__, con, resid, + con->v2.in_enc_resid); + con->v2.in_iter.count -= resid; + set_in_skip(con, resid + con->v2.in_enc_resid); + con->v2.in_state = IN_S_FINISH_SKIP; +} + static void revoke_at_handle_epilogue(struct ceph_connection *con) { int resid; - WARN_ON(!iov_iter_is_kvec(&con->v2.in_iter)); resid = iov_iter_count(&con->v2.in_iter); WARN_ON(!resid); @@ -3399,6 +3511,9 @@ void ceph_con_v2_revoke_incoming(struct ceph_connection *con) case IN_S_PREPARE_READ_DATA_CONT: revoke_at_prepare_read_data_cont(con); break; + case IN_S_PREPARE_READ_ENC_PAGE: + revoke_at_prepare_read_enc_page(con); + break; case IN_S_HANDLE_EPILOGUE: revoke_at_handle_epilogue(con); break; @@ -3432,6 +3547,13 @@ void ceph_con_v2_reset_protocol(struct ceph_connection *con) clear_out_sign_kvecs(con); free_conn_bufs(con); + if (con->v2.in_enc_pages) { + WARN_ON(!con->v2.in_enc_page_cnt); + ceph_release_page_vector(con->v2.in_enc_pages, + con->v2.in_enc_page_cnt); + con->v2.in_enc_pages = NULL; + con->v2.in_enc_page_cnt = 0; + } if (con->v2.out_enc_pages) { WARN_ON(!con->v2.out_enc_page_cnt); ceph_release_page_vector(con->v2.out_enc_pages, |