From 9280be24dc9c7aaee230de3ed33f8357386de9a2 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Tue, 14 Oct 2014 10:33:35 +0800 Subject: ceph: fix file lock interruption When a lock operation is interrupted, current code sends a unlock request to MDS to undo the lock operation. This method does not work as expected because the unlock request can drop locks that have already been acquired. The fix is use the newly introduced CEPH_LOCK_FCNTL_INTR/CEPH_LOCK_FLOCK_INTR requests to interrupt blocked file lock request. These requests do not drop locks that have alread been acquired, they only interrupt blocked file lock request. Signed-off-by: Yan, Zheng --- fs/ceph/locks.c | 64 ++++++++++++++++++++++++++++++++++++++++++++-------- fs/ceph/mds_client.c | 2 ++ fs/ceph/mds_client.h | 6 +++++ 3 files changed, 62 insertions(+), 10 deletions(-) (limited to 'fs') diff --git a/fs/ceph/locks.c b/fs/ceph/locks.c index fbc39c47bacd..c35c5c614e38 100644 --- a/fs/ceph/locks.c +++ b/fs/ceph/locks.c @@ -9,6 +9,8 @@ #include static u64 lock_secret; +static int ceph_lock_wait_for_completion(struct ceph_mds_client *mdsc, + struct ceph_mds_request *req); static inline u64 secure_addr(void *addr) { @@ -40,6 +42,9 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file, u64 length = 0; u64 owner; + if (operation != CEPH_MDS_OP_SETFILELOCK || cmd == CEPH_LOCK_UNLOCK) + wait = 0; + req = ceph_mdsc_create_request(mdsc, operation, USE_AUTH_MDS); if (IS_ERR(req)) return PTR_ERR(req); @@ -68,6 +73,9 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file, req->r_args.filelock_change.length = cpu_to_le64(length); req->r_args.filelock_change.wait = wait; + if (wait) + req->r_wait_for_completion = ceph_lock_wait_for_completion; + err = ceph_mdsc_do_request(mdsc, inode, req); if (operation == CEPH_MDS_OP_GETFILELOCK) { @@ -96,6 +104,52 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file, return err; } +static int ceph_lock_wait_for_completion(struct ceph_mds_client *mdsc, + struct ceph_mds_request *req) +{ + struct ceph_mds_request *intr_req; + struct inode *inode = req->r_inode; + int err, lock_type; + + BUG_ON(req->r_op != CEPH_MDS_OP_SETFILELOCK); + if (req->r_args.filelock_change.rule == CEPH_LOCK_FCNTL) + lock_type = CEPH_LOCK_FCNTL_INTR; + else if (req->r_args.filelock_change.rule == CEPH_LOCK_FLOCK) + lock_type = CEPH_LOCK_FLOCK_INTR; + else + BUG_ON(1); + BUG_ON(req->r_args.filelock_change.type == CEPH_LOCK_UNLOCK); + + err = wait_for_completion_interruptible(&req->r_completion); + if (!err) + return 0; + + dout("ceph_lock_wait_for_completion: request %llu was interrupted\n", + req->r_tid); + + intr_req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETFILELOCK, + USE_AUTH_MDS); + if (IS_ERR(intr_req)) + return PTR_ERR(intr_req); + + intr_req->r_inode = inode; + ihold(inode); + intr_req->r_num_caps = 1; + + intr_req->r_args.filelock_change = req->r_args.filelock_change; + intr_req->r_args.filelock_change.rule = lock_type; + intr_req->r_args.filelock_change.type = CEPH_LOCK_UNLOCK; + + err = ceph_mdsc_do_request(mdsc, inode, intr_req); + ceph_mdsc_put_request(intr_req); + + if (err && err != -ERESTARTSYS) + return err; + + wait_for_completion(&req->r_completion); + return 0; +} + /** * Attempt to set an fcntl lock. * For now, this just goes away to the server. Later it may be more awesome. @@ -143,11 +197,6 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl) err); } } - - } else if (err == -ERESTARTSYS) { - dout("undoing lock\n"); - ceph_lock_message(CEPH_LOCK_FCNTL, op, file, - CEPH_LOCK_UNLOCK, 0, fl); } return err; } @@ -186,11 +235,6 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl) file, CEPH_LOCK_UNLOCK, 0, fl); dout("got %d on flock_lock_file_wait, undid lock", err); } - } else if (err == -ERESTARTSYS) { - dout("undoing lock\n"); - ceph_lock_message(CEPH_LOCK_FLOCK, - CEPH_MDS_OP_SETFILELOCK, - file, CEPH_LOCK_UNLOCK, 0, fl); } return err; } diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index a92d3f5c6c12..5a47ed760e6d 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -2208,6 +2208,8 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, &req->r_completion, req->r_timeout); if (err == 0) err = -EIO; + } else if (req->r_wait_for_completion) { + err = req->r_wait_for_completion(mdsc, req); } else { err = wait_for_completion_killable(&req->r_completion); } diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 3288359353e9..230bda791d4f 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -166,6 +166,11 @@ struct ceph_mds_client; */ typedef void (*ceph_mds_request_callback_t) (struct ceph_mds_client *mdsc, struct ceph_mds_request *req); +/* + * wait for request completion callback + */ +typedef int (*ceph_mds_request_wait_callback_t) (struct ceph_mds_client *mdsc, + struct ceph_mds_request *req); /* * an in-flight mds request @@ -239,6 +244,7 @@ struct ceph_mds_request { struct completion r_completion; struct completion r_safe_completion; ceph_mds_request_callback_t r_callback; + ceph_mds_request_wait_callback_t r_wait_for_completion; struct list_head r_unsafe_item; /* per-session unsafe list item */ bool r_got_unsafe, r_got_safe, r_got_result; -- cgit v1.2.3-70-g09d2 From 70db4f3629b3476cf506be869ef9d15688d2d44a Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Tue, 21 Oct 2014 18:09:56 -0700 Subject: ceph: introduce a new inode flag indicating if cached dentries are ordered After creating/deleting/renaming file, offsets of sibling dentries may change. So we can not use cached dentries to satisfy readdir. But we can still use the cached dentries to conclude -ENOENT for lookup. This patch introduces a new inode flag indicating if child dentries are ordered. The flag is set at the same time marking a directory complete. After creating/deleting/renaming file, we clear the flag on directory inode. This prevents ceph_readdir() from using cached dentries to satisfy readdir syscall. Signed-off-by: Yan, Zheng --- fs/ceph/dir.c | 23 +++++++++++++++-------- fs/ceph/inode.c | 13 ++++++++----- fs/ceph/super.h | 38 ++++++++++++++++++++++++++++++++------ 3 files changed, 55 insertions(+), 19 deletions(-) (limited to 'fs') diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index e6d63f8f98c0..652619950fa9 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -183,7 +183,7 @@ more: spin_unlock(&parent->d_lock); /* make sure a dentry wasn't dropped while we didn't have parent lock */ - if (!ceph_dir_is_complete(dir)) { + if (!ceph_dir_is_complete_ordered(dir)) { dout(" lost dir complete on %p; falling back to mds\n", dir); dput(dentry); err = -EAGAIN; @@ -261,10 +261,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) /* always start with . and .. */ if (ctx->pos == 0) { - /* note dir version at start of readdir so we can tell - * if any dentries get dropped */ - fi->dir_release_count = atomic_read(&ci->i_release_count); - dout("readdir off 0 -> '.'\n"); if (!dir_emit(ctx, ".", 1, ceph_translate_ino(inode->i_sb, inode->i_ino), @@ -289,7 +285,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) if ((ctx->pos == 2 || fi->dentry) && !ceph_test_mount_opt(fsc, NOASYNCREADDIR) && ceph_snap(inode) != CEPH_SNAPDIR && - __ceph_dir_is_complete(ci) && + __ceph_dir_is_complete_ordered(ci) && __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) { u32 shared_gen = ci->i_shared_gen; spin_unlock(&ci->i_ceph_lock); @@ -312,6 +308,13 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) /* proceed with a normal readdir */ + if (ctx->pos == 2) { + /* note dir version at start of readdir so we can tell + * if any dentries get dropped */ + fi->dir_release_count = atomic_read(&ci->i_release_count); + fi->dir_ordered_count = ci->i_ordered_count; + } + more: /* do we have the correct frag content buffered? */ if (fi->frag != frag || fi->last_readdir == NULL) { @@ -446,8 +449,12 @@ more: */ spin_lock(&ci->i_ceph_lock); if (atomic_read(&ci->i_release_count) == fi->dir_release_count) { - dout(" marking %p complete\n", inode); - __ceph_dir_set_complete(ci, fi->dir_release_count); + if (ci->i_ordered_count == fi->dir_ordered_count) + dout(" marking %p complete and ordered\n", inode); + else + dout(" marking %p complete\n", inode); + __ceph_dir_set_complete(ci, fi->dir_release_count, + fi->dir_ordered_count); } spin_unlock(&ci->i_ceph_lock); diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 7b6139004401..72607c17e6fd 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -389,6 +389,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ci->i_version = 0; ci->i_time_warp_seq = 0; ci->i_ceph_flags = 0; + ci->i_ordered_count = 0; atomic_set(&ci->i_release_count, 1); atomic_set(&ci->i_complete_count, 0); ci->i_symlink = NULL; @@ -845,7 +846,8 @@ static int fill_inode(struct inode *inode, (issued & CEPH_CAP_FILE_EXCL) == 0 && !__ceph_dir_is_complete(ci)) { dout(" marking %p complete (empty)\n", inode); - __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count)); + __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count), + ci->i_ordered_count); } /* were we issued a capability? */ @@ -1206,8 +1208,8 @@ retry_lookup: ceph_invalidate_dentry_lease(dn); /* d_move screws up sibling dentries' offsets */ - ceph_dir_clear_complete(dir); - ceph_dir_clear_complete(olddir); + ceph_dir_clear_ordered(dir); + ceph_dir_clear_ordered(olddir); dout("dn %p gets new offset %lld\n", req->r_old_dentry, ceph_dentry(req->r_old_dentry)->offset); @@ -1219,6 +1221,7 @@ retry_lookup: if (!rinfo->head->is_target) { dout("fill_trace null dentry\n"); if (dn->d_inode) { + ceph_dir_clear_ordered(dir); dout("d_delete %p\n", dn); d_delete(dn); } else { @@ -1235,7 +1238,7 @@ retry_lookup: /* attach proper inode */ if (!dn->d_inode) { - ceph_dir_clear_complete(dir); + ceph_dir_clear_ordered(dir); ihold(in); dn = splice_dentry(dn, in, &have_lease); if (IS_ERR(dn)) { @@ -1265,7 +1268,7 @@ retry_lookup: BUG_ON(!dir); BUG_ON(ceph_snap(dir) != CEPH_SNAPDIR); dout(" linking snapped dir %p to dn %p\n", in, dn); - ceph_dir_clear_complete(dir); + ceph_dir_clear_ordered(dir); ihold(in); dn = splice_dentry(dn, in, NULL); if (IS_ERR(dn)) { diff --git a/fs/ceph/super.h b/fs/ceph/super.h index b82f507979b8..aca22879b41f 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -256,6 +256,7 @@ struct ceph_inode_info { u32 i_time_warp_seq; unsigned i_ceph_flags; + int i_ordered_count; atomic_t i_release_count; atomic_t i_complete_count; @@ -434,14 +435,19 @@ static inline struct inode *ceph_find_inode(struct super_block *sb, /* * Ceph inode. */ -#define CEPH_I_NODELAY 4 /* do not delay cap release */ -#define CEPH_I_FLUSH 8 /* do not delay flush of dirty metadata */ -#define CEPH_I_NOFLUSH 16 /* do not flush dirty caps */ +#define CEPH_I_DIR_ORDERED 1 /* dentries in dir are ordered */ +#define CEPH_I_NODELAY 4 /* do not delay cap release */ +#define CEPH_I_FLUSH 8 /* do not delay flush of dirty metadata */ +#define CEPH_I_NOFLUSH 16 /* do not flush dirty caps */ static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci, - int release_count) + int release_count, int ordered_count) { atomic_set(&ci->i_complete_count, release_count); + if (ci->i_ordered_count == ordered_count) + ci->i_ceph_flags |= CEPH_I_DIR_ORDERED; + else + ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED; } static inline void __ceph_dir_clear_complete(struct ceph_inode_info *ci) @@ -455,16 +461,35 @@ static inline bool __ceph_dir_is_complete(struct ceph_inode_info *ci) atomic_read(&ci->i_release_count); } +static inline bool __ceph_dir_is_complete_ordered(struct ceph_inode_info *ci) +{ + return __ceph_dir_is_complete(ci) && + (ci->i_ceph_flags & CEPH_I_DIR_ORDERED); +} + static inline void ceph_dir_clear_complete(struct inode *inode) { __ceph_dir_clear_complete(ceph_inode(inode)); } -static inline bool ceph_dir_is_complete(struct inode *inode) +static inline void ceph_dir_clear_ordered(struct inode *inode) { - return __ceph_dir_is_complete(ceph_inode(inode)); + struct ceph_inode_info *ci = ceph_inode(inode); + spin_lock(&ci->i_ceph_lock); + ci->i_ordered_count++; + ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED; + spin_unlock(&ci->i_ceph_lock); } +static inline bool ceph_dir_is_complete_ordered(struct inode *inode) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + bool ret; + spin_lock(&ci->i_ceph_lock); + ret = __ceph_dir_is_complete_ordered(ci); + spin_unlock(&ci->i_ceph_lock); + return ret; +} /* find a specific frag @f */ extern struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci, @@ -580,6 +605,7 @@ struct ceph_file_info { char *last_name; /* last entry in previous chunk */ struct dentry *dentry; /* next dentry (for dcache readdir) */ int dir_release_count; + int dir_ordered_count; /* used for -o dirstat read() on directory thing */ char *dir_info; -- cgit v1.2.3-70-g09d2 From e96a650a8174e20112b400e72e0b2429aa66de20 Mon Sep 17 00:00:00 2001 From: SF Markus Elfring Date: Sun, 2 Nov 2014 15:20:59 +0100 Subject: ceph, rbd: delete unnecessary checks before two function calls The functions ceph_put_snap_context() and iput() test whether their argument is NULL and then return immediately. Thus the test around the call is not needed. This issue was detected by using the Coccinelle software. Signed-off-by: Markus Elfring [idryomov@redhat.com: squashed rbd.c hunk, changelog] Signed-off-by: Ilya Dryomov --- drivers/block/rbd.c | 3 +-- fs/ceph/caps.c | 3 +-- fs/ceph/mds_client.c | 6 ++---- fs/ceph/snap.c | 9 +++------ 4 files changed, 7 insertions(+), 14 deletions(-) (limited to 'fs') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 27b71a0b72d0..3c34ab55537c 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -3405,8 +3405,7 @@ err_rq: if (result) rbd_warn(rbd_dev, "%s %llx at %llx result %d", obj_op_name(op_type), length, offset, result); - if (snapc) - ceph_put_snap_context(snapc); + ceph_put_snap_context(snapc); blk_end_request_all(rq, result); } diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index cefca661464b..eb1bf1f35e0e 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -3137,8 +3137,7 @@ flush_cap_releases: done: mutex_unlock(&session->s_mutex); done_unlocked: - if (inode) - iput(inode); + iput(inode); return; bad: diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 5a47ed760e6d..fc74617069a3 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -524,8 +524,7 @@ void ceph_mdsc_release_request(struct kref *kref) } if (req->r_locked_dir) ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); - if (req->r_target_inode) - iput(req->r_target_inode); + iput(req->r_target_inode); if (req->r_dentry) dput(req->r_dentry); if (req->r_old_dentry) @@ -1066,8 +1065,7 @@ out: session->s_cap_iterator = NULL; spin_unlock(&session->s_cap_lock); - if (last_inode) - iput(last_inode); + iput(last_inode); if (old_cap) ceph_put_cap(session->s_mdsc, old_cap); diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index f01645a27752..c1cc993225e3 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -365,8 +365,7 @@ static int build_snap_context(struct ceph_snap_realm *realm) realm->ino, realm, snapc, snapc->seq, (unsigned int) snapc->num_snaps); - if (realm->cached_context) - ceph_put_snap_context(realm->cached_context); + ceph_put_snap_context(realm->cached_context); realm->cached_context = snapc; return 0; @@ -590,15 +589,13 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm) if (!inode) continue; spin_unlock(&realm->inodes_with_caps_lock); - if (lastinode) - iput(lastinode); + iput(lastinode); lastinode = inode; ceph_queue_cap_snap(ci); spin_lock(&realm->inodes_with_caps_lock); } spin_unlock(&realm->inodes_with_caps_lock); - if (lastinode) - iput(lastinode); + iput(lastinode); list_for_each_entry(child, &realm->children, child_item) { dout("queue_realm_cap_snaps %p %llx queue child %p %llx\n", -- cgit v1.2.3-70-g09d2 From 33d07337962c7bbd2fd5cf7f1106735c9507fbe2 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Tue, 4 Nov 2014 16:33:37 +0800 Subject: libceph: message signature support Signed-off-by: Yan, Zheng --- fs/ceph/mds_client.c | 16 +++++++++++ include/linux/ceph/auth.h | 26 +++++++++++++++++ include/linux/ceph/ceph_features.h | 1 + include/linux/ceph/messenger.h | 9 +++++- include/linux/ceph/msgr.h | 8 ++++++ net/ceph/auth_x.c | 58 ++++++++++++++++++++++++++++++++++++++ net/ceph/messenger.c | 32 +++++++++++++++++++-- net/ceph/osd_client.c | 16 +++++++++++ 8 files changed, 162 insertions(+), 4 deletions(-) (limited to 'fs') diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index fc74617069a3..9f00853f6d42 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -3744,6 +3744,20 @@ static struct ceph_msg *mds_alloc_msg(struct ceph_connection *con, return msg; } +static int sign_message(struct ceph_connection *con, struct ceph_msg *msg) +{ + struct ceph_mds_session *s = con->private; + struct ceph_auth_handshake *auth = &s->s_auth; + return ceph_auth_sign_message(auth, msg); +} + +static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg) +{ + struct ceph_mds_session *s = con->private; + struct ceph_auth_handshake *auth = &s->s_auth; + return ceph_auth_check_message_signature(auth, msg); +} + static const struct ceph_connection_operations mds_con_ops = { .get = con_get, .put = con_put, @@ -3753,6 +3767,8 @@ static const struct ceph_connection_operations mds_con_ops = { .invalidate_authorizer = invalidate_authorizer, .peer_reset = peer_reset, .alloc_msg = mds_alloc_msg, + .sign_message = sign_message, + .check_message_signature = check_message_signature, }; /* eof */ diff --git a/include/linux/ceph/auth.h b/include/linux/ceph/auth.h index 5f3386844134..260d78b587c4 100644 --- a/include/linux/ceph/auth.h +++ b/include/linux/ceph/auth.h @@ -13,6 +13,7 @@ struct ceph_auth_client; struct ceph_authorizer; +struct ceph_msg; struct ceph_auth_handshake { struct ceph_authorizer *authorizer; @@ -20,6 +21,10 @@ struct ceph_auth_handshake { size_t authorizer_buf_len; void *authorizer_reply_buf; size_t authorizer_reply_buf_len; + int (*sign_message)(struct ceph_auth_handshake *auth, + struct ceph_msg *msg); + int (*check_message_signature)(struct ceph_auth_handshake *auth, + struct ceph_msg *msg); }; struct ceph_auth_client_ops { @@ -66,6 +71,11 @@ struct ceph_auth_client_ops { void (*reset)(struct ceph_auth_client *ac); void (*destroy)(struct ceph_auth_client *ac); + + int (*sign_message)(struct ceph_auth_handshake *auth, + struct ceph_msg *msg); + int (*check_message_signature)(struct ceph_auth_handshake *auth, + struct ceph_msg *msg); }; struct ceph_auth_client { @@ -113,4 +123,20 @@ extern int ceph_auth_verify_authorizer_reply(struct ceph_auth_client *ac, extern void ceph_auth_invalidate_authorizer(struct ceph_auth_client *ac, int peer_type); +static inline int ceph_auth_sign_message(struct ceph_auth_handshake *auth, + struct ceph_msg *msg) +{ + if (auth->sign_message) + return auth->sign_message(auth, msg); + return 0; +} + +static inline +int ceph_auth_check_message_signature(struct ceph_auth_handshake *auth, + struct ceph_msg *msg) +{ + if (auth->check_message_signature) + return auth->check_message_signature(auth, msg); + return 0; +} #endif diff --git a/include/linux/ceph/ceph_features.h b/include/linux/ceph/ceph_features.h index d12659ce550d..71e05bbf8ceb 100644 --- a/include/linux/ceph/ceph_features.h +++ b/include/linux/ceph/ceph_features.h @@ -84,6 +84,7 @@ static inline u64 ceph_sanitize_features(u64 features) CEPH_FEATURE_PGPOOL3 | \ CEPH_FEATURE_OSDENC | \ CEPH_FEATURE_CRUSH_TUNABLES | \ + CEPH_FEATURE_MSG_AUTH | \ CEPH_FEATURE_CRUSH_TUNABLES2 | \ CEPH_FEATURE_REPLY_CREATE_INODE | \ CEPH_FEATURE_OSDHASHPSPOOL | \ diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index 40ae58e3e9db..d9d396c16503 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -42,6 +42,10 @@ struct ceph_connection_operations { struct ceph_msg * (*alloc_msg) (struct ceph_connection *con, struct ceph_msg_header *hdr, int *skip); + int (*sign_message) (struct ceph_connection *con, struct ceph_msg *msg); + + int (*check_message_signature) (struct ceph_connection *con, + struct ceph_msg *msg); }; /* use format string %s%d */ @@ -142,7 +146,10 @@ struct ceph_msg_data_cursor { */ struct ceph_msg { struct ceph_msg_header hdr; /* header */ - struct ceph_msg_footer footer; /* footer */ + union { + struct ceph_msg_footer footer; /* footer */ + struct ceph_msg_footer_old old_footer; /* old format footer */ + }; struct kvec front; /* unaligned blobs of message */ struct ceph_buffer *middle; diff --git a/include/linux/ceph/msgr.h b/include/linux/ceph/msgr.h index 3d94a73b5f30..cac4b28ac1c0 100644 --- a/include/linux/ceph/msgr.h +++ b/include/linux/ceph/msgr.h @@ -164,13 +164,21 @@ struct ceph_msg_header { /* * follows data payload */ +struct ceph_msg_footer_old { + __le32 front_crc, middle_crc, data_crc; + __u8 flags; +} __attribute__ ((packed)); + struct ceph_msg_footer { __le32 front_crc, middle_crc, data_crc; + // sig holds the 64 bits of the digital signature for the message PLR + __le64 sig; __u8 flags; } __attribute__ ((packed)); #define CEPH_MSG_FOOTER_COMPLETE (1<<0) /* msg wasn't aborted */ #define CEPH_MSG_FOOTER_NOCRC (1<<1) /* no data crc */ +#define CEPH_MSG_FOOTER_SIGNED (1<<2) /* msg was signed */ #endif diff --git a/net/ceph/auth_x.c b/net/ceph/auth_x.c index 77f3885c16bc..15845814a0f2 100644 --- a/net/ceph/auth_x.c +++ b/net/ceph/auth_x.c @@ -8,6 +8,7 @@ #include #include +#include #include "crypto.h" #include "auth_x.h" @@ -567,6 +568,8 @@ static int ceph_x_create_authorizer( auth->authorizer_buf_len = au->buf->vec.iov_len; auth->authorizer_reply_buf = au->reply_buf; auth->authorizer_reply_buf_len = sizeof (au->reply_buf); + auth->sign_message = ac->ops->sign_message; + auth->check_message_signature = ac->ops->check_message_signature; return 0; } @@ -667,6 +670,59 @@ static void ceph_x_invalidate_authorizer(struct ceph_auth_client *ac, memset(&th->validity, 0, sizeof(th->validity)); } +static int calcu_signature(struct ceph_x_authorizer *au, + struct ceph_msg *msg, __le64 *sig) +{ + int ret; + char tmp_enc[40]; + __le32 tmp[5] = { + 16u, msg->hdr.crc, msg->footer.front_crc, + msg->footer.middle_crc, msg->footer.data_crc, + }; + ret = ceph_x_encrypt(&au->session_key, &tmp, sizeof(tmp), + tmp_enc, sizeof(tmp_enc)); + if (ret < 0) + return ret; + *sig = *(__le64*)(tmp_enc + 4); + return 0; +} + +static int ceph_x_sign_message(struct ceph_auth_handshake *auth, + struct ceph_msg *msg) +{ + int ret; + if (!auth->authorizer) + return 0; + ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer, + msg, &msg->footer.sig); + if (ret < 0) + return ret; + msg->footer.flags |= CEPH_MSG_FOOTER_SIGNED; + return 0; +} + +static int ceph_x_check_message_signature(struct ceph_auth_handshake *auth, + struct ceph_msg *msg) +{ + __le64 sig_check; + int ret; + + if (!auth->authorizer) + return 0; + ret = calcu_signature((struct ceph_x_authorizer *)auth->authorizer, + msg, &sig_check); + if (ret < 0) + return ret; + if (sig_check == msg->footer.sig) + return 0; + if (msg->footer.flags & CEPH_MSG_FOOTER_SIGNED) + dout("ceph_x_check_message_signature %p has signature %llx " + "expect %llx\n", msg, msg->footer.sig, sig_check); + else + dout("ceph_x_check_message_signature %p sender did not set " + "CEPH_MSG_FOOTER_SIGNED\n", msg); + return -EBADMSG; +} static const struct ceph_auth_client_ops ceph_x_ops = { .name = "x", @@ -681,6 +737,8 @@ static const struct ceph_auth_client_ops ceph_x_ops = { .invalidate_authorizer = ceph_x_invalidate_authorizer, .reset = ceph_x_reset, .destroy = ceph_x_destroy, + .sign_message = ceph_x_sign_message, + .check_message_signature = ceph_x_check_message_signature, }; diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 863d07ab2129..33a2f201e460 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -1196,8 +1196,18 @@ static void prepare_write_message_footer(struct ceph_connection *con) dout("prepare_write_message_footer %p\n", con); con->out_kvec_is_msg = true; con->out_kvec[v].iov_base = &m->footer; - con->out_kvec[v].iov_len = sizeof(m->footer); - con->out_kvec_bytes += sizeof(m->footer); + if (con->peer_features & CEPH_FEATURE_MSG_AUTH) { + if (con->ops->sign_message) + con->ops->sign_message(con, m); + else + m->footer.sig = 0; + con->out_kvec[v].iov_len = sizeof(m->footer); + con->out_kvec_bytes += sizeof(m->footer); + } else { + m->old_footer.flags = m->footer.flags; + con->out_kvec[v].iov_len = sizeof(m->old_footer); + con->out_kvec_bytes += sizeof(m->old_footer); + } con->out_kvec_left++; con->out_more = m->more_to_follow; con->out_msg_done = true; @@ -2249,6 +2259,7 @@ static int read_partial_message(struct ceph_connection *con) int ret; unsigned int front_len, middle_len, data_len; bool do_datacrc = !con->msgr->nocrc; + bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH); u64 seq; u32 crc; @@ -2361,12 +2372,21 @@ static int read_partial_message(struct ceph_connection *con) } /* footer */ - size = sizeof (m->footer); + if (need_sign) + size = sizeof(m->footer); + else + size = sizeof(m->old_footer); + end += size; ret = read_partial(con, end, size, &m->footer); if (ret <= 0) return ret; + if (!need_sign) { + m->footer.flags = m->old_footer.flags; + m->footer.sig = 0; + } + dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n", m, front_len, m->footer.front_crc, middle_len, m->footer.middle_crc, data_len, m->footer.data_crc); @@ -2390,6 +2410,12 @@ static int read_partial_message(struct ceph_connection *con) return -EBADMSG; } + if (need_sign && con->ops->check_message_signature && + con->ops->check_message_signature(con, m)) { + pr_err("read_partial_message %p signature check failed\n", m); + return -EBADMSG; + } + return 1; /* done! */ } diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 6f164289bde8..1f6c4055adaf 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -2920,6 +2920,20 @@ static int invalidate_authorizer(struct ceph_connection *con) return ceph_monc_validate_auth(&osdc->client->monc); } +static int sign_message(struct ceph_connection *con, struct ceph_msg *msg) +{ + struct ceph_osd *o = con->private; + struct ceph_auth_handshake *auth = &o->o_auth; + return ceph_auth_sign_message(auth, msg); +} + +static int check_message_signature(struct ceph_connection *con, struct ceph_msg *msg) +{ + struct ceph_osd *o = con->private; + struct ceph_auth_handshake *auth = &o->o_auth; + return ceph_auth_check_message_signature(auth, msg); +} + static const struct ceph_connection_operations osd_con_ops = { .get = get_osd_con, .put = put_osd_con, @@ -2928,5 +2942,7 @@ static const struct ceph_connection_operations osd_con_ops = { .verify_authorizer_reply = verify_authorizer_reply, .invalidate_authorizer = invalidate_authorizer, .alloc_msg = alloc_msg, + .sign_message = sign_message, + .check_message_signature = check_message_signature, .fault = osd_reset, }; -- cgit v1.2.3-70-g09d2 From 7cfa0313d0dc52e4da9f196f8ad5bfdf266fc1fb Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 30 Oct 2014 17:15:26 +0000 Subject: ceph: message versioning fixes There were two places we were assigning version in host byte order instead of network byte order. Also in MSG_CLIENT_SESSION we weren't setting compat_version in the header to reflect continued compatability with older MDSs. Fixes: http://tracker.ceph.com/issues/9945 Signed-off-by: John Spray Reviewed-by: Sage Weil --- fs/ceph/mds_client.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'fs') diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 9f00853f6d42..387a48993d34 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -860,8 +860,11 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6 /* * Serialize client metadata into waiting buffer space, using * the format that userspace expects for map + * + * ClientSession messages with metadata are v2 */ - msg->hdr.version = 2; /* ClientSession messages with metadata are v2 */ + msg->hdr.version = cpu_to_le16(2); + msg->hdr.compat_version = cpu_to_le16(1); /* The write pointer, following the session_head structure */ p = msg->front.iov_base + sizeof(*h); @@ -1872,7 +1875,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, goto out_free2; } - msg->hdr.version = 2; + msg->hdr.version = cpu_to_le16(2); msg->hdr.tid = cpu_to_le64(req->r_tid); head = msg->front.iov_base; -- cgit v1.2.3-70-g09d2 From 97c85a828f36bbfffe9d77b977b65a5872b6cad4 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Thu, 6 Nov 2014 15:09:41 +0800 Subject: ceph: introduce global empty snap context Current snaphost code does not properly handle moving inode from one empty snap realm to another empty snap realm. After changing inode's snap realm, some dirty pages' snap context can be not equal to inode's i_head_snap. This can trigger BUG() in ceph_put_wrbuffer_cap_refs() The fix is introduce a global empty snap context for all empty snap realm. This avoids triggering the BUG() for filesystem with no snapshot. Fixes: http://tracker.ceph.com/issues/9928 Signed-off-by: Yan, Zheng Reviewed-by: Ilya Dryomov --- fs/ceph/snap.c | 26 +++++++++++++++++++++++++- fs/ceph/super.c | 10 ++++++++-- fs/ceph/super.h | 2 ++ 3 files changed, 35 insertions(+), 3 deletions(-) (limited to 'fs') diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index c1cc993225e3..f64576e72b79 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -288,6 +288,9 @@ static int cmpu64_rev(const void *a, const void *b) return 0; } + +static struct ceph_snap_context *empty_snapc; + /* * build the snap context for a given realm. */ @@ -328,6 +331,12 @@ static int build_snap_context(struct ceph_snap_realm *realm) return 0; } + if (num == 0 && realm->seq == empty_snapc->seq) { + ceph_get_snap_context(empty_snapc); + snapc = empty_snapc; + goto done; + } + /* alloc new snap context */ err = -ENOMEM; if (num > (SIZE_MAX - sizeof(*snapc)) / sizeof(u64)) @@ -365,6 +374,7 @@ static int build_snap_context(struct ceph_snap_realm *realm) realm->ino, realm, snapc, snapc->seq, (unsigned int) snapc->num_snaps); +done: ceph_put_snap_context(realm->cached_context); realm->cached_context = snapc; return 0; @@ -465,6 +475,9 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) cap_snap. lucky us. */ dout("queue_cap_snap %p already pending\n", inode); kfree(capsnap); + } else if (ci->i_snap_realm->cached_context == empty_snapc) { + dout("queue_cap_snap %p empty snapc\n", inode); + kfree(capsnap); } else if (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL| CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) { struct ceph_snap_context *snapc = ci->i_head_snapc; @@ -925,5 +938,16 @@ out: return; } +int __init ceph_snap_init(void) +{ + empty_snapc = ceph_create_snap_context(0, GFP_NOFS); + if (!empty_snapc) + return -ENOMEM; + empty_snapc->seq = 1; + return 0; +} - +void ceph_snap_exit(void) +{ + ceph_put_snap_context(empty_snapc); +} diff --git a/fs/ceph/super.c b/fs/ceph/super.c index f6e12377335c..3b5c1e3335db 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -1028,15 +1028,20 @@ static int __init init_ceph(void) ceph_flock_init(); ceph_xattr_init(); + ret = ceph_snap_init(); + if (ret) + goto out_xattr; ret = register_filesystem(&ceph_fs_type); if (ret) - goto out_icache; + goto out_snap; pr_info("loaded (mds proto %d)\n", CEPH_MDSC_PROTOCOL); return 0; -out_icache: +out_snap: + ceph_snap_exit(); +out_xattr: ceph_xattr_exit(); destroy_caches(); out: @@ -1047,6 +1052,7 @@ static void __exit exit_ceph(void) { dout("exit_ceph\n"); unregister_filesystem(&ceph_fs_type); + ceph_snap_exit(); ceph_xattr_exit(); destroy_caches(); } diff --git a/fs/ceph/super.h b/fs/ceph/super.h index aca22879b41f..fc1c8255dead 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -699,6 +699,8 @@ extern void ceph_queue_cap_snap(struct ceph_inode_info *ci); extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci, struct ceph_cap_snap *capsnap); extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc); +extern int ceph_snap_init(void); +extern void ceph_snap_exit(void); /* * a cap_snap is "pending" if it is still awaiting an in-progress -- cgit v1.2.3-70-g09d2 From ca3995ad13b5e8bb08be850d08fe1e2abffc0206 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 13 Nov 2014 20:42:35 +0300 Subject: ceph: remove unused stringification macros These were used to report git versions a long time ago. Signed-off-by: Ilya Dryomov --- fs/ceph/super.c | 3 --- 1 file changed, 3 deletions(-) (limited to 'fs') diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 3b5c1e3335db..989e86cc1e73 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -1017,9 +1017,6 @@ static struct file_system_type ceph_fs_type = { }; MODULE_ALIAS_FS("ceph"); -#define _STRINGIFY(x) #x -#define STRINGIFY(x) _STRINGIFY(x) - static int __init init_ceph(void) { int ret = init_caches(); -- cgit v1.2.3-70-g09d2 From 715e4cd405cfd67bd058e410b3e599bab2072645 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Thu, 13 Nov 2014 14:40:37 +0800 Subject: libceph: specify position of extent operation allow specifying position of extent operation in multi-operations osd request. This is required for cephfs to convert inline data to normal data (compare xattr, then write object). Signed-off-by: Yan, Zheng Reviewed-by: Ilya Dryomov --- fs/ceph/addr.c | 9 ++++++--- fs/ceph/file.c | 8 +++++--- include/linux/ceph/osd_client.h | 3 ++- net/ceph/osd_client.c | 19 ++++++------------- 4 files changed, 19 insertions(+), 20 deletions(-) (limited to 'fs') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 18c06bbaf136..f2c7aa878aa4 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -319,7 +319,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) off, len); vino = ceph_vino(inode); req = ceph_osdc_new_request(osdc, &ci->i_layout, vino, off, &len, - 1, CEPH_OSD_OP_READ, + 0, 1, CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, NULL, ci->i_truncate_seq, ci->i_truncate_size, false); @@ -750,7 +750,6 @@ retry: last_snapc = snapc; while (!done && index <= end) { - int num_ops = do_sync ? 2 : 1; unsigned i; int first; pgoff_t next; @@ -850,7 +849,8 @@ get_more_pages: len = wsize; req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, vino, - offset, &len, num_ops, + offset, &len, 0, + do_sync ? 2 : 1, CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, @@ -862,6 +862,9 @@ get_more_pages: break; } + if (do_sync) + osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC); + req->r_callback = writepages_finish; req->r_inode = inode; diff --git a/fs/ceph/file.c b/fs/ceph/file.c index d7e0da8366e6..c03ac4c4bcd1 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -598,7 +598,7 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos) snapc = ci->i_snap_realm->cached_context; vino = ceph_vino(inode); req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, - vino, pos, &len, + vino, pos, &len, 0, 2,/*include a 'startsync' command*/ CEPH_OSD_OP_WRITE, flags, snapc, ci->i_truncate_seq, @@ -609,6 +609,8 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos) break; } + osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC); + n = iov_iter_get_pages_alloc(from, &pages, len, &start); if (unlikely(n < 0)) { ret = n; @@ -713,7 +715,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos) snapc = ci->i_snap_realm->cached_context; vino = ceph_vino(inode); req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, - vino, pos, &len, 1, + vino, pos, &len, 0, 1, CEPH_OSD_OP_WRITE, flags, snapc, ci->i_truncate_seq, ci->i_truncate_size, @@ -1111,7 +1113,7 @@ static int ceph_zero_partial_object(struct inode *inode, req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, ceph_vino(inode), offset, length, - 1, op, + 0, 1, op, CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, NULL, 0, 0, false); diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index bcf9c2180ea5..5d86416d35f2 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -328,7 +328,8 @@ extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *, struct ceph_file_layout *layout, struct ceph_vino vino, u64 offset, u64 *len, - int num_ops, int opcode, int flags, + unsigned int which, int num_ops, + int opcode, int flags, struct ceph_snap_context *snapc, u32 truncate_seq, u64 truncate_size, bool use_mempool); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index af1b3ee195e9..53299c7b0ca4 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -753,7 +753,8 @@ static u64 osd_req_encode_op(struct ceph_osd_request *req, struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, struct ceph_file_layout *layout, struct ceph_vino vino, - u64 off, u64 *plen, int num_ops, + u64 off, u64 *plen, + unsigned int which, int num_ops, int opcode, int flags, struct ceph_snap_context *snapc, u32 truncate_seq, @@ -785,7 +786,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, } if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) { - osd_req_op_init(req, 0, opcode); + osd_req_op_init(req, which, opcode); } else { u32 object_size = le32_to_cpu(layout->fl_object_size); u32 object_base = off - objoff; @@ -798,17 +799,9 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, truncate_size = object_size; } } - - osd_req_op_extent_init(req, 0, opcode, objoff, objlen, + osd_req_op_extent_init(req, which, opcode, objoff, objlen, truncate_size, truncate_seq); } - /* - * A second op in the ops array means the caller wants to - * also issue a include a 'startsync' command so that the - * osd will flush data quickly. - */ - if (num_ops > 1) - osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC); req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout); @@ -2675,7 +2668,7 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc, dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino, vino.snap, off, *plen); - req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 1, + req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1, CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, NULL, truncate_seq, truncate_size, false); @@ -2718,7 +2711,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, int page_align = off & ~PAGE_MASK; BUG_ON(vino.snap != CEPH_NOSNAP); /* snapshots aren't writeable */ - req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 1, + req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1, CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, snapc, truncate_seq, truncate_size, -- cgit v1.2.3-70-g09d2 From fb01d1f8b0343f1b19be878cee89d089f06e9f38 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 14 Nov 2014 21:29:55 +0800 Subject: ceph: parse inline data in MClientReply and MClientCaps Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 34 +++++++++++++++++++++++----------- fs/ceph/mds_client.c | 10 ++++++++++ fs/ceph/mds_client.h | 3 +++ 3 files changed, 36 insertions(+), 11 deletions(-) (limited to 'fs') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index eb1bf1f35e0e..b88ae601f309 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -2383,6 +2383,8 @@ static void invalidate_aliases(struct inode *inode) static void handle_cap_grant(struct ceph_mds_client *mdsc, struct inode *inode, struct ceph_mds_caps *grant, void *snaptrace, int snaptrace_len, + u64 inline_version, + void *inline_data, int inline_len, struct ceph_buffer *xattr_buf, struct ceph_mds_session *session, struct ceph_cap *cap, int issued) @@ -2996,11 +2998,12 @@ void ceph_handle_caps(struct ceph_mds_session *session, u64 cap_id; u64 size, max_size; u64 tid; + u64 inline_version = 0; + void *inline_data = NULL; + u32 inline_len = 0; void *snaptrace; size_t snaptrace_len; - void *flock; - void *end; - u32 flock_len; + void *p, *end; dout("handle_caps from mds%d\n", mds); @@ -3021,30 +3024,37 @@ void ceph_handle_caps(struct ceph_mds_session *session, snaptrace = h + 1; snaptrace_len = le32_to_cpu(h->snap_trace_len); + p = snaptrace + snaptrace_len; if (le16_to_cpu(msg->hdr.version) >= 2) { - void *p = snaptrace + snaptrace_len; + u32 flock_len; ceph_decode_32_safe(&p, end, flock_len, bad); if (p + flock_len > end) goto bad; - flock = p; - } else { - flock = NULL; - flock_len = 0; + p += flock_len; } if (le16_to_cpu(msg->hdr.version) >= 3) { if (op == CEPH_CAP_OP_IMPORT) { - void *p = flock + flock_len; if (p + sizeof(*peer) > end) goto bad; peer = p; + p += sizeof(*peer); } else if (op == CEPH_CAP_OP_EXPORT) { /* recorded in unused fields */ peer = (void *)&h->size; } } + if (le16_to_cpu(msg->hdr.version) >= 4) { + ceph_decode_64_safe(&p, end, inline_version, bad); + ceph_decode_32_safe(&p, end, inline_len, bad); + if (p + inline_len > end) + goto bad; + inline_data = p; + p += inline_len; + } + /* lookup ino */ inode = ceph_find_inode(sb, vino); ci = ceph_inode(inode); @@ -3085,6 +3095,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, handle_cap_import(mdsc, inode, h, peer, session, &cap, &issued); handle_cap_grant(mdsc, inode, h, snaptrace, snaptrace_len, + inline_version, inline_data, inline_len, msg->middle, session, cap, issued); goto done_unlocked; } @@ -3105,8 +3116,9 @@ void ceph_handle_caps(struct ceph_mds_session *session, case CEPH_CAP_OP_GRANT: __ceph_caps_issued(ci, &issued); issued |= __ceph_caps_dirty(ci); - handle_cap_grant(mdsc, inode, h, NULL, 0, msg->middle, - session, cap, issued); + handle_cap_grant(mdsc, inode, h, NULL, 0, + inline_version, inline_data, inline_len, + msg->middle, session, cap, issued); goto done_unlocked; case CEPH_CAP_OP_FLUSH_ACK: diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 387a48993d34..d2171f4a6980 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -89,6 +89,16 @@ static int parse_reply_info_in(void **p, void *end, ceph_decode_need(p, end, info->xattr_len, bad); info->xattr_data = *p; *p += info->xattr_len; + + if (features & CEPH_FEATURE_MDS_INLINE_DATA) { + ceph_decode_64_safe(p, end, info->inline_version, bad); + ceph_decode_32_safe(p, end, info->inline_len, bad); + ceph_decode_need(p, end, info->inline_len, bad); + info->inline_data = *p; + *p += info->inline_len; + } else + info->inline_version = CEPH_INLINE_NONE; + return 0; bad: return err; diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 230bda791d4f..01f5e4cfd684 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -41,6 +41,9 @@ struct ceph_mds_reply_info_in { char *symlink; u32 xattr_len; char *xattr_data; + u64 inline_version; + u32 inline_len; + char *inline_data; }; /* -- cgit v1.2.3-70-g09d2 From 31c542a199d79f0f402c2f3e04229464510d47ec Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 14 Nov 2014 21:41:55 +0800 Subject: ceph: add inline data to pagecache Request reply and cap message can contain inline data. add inline data to the page cache if there is Fc cap. Signed-off-by: Yan, Zheng --- fs/ceph/addr.c | 43 +++++++++++++++++++++++++++++++++++++++++++ fs/ceph/caps.c | 11 +++++++++++ fs/ceph/inode.c | 16 ++++++++++++++++ fs/ceph/super.h | 5 ++++- fs/ceph/super.h.rej | 10 ++++++++++ include/linux/ceph/ceph_fs.h | 1 + 6 files changed, 85 insertions(+), 1 deletion(-) create mode 100644 fs/ceph/super.h.rej (limited to 'fs') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index f2c7aa878aa4..4a3f55f27ab4 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -1318,6 +1318,49 @@ out: return ret; } +void ceph_fill_inline_data(struct inode *inode, struct page *locked_page, + char *data, size_t len) +{ + struct address_space *mapping = inode->i_mapping; + struct page *page; + + if (locked_page) { + page = locked_page; + } else { + if (i_size_read(inode) == 0) + return; + page = find_or_create_page(mapping, 0, + mapping_gfp_mask(mapping) & ~__GFP_FS); + if (!page) + return; + if (PageUptodate(page)) { + unlock_page(page); + page_cache_release(page); + return; + } + } + + dout("fill_inline_data %p %llx.%llx len %lu locked_page %p\n", + inode, ceph_vinop(inode), len, locked_page); + + if (len > 0) { + void *kaddr = kmap_atomic(page); + memcpy(kaddr, data, len); + kunmap_atomic(kaddr); + } + + if (page != locked_page) { + if (len < PAGE_CACHE_SIZE) + zero_user_segment(page, len, PAGE_CACHE_SIZE); + else + flush_dcache_page(page); + + SetPageUptodate(page); + unlock_page(page); + page_cache_release(page); + } +} + static struct vm_operations_struct ceph_vmops = { .fault = ceph_filemap_fault, .page_mkwrite = ceph_page_mkwrite, diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index b88ae601f309..6372eb9ce491 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -2405,6 +2405,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, bool queue_invalidate = false; bool queue_revalidate = false; bool deleted_inode = false; + bool fill_inline = false; dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n", inode, cap, mds, seq, ceph_cap_string(newcaps)); @@ -2578,6 +2579,13 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, } BUG_ON(cap->issued & ~cap->implemented); + if (inline_version > 0 && inline_version >= ci->i_inline_version) { + ci->i_inline_version = inline_version; + if (ci->i_inline_version != CEPH_INLINE_NONE && + (newcaps & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO))) + fill_inline = true; + } + spin_unlock(&ci->i_ceph_lock); if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) { @@ -2591,6 +2599,9 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, wake = true; } + if (fill_inline) + ceph_fill_inline_data(inode, NULL, inline_data, inline_len); + if (queue_trunc) { ceph_queue_vmtruncate(inode); ceph_queue_revalidate(inode); diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 72607c17e6fd..feea6a8f88ae 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -387,6 +387,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) spin_lock_init(&ci->i_ceph_lock); ci->i_version = 0; + ci->i_inline_version = 0; ci->i_time_warp_seq = 0; ci->i_ceph_flags = 0; ci->i_ordered_count = 0; @@ -676,6 +677,7 @@ static int fill_inode(struct inode *inode, bool wake = false; bool queue_trunc = false; bool new_version = false; + bool fill_inline = false; dout("fill_inode %p ino %llx.%llx v %llu had %llu\n", inode, ceph_vinop(inode), le64_to_cpu(info->version), @@ -875,8 +877,22 @@ static int fill_inode(struct inode *inode, ceph_vinop(inode)); __ceph_get_fmode(ci, cap_fmode); } + + if (iinfo->inline_version > 0 && + iinfo->inline_version >= ci->i_inline_version) { + int cache_caps = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; + ci->i_inline_version = iinfo->inline_version; + if (ci->i_inline_version != CEPH_INLINE_NONE && + (le32_to_cpu(info->cap.caps) & cache_caps)) + fill_inline = true; + } + spin_unlock(&ci->i_ceph_lock); + if (fill_inline) + ceph_fill_inline_data(inode, NULL, + iinfo->inline_data, iinfo->inline_len); + if (wake) wake_up_all(&ci->i_cap_wq); diff --git a/fs/ceph/super.h b/fs/ceph/super.h index fc1c8255dead..b0de9162758b 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -253,6 +253,7 @@ struct ceph_inode_info { spinlock_t i_ceph_lock; u64 i_version; + u64 i_inline_version; u32 i_time_warp_seq; unsigned i_ceph_flags; @@ -858,7 +859,7 @@ extern int ceph_encode_dentry_release(void **p, struct dentry *dn, int mds, int drop, int unless); extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, - int *got, loff_t endoff); + loff_t endoff, int *got, struct page **pinned_page); /* for counting open files by mode */ static inline void __ceph_get_fmode(struct ceph_inode_info *ci, int mode) @@ -880,6 +881,8 @@ extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry, struct file *file, unsigned flags, umode_t mode, int *opened); extern int ceph_release(struct inode *inode, struct file *filp); +extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page, + char *data, size_t len); /* dir.c */ extern const struct file_operations ceph_dir_fops; diff --git a/fs/ceph/super.h.rej b/fs/ceph/super.h.rej new file mode 100644 index 000000000000..88fe3dfadb29 --- /dev/null +++ b/fs/ceph/super.h.rej @@ -0,0 +1,10 @@ +--- fs/ceph/super.h ++++ fs/ceph/super.h +@@ -254,6 +255,7 @@ + spinlock_t i_ceph_lock; + + u64 i_version; ++ u64 i_inline_version; + u32 i_time_warp_seq; + + unsigned i_ceph_flags; diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index 31d8b98b7f96..2d4acfa2c7b7 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -552,6 +552,7 @@ struct ceph_filelock { int ceph_flags_to_mode(int flags); +#define CEPH_INLINE_NONE ((__u64)-1) /* capability bits */ #define CEPH_CAP_PIN 1 /* no specific capabilities beyond the pin */ -- cgit v1.2.3-70-g09d2 From 01deead041e03c9a6b4e1b2dd165dee4cced6112 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 14 Nov 2014 21:56:29 +0800 Subject: ceph: use getattr request to fetch inline data Add a new parameter 'locked_page' to ceph_do_getattr(). If inline data in getattr reply will be copied to the page. Signed-off-by: Yan, Zheng --- fs/ceph/inode.c | 34 +++++++++++++++++++++++++--------- fs/ceph/mds_client.h | 1 + fs/ceph/super.h | 7 ++++++- include/linux/ceph/ceph_fs.h | 2 ++ 4 files changed, 34 insertions(+), 10 deletions(-) (limited to 'fs') diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index feea6a8f88ae..0bdabd1bff8a 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -659,7 +659,7 @@ void ceph_fill_file_time(struct inode *inode, int issued, * Populate an inode based on info from mds. May be called on new or * existing inodes. */ -static int fill_inode(struct inode *inode, +static int fill_inode(struct inode *inode, struct page *locked_page, struct ceph_mds_reply_info_in *iinfo, struct ceph_mds_reply_dirfrag *dirinfo, struct ceph_mds_session *session, @@ -883,14 +883,15 @@ static int fill_inode(struct inode *inode, int cache_caps = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; ci->i_inline_version = iinfo->inline_version; if (ci->i_inline_version != CEPH_INLINE_NONE && - (le32_to_cpu(info->cap.caps) & cache_caps)) + (locked_page || + (le32_to_cpu(info->cap.caps) & cache_caps))) fill_inline = true; } spin_unlock(&ci->i_ceph_lock); if (fill_inline) - ceph_fill_inline_data(inode, NULL, + ceph_fill_inline_data(inode, locked_page, iinfo->inline_data, iinfo->inline_len); if (wake) @@ -1080,7 +1081,8 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, struct inode *dir = req->r_locked_dir; if (dir) { - err = fill_inode(dir, &rinfo->diri, rinfo->dirfrag, + err = fill_inode(dir, NULL, + &rinfo->diri, rinfo->dirfrag, session, req->r_request_started, -1, &req->r_caps_reservation); if (err < 0) @@ -1150,7 +1152,7 @@ retry_lookup: } req->r_target_inode = in; - err = fill_inode(in, &rinfo->targeti, NULL, + err = fill_inode(in, req->r_locked_page, &rinfo->targeti, NULL, session, req->r_request_started, (!req->r_aborted && rinfo->head->result == 0) ? req->r_fmode : -1, @@ -1321,7 +1323,7 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req, dout("new_inode badness got %d\n", err); continue; } - rc = fill_inode(in, &rinfo->dir_in[i], NULL, session, + rc = fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session, req->r_request_started, -1, &req->r_caps_reservation); if (rc < 0) { @@ -1437,7 +1439,7 @@ retry_lookup: } } - if (fill_inode(in, &rinfo->dir_in[i], NULL, session, + if (fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session, req->r_request_started, -1, &req->r_caps_reservation) < 0) { pr_err("fill_inode badness on %p\n", in); @@ -1920,7 +1922,8 @@ out_put: * Verify that we have a lease on the given mask. If not, * do a getattr against an mds. */ -int ceph_do_getattr(struct inode *inode, int mask, bool force) +int __ceph_do_getattr(struct inode *inode, struct page *locked_page, + int mask, bool force) { struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb); struct ceph_mds_client *mdsc = fsc->mdsc; @@ -1932,7 +1935,8 @@ int ceph_do_getattr(struct inode *inode, int mask, bool force) return 0; } - dout("do_getattr inode %p mask %s mode 0%o\n", inode, ceph_cap_string(mask), inode->i_mode); + dout("do_getattr inode %p mask %s mode 0%o\n", + inode, ceph_cap_string(mask), inode->i_mode); if (!force && ceph_caps_issued_mask(ceph_inode(inode), mask, 1)) return 0; @@ -1943,7 +1947,19 @@ int ceph_do_getattr(struct inode *inode, int mask, bool force) ihold(inode); req->r_num_caps = 1; req->r_args.getattr.mask = cpu_to_le32(mask); + req->r_locked_page = locked_page; err = ceph_mdsc_do_request(mdsc, NULL, req); + if (locked_page && err == 0) { + u64 inline_version = req->r_reply_info.targeti.inline_version; + if (inline_version == 0) { + /* the reply is supposed to contain inline data */ + err = -EINVAL; + } else if (inline_version == CEPH_INLINE_NONE) { + err = -ENODATA; + } else { + err = req->r_reply_info.targeti.inline_len; + } + } ceph_mdsc_put_request(req); dout("do_getattr result=%d\n", err); return err; diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 01f5e4cfd684..e2817d00f7d9 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -223,6 +223,7 @@ struct ceph_mds_request { int r_request_release_offset; struct ceph_msg *r_reply; struct ceph_mds_reply_info_parsed r_reply_info; + struct page *r_locked_page; int r_err; bool r_aborted; diff --git a/fs/ceph/super.h b/fs/ceph/super.h index b0de9162758b..6d56fae863ca 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -744,7 +744,12 @@ extern void ceph_queue_vmtruncate(struct inode *inode); extern void ceph_queue_invalidate(struct inode *inode); extern void ceph_queue_writeback(struct inode *inode); -extern int ceph_do_getattr(struct inode *inode, int mask, bool force); +extern int __ceph_do_getattr(struct inode *inode, struct page *locked_page, + int mask, bool force); +static inline int ceph_do_getattr(struct inode *inode, int mask, bool force) +{ + return __ceph_do_getattr(inode, NULL, mask, force); +} extern int ceph_permission(struct inode *inode, int mask); extern int ceph_setattr(struct dentry *dentry, struct iattr *attr); extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry, diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index 2d4acfa2c7b7..c0dadaac26e3 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -617,6 +617,8 @@ int ceph_flags_to_mode(int flags); CEPH_CAP_LINK_SHARED | \ CEPH_CAP_FILE_SHARED | \ CEPH_CAP_XATTR_SHARED) +#define CEPH_STAT_CAP_INLINE_DATA (CEPH_CAP_FILE_SHARED | \ + CEPH_CAP_FILE_RD) #define CEPH_CAP_ANY_SHARED (CEPH_CAP_AUTH_SHARED | \ CEPH_CAP_LINK_SHARED | \ -- cgit v1.2.3-70-g09d2 From 3738daa68a5121ad7dd0318bca931e2a6afb0e8c Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 14 Nov 2014 22:10:07 +0800 Subject: ceph: fetch inline data when getting Fcr cap refs we can't use getattr to fetch inline data after getting Fcr caps, because it can cause deadlock. The solution is try bringing inline data to page cache when not holding any cap, and hope the inline data page is still there after getting the Fcr caps. If the page is still there, pin it in page cache for later IO. Signed-off-by: Yan, Zheng --- fs/ceph/addr.c | 9 +++++++-- fs/ceph/caps.c | 60 +++++++++++++++++++++++++++++++++++++++++++++------------- fs/ceph/file.c | 12 +++++++++--- 3 files changed, 63 insertions(+), 18 deletions(-) (limited to 'fs') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 4a3f55f27ab4..5d2b88e3ff0b 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -1207,6 +1207,7 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf) struct inode *inode = file_inode(vma->vm_file); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_file_info *fi = vma->vm_file->private_data; + struct page *pinned_page = NULL; loff_t off = vmf->pgoff << PAGE_CACHE_SHIFT; int want, got, ret; @@ -1218,7 +1219,8 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf) want = CEPH_CAP_FILE_CACHE; while (1) { got = 0; - ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1); + ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, + &got, &pinned_page); if (ret == 0) break; if (ret != -ERESTARTSYS) { @@ -1233,6 +1235,8 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf) dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n", inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret); + if (pinned_page) + page_cache_release(pinned_page); ceph_put_cap_refs(ci, got); return ret; @@ -1266,7 +1270,8 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) want = CEPH_CAP_FILE_BUFFER; while (1) { got = 0; - ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, off + len); + ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, off + len, + &got, NULL); if (ret == 0) break; if (ret != -ERESTARTSYS) { diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 6372eb9ce491..795afe304871 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -2057,15 +2057,17 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got) * requested from the MDS. */ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, - int *got, loff_t endoff, int *check_max, int *err) + loff_t endoff, int *got, struct page **pinned_page, + int *check_max, int *err) { struct inode *inode = &ci->vfs_inode; int ret = 0; - int have, implemented; + int have, implemented, _got = 0; int file_wanted; dout("get_cap_refs %p need %s want %s\n", inode, ceph_cap_string(need), ceph_cap_string(want)); +again: spin_lock(&ci->i_ceph_lock); /* make sure file is actually open */ @@ -2075,7 +2077,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, ceph_cap_string(need), ceph_cap_string(file_wanted)); *err = -EBADF; ret = 1; - goto out; + goto out_unlock; } /* finish pending truncate */ @@ -2095,7 +2097,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, *check_max = 1; ret = 1; } - goto out; + goto out_unlock; } /* * If a sync write is in progress, we must wait, so that we @@ -2103,7 +2105,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, */ if (__ceph_have_pending_cap_snap(ci)) { dout("get_cap_refs %p cap_snap_pending\n", inode); - goto out; + goto out_unlock; } } @@ -2120,18 +2122,50 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, inode, ceph_cap_string(have), ceph_cap_string(not), ceph_cap_string(revoking)); if ((revoking & not) == 0) { - *got = need | (have & want); - __take_cap_refs(ci, *got); + _got = need | (have & want); + __take_cap_refs(ci, _got); ret = 1; } } else { dout("get_cap_refs %p have %s needed %s\n", inode, ceph_cap_string(have), ceph_cap_string(need)); } -out: +out_unlock: spin_unlock(&ci->i_ceph_lock); + + if (ci->i_inline_version != CEPH_INLINE_NONE && + (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) && + i_size_read(inode) > 0) { + int ret1; + struct page *page = find_get_page(inode->i_mapping, 0); + if (page) { + if (PageUptodate(page)) { + *pinned_page = page; + goto out; + } + page_cache_release(page); + } + /* + * drop cap refs first because getattr while holding + * caps refs can cause deadlock. + */ + ceph_put_cap_refs(ci, _got); + _got = 0; + + /* getattr request will bring inline data into page cache */ + ret1 = __ceph_do_getattr(inode, NULL, + CEPH_STAT_CAP_INLINE_DATA, true); + if (ret1 >= 0) { + ret = 0; + goto again; + } + *err = ret1; + ret = 1; + } +out: dout("get_cap_refs %p ret %d got %s\n", inode, - ret, ceph_cap_string(*got)); + ret, ceph_cap_string(_got)); + *got = _got; return ret; } @@ -2168,8 +2202,8 @@ static void check_max_size(struct inode *inode, loff_t endoff) * due to a small max_size, make sure we check_max_size (and possibly * ask the mds) so we don't get hung up indefinitely. */ -int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, int *got, - loff_t endoff) +int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, + loff_t endoff, int *got, struct page **pinned_page) { int check_max, ret, err; @@ -2179,8 +2213,8 @@ retry: check_max = 0; err = 0; ret = wait_event_interruptible(ci->i_cap_wq, - try_get_cap_refs(ci, need, want, - got, endoff, + try_get_cap_refs(ci, need, want, endoff, + got, pinned_page, &check_max, &err)); if (err) ret = err; diff --git a/fs/ceph/file.c b/fs/ceph/file.c index c03ac4c4bcd1..861b9954a63a 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -805,6 +805,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to) size_t len = iocb->ki_nbytes; struct inode *inode = file_inode(filp); struct ceph_inode_info *ci = ceph_inode(inode); + struct page *pinned_page = NULL; ssize_t ret; int want, got = 0; int checkeof = 0, read = 0; @@ -817,7 +818,7 @@ again: want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; else want = CEPH_CAP_FILE_CACHE; - ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1); + ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page); if (ret < 0) return ret; @@ -840,6 +841,10 @@ again: } dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n", inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret); + if (pinned_page) { + page_cache_release(pinned_page); + pinned_page = NULL; + } ceph_put_cap_refs(ci, got); if (checkeof && ret >= 0) { @@ -924,7 +929,8 @@ retry_snap: else want = CEPH_CAP_FILE_BUFFER; got = 0; - err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, pos + count); + err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, pos + count, + &got, NULL); if (err < 0) goto out; @@ -1225,7 +1231,7 @@ static long ceph_fallocate(struct file *file, int mode, else want = CEPH_CAP_FILE_BUFFER; - ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff); + ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, endoff, &got, NULL); if (ret < 0) goto unlock; -- cgit v1.2.3-70-g09d2 From 83701246aee8f83b4b42483051b439fbe96ed47d Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 14 Nov 2014 22:36:18 +0800 Subject: ceph: sync read inline data we can't use getattr to fetch inline data while holding Fr cap, because it can cause deadlock. If we need to sync read inline data, drop cap refs first, then use getattr to fetch inline data. Signed-off-by: Yan, Zheng --- fs/ceph/addr.c | 66 +++++++++++++++++++++++++++++++++++++++++++++++++++++----- fs/ceph/file.c | 63 ++++++++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 116 insertions(+), 13 deletions(-) (limited to 'fs') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 5d2b88e3ff0b..13413d7440d6 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -192,17 +192,30 @@ static int readpage_nounlock(struct file *filp, struct page *page) struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->client->osdc; int err = 0; + u64 off = page_offset(page); u64 len = PAGE_CACHE_SIZE; - err = ceph_readpage_from_fscache(inode, page); + if (off >= i_size_read(inode)) { + zero_user_segment(page, err, PAGE_CACHE_SIZE); + SetPageUptodate(page); + return 0; + } + /* + * Uptodate inline data should have been added into page cache + * while getting Fcr caps. + */ + if (ci->i_inline_version != CEPH_INLINE_NONE) + return -EINVAL; + + err = ceph_readpage_from_fscache(inode, page); if (err == 0) goto out; dout("readpage inode %p file %p page %p index %lu\n", inode, filp, page, page->index); err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout, - (u64) page_offset(page), &len, + off, &len, ci->i_truncate_seq, ci->i_truncate_size, &page, 1, 0); if (err == -ENOENT) @@ -384,6 +397,9 @@ static int ceph_readpages(struct file *file, struct address_space *mapping, int rc = 0; int max = 0; + if (ceph_inode(inode)->i_inline_version != CEPH_INLINE_NONE) + return -EINVAL; + rc = ceph_readpages_from_fscache(mapping->host, mapping, page_list, &nr_pages); @@ -1219,8 +1235,8 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf) want = CEPH_CAP_FILE_CACHE; while (1) { got = 0; - ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, - &got, &pinned_page); + ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, + -1, &got, &pinned_page); if (ret == 0) break; if (ret != -ERESTARTSYS) { @@ -1231,7 +1247,11 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf) dout("filemap_fault %p %llu~%zd got cap refs on %s\n", inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got)); - ret = filemap_fault(vma, vmf); + if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) || + ci->i_inline_version == CEPH_INLINE_NONE) + ret = filemap_fault(vma, vmf); + else + ret = -EAGAIN; dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n", inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret); @@ -1239,6 +1259,42 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf) page_cache_release(pinned_page); ceph_put_cap_refs(ci, got); + if (ret != -EAGAIN) + return ret; + + /* read inline data */ + if (off >= PAGE_CACHE_SIZE) { + /* does not support inline data > PAGE_SIZE */ + ret = VM_FAULT_SIGBUS; + } else { + int ret1; + struct address_space *mapping = inode->i_mapping; + struct page *page = find_or_create_page(mapping, 0, + mapping_gfp_mask(mapping) & + ~__GFP_FS); + if (!page) { + ret = VM_FAULT_OOM; + goto out; + } + ret1 = __ceph_do_getattr(inode, page, + CEPH_STAT_CAP_INLINE_DATA, true); + if (ret1 < 0 || off >= i_size_read(inode)) { + unlock_page(page); + page_cache_release(page); + ret = VM_FAULT_SIGBUS; + goto out; + } + if (ret1 < PAGE_CACHE_SIZE) + zero_user_segment(page, ret1, PAGE_CACHE_SIZE); + else + flush_dcache_page(page); + SetPageUptodate(page); + vmf->page = page; + ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED; + } +out: + dout("filemap_fault %p %llu~%zd read inline data ret %d\n", + inode, off, (size_t)PAGE_CACHE_SIZE, ret); return ret; } diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 861b9954a63a..5b092bda9284 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -333,6 +333,11 @@ int ceph_release(struct inode *inode, struct file *file) return 0; } +enum { + CHECK_EOF = 1, + READ_INLINE = 2, +}; + /* * Read a range of bytes striped over one or more objects. Iterate over * objects we stripe over. (That's not atomic, but good enough for now.) @@ -412,7 +417,7 @@ more: ret = read; /* did we bounce off eof? */ if (pos + left > inode->i_size) - *checkeof = 1; + *checkeof = CHECK_EOF; } dout("striped_read returns %d\n", ret); @@ -808,7 +813,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to) struct page *pinned_page = NULL; ssize_t ret; int want, got = 0; - int checkeof = 0, read = 0; + int retry_op = 0, read = 0; again: dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n", @@ -830,8 +835,12 @@ again: inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, ceph_cap_string(got)); - /* hmm, this isn't really async... */ - ret = ceph_sync_read(iocb, to, &checkeof); + if (ci->i_inline_version == CEPH_INLINE_NONE) { + /* hmm, this isn't really async... */ + ret = ceph_sync_read(iocb, to, &retry_op); + } else { + retry_op = READ_INLINE; + } } else { dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n", inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, @@ -846,12 +855,50 @@ again: pinned_page = NULL; } ceph_put_cap_refs(ci, got); + if (retry_op && ret >= 0) { + int statret; + struct page *page = NULL; + loff_t i_size; + if (retry_op == READ_INLINE) { + page = __page_cache_alloc(GFP_NOFS); + if (!page) + return -ENOMEM; + } + + statret = __ceph_do_getattr(inode, page, + CEPH_STAT_CAP_INLINE_DATA, !!page); + if (statret < 0) { + __free_page(page); + if (statret == -ENODATA) { + BUG_ON(retry_op != READ_INLINE); + goto again; + } + return statret; + } - if (checkeof && ret >= 0) { - int statret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false); + i_size = i_size_read(inode); + if (retry_op == READ_INLINE) { + /* does not support inline data > PAGE_SIZE */ + if (i_size > PAGE_CACHE_SIZE) { + ret = -EIO; + } else if (iocb->ki_pos < i_size) { + loff_t end = min_t(loff_t, i_size, + iocb->ki_pos + len); + if (statret < end) + zero_user_segment(page, statret, end); + ret = copy_page_to_iter(page, + iocb->ki_pos & ~PAGE_MASK, + end - iocb->ki_pos, to); + iocb->ki_pos += ret; + } else { + ret = 0; + } + __free_pages(page, 0); + return ret; + } /* hit EOF or hole? */ - if (statret == 0 && iocb->ki_pos < inode->i_size && + if (retry_op == CHECK_EOF && iocb->ki_pos < i_size && ret < len) { dout("sync_read hit hole, ppos %lld < size %lld" ", reading more\n", iocb->ki_pos, @@ -859,7 +906,7 @@ again: read += ret; len -= ret; - checkeof = 0; + retry_op = 0; goto again; } } -- cgit v1.2.3-70-g09d2 From 28127bdd2f843e996f24b51a70a0592c7ec5c763 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 14 Nov 2014 22:38:29 +0800 Subject: ceph: convert inline data to normal data before data write Before any data write, convert inline data to normal data and set i_inline_version to CEPH_INLINE_NONE. The OSD request that saves inline data to object contains 3 operations (CMPXATTR, WRITE and SETXATTR). It compares a xattr named 'inline_version' to prevent old data overwrites newer data. Signed-off-by: Yan, Zheng --- fs/ceph/addr.c | 148 +++++++++++++++++++++++++++++++++++++++++++++++++++++++- fs/ceph/file.c | 14 ++++++ fs/ceph/super.h | 2 +- 3 files changed, 161 insertions(+), 3 deletions(-) (limited to 'fs') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 13413d7440d6..70a3b441261b 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -1313,6 +1313,19 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) size_t len; int want, got, ret; + if (ci->i_inline_version != CEPH_INLINE_NONE) { + struct page *locked_page = NULL; + if (off == 0) { + lock_page(page); + locked_page = page; + } + ret = ceph_uninline_data(vma->vm_file, locked_page); + if (locked_page) + unlock_page(locked_page); + if (ret < 0) + return VM_FAULT_SIGBUS; + } + if (off + PAGE_CACHE_SIZE <= size) len = PAGE_CACHE_SIZE; else @@ -1361,11 +1374,13 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) ret = VM_FAULT_SIGBUS; } out: - if (ret != VM_FAULT_LOCKED) { + if (ret != VM_FAULT_LOCKED) unlock_page(page); - } else { + if (ret == VM_FAULT_LOCKED || + ci->i_inline_version != CEPH_INLINE_NONE) { int dirty; spin_lock(&ci->i_ceph_lock); + ci->i_inline_version = CEPH_INLINE_NONE; dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); spin_unlock(&ci->i_ceph_lock); if (dirty) @@ -1422,6 +1437,135 @@ void ceph_fill_inline_data(struct inode *inode, struct page *locked_page, } } +int ceph_uninline_data(struct file *filp, struct page *locked_page) +{ + struct inode *inode = file_inode(filp); + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_fs_client *fsc = ceph_inode_to_client(inode); + struct ceph_osd_request *req; + struct page *page = NULL; + u64 len, inline_version; + int err = 0; + bool from_pagecache = false; + + spin_lock(&ci->i_ceph_lock); + inline_version = ci->i_inline_version; + spin_unlock(&ci->i_ceph_lock); + + dout("uninline_data %p %llx.%llx inline_version %llu\n", + inode, ceph_vinop(inode), inline_version); + + if (inline_version == 1 || /* initial version, no data */ + inline_version == CEPH_INLINE_NONE) + goto out; + + if (locked_page) { + page = locked_page; + WARN_ON(!PageUptodate(page)); + } else if (ceph_caps_issued(ci) & + (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) { + page = find_get_page(inode->i_mapping, 0); + if (page) { + if (PageUptodate(page)) { + from_pagecache = true; + lock_page(page); + } else { + page_cache_release(page); + page = NULL; + } + } + } + + if (page) { + len = i_size_read(inode); + if (len > PAGE_CACHE_SIZE) + len = PAGE_CACHE_SIZE; + } else { + page = __page_cache_alloc(GFP_NOFS); + if (!page) { + err = -ENOMEM; + goto out; + } + err = __ceph_do_getattr(inode, page, + CEPH_STAT_CAP_INLINE_DATA, true); + if (err < 0) { + /* no inline data */ + if (err == -ENODATA) + err = 0; + goto out; + } + len = err; + } + + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, + ceph_vino(inode), 0, &len, 0, 1, + CEPH_OSD_OP_CREATE, + CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, + ci->i_snap_realm->cached_context, + 0, 0, false); + if (IS_ERR(req)) { + err = PTR_ERR(req); + goto out; + } + + ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime); + err = ceph_osdc_start_request(&fsc->client->osdc, req, false); + if (!err) + err = ceph_osdc_wait_request(&fsc->client->osdc, req); + ceph_osdc_put_request(req); + if (err < 0) + goto out; + + req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, + ceph_vino(inode), 0, &len, 1, 3, + CEPH_OSD_OP_WRITE, + CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, + ci->i_snap_realm->cached_context, + ci->i_truncate_seq, ci->i_truncate_size, + false); + if (IS_ERR(req)) { + err = PTR_ERR(req); + goto out; + } + + osd_req_op_extent_osd_data_pages(req, 1, &page, len, 0, false, false); + + err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR, + "inline_version", &inline_version, + sizeof(inline_version), + CEPH_OSD_CMPXATTR_OP_GT, + CEPH_OSD_CMPXATTR_MODE_U64); + if (err) + goto out_put; + + err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR, + "inline_version", &inline_version, + sizeof(inline_version), 0, 0); + if (err) + goto out_put; + + ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime); + err = ceph_osdc_start_request(&fsc->client->osdc, req, false); + if (!err) + err = ceph_osdc_wait_request(&fsc->client->osdc, req); +out_put: + ceph_osdc_put_request(req); + if (err == -ECANCELED) + err = 0; +out: + if (page && page != locked_page) { + if (from_pagecache) { + unlock_page(page); + page_cache_release(page); + } else + __free_pages(page, 0); + } + + dout("uninline_data %p %llx.%llx inline_version %llu = %d\n", + inode, ceph_vinop(inode), inline_version, err); + return err; +} + static struct vm_operations_struct ceph_vmops = { .fault = ceph_filemap_fault, .page_mkwrite = ceph_page_mkwrite, diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 5b092bda9284..9b5901fefbf8 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -963,6 +963,12 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) if (err) goto out; + if (ci->i_inline_version != CEPH_INLINE_NONE) { + err = ceph_uninline_data(file, NULL); + if (err < 0) + goto out; + } + retry_snap: if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) { err = -ENOSPC; @@ -1024,6 +1030,7 @@ retry_snap: if (written >= 0) { int dirty; spin_lock(&ci->i_ceph_lock); + ci->i_inline_version = CEPH_INLINE_NONE; dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); spin_unlock(&ci->i_ceph_lock); if (dirty) @@ -1269,6 +1276,12 @@ static long ceph_fallocate(struct file *file, int mode, goto unlock; } + if (ci->i_inline_version != CEPH_INLINE_NONE) { + ret = ceph_uninline_data(file, NULL); + if (ret < 0) + goto unlock; + } + size = i_size_read(inode); if (!(mode & FALLOC_FL_KEEP_SIZE)) endoff = offset + length; @@ -1295,6 +1308,7 @@ static long ceph_fallocate(struct file *file, int mode, if (!ret) { spin_lock(&ci->i_ceph_lock); + ci->i_inline_version = CEPH_INLINE_NONE; dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); spin_unlock(&ci->i_ceph_lock); if (dirty) diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 6d56fae863ca..8197a3cf750b 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -888,7 +888,7 @@ extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry, extern int ceph_release(struct inode *inode, struct file *filp); extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page, char *data, size_t len); - +int ceph_uninline_data(struct file *filp, struct page *locked_page); /* dir.c */ extern const struct file_operations ceph_dir_fops; extern const struct inode_operations ceph_dir_iops; -- cgit v1.2.3-70-g09d2 From e20d258d73a8d565b729b6fc0290e060daabd8b8 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 14 Nov 2014 22:39:13 +0800 Subject: ceph: flush inline version After converting inline data to normal data, client need to flush the new i_inline_version (CEPH_INLINE_NONE) to MDS. This commit makes cap messages (sent to MDS) contain inline_version and inline_data. Client always converts inline data to normal data before data write, so the inline data length part is always zero. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 24 ++++++++++++++++++++---- fs/ceph/snap.c | 2 ++ fs/ceph/super.h | 1 + 3 files changed, 23 insertions(+), 4 deletions(-) (limited to 'fs') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 795afe304871..b93c631c6c87 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -975,10 +975,12 @@ static int send_cap_msg(struct ceph_mds_session *session, kuid_t uid, kgid_t gid, umode_t mode, u64 xattr_version, struct ceph_buffer *xattrs_buf, - u64 follows) + u64 follows, bool inline_data) { struct ceph_mds_caps *fc; struct ceph_msg *msg; + void *p; + size_t extra_len; dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s" " seq %u/%u mseq %u follows %lld size %llu/%llu" @@ -988,7 +990,10 @@ static int send_cap_msg(struct ceph_mds_session *session, seq, issue_seq, mseq, follows, size, max_size, xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0); - msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS, false); + /* flock buffer size + inline version + inline data size */ + extra_len = 4 + 8 + 4; + msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len, + GFP_NOFS, false); if (!msg) return -ENOMEM; @@ -1020,6 +1025,14 @@ static int send_cap_msg(struct ceph_mds_session *session, fc->gid = cpu_to_le32(from_kgid(&init_user_ns, gid)); fc->mode = cpu_to_le32(mode); + p = fc + 1; + /* flock buffer size */ + ceph_encode_32(&p, 0); + /* inline version */ + ceph_encode_64(&p, inline_data ? 0 : CEPH_INLINE_NONE); + /* inline data size */ + ceph_encode_32(&p, 0); + fc->xattr_version = cpu_to_le64(xattr_version); if (xattrs_buf) { msg->middle = ceph_buffer_get(xattrs_buf); @@ -1126,6 +1139,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, u64 flush_tid = 0; int i; int ret; + bool inline_data; held = cap->issued | cap->implemented; revoking = cap->implemented & ~cap->issued; @@ -1209,13 +1223,15 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, xattr_version = ci->i_xattrs.version; } + inline_data = ci->i_inline_version != CEPH_INLINE_NONE; + spin_unlock(&ci->i_ceph_lock); ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id, op, keep, want, flushing, seq, flush_tid, issue_seq, mseq, size, max_size, &mtime, &atime, time_warp_seq, uid, gid, mode, xattr_version, xattr_blob, - follows); + follows, inline_data); if (ret < 0) { dout("error sending cap msg, must requeue %p\n", inode); delayed = 1; @@ -1336,7 +1352,7 @@ retry: capsnap->time_warp_seq, capsnap->uid, capsnap->gid, capsnap->mode, capsnap->xattr_version, capsnap->xattr_blob, - capsnap->follows); + capsnap->follows, capsnap->inline_data); next_follows = capsnap->follows + 1; ceph_put_cap_snap(capsnap); diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index f64576e72b79..ce35fbd4ba5d 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -516,6 +516,8 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) capsnap->xattr_version = 0; } + capsnap->inline_data = ci->i_inline_version != CEPH_INLINE_NONE; + /* dirty page count moved from _head to this cap_snap; all subsequent writes page dirties occur _after_ this snapshot. */ diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 8197a3cf750b..e1aa32d0759d 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -161,6 +161,7 @@ struct ceph_cap_snap { u64 time_warp_seq; int writing; /* a sync write is still in progress */ int dirty_pages; /* dirty pages awaiting writeback */ + bool inline_data; }; static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap) -- cgit v1.2.3-70-g09d2 From 65a22662bfe1a84d72b9bbd9146b6782b9e53478 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Mon, 17 Nov 2014 10:01:03 +0800 Subject: ceph: support inline data feature Signed-off-by: Yan, Zheng --- fs/ceph/super.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'fs') diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 989e86cc1e73..50f06cddc94b 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -515,7 +515,8 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, struct ceph_fs_client *fsc; const u64 supported_features = CEPH_FEATURE_FLOCK | - CEPH_FEATURE_DIRLAYOUTHASH; + CEPH_FEATURE_DIRLAYOUTHASH | + CEPH_FEATURE_MDS_INLINE_DATA; const u64 required_features = 0; int page_count; size_t size; -- cgit v1.2.3-70-g09d2 From 021b77bee210843bed1ea91b5cad58235ff9c8e5 Mon Sep 17 00:00:00 2001 From: Dan Carpenter Date: Fri, 28 Nov 2014 11:33:34 +0300 Subject: ceph: do_sync is never initialized Probably this code was syncing a lot more often then intended because the do_sync variable wasn't set to zero. Cc: stable@vger.kernel.org # v3.11+ Fixes: c62988ec0910 ('ceph: avoid meaningless calling ceph_caps_revoking if sync_mode == WB_SYNC_ALL.') Signed-off-by: Dan Carpenter Signed-off-by: Ilya Dryomov --- fs/ceph/addr.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'fs') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 70a3b441261b..f5013d92a7e6 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -689,7 +689,7 @@ static int ceph_writepages_start(struct address_space *mapping, int rc = 0; unsigned wsize = 1 << inode->i_blkbits; struct ceph_osd_request *req = NULL; - int do_sync; + int do_sync = 0; u64 truncate_size, snap_size; u32 truncate_seq; -- cgit v1.2.3-70-g09d2 From 275dd19ea4e84c34f985ba097f9cddb539f54a50 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Wed, 10 Dec 2014 16:17:31 +0800 Subject: ceph: fix mksnap crash mksnap reply only contain 'target', does not contain 'dentry'. So it's wrong to use req->r_reply_info.head->is_dentry to detect traceless reply. Signed-off-by: Yan, Zheng Reviewed-by: Sage Weil --- fs/ceph/dir.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'fs') diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 652619950fa9..fcfd0abc8c38 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -812,7 +812,9 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode) acls.pagelist = NULL; } err = ceph_mdsc_do_request(mdsc, dir, req); - if (!err && !req->r_reply_info.head->is_dentry) + if (!err && + !req->r_reply_info.head->is_target && + !req->r_reply_info.head->is_dentry) err = ceph_handle_notrace_create(dir, dentry); ceph_mdsc_put_request(req); out: -- cgit v1.2.3-70-g09d2 From 0aeff37abada9f8c08d2b10481a43d3ae406c823 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Wed, 17 Dec 2014 21:26:47 +0800 Subject: ceph: fix setting empty extended attribute make sure 'value' is not null. otherwise __ceph_setxattr will remove the extended attribute. Signed-off-by: Yan, Zheng Reviewed-by: Sage Weil --- fs/ceph/xattr.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'fs') diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index 678b0d2bbbc4..5a492caf34cb 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -854,7 +854,7 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name, struct ceph_pagelist *pagelist = NULL; int err; - if (value) { + if (size > 0) { /* copy value into pagelist */ pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS); if (!pagelist) @@ -864,7 +864,7 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name, err = ceph_pagelist_append(pagelist, value, size); if (err) goto out; - } else { + } else if (!value) { flags |= CEPH_XATTR_REMOVE; } @@ -1001,6 +1001,9 @@ int ceph_setxattr(struct dentry *dentry, const char *name, if (!strncmp(name, XATTR_SYSTEM_PREFIX, XATTR_SYSTEM_PREFIX_LEN)) return generic_setxattr(dentry, name, value, size, flags); + if (size == 0) + value = ""; /* empty EA, do not remove */ + return __ceph_setxattr(dentry, name, value, size, flags); } -- cgit v1.2.3-70-g09d2