diff options
author | Ilya Dryomov <idryomov@gmail.com> | 2020-11-12 15:48:06 +0100 |
---|---|---|
committer | Ilya Dryomov <idryomov@gmail.com> | 2020-12-14 23:21:50 +0100 |
commit | 2f713615ddd9d805b6c5e79c52e0e11af99d2bf1 (patch) | |
tree | 1a2940cfdb53882ace9f27167d594828cdbf6658 /net/ceph/messenger.c | |
parent | 566050e17e53db283d4e26b73b4b50556f97ce7b (diff) |
libceph: move msgr1 protocol implementation to its own file
A pure move, no other changes.
Note that ceph_tcp_recv{msg,page}() and ceph_tcp_send{msg,page}()
helpers are also moved. msgr2 will bring its own, more efficient,
variants based on iov_iter. Switching msgr1 to them was considered
but decided against to avoid subtle regressions.
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Diffstat (limited to 'net/ceph/messenger.c')
-rw-r--r-- | net/ceph/messenger.c | 1495 |
1 files changed, 0 insertions, 1495 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 4ca7d9b594c7..544cfdbe52d6 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -137,12 +137,6 @@ bool ceph_con_flag_test_and_set(struct ceph_connection *con, static struct kmem_cache *ceph_msg_cache; -/* static tag bytes (protocol control messages) */ -static char tag_msg = CEPH_MSGR_TAG_MSG; -static char tag_ack = CEPH_MSGR_TAG_ACK; -static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE; -static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2; - #ifdef CONFIG_LOCKDEP static struct lock_class_key socket_class; #endif @@ -478,96 +472,6 @@ int ceph_tcp_connect(struct ceph_connection *con) } /* - * If @buf is NULL, discard up to @len bytes. - */ -static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) -{ - struct kvec iov = {buf, len}; - struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; - int r; - - if (!buf) - msg.msg_flags |= MSG_TRUNC; - - iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, len); - r = sock_recvmsg(sock, &msg, msg.msg_flags); - if (r == -EAGAIN) - r = 0; - return r; -} - -static int ceph_tcp_recvpage(struct socket *sock, struct page *page, - int page_offset, size_t length) -{ - struct bio_vec bvec = { - .bv_page = page, - .bv_offset = page_offset, - .bv_len = length - }; - struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; - int r; - - BUG_ON(page_offset + length > PAGE_SIZE); - iov_iter_bvec(&msg.msg_iter, READ, &bvec, 1, length); - r = sock_recvmsg(sock, &msg, msg.msg_flags); - if (r == -EAGAIN) - r = 0; - return r; -} - -/* - * write something. @more is true if caller will be sending more data - * shortly. - */ -static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, - size_t kvlen, size_t len, bool more) -{ - struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; - int r; - - if (more) - msg.msg_flags |= MSG_MORE; - else - msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */ - - r = kernel_sendmsg(sock, &msg, iov, kvlen, len); - if (r == -EAGAIN) - r = 0; - return r; -} - -/* - * @more: either or both of MSG_MORE and MSG_SENDPAGE_NOTLAST - */ -static int ceph_tcp_sendpage(struct socket *sock, struct page *page, - int offset, size_t size, int more) -{ - ssize_t (*sendpage)(struct socket *sock, struct page *page, - int offset, size_t size, int flags); - int flags = MSG_DONTWAIT | MSG_NOSIGNAL | more; - int ret; - - /* - * sendpage cannot properly handle pages with page_count == 0, - * we need to fall back to sendmsg if that's the case. - * - * Same goes for slab pages: skb_can_coalesce() allows - * coalescing neighboring slab objects into a single frag which - * triggers one of hardened usercopy checks. - */ - if (sendpage_ok(page)) - sendpage = sock->ops->sendpage; - else - sendpage = sock_no_sendpage; - - ret = sendpage(sock, page, offset, size, flags); - if (ret == -EAGAIN) - ret = 0; - - return ret; -} - -/* * Shutdown/close the socket for the given connection. */ int ceph_con_close_socket(struct ceph_connection *con) @@ -593,11 +497,6 @@ int ceph_con_close_socket(struct ceph_connection *con) return rc; } -void ceph_con_v1_reset_protocol(struct ceph_connection *con) -{ - con->out_skip = 0; -} - static void ceph_con_reset_protocol(struct ceph_connection *con) { dout("%s con %p\n", __func__, con); @@ -636,12 +535,6 @@ static void ceph_msg_remove_list(struct list_head *head) } } -void ceph_con_v1_reset_session(struct ceph_connection *con) -{ - con->connect_seq = 0; - con->peer_global_seq = 0; -} - void ceph_con_reset_session(struct ceph_connection *con) { dout("%s con %p\n", __func__, con); @@ -702,11 +595,6 @@ void ceph_con_open(struct ceph_connection *con, } EXPORT_SYMBOL(ceph_con_open); -bool ceph_con_v1_opened(struct ceph_connection *con) -{ - return con->connect_seq; -} - /* * return true if this connection ever successfully opened */ @@ -739,7 +627,6 @@ void ceph_con_init(struct ceph_connection *con, void *private, } EXPORT_SYMBOL(ceph_con_init); - /* * We maintain a global counter to order connection attempts. Get * a unique seq greater than @gt. @@ -805,50 +692,6 @@ void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq) } } -static void con_out_kvec_reset(struct ceph_connection *con) -{ - BUG_ON(con->out_skip); - - con->out_kvec_left = 0; - con->out_kvec_bytes = 0; - con->out_kvec_cur = &con->out_kvec[0]; -} - -static void con_out_kvec_add(struct ceph_connection *con, - size_t size, void *data) -{ - int index = con->out_kvec_left; - - BUG_ON(con->out_skip); - BUG_ON(index >= ARRAY_SIZE(con->out_kvec)); - - con->out_kvec[index].iov_len = size; - con->out_kvec[index].iov_base = data; - con->out_kvec_left++; - con->out_kvec_bytes += size; -} - -/* - * Chop off a kvec from the end. Return residual number of bytes for - * that kvec, i.e. how many bytes would have been written if the kvec - * hadn't been nuked. - */ -static int con_out_kvec_skip(struct ceph_connection *con) -{ - int off = con->out_kvec_cur - con->out_kvec; - int skip = 0; - - if (con->out_kvec_bytes > 0) { - skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len; - BUG_ON(con->out_kvec_bytes < skip); - BUG_ON(!con->out_kvec_left); - con->out_kvec_bytes -= skip; - con->out_kvec_left--; - } - - return skip; -} - #ifdef CONFIG_BLOCK /* @@ -1260,307 +1103,6 @@ void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor, size_t bytes) cursor->need_crc = new_piece; } -static size_t sizeof_footer(struct ceph_connection *con) -{ - return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ? - sizeof(struct ceph_msg_footer) : - sizeof(struct ceph_msg_footer_old); -} - -static void prepare_message_data(struct ceph_msg *msg, u32 data_len) -{ - /* Initialize data cursor */ - - ceph_msg_data_cursor_init(&msg->cursor, msg, data_len); -} - -/* - * Prepare footer for currently outgoing message, and finish things - * off. Assumes out_kvec* are already valid.. we just add on to the end. - */ -static void prepare_write_message_footer(struct ceph_connection *con) -{ - struct ceph_msg *m = con->out_msg; - - m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE; - - dout("prepare_write_message_footer %p\n", con); - con_out_kvec_add(con, sizeof_footer(con), &m->footer); - if (con->peer_features & CEPH_FEATURE_MSG_AUTH) { - if (con->ops->sign_message) - con->ops->sign_message(m); - else - m->footer.sig = 0; - } else { - m->old_footer.flags = m->footer.flags; - } - con->out_more = m->more_to_follow; - con->out_msg_done = true; -} - -/* - * Prepare headers for the next outgoing message. - */ -static void prepare_write_message(struct ceph_connection *con) -{ - struct ceph_msg *m; - u32 crc; - - con_out_kvec_reset(con); - con->out_msg_done = false; - - /* Sneak an ack in there first? If we can get it into the same - * TCP packet that's a good thing. */ - if (con->in_seq > con->in_seq_acked) { - con->in_seq_acked = con->in_seq; - con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); - con->out_temp_ack = cpu_to_le64(con->in_seq_acked); - con_out_kvec_add(con, sizeof (con->out_temp_ack), - &con->out_temp_ack); - } - - ceph_con_get_out_msg(con); - m = con->out_msg; - - dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n", - m, con->out_seq, le16_to_cpu(m->hdr.type), - le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len), - m->data_length); - WARN_ON(m->front.iov_len != le32_to_cpu(m->hdr.front_len)); - WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len)); - - /* tag + hdr + front + middle */ - con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); - con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr); - con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); - - if (m->middle) - con_out_kvec_add(con, m->middle->vec.iov_len, - m->middle->vec.iov_base); - - /* fill in hdr crc and finalize hdr */ - crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc)); - con->out_msg->hdr.crc = cpu_to_le32(crc); - memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr)); - - /* fill in front and middle crc, footer */ - crc = crc32c(0, m->front.iov_base, m->front.iov_len); - con->out_msg->footer.front_crc = cpu_to_le32(crc); - if (m->middle) { - crc = crc32c(0, m->middle->vec.iov_base, - m->middle->vec.iov_len); - con->out_msg->footer.middle_crc = cpu_to_le32(crc); - } else - con->out_msg->footer.middle_crc = 0; - dout("%s front_crc %u middle_crc %u\n", __func__, - le32_to_cpu(con->out_msg->footer.front_crc), - le32_to_cpu(con->out_msg->footer.middle_crc)); - con->out_msg->footer.flags = 0; - - /* is there a data payload? */ - con->out_msg->footer.data_crc = 0; - if (m->data_length) { - prepare_message_data(con->out_msg, m->data_length); - con->out_more = 1; /* data + footer will follow */ - } else { - /* no, queue up footer too and be done */ - prepare_write_message_footer(con); - } - - ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); -} - -/* - * Prepare an ack. - */ -static void prepare_write_ack(struct ceph_connection *con) -{ - dout("prepare_write_ack %p %llu -> %llu\n", con, - con->in_seq_acked, con->in_seq); - con->in_seq_acked = con->in_seq; - - con_out_kvec_reset(con); - - con_out_kvec_add(con, sizeof (tag_ack), &tag_ack); - - con->out_temp_ack = cpu_to_le64(con->in_seq_acked); - con_out_kvec_add(con, sizeof (con->out_temp_ack), - &con->out_temp_ack); - - con->out_more = 1; /* more will follow.. eventually.. */ - ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); -} - -/* - * Prepare to share the seq during handshake - */ -static void prepare_write_seq(struct ceph_connection *con) -{ - dout("prepare_write_seq %p %llu -> %llu\n", con, - con->in_seq_acked, con->in_seq); - con->in_seq_acked = con->in_seq; - - con_out_kvec_reset(con); - - con->out_temp_ack = cpu_to_le64(con->in_seq_acked); - con_out_kvec_add(con, sizeof (con->out_temp_ack), - &con->out_temp_ack); - - ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); -} - -/* - * Prepare to write keepalive byte. - */ -static void prepare_write_keepalive(struct ceph_connection *con) -{ - dout("prepare_write_keepalive %p\n", con); - con_out_kvec_reset(con); - if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) { - struct timespec64 now; - - ktime_get_real_ts64(&now); - con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2); - ceph_encode_timespec64(&con->out_temp_keepalive2, &now); - con_out_kvec_add(con, sizeof(con->out_temp_keepalive2), - &con->out_temp_keepalive2); - } else { - con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive); - } - ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); -} - -/* - * Connection negotiation. - */ - -static int get_connect_authorizer(struct ceph_connection *con) -{ - struct ceph_auth_handshake *auth; - int auth_proto; - - if (!con->ops->get_authorizer) { - con->auth = NULL; - con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; - con->out_connect.authorizer_len = 0; - return 0; - } - - auth = con->ops->get_authorizer(con, &auth_proto, con->auth_retry); - if (IS_ERR(auth)) - return PTR_ERR(auth); - - con->auth = auth; - con->out_connect.authorizer_protocol = cpu_to_le32(auth_proto); - con->out_connect.authorizer_len = cpu_to_le32(auth->authorizer_buf_len); - return 0; -} - -/* - * We connected to a peer and are saying hello. - */ -static void prepare_write_banner(struct ceph_connection *con) -{ - con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER); - con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr), - &con->msgr->my_enc_addr); - - con->out_more = 0; - ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); -} - -static void __prepare_write_connect(struct ceph_connection *con) -{ - con_out_kvec_add(con, sizeof(con->out_connect), &con->out_connect); - if (con->auth) - con_out_kvec_add(con, con->auth->authorizer_buf_len, - con->auth->authorizer_buf); - - con->out_more = 0; - ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING); -} - -static int prepare_write_connect(struct ceph_connection *con) -{ - unsigned int global_seq = ceph_get_global_seq(con->msgr, 0); - int proto; - int ret; - - switch (con->peer_name.type) { - case CEPH_ENTITY_TYPE_MON: - proto = CEPH_MONC_PROTOCOL; - break; - case CEPH_ENTITY_TYPE_OSD: - proto = CEPH_OSDC_PROTOCOL; - break; - case CEPH_ENTITY_TYPE_MDS: - proto = CEPH_MDSC_PROTOCOL; - break; - default: - BUG(); - } - - dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con, - con->connect_seq, global_seq, proto); - - con->out_connect.features = - cpu_to_le64(from_msgr(con->msgr)->supported_features); - con->out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT); - con->out_connect.connect_seq = cpu_to_le32(con->connect_seq); - con->out_connect.global_seq = cpu_to_le32(global_seq); - con->out_connect.protocol_version = cpu_to_le32(proto); - con->out_connect.flags = 0; - - ret = get_connect_authorizer(con); - if (ret) - return ret; - - __prepare_write_connect(con); - return 0; -} - -/* - * write as much of pending kvecs to the socket as we can. - * 1 -> done - * 0 -> socket full, but more to do - * <0 -> error - */ -static int write_partial_kvec(struct ceph_connection *con) -{ - int ret; - - dout("write_partial_kvec %p %d left\n", con, con->out_kvec_bytes); - while (con->out_kvec_bytes > 0) { - ret = ceph_tcp_sendmsg(con->sock, con->out_kvec_cur, - con->out_kvec_left, con->out_kvec_bytes, - con->out_more); - if (ret <= 0) - goto out; - con->out_kvec_bytes -= ret; - if (con->out_kvec_bytes == 0) - break; /* done */ - - /* account for full iov entries consumed */ - while (ret >= con->out_kvec_cur->iov_len) { - BUG_ON(!con->out_kvec_left); - ret -= con->out_kvec_cur->iov_len; - con->out_kvec_cur++; - con->out_kvec_left--; - } - /* and for a partially-consumed entry */ - if (ret) { - con->out_kvec_cur->iov_len -= ret; - con->out_kvec_cur->iov_base += ret; - } - } - con->out_kvec_left = 0; - ret = 1; -out: - dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, - con->out_kvec_bytes, con->out_kvec_left, ret); - return ret; /* done! */ -} - u32 ceph_crc32c_page(u32 crc, struct page *page, unsigned int page_offset, unsigned int length) { @@ -1573,256 +1115,6 @@ u32 ceph_crc32c_page(u32 crc, struct page *page, unsigned int page_offset, return crc; } -/* - * Write as much message data payload as we can. If we finish, queue - * up the footer. - * 1 -> done, footer is now queued in out_kvec[]. - * 0 -> socket full, but more to do - * <0 -> error - */ -static int write_partial_message_data(struct ceph_connection *con) -{ - struct ceph_msg *msg = con->out_msg; - struct ceph_msg_data_cursor *cursor = &msg->cursor; - bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); - int more = MSG_MORE | MSG_SENDPAGE_NOTLAST; - u32 crc; - - dout("%s %p msg %p\n", __func__, con, msg); - - if (!msg->num_data_items) - return -EINVAL; - - /* - * Iterate through each page that contains data to be - * written, and send as much as possible for each. - * - * If we are calculating the data crc (the default), we will - * need to map the page. If we have no pages, they have - * been revoked, so use the zero page. - */ - crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0; - while (cursor->total_resid) { - struct page *page; - size_t page_offset; - size_t length; - int ret; - - if (!cursor->resid) { - ceph_msg_data_advance(cursor, 0); - continue; - } - - page = ceph_msg_data_next(cursor, &page_offset, &length, NULL); - if (length == cursor->total_resid) - more = MSG_MORE; - ret = ceph_tcp_sendpage(con->sock, page, page_offset, length, - more); - if (ret <= 0) { - if (do_datacrc) - msg->footer.data_crc = cpu_to_le32(crc); - - return ret; - } - if (do_datacrc && cursor->need_crc) - crc = ceph_crc32c_page(crc, page, page_offset, length); - ceph_msg_data_advance(cursor, (size_t)ret); - } - - dout("%s %p msg %p done\n", __func__, con, msg); - - /* prepare and queue up footer, too */ - if (do_datacrc) - msg->footer.data_crc = cpu_to_le32(crc); - else - msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; - con_out_kvec_reset(con); - prepare_write_message_footer(con); - - return 1; /* must return > 0 to indicate success */ -} - -/* - * write some zeros - */ -static int write_partial_skip(struct ceph_connection *con) -{ - int more = MSG_MORE | MSG_SENDPAGE_NOTLAST; - int ret; - - dout("%s %p %d left\n", __func__, con, con->out_skip); - while (con->out_skip > 0) { - size_t size = min(con->out_skip, (int) PAGE_SIZE); - - if (size == con->out_skip) - more = MSG_MORE; - ret = ceph_tcp_sendpage(con->sock, ceph_zero_page, 0, size, - more); - if (ret <= 0) - goto out; - con->out_skip -= ret; - } - ret = 1; -out: - return ret; -} - -/* - * Prepare to read connection handshake, or an ack. - */ -static void prepare_read_banner(struct ceph_connection *con) -{ - dout("prepare_read_banner %p\n", con); - con->in_base_pos = 0; -} - -static void prepare_read_connect(struct ceph_connection *con) -{ - dout("prepare_read_connect %p\n", con); - con->in_base_pos = 0; -} - -static void prepare_read_ack(struct ceph_connection *con) -{ - dout("prepare_read_ack %p\n", con); - con->in_base_pos = 0; -} - -static void prepare_read_seq(struct ceph_connection *con) -{ - dout("prepare_read_seq %p\n", con); - con->in_base_pos = 0; - con->in_tag = CEPH_MSGR_TAG_SEQ; -} - -static void prepare_read_tag(struct ceph_connection *con) -{ - dout("prepare_read_tag %p\n", con); - con->in_base_pos = 0; - con->in_tag = CEPH_MSGR_TAG_READY; -} - -static void prepare_read_keepalive_ack(struct ceph_connection *con) -{ - dout("prepare_read_keepalive_ack %p\n", con); - con->in_base_pos = 0; -} - -/* - * Prepare to read a message. - */ -static int prepare_read_message(struct ceph_connection *con) -{ - dout("prepare_read_message %p\n", con); - BUG_ON(con->in_msg != NULL); - con->in_base_pos = 0; - con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0; - return 0; -} - - -static int read_partial(struct ceph_connection *con, - int end, int size, void *object) -{ - while (con->in_base_pos < end) { - int left = end - con->in_base_pos; - int have = size - left; - int ret = ceph_tcp_recvmsg(con->sock, object + have, left); - if (ret <= 0) - return ret; - con->in_base_pos += ret; - } - return 1; -} - - -/* - * Read all or part of the connect-side handshake on a new connection - */ -static int read_partial_banner(struct ceph_connection *con) -{ - int size; - int end; - int ret; - - dout("read_partial_banner %p at %d\n", con, con->in_base_pos); - - /* peer's banner */ - size = strlen(CEPH_BANNER); - end = size; - ret = read_partial(con, end, size, con->in_banner); - if (ret <= 0) - goto out; - - size = sizeof (con->actual_peer_addr); - end += size; - ret = read_partial(con, end, size, &con->actual_peer_addr); - if (ret <= 0) - goto out; - ceph_decode_banner_addr(&con->actual_peer_addr); - - size = sizeof (con->peer_addr_for_me); - end += size; - ret = read_partial(con, end, size, &con->peer_addr_for_me); - if (ret <= 0) - goto out; - ceph_decode_banner_addr(&con->peer_addr_for_me); - -out: - return ret; -} - -static int read_partial_connect(struct ceph_connection *con) -{ - int size; - int end; - int ret; - - dout("read_partial_connect %p at %d\n", con, con->in_base_pos); - - size = sizeof (con->in_reply); - end = size; - ret = read_partial(con, end, size, &con->in_reply); - if (ret <= 0) - goto out; - - if (con->auth) { - size = le32_to_cpu(con->in_reply.authorizer_len); - if (size > con->auth->authorizer_reply_buf_len) { - pr_err("authorizer reply too big: %d > %zu\n", size, - con->auth->authorizer_reply_buf_len); - ret = -EINVAL; - goto out; - } - - end += size; - ret = read_partial(con, end, size, - con->auth->authorizer_reply_buf); - if (ret <= 0) - goto out; - } - - dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n", - con, (int)con->in_reply.tag, - le32_to_cpu(con->in_reply.connect_seq), - le32_to_cpu(con->in_reply.global_seq)); -out: - return ret; -} - -/* - * Verify the hello banner looks okay. - */ -static int verify_hello(struct ceph_connection *con) -{ - if (memcmp(con->in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) { - pr_err("connect to %s got bad banner\n", - ceph_pr_addr(&con->peer_addr)); - con->error_msg = "protocol error, bad banner"; - return -1; - } - return 0; -} bool ceph_addr_is_blank(const struct ceph_entity_addr *addr) { @@ -2032,492 +1324,6 @@ bad: return ret; } -static int process_banner(struct ceph_connection *con) -{ - struct ceph_entity_addr *my_addr = &con->msgr->inst.addr; - - dout("process_banner on %p\n", con); - - if (verify_hello(con) < 0) - return -1; - - /* - * Make sure the other end is who we wanted. note that the other - * end may not yet know their ip address, so if it's 0.0.0.0, give - * them the benefit of the doubt. - */ - if (memcmp(&con->peer_addr, &con->actual_peer_addr, - sizeof(con->peer_addr)) != 0 && - !(ceph_addr_is_blank(&con->actual_peer_addr) && - con->actual_peer_addr.nonce == con->peer_addr.nonce)) { - pr_warn("wrong peer, want %s/%u, got %s/%u\n", - ceph_pr_addr(&con->peer_addr), - le32_to_cpu(con->peer_addr.nonce), - ceph_pr_addr(&con->actual_peer_addr), - le32_to_cpu(con->actual_peer_addr.nonce)); - con->error_msg = "wrong peer at address"; - return -1; - } - - /* - * did we learn our address? - */ - if (ceph_addr_is_blank(my_addr)) { - memcpy(&my_addr->in_addr, - &con->peer_addr_for_me.in_addr, - sizeof(con->peer_addr_for_me.in_addr)); - ceph_addr_set_port(my_addr, 0); - ceph_encode_my_addr(con->msgr); - dout("process_banner learned my addr is %s\n", - ceph_pr_addr(my_addr)); - } - - return 0; -} - -static int process_connect(struct ceph_connection *con) -{ - u64 sup_feat = from_msgr(con->msgr)->supported_features; - u64 req_feat = from_msgr(con->msgr)->required_features; - u64 server_feat = le64_to_cpu(con->in_reply.features); - int ret; - - dout("process_connect on %p tag %d\n", con, (int)con->in_tag); - - if (con->auth) { - int len = le32_to_cpu(con->in_reply.authorizer_len); - - /* - * Any connection that defines ->get_authorizer() - * should also define ->add_authorizer_challenge() and - * ->verify_authorizer_reply(). - * - * See get_connect_authorizer(). - */ - if (con->in_reply.tag == CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) { - ret = con->ops->add_authorizer_challenge( - con, con->auth->authorizer_reply_buf, len); - if (ret < 0) - return ret; - - con_out_kvec_reset(con); - __prepare_write_connect(con); - prepare_read_connect(con); - return 0; - } - - if (len) { - ret = con->ops->verify_authorizer_reply(con); - if (ret < 0) { - con->error_msg = "bad authorize reply"; - return ret; - } - } - } - - switch (con->in_reply.tag) { - case CEPH_MSGR_TAG_FEATURES: - pr_err("%s%lld %s feature set mismatch," - " my %llx < server's %llx, missing %llx\n", - ENTITY_NAME(con->peer_name), - ceph_pr_addr(&con->peer_addr), - sup_feat, server_feat, server_feat & ~sup_feat); - con->error_msg = "missing required protocol features"; - return -1; - - case CEPH_MSGR_TAG_BADPROTOVER: - pr_err("%s%lld %s protocol version mismatch," - " my %d != server's %d\n", - ENTITY_NAME(con->peer_name), - ceph_pr_addr(&con->peer_addr), - le32_to_cpu(con->out_connect.protocol_version), - le32_to_cpu(con->in_reply.protocol_version)); - con->error_msg = "protocol version mismatch"; - return -1; - - case CEPH_MSGR_TAG_BADAUTHORIZER: - con->auth_retry++; - dout("process_connect %p got BADAUTHORIZER attempt %d\n", con, - con->auth_retry); - if (con->auth_retry == 2) { - con->error_msg = "connect authorization failure"; - return -1; - } - con_out_kvec_reset(con); - ret = prepare_write_connect(con); - if (ret < 0) - return ret; - prepare_read_connect(con); - break; - - case CEPH_MSGR_TAG_RESETSESSION: - /* - * If we connected with a large connect_seq but the peer - * has no record of a session with us (no connection, or - * connect_seq == 0), they will send RESETSESION to indicate - * that they must have reset their session, and may have - * dropped messages. - */ - dout("process_connect got RESET peer seq %u\n", - le32_to_cpu(con->in_reply.connect_seq)); - pr_info("%s%lld %s session reset\n", - ENTITY_NAME(con->peer_name), - ceph_pr_addr(&con->peer_addr)); - ceph_con_reset_session(con); - con_out_kvec_reset(con); - ret = prepare_write_connect(con); - if (ret < 0) - return ret; - prepare_read_connect(con); - - /* Tell ceph about it. */ - mutex_unlock(&con->mutex); - if (con->ops->peer_reset) - con->ops->peer_reset(con); - mutex_lock(&con->mutex); - if (con->state != CEPH_CON_S_V1_CONNECT_MSG) - return -EAGAIN; - break; - - case CEPH_MSGR_TAG_RETRY_SESSION: - /* - * If we sent a smaller connect_seq than the peer has, try - * again with a larger value. - */ - dout("process_connect got RETRY_SESSION my seq %u, peer %u\n", - le32_to_cpu(con->out_connect.connect_seq), - le32_to_cpu(con->in_reply.connect_seq)); - con->connect_seq = le32_to_cpu(con->in_reply.connect_seq); - con_out_kvec_reset(con); - ret = prepare_write_connect(con); - if (ret < 0) - return ret; - prepare_read_connect(con); - break; - - case CEPH_MSGR_TAG_RETRY_GLOBAL: - /* - * If we sent a smaller global_seq than the peer has, try - * again with a larger value. - */ - dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n", - con->peer_global_seq, - le32_to_cpu(con->in_reply.global_seq)); - ceph_get_global_seq(con->msgr, - le32_to_cpu(con->in_reply.global_seq)); - con_out_kvec_reset(con); - ret = prepare_write_connect(con); - if (ret < 0) - return ret; - prepare_read_connect(con); - break; - - case CEPH_MSGR_TAG_SEQ: - case CEPH_MSGR_TAG_READY: - if (req_feat & ~server_feat) { - pr_err("%s%lld %s protocol feature mismatch," - " my required %llx > server's %llx, need %llx\n", - ENTITY_NAME(con->peer_name), - ceph_pr_addr(&con->peer_addr), - req_feat, server_feat, req_feat & ~server_feat); - con->error_msg = "missing required protocol features"; - return -1; - } - - WARN_ON(con->state != CEPH_CON_S_V1_CONNECT_MSG); - con->state = CEPH_CON_S_OPEN; - con->auth_retry = 0; /* we authenticated; clear flag */ - con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); - con->connect_seq++; - con->peer_features = server_feat; - dout("process_connect got READY gseq %d cseq %d (%d)\n", - con->peer_global_seq, - le32_to_cpu(con->in_reply.connect_seq), - con->connect_seq); - WARN_ON(con->connect_seq != - le32_to_cpu(con->in_reply.connect_seq)); - - if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) - ceph_con_flag_set(con, CEPH_CON_F_LOSSYTX); - - con->delay = 0; /* reset backoff memory */ - - if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) { - prepare_write_seq(con); - prepare_read_seq(con); - } else { - prepare_read_tag(con); - } - break; - - case CEPH_MSGR_TAG_WAIT: - /* - * If there is a connection race (we are opening - * connections to each other), one of us may just have - * to WAIT. This shouldn't happen if we are the - * client. - */ - con->error_msg = "protocol error, got WAIT as client"; - return -1; - - default: - con->error_msg = "protocol error, garbage tag during connect"; - return -1; - } - return 0; -} - - -/* - * read (part of) an ack - */ -static int read_partial_ack(struct ceph_connection *con) -{ - int size = sizeof (con->in_temp_ack); - int end = size; - - return read_partial(con, end, size, &con->in_temp_ack); -} - -/* - * We can finally discard anything that's been acked. - */ -static void process_ack(struct ceph_connection *con) -{ - u64 ack = le64_to_cpu(con->in_temp_ack); - - if (con->in_tag == CEPH_MSGR_TAG_ACK) - ceph_con_discard_sent(con, ack); - else - ceph_con_discard_requeued(con, ack); - - prepare_read_tag(con); -} - - -static int read_partial_message_section(struct ceph_connection *con, - struct kvec *section, - unsigned int sec_len, u32 *crc) -{ - int ret, left; - - BUG_ON(!section); - - while (section->iov_len < sec_len) { - BUG_ON(section->iov_base == NULL); - left = sec_len - section->iov_len; - ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base + - section->iov_len, left); - if (ret <= 0) - return ret; - section->iov_len += ret; - } - if (section->iov_len == sec_len) - *crc = crc32c(0, section->iov_base, section->iov_len); - - return 1; -} - -static int read_partial_msg_data(struct ceph_connection *con) -{ - struct ceph_msg *msg = con->in_msg; - struct ceph_msg_data_cursor *cursor = &msg->cursor; - bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); - struct page *page; - size_t page_offset; - size_t length; - u32 crc = 0; - int ret; - - if (!msg->num_data_items) - return -EIO; - - if (do_datacrc) - crc = con->in_data_crc; - while (cursor->total_resid) { - if (!cursor->resid) { - ceph_msg_data_advance(cursor, 0); - continue; - } - - page = ceph_msg_data_next(cursor, &page_offset, &length, NULL); - ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); - if (ret <= 0) { - if (do_datacrc) - con->in_data_crc = crc; - - return ret; - } - - if (do_datacrc) - crc = ceph_crc32c_page(crc, page, page_offset, ret); - ceph_msg_data_advance(cursor, (size_t)ret); - } - if (do_datacrc) - con->in_data_crc = crc; - - return 1; /* must return > 0 to indicate success */ -} - -/* - * read (part of) a message. - */ -static int read_partial_message(struct ceph_connection *con) -{ - struct ceph_msg *m = con->in_msg; - int size; - int end; - int ret; - unsigned int front_len, middle_len, data_len; - bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); - bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH); - u64 seq; - u32 crc; - - dout("read_partial_message con %p msg %p\n", con, m); - - /* header */ - size = sizeof (con->in_hdr); - end = size; - ret = read_partial(con, end, size, &con->in_hdr); - if (ret <= 0) - return ret; - - crc = crc32c(0, &con->in_hdr, offsetof(struct ceph_msg_header, crc)); - if (cpu_to_le32(crc) != con->in_hdr.crc) { - pr_err("read_partial_message bad hdr crc %u != expected %u\n", - crc, con->in_hdr.crc); - return -EBADMSG; - } - - front_len = le32_to_cpu(con->in_hdr.front_len); - if (front_len > CEPH_MSG_MAX_FRONT_LEN) - return -EIO; - middle_len = le32_to_cpu(con->in_hdr.middle_len); - if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN) - return -EIO; - data_len = le32_to_cpu(con->in_hdr.data_len); - if (data_len > CEPH_MSG_MAX_DATA_LEN) - return -EIO; - - /* verify seq# */ - seq = le64_to_cpu(con->in_hdr.seq); - if ((s64)seq - (s64)con->in_seq < 1) { - pr_info("skipping %s%lld %s seq %lld expected %lld\n", - ENTITY_NAME(con->peer_name), - ceph_pr_addr(&con->peer_addr), - seq, con->in_seq + 1); - con->in_base_pos = -front_len - middle_len - data_len - - sizeof_footer(con); - con->in_tag = CEPH_MSGR_TAG_READY; - return 1; - } else if ((s64)seq - (s64)con->in_seq > 1) { - pr_err("read_partial_message bad seq %lld expected %lld\n", - seq, con->in_seq + 1); - con->error_msg = "bad message sequence # for incoming message"; - return -EBADE; - } - - /* allocate message? */ - if (!con->in_msg) { - int skip = 0; - - dout("got hdr type %d front %d data %d\n", con->in_hdr.type, - front_len, data_len); - ret = ceph_con_in_msg_alloc(con, &con->in_hdr, &skip); - if (ret < 0) - return ret; - - BUG_ON(!con->in_msg ^ skip); - if (skip) { - /* skip this message */ - dout("alloc_msg said skip message\n"); - con->in_base_pos = -front_len - middle_len - data_len - - sizeof_footer(con); - con->in_tag = CEPH_MSGR_TAG_READY; - con->in_seq++; - return 1; - } - - BUG_ON(!con->in_msg); - BUG_ON(con->in_msg->con != con); - m = con->in_msg; - m->front.iov_len = 0; /* haven't read it yet */ - if (m->middle) - m->middle->vec.iov_len = 0; - - /* prepare for data payload, if any */ - - if (data_len) - prepare_message_data(con->in_msg, data_len); - } - - /* front */ - ret = read_partial_message_section(con, &m->front, front_len, - &con->in_front_crc); - if (ret <= 0) - return ret; - - /* middle */ - if (m->middle) { - ret = read_partial_message_section(con, &m->middle->vec, - middle_len, - &con->in_middle_crc); - if (ret <= 0) - return ret; - } - - /* (page) data */ - if (data_len) { - ret = read_partial_msg_data(con); - if (ret <= 0) - return ret; - } - - /* footer */ - size = sizeof_footer(con); - end += size; - ret = read_partial(con, end, size, &m->footer); - if (ret <= 0) - return ret; - - if (!need_sign) { - m->footer.flags = m->old_footer.flags; - m->footer.sig = 0; - } - - dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n", - m, front_len, m->footer.front_crc, middle_len, - m->footer.middle_crc, data_len, m->footer.data_crc); - - /* crc ok? */ - if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) { - pr_err("read_partial_message %p front crc %u != exp. %u\n", - m, con->in_front_crc, m->footer.front_crc); - return -EBADMSG; - } - if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) { - pr_err("read_partial_message %p middle crc %u != exp %u\n", - m, con->in_middle_crc, m->footer.middle_crc); - return -EBADMSG; - } - if (do_datacrc && - (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 && - con->in_data_crc != le32_to_cpu(m->footer.data_crc)) { - pr_err("read_partial_message %p data crc %u != exp. %u\n", m, - con->in_data_crc, le32_to_cpu(m->footer.data_crc)); - return -EBADMSG; - } - - if (need_sign && con->ops->check_message_signature && - con->ops->check_message_signature(m)) { - pr_err("read_partial_message %p signature check failed\n", m); - return -EBADMSG; - } - - return 1; /* done! */ -} - /* * Process message. This happens in the worker thread. The callback should * be careful not to do anything that waits on other incoming messages or it @@ -2551,263 +1357,6 @@ void ceph_con_process_message(struct ceph_connection *con) mutex_lock(&con->mutex); } -static int read_keepalive_ack(struct ceph_connection *con) -{ - struct ceph_timespec ceph_ts; - size_t size = sizeof(ceph_ts); - int ret = read_partial(con, size, size, &ceph_ts); - if (ret <= 0) - return ret; - ceph_decode_timespec64(&con->last_keepalive_ack, &ceph_ts); - prepare_read_tag(con); - return 1; -} - -/* - * Write something to the socket. Called in a worker thread when the - * socket appears to be writeable and we have something ready to send. - */ -int ceph_con_v1_try_write(struct ceph_connection *con) -{ - int ret = 1; - - dout("try_write start %p state %d\n", con, con->state); - if (con->state != CEPH_CON_S_PREOPEN && - con->state != CEPH_CON_S_V1_BANNER && - con->state != CEPH_CON_S_V1_CONNECT_MSG && - con->state != CEPH_CON_S_OPEN) - return 0; - - /* open the socket first? */ - if (con->state == CEPH_CON_S_PREOPEN) { - BUG_ON(con->sock); - con->state = CEPH_CON_S_V1_BANNER; - - con_out_kvec_reset(con); - prepare_write_banner(con); - prepare_read_banner(con); - - BUG_ON(con->in_msg); - con->in_tag = CEPH_MSGR_TAG_READY; - dout("try_write initiating connect on %p new state %d\n", - con, con->state); - ret = ceph_tcp_connect(con); - if (ret < 0) { - con->error_msg = "connect error"; - goto out; - } - } - -more: - dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); - BUG_ON(!con->sock); - - /* kvec data queued? */ - if (con->out_kvec_left) { - ret = write_partial_kvec(con); - if (ret <= 0) - goto out; - } - if (con->out_skip) { - ret = write_partial_skip(con); - if (ret <= 0) - goto out; - } - - /* msg pages? */ - if (con->out_msg) { - if (con->out_msg_done) { - ceph_msg_put(con->out_msg); - con->out_msg = NULL; /* we're done with this one */ - goto do_next; - } - - ret = write_partial_message_data(con); - if (ret == 1) - goto more; /* we need to send the footer, too! */ - if (ret == 0) - goto out; - if (ret < 0) { - dout("try_write write_partial_message_data err %d\n", - ret); - goto out; - } - } - -do_next: - if (con->state == CEPH_CON_S_OPEN) { - if (ceph_con_flag_test_and_clear(con, - CEPH_CON_F_KEEPALIVE_PENDING)) { - prepare_write_keepalive(con); - goto more; - } - /* is anything else pending? */ - if (!list_empty(&con->out_queue)) { - prepare_write_message(con); - goto more; - } - if (con->in_seq > con->in_seq_acked) { - prepare_write_ack(con); - goto more; - } - } - - /* Nothing to do! */ - ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); - dout("try_write nothing else to write.\n"); - ret = 0; -out: - dout("try_write done on %p ret %d\n", con, ret); - return ret; -} - -/* - * Read what we can from the socket. - */ -int ceph_con_v1_try_read(struct ceph_connection *con) -{ - int ret = -1; - -more: - dout("try_read start %p state %d\n", con, con->state); - if (con->state != CEPH_CON_S_V1_BANNER && - con->state != CEPH_CON_S_V1_CONNECT_MSG && - con->state != CEPH_CON_S_OPEN) - return 0; - - BUG_ON(!con->sock); - - dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, - con->in_base_pos); - - if (con->state == CEPH_CON_S_V1_BANNER) { - ret = read_partial_banner(con); - if (ret <= 0) - goto out; - ret = process_banner(con); - if (ret < 0) - goto out; - - con->state = CEPH_CON_S_V1_CONNECT_MSG; - - /* - * Received banner is good, exchange connection info. - * Do not reset out_kvec, as sending our banner raced - * with receiving peer banner after connect completed. - */ - ret = prepare_write_connect(con); - if (ret < 0) - goto out; - prepare_read_connect(con); - - /* Send connection info before awaiting response */ - goto out; - } - - if (con->state == CEPH_CON_S_V1_CONNECT_MSG) { - ret = read_partial_connect(con); - if (ret <= 0) - goto out; - ret = process_connect(con); - if (ret < 0) - goto out; - goto more; - } - - WARN_ON(con->state != CEPH_CON_S_OPEN); - - if (con->in_base_pos < 0) { - /* - * skipping + discarding content. - */ - ret = ceph_tcp_recvmsg(con->sock, NULL, -con->in_base_pos); - if (ret <= 0) - goto out; - dout("skipped %d / %d bytes\n", ret, -con->in_base_pos); - con->in_base_pos += ret; - if (con->in_base_pos) - goto more; - } - if (con->in_tag == CEPH_MSGR_TAG_READY) { - /* - * what's next? - */ - ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); - if (ret <= 0) - goto out; - dout("try_read got tag %d\n", (int)con->in_tag); - switch (con->in_tag) { - case CEPH_MSGR_TAG_MSG: - prepare_read_message(con); - break; - case CEPH_MSGR_TAG_ACK: - prepare_read_ack(con); - break; - case CEPH_MSGR_TAG_KEEPALIVE2_ACK: - prepare_read_keepalive_ack(con); - break; - case CEPH_MSGR_TAG_CLOSE: - ceph_con_close_socket(con); - con->state = CEPH_CON_S_CLOSED; - goto out; - default: - goto bad_tag; - } - } - if (con->in_tag == CEPH_MSGR_TAG_MSG) { - ret = read_partial_message(con); - if (ret <= 0) { - switch (ret) { - case -EBADMSG: - con->error_msg = "bad crc/signature"; - fallthrough; - case -EBADE: - ret = -EIO; - break; - case -EIO: - con->error_msg = "io error"; - break; - } - goto out; - } - if (con->in_tag == CEPH_MSGR_TAG_READY) - goto more; - ceph_con_process_message(con); - if (con->state == CEPH_CON_S_OPEN) - prepare_read_tag(con); - goto more; - } - if (con->in_tag == CEPH_MSGR_TAG_ACK || - con->in_tag == CEPH_MSGR_TAG_SEQ) { - /* - * the final handshake seq exchange is semantically - * equivalent to an ACK - */ - ret = read_partial_ack(con); - if (ret <= 0) - goto out; - process_ack(con); - goto more; - } - if (con->in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) { - ret = read_keepalive_ack(con); - if (ret <= 0) - goto out; - goto more; - } - -out: - dout("try_read done on %p ret %d\n", con, ret); - return ret; - -bad_tag: - pr_err("try_read bad con->in_tag = %d\n", (int)con->in_tag); - con->error_msg = "protocol error, garbage tag"; - ret = -1; - goto out; -} - - /* * Atomically queue work on a connection after the specified delay. * Bump @con reference to avoid races with connection teardown. @@ -3026,7 +1575,6 @@ static void con_fault(struct ceph_connection *con) } } - void ceph_messenger_reset_nonce(struct ceph_messenger *msgr) { u32 nonce = le32_to_cpu(msgr->inst.addr.nonce) + 1000000; @@ -3131,29 +1679,6 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) } EXPORT_SYMBOL(ceph_con_send); -void ceph_con_v1_revoke(struct ceph_connection *con) -{ - struct ceph_msg *msg = con->out_msg; - - WARN_ON(con->out_skip); - /* footer */ - if (con->out_msg_done) { - con->out_skip += con_out_kvec_skip(con); - } else { - WARN_ON(!msg->data_length); - con->out_skip += sizeof_footer(con); - } - /* data, middle, front */ - if (msg->data_length) - con->out_skip += msg->cursor.total_resid; - if (msg->middle) - con->out_skip += con_out_kvec_skip(con); - con->out_skip += con_out_kvec_skip(con); - - dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con, - con->out_kvec_bytes, con->out_skip); -} - /* * Revoke a message that was previously queued for send */ @@ -3191,26 +1716,6 @@ void ceph_msg_revoke(struct ceph_msg *msg) mutex_unlock(&con->mutex); } -void ceph_con_v1_revoke_incoming(struct ceph_connection *con) -{ - unsigned int front_len = le32_to_cpu(con->in_hdr.front_len); - unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len); - unsigned int data_len = le32_to_cpu(con->in_hdr.data_len); - - /* skip rest of message */ - con->in_base_pos = con->in_base_pos - - sizeof(struct ceph_msg_header) - - front_len - - middle_len - - data_len - - sizeof(struct ceph_msg_footer); - - con->in_tag = CEPH_MSGR_TAG_READY; - con->in_seq++; - - dout("%s con %p in_base_pos %d\n", __func__, con, con->in_base_pos); -} - /* * Revoke a message that we may be reading data into */ |