From 30c71233a1d4b5c23ad6652847285bf6b57086e2 Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Thu, 15 Dec 2016 08:37:56 -0500 Subject: ceph: clean up unsafe d_parent access in __choose_mds __choose_mds exists to pick an MDS to use when issuing a call. Doing that typically involves picking an inode and using the authoritative MDS for it. In most cases, that's pretty straightforward, as we are using an inode to which we hold a reference (usually represented by r_dentry or r_inode in the request). In the case of a snapshotted directory however, we need to fetch the non-snapped parent, which involves walking back up the parents in the tree. The dentries in the snapshot dir are effectively frozen but the overall parent is _not_, and could vanish if a concurrent rename were to occur. Clean this code up and take special care to ensure the validity of the entries we're working with. First, try to use the inode in r_locked_dir if one exists. If not and all we have is r_dentry, then we have to walk back up the tree. Use the rcu_read_lock for this so we can ensure that any d_parent we find won't go away, and take extra care to deal with the possibility that the dentries could go negative. Change get_nonsnap_parent to return an inode, and take a reference to that inode before returning (if any). Change all of the other places where we set "inode" in __choose_mds to also take a reference, and then call iput on that inode before exiting the function. Link: http://tracker.ceph.com/issues/18148 Signed-off-by: Jeff Layton Reviewed-by: Yan, Zheng Signed-off-by: Ilya Dryomov --- fs/ceph/mds_client.c | 64 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 42 insertions(+), 22 deletions(-) diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index c9d2e553a6c4..377ac34ddbb3 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -667,6 +667,27 @@ static void __unregister_request(struct ceph_mds_client *mdsc, ceph_mdsc_put_request(req); } +/* + * Walk back up the dentry tree until we hit a dentry representing a + * non-snapshot inode. We do this using the rcu_read_lock (which must be held + * when calling this) to ensure that the objects won't disappear while we're + * working with them. Once we hit a candidate dentry, we attempt to take a + * reference to it, and return that as the result. + */ +static struct inode *get_nonsnap_parent(struct dentry *dentry) { struct inode + *inode = NULL; + + while (dentry && !IS_ROOT(dentry)) { + inode = d_inode_rcu(dentry); + if (!inode || ceph_snap(inode) == CEPH_NOSNAP) + break; + dentry = dentry->d_parent; + } + if (inode) + inode = igrab(inode); + return inode; +} + /* * Choose mds to send request to next. If there is a hint set in the * request (e.g., due to a prior forward hint from the mds), use that. @@ -675,19 +696,6 @@ static void __unregister_request(struct ceph_mds_client *mdsc, * * Called under mdsc->mutex. */ -static struct dentry *get_nonsnap_parent(struct dentry *dentry) -{ - /* - * we don't need to worry about protecting the d_parent access - * here because we never renaming inside the snapped namespace - * except to resplice to another snapdir, and either the old or new - * result is a valid result. - */ - while (!IS_ROOT(dentry) && ceph_snap(d_inode(dentry)) != CEPH_NOSNAP) - dentry = dentry->d_parent; - return dentry; -} - static int __choose_mds(struct ceph_mds_client *mdsc, struct ceph_mds_request *req) { @@ -717,30 +725,39 @@ static int __choose_mds(struct ceph_mds_client *mdsc, inode = NULL; if (req->r_inode) { inode = req->r_inode; + ihold(inode); } else if (req->r_dentry) { /* ignore race with rename; old or new d_parent is okay */ - struct dentry *parent = req->r_dentry->d_parent; - struct inode *dir = d_inode(parent); + struct dentry *parent; + struct inode *dir; + + rcu_read_lock(); + parent = req->r_dentry->d_parent; + dir = req->r_locked_dir ? : d_inode_rcu(parent); - if (dir->i_sb != mdsc->fsc->sb) { - /* not this fs! */ + if (!dir || dir->i_sb != mdsc->fsc->sb) { + /* not this fs or parent went negative */ inode = d_inode(req->r_dentry); + if (inode) + ihold(inode); } else if (ceph_snap(dir) != CEPH_NOSNAP) { /* direct snapped/virtual snapdir requests * based on parent dir inode */ - struct dentry *dn = get_nonsnap_parent(parent); - inode = d_inode(dn); + inode = get_nonsnap_parent(parent); dout("__choose_mds using nonsnap parent %p\n", inode); } else { /* dentry target */ inode = d_inode(req->r_dentry); if (!inode || mode == USE_AUTH_MDS) { /* dir + name */ - inode = dir; + inode = igrab(dir); hash = ceph_dentry_hash(dir, req->r_dentry); is_hash = true; + } else { + ihold(inode); } } + rcu_read_unlock(); } dout("__choose_mds %p is_hash=%d (%d) mode %d\n", inode, (int)is_hash, @@ -769,7 +786,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc, (int)r, frag.ndist); if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= CEPH_MDS_STATE_ACTIVE) - return mds; + goto out; } /* since this file/dir wasn't known to be @@ -784,7 +801,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc, inode, ceph_vinop(inode), frag.frag, mds); if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >= CEPH_MDS_STATE_ACTIVE) - return mds; + goto out; } } } @@ -797,6 +814,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc, cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); if (!cap) { spin_unlock(&ci->i_ceph_lock); + iput(inode); goto random; } mds = cap->session->s_mds; @@ -804,6 +822,8 @@ static int __choose_mds(struct ceph_mds_client *mdsc, inode, ceph_vinop(inode), mds, cap == ci->i_auth_cap ? "auth " : "", cap); spin_unlock(&ci->i_ceph_lock); +out: + iput(inode); return mds; random: -- cgit v1.2.3-70-g09d2 From c6b0b656ca24ede6657abb4a2cd910fa9c1879ba Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Thu, 15 Dec 2016 08:37:57 -0500 Subject: ceph: clean up unsafe d_parent accesses in build_dentry_path While we hold a reference to the dentry when build_dentry_path is called, we could end up racing with a rename that changes d_parent. Handle that situation correctly, by using the rcu_read_lock to ensure that the parent dentry and inode stick around long enough to safely check ceph_snap and ceph_ino. Link: http://tracker.ceph.com/issues/18148 Signed-off-by: Jeff Layton Reviewed-by: Yan, Zheng Signed-off-by: Ilya Dryomov --- fs/ceph/mds_client.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 377ac34ddbb3..7c2eb28c24f6 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -1800,13 +1800,18 @@ static int build_dentry_path(struct dentry *dentry, int *pfreepath) { char *path; + struct inode *dir; - if (ceph_snap(d_inode(dentry->d_parent)) == CEPH_NOSNAP) { - *pino = ceph_ino(d_inode(dentry->d_parent)); + rcu_read_lock(); + dir = d_inode_rcu(dentry->d_parent); + if (dir && ceph_snap(dir) == CEPH_NOSNAP) { + *pino = ceph_ino(dir); + rcu_read_unlock(); *ppath = dentry->d_name.name; *ppathlen = dentry->d_name.len; return 0; } + rcu_read_unlock(); path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1); if (IS_ERR(path)) return PTR_ERR(path); -- cgit v1.2.3-70-g09d2 From fd36a71762f3b0fcb9741ed24021afabec7e0c45 Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Thu, 15 Dec 2016 08:37:58 -0500 Subject: ceph: pass parent dir ino info to build_dentry_path In the event that we have a parent inode reference in the request, we can use that instead of mucking about in the dcache. Pass any parent inode info we have down to build_dentry_path so it can make use of it. Link: http://tracker.ceph.com/issues/18148 Signed-off-by: Jeff Layton Reviewed-by: Yan, Zheng Signed-off-by: Ilya Dryomov --- fs/ceph/mds_client.c | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 7c2eb28c24f6..c41c6aab3232 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -1795,15 +1795,15 @@ retry: return path; } -static int build_dentry_path(struct dentry *dentry, +static int build_dentry_path(struct dentry *dentry, struct inode *dir, const char **ppath, int *ppathlen, u64 *pino, int *pfreepath) { char *path; - struct inode *dir; rcu_read_lock(); - dir = d_inode_rcu(dentry->d_parent); + if (!dir) + dir = d_inode_rcu(dentry->d_parent); if (dir && ceph_snap(dir) == CEPH_NOSNAP) { *pino = ceph_ino(dir); rcu_read_unlock(); @@ -1847,8 +1847,8 @@ static int build_inode_path(struct inode *inode, * an explicit ino+path. */ static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, - const char *rpath, u64 rino, - const char **ppath, int *pathlen, + struct inode *rdiri, const char *rpath, + u64 rino, const char **ppath, int *pathlen, u64 *ino, int *freepath) { int r = 0; @@ -1858,7 +1858,8 @@ static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry, dout(" inode %p %llx.%llx\n", rinode, ceph_ino(rinode), ceph_snap(rinode)); } else if (rdentry) { - r = build_dentry_path(rdentry, ppath, pathlen, ino, freepath); + r = build_dentry_path(rdentry, rdiri, ppath, pathlen, ino, + freepath); dout(" dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen, *ppath); } else if (rpath || rino) { @@ -1891,7 +1892,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, int ret; ret = set_request_path_attr(req->r_inode, req->r_dentry, - req->r_path1, req->r_ino1.ino, + req->r_locked_dir, req->r_path1, req->r_ino1.ino, &path1, &pathlen1, &ino1, &freepath1); if (ret < 0) { msg = ERR_PTR(ret); @@ -1899,6 +1900,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, } ret = set_request_path_attr(NULL, req->r_old_dentry, + req->r_old_dentry_dir, req->r_path2, req->r_ino2.ino, &path2, &pathlen2, &ino2, &freepath2); if (ret < 0) { -- cgit v1.2.3-70-g09d2 From adf0d68701c7f3e50f21308c76f41e60956a6832 Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Thu, 15 Dec 2016 08:37:58 -0500 Subject: ceph: fix unsafe dcache access in ceph_encode_dentry_release Accessing d_parent requires some sort of locking or it could vanish out from under us. Since we take the d_lock anyway, use that to fetch d_parent and take a reference to it, and then use that reference to call ceph_encode_inode_release. Link: http://tracker.ceph.com/issues/18148 Signed-off-by: Jeff Layton Reviewed-by: Yan, Zheng Signed-off-by: Ilya Dryomov --- fs/ceph/caps.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 94fd76d04683..d1b4c543cab1 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -3926,7 +3926,7 @@ int ceph_encode_inode_release(void **p, struct inode *inode, int ceph_encode_dentry_release(void **p, struct dentry *dentry, int mds, int drop, int unless) { - struct inode *dir = d_inode(dentry->d_parent); + struct dentry *parent; struct ceph_mds_request_release *rel = *p; struct ceph_dentry_info *di = ceph_dentry(dentry); int force = 0; @@ -3941,9 +3941,12 @@ int ceph_encode_dentry_release(void **p, struct dentry *dentry, spin_lock(&dentry->d_lock); if (di->lease_session && di->lease_session->s_mds == mds) force = 1; + parent = dget(dentry->d_parent); spin_unlock(&dentry->d_lock); - ret = ceph_encode_inode_release(p, dir, mds, drop, unless, force); + ret = ceph_encode_inode_release(p, d_inode(parent), mds, drop, + unless, force); + dput(parent); spin_lock(&dentry->d_lock); if (ret && di->lease_session && di->lease_session->s_mds == mds) { -- cgit v1.2.3-70-g09d2 From ca6c8ae0f7930dad7e10664e3b5bc657dd75be60 Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Thu, 15 Dec 2016 08:37:59 -0500 Subject: ceph: pass parent inode info to ceph_encode_dentry_release if we have it If we have a parent inode reference already, then we don't need to go back up the directory tree to find one. Link: http://tracker.ceph.com/issues/18148 Signed-off-by: Jeff Layton Reviewed-by: Yan, Zheng Signed-off-by: Ilya Dryomov --- fs/ceph/caps.c | 11 +++++++---- fs/ceph/mds_client.c | 7 +++++-- fs/ceph/super.h | 1 + 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index d1b4c543cab1..4951ab96ffc8 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -3924,9 +3924,10 @@ int ceph_encode_inode_release(void **p, struct inode *inode, } int ceph_encode_dentry_release(void **p, struct dentry *dentry, + struct inode *dir, int mds, int drop, int unless) { - struct dentry *parent; + struct dentry *parent = NULL; struct ceph_mds_request_release *rel = *p; struct ceph_dentry_info *di = ceph_dentry(dentry); int force = 0; @@ -3941,11 +3942,13 @@ int ceph_encode_dentry_release(void **p, struct dentry *dentry, spin_lock(&dentry->d_lock); if (di->lease_session && di->lease_session->s_mds == mds) force = 1; - parent = dget(dentry->d_parent); + if (!dir) { + parent = dget(dentry->d_parent); + dir = d_inode(parent); + } spin_unlock(&dentry->d_lock); - ret = ceph_encode_inode_release(p, d_inode(parent), mds, drop, - unless, force); + ret = ceph_encode_inode_release(p, dir, mds, drop, unless, force); dput(parent); spin_lock(&dentry->d_lock); diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index c41c6aab3232..1b78457c992b 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -1954,10 +1954,13 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, mds, req->r_inode_drop, req->r_inode_unless, 0); if (req->r_dentry_drop) releases += ceph_encode_dentry_release(&p, req->r_dentry, - mds, req->r_dentry_drop, req->r_dentry_unless); + req->r_locked_dir, mds, req->r_dentry_drop, + req->r_dentry_unless); if (req->r_old_dentry_drop) releases += ceph_encode_dentry_release(&p, req->r_old_dentry, - mds, req->r_old_dentry_drop, req->r_old_dentry_unless); + req->r_old_dentry_dir, mds, + req->r_old_dentry_drop, + req->r_old_dentry_unless); if (req->r_old_inode_drop) releases += ceph_encode_inode_release(&p, d_inode(req->r_old_dentry), diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 3373b61faefd..a80a915ca247 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -904,6 +904,7 @@ extern void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc); extern int ceph_encode_inode_release(void **p, struct inode *inode, int mds, int drop, int unless, int force); extern int ceph_encode_dentry_release(void **p, struct dentry *dn, + struct inode *dir, int mds, int drop, int unless); extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, -- cgit v1.2.3-70-g09d2 From 52953d55917e45fdf62d5e6cb37c61c32dc8d373 Mon Sep 17 00:00:00 2001 From: Seraphime Kirkovski Date: Mon, 26 Dec 2016 10:26:34 +0100 Subject: ceph: cleanup ACCESS_ONCE -> READ_ONCE This removes the uses of ACCESS_ONCE in favor of READ_ONCE Signed-off-by: Seraphime Kirkovski Signed-off-by: Yan, Zheng --- fs/ceph/addr.c | 4 ++-- fs/ceph/caps.c | 2 +- fs/ceph/dir.c | 2 +- fs/ceph/inode.c | 2 +- fs/ceph/mds_client.c | 10 +++++----- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index e4b066cd912a..39852567d66e 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -771,7 +771,7 @@ static int ceph_writepages_start(struct address_space *mapping, wbc->sync_mode == WB_SYNC_NONE ? "NONE" : (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD")); - if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { + if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { if (ci->i_wrbuffer_ref > 0) { pr_warn_ratelimited( "writepage_start %p %lld forced umount\n", @@ -1194,7 +1194,7 @@ static int ceph_update_writeable_page(struct file *file, int r; struct ceph_snap_context *snapc, *oldest; - if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { + if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { dout(" page %p forced umount\n", page); unlock_page(page); return -EIO; diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 4951ab96ffc8..e3a2a3f32568 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -2477,7 +2477,7 @@ again: if (ci->i_ceph_flags & CEPH_I_CAP_DROPPED) { int mds_wanted; - if (ACCESS_ONCE(mdsc->fsc->mount_state) == + if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { dout("get_cap_refs %p forced umount\n", inode); *err = -EIO; diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 8ab1fdf0bd49..d4385563b70a 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -1194,7 +1194,7 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) struct inode *dir; if (flags & LOOKUP_RCU) { - parent = ACCESS_ONCE(dentry->d_parent); + parent = READ_ONCE(dentry->d_parent); dir = d_inode_rcu(parent); if (!dir) return -ECHILD; diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 5e659d054b40..4926265f4223 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -1720,7 +1720,7 @@ static void ceph_invalidate_work(struct work_struct *work) mutex_lock(&ci->i_truncate_mutex); - if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { + if (READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { pr_warn_ratelimited("invalidate_pages %p %lld forced umount\n", inode, ceph_ino(inode)); mapping_set_error(inode->i_mapping, -EIO); diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 1b78457c992b..176512960b14 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -1166,7 +1166,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, ci->i_ceph_flags |= CEPH_I_CAP_DROPPED; if (ci->i_wrbuffer_ref > 0 && - ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) + READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) invalidate = true; while (!list_empty(&ci->i_cap_flush_list)) { @@ -2126,12 +2126,12 @@ static int __do_request(struct ceph_mds_client *mdsc, err = -EIO; goto finish; } - if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { + if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { dout("do_request forced umount\n"); err = -EIO; goto finish; } - if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { + if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_MOUNTING) { if (mdsc->mdsmap_err) { err = mdsc->mdsmap_err; dout("do_request mdsmap err %d\n", err); @@ -3586,7 +3586,7 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc) { u64 want_tid, want_flush; - if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) + if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) return; dout("sync\n"); @@ -3617,7 +3617,7 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc) */ static bool done_closing_sessions(struct ceph_mds_client *mdsc, int skipped) { - if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) + if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) return true; return atomic_read(&mdsc->num_sessions) <= skipped; } -- cgit v1.2.3-70-g09d2 From 0fbc5360bf433ffeccbb94ef806d637049899741 Mon Sep 17 00:00:00 2001 From: Colin Ian King Date: Thu, 29 Dec 2016 20:19:32 +0000 Subject: ceph: fix spelling mistake: "enabing" -> "enabling" trivial fix to spelling mistake in debug message Signed-off-by: Colin Ian King Signed-off-by: Yan, Zheng --- fs/ceph/cache.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c index 5bc5d37b1217..4e7421caf380 100644 --- a/fs/ceph/cache.c +++ b/fs/ceph/cache.c @@ -234,7 +234,7 @@ void ceph_fscache_file_set_cookie(struct inode *inode, struct file *filp) fscache_enable_cookie(ci->fscache, ceph_fscache_can_enable, inode); if (fscache_cookie_enabled(ci->fscache)) { - dout("fscache_file_set_cookie %p %p enabing cache\n", + dout("fscache_file_set_cookie %p %p enabling cache\n", inode, filp); } } -- cgit v1.2.3-70-g09d2 From 7c94ba27903c3c46c5829e96efb7c2a8ab7510e9 Mon Sep 17 00:00:00 2001 From: Andreas Gerstmayr Date: Tue, 10 Jan 2017 14:17:56 +0100 Subject: ceph: set io_pages bdi hint This patch sets the io_pages bdi hint based on the rsize mount option. Without this patch large buffered reads (request size > max readahead) are processed sequentially in chunks of the readahead size (i.e. read requests are sent out up to the readahead size, then the do_generic_file_read() function waits until the first page is received). With this patch read requests are sent out at once up to the size specified in the rsize mount option (default: 64 MB). Signed-off-by: Andreas Gerstmayr Acked-by: Jeff Layton Signed-off-by: Yan, Zheng --- Documentation/filesystems/ceph.txt | 5 ++--- fs/ceph/super.c | 8 ++++++++ fs/ceph/super.h | 4 ++-- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/Documentation/filesystems/ceph.txt b/Documentation/filesystems/ceph.txt index f5306ee40ea9..0b302a11718a 100644 --- a/Documentation/filesystems/ceph.txt +++ b/Documentation/filesystems/ceph.txt @@ -98,11 +98,10 @@ Mount Options size. rsize=X - Specify the maximum read size in bytes. By default there is no - maximum. + Specify the maximum read size in bytes. Default: 64 MB. rasize=X - Specify the maximum readahead. + Specify the maximum readahead. Default: 8 MB. mount_timeout=X Specify the timeout value for mount (in seconds), in the case diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 6bd20d707bfd..a0a0b6d02f89 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -952,6 +952,14 @@ static int ceph_register_bdi(struct super_block *sb, fsc->backing_dev_info.ra_pages = VM_MAX_READAHEAD * 1024 / PAGE_SIZE; + if (fsc->mount_options->rsize > fsc->mount_options->rasize && + fsc->mount_options->rsize >= PAGE_SIZE) + fsc->backing_dev_info.io_pages = + (fsc->mount_options->rsize + PAGE_SIZE - 1) + >> PAGE_SHIFT; + else if (fsc->mount_options->rsize == 0) + fsc->backing_dev_info.io_pages = ULONG_MAX; + err = bdi_register(&fsc->backing_dev_info, NULL, "ceph-%ld", atomic_long_inc_return(&bdi_seq)); if (!err) diff --git a/fs/ceph/super.h b/fs/ceph/super.h index a80a915ca247..f3f9215abf8e 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -45,8 +45,8 @@ #define ceph_test_mount_opt(fsc, opt) \ (!!((fsc)->mount_options->flags & CEPH_MOUNT_OPT_##opt)) -#define CEPH_RSIZE_DEFAULT 0 /* max read size */ -#define CEPH_RASIZE_DEFAULT (8192*1024) /* readahead */ +#define CEPH_RSIZE_DEFAULT (64*1024*1024) /* max read size */ +#define CEPH_RASIZE_DEFAULT (8192*1024) /* max readahead */ #define CEPH_MAX_READDIR_DEFAULT 1024 #define CEPH_MAX_READDIR_BYTES_DEFAULT (512*1024) #define CEPH_SNAPDIRNAME_DEFAULT ".snap" -- cgit v1.2.3-70-g09d2 From eb65b919b914e65485fd8134912066f4fafc4f1e Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Thu, 12 Jan 2017 17:18:00 +0800 Subject: ceph: avoid updating mds_wanted too frequently user space may open/close single file frequently. It's not good to send a clientcaps message to mds for each open/close syscall. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index e3a2a3f32568..d941c48e8bff 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1184,6 +1184,13 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, delayed = 1; } ci->i_ceph_flags &= ~(CEPH_I_NODELAY | CEPH_I_FLUSH); + if (want & ~cap->mds_wanted) { + /* user space may open/close single file frequently. + * This avoids droping mds_wanted immediately after + * requesting new mds_wanted. + */ + __cap_set_timeouts(mdsc, ci); + } cap->issued &= retain; /* drop bits we don't want */ if (cap->implemented & ~cap->issued) { @@ -2485,15 +2492,14 @@ again: goto out_unlock; } mds_wanted = __ceph_caps_mds_wanted(ci); - if ((mds_wanted & need) != need) { + if (need & ~(mds_wanted & need)) { dout("get_cap_refs %p caps were dropped" " (session killed?)\n", inode); *err = -ESTALE; ret = 1; goto out_unlock; } - if ((mds_wanted & file_wanted) == - (file_wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR))) + if (!(file_wanted & ~mds_wanted)) ci->i_ceph_flags &= ~CEPH_I_CAP_DROPPED; } -- cgit v1.2.3-70-g09d2 From d24cdcd3e40a6825135498e11c20c7976b9bf545 Mon Sep 17 00:00:00 2001 From: Arnd Bergmann Date: Mon, 16 Jan 2017 12:06:09 +0100 Subject: libceph: use BUG() instead of BUG_ON(1) I ran into this compile warning, which is the result of BUG_ON(1) not always leading to the compiler treating the code path as unreachable: include/linux/ceph/osdmap.h: In function 'ceph_can_shift_osds': include/linux/ceph/osdmap.h:62:1: error: control reaches end of non-void function [-Werror=return-type] Using BUG() here avoids the warning. Signed-off-by: Arnd Bergmann Signed-off-by: Ilya Dryomov --- include/linux/ceph/osdmap.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h index 9a9041784dcf..412906609954 100644 --- a/include/linux/ceph/osdmap.h +++ b/include/linux/ceph/osdmap.h @@ -57,7 +57,7 @@ static inline bool ceph_can_shift_osds(struct ceph_pg_pool_info *pool) case CEPH_POOL_TYPE_EC: return false; default: - BUG_ON(1); + BUG(); } } -- cgit v1.2.3-70-g09d2 From 7fea24c6d4a553c59937ae4fef95c730a88125cb Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 16 Jan 2017 14:35:17 +0100 Subject: libceph: include linux/sched.h into crypto.c directly Currently crypto.c gets linux/sched.h indirectly through linux/slab.h from linux/kasan.h. Include it directly for memalloc_noio_*() inlines. Signed-off-by: Ilya Dryomov --- net/ceph/crypto.c | 1 + 1 file changed, 1 insertion(+) diff --git a/net/ceph/crypto.c b/net/ceph/crypto.c index 292e33bd916e..85747b7f91a9 100644 --- a/net/ceph/crypto.c +++ b/net/ceph/crypto.c @@ -3,6 +3,7 @@ #include #include +#include #include #include #include -- cgit v1.2.3-70-g09d2 From 24c149ad6914d349d8b64749f20f3f8ea5031fe0 Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Thu, 12 Jan 2017 14:42:40 -0500 Subject: ceph: fix bogus endianness change in ceph_ioctl_set_layout sparse says: fs/ceph/ioctl.c:100:28: warning: cast to restricted __le64 preferred_osd is a __s64 so we don't need to do any conversion. Also, just remove the cast in ceph_ioctl_get_layout as it's not needed. Signed-off-by: Jeff Layton Reviewed-by: Sage Weil Signed-off-by: Ilya Dryomov --- fs/ceph/ioctl.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c index 7d752d53353a..4c9c72f26eb9 100644 --- a/fs/ceph/ioctl.c +++ b/fs/ceph/ioctl.c @@ -25,7 +25,7 @@ static long ceph_ioctl_get_layout(struct file *file, void __user *arg) l.stripe_count = ci->i_layout.stripe_count; l.object_size = ci->i_layout.object_size; l.data_pool = ci->i_layout.pool_id; - l.preferred_osd = (s32)-1; + l.preferred_osd = -1; if (copy_to_user(arg, &l, sizeof(l))) return -EFAULT; } @@ -97,7 +97,7 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg) nl.data_pool = ci->i_layout.pool_id; /* this is obsolete, and always -1 */ - nl.preferred_osd = le64_to_cpu(-1); + nl.preferred_osd = -1; err = __validate_layout(mdsc, &nl); if (err) -- cgit v1.2.3-70-g09d2 From d641df819db8b80198fd85d9de91137e8a823b07 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Thu, 19 Jan 2017 11:21:29 +0800 Subject: ceph: update readpages osd request according to size of pages add_to_page_cache_lru() can fails, so the actual pages to read can be smaller than the initial size of osd request. We need to update osd request size in that case. Signed-off-by: Yan, Zheng Reviewed-by: Jeff Layton --- fs/ceph/addr.c | 1 + net/ceph/osd_client.c | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 39852567d66e..4547bbf80e4f 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -391,6 +391,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) nr_pages = i; if (nr_pages > 0) { len = nr_pages << PAGE_SHIFT; + osd_req_op_extent_update(req, 0, len); break; } goto out_pages; diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 842f049abb86..3a2417bb6ff0 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -672,7 +672,8 @@ void osd_req_op_extent_update(struct ceph_osd_request *osd_req, BUG_ON(length > previous); op->extent.length = length; - op->indata_len -= previous - length; + if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL) + op->indata_len -= previous - length; } EXPORT_SYMBOL(osd_req_op_extent_update); -- cgit v1.2.3-70-g09d2 From 00f06cba53f53f3f7be8ac4f9ba2c2f6a94bca6f Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Tue, 24 Jan 2017 10:02:32 +0800 Subject: ceph: make sure flushing inode in proper session's cap_flushing list when flushing inode's auth cap changes, we need to move it into the new auth cap session's cap_flushing list Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index d941c48e8bff..ed8c7addce91 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -3410,6 +3410,7 @@ retry: tcap->implemented |= issued; if (cap == ci->i_auth_cap) ci->i_auth_cap = tcap; + if (!list_empty(&ci->i_cap_flush_list) && ci->i_auth_cap == tcap) { spin_lock(&mdsc->cap_dirty_lock); @@ -3423,9 +3424,18 @@ retry: } else if (tsession) { /* add placeholder for the export tagert */ int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0; + tcap = new_cap; ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0, t_seq - 1, t_mseq, (u64)-1, flag, &new_cap); + if (!list_empty(&ci->i_cap_flush_list) && + ci->i_auth_cap == tcap) { + spin_lock(&mdsc->cap_dirty_lock); + list_move_tail(&ci->i_flushing_item, + &tcap->session->s_cap_flushing); + spin_unlock(&mdsc->cap_dirty_lock); + } + __ceph_remove_cap(cap, false); goto out_unlock; } -- cgit v1.2.3-70-g09d2 From c1944fedd8c492ce1c1a99ca9064dcc7bafa80e9 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Sun, 29 Jan 2017 22:15:47 +0800 Subject: ceph: avoid calling ceph_renew_caps() infinitely __ceph_caps_mds_wanted() ignores caps from stale session. So the return value of __ceph_caps_mds_wanted() can keep the same across ceph_renew_caps(). This causes try_get_cap_refs() to keep calling ceph_renew_caps(). The fix is ignore the session valid check for the try_get_cap_refs() case. If session is stale, just let the caps requester sleep. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 6 +++--- fs/ceph/file.c | 2 +- fs/ceph/super.h | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index ed8c7addce91..3c2dfd72e5b2 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -867,7 +867,7 @@ int __ceph_caps_file_wanted(struct ceph_inode_info *ci) /* * Return caps we have registered with the MDS(s) as 'wanted'. */ -int __ceph_caps_mds_wanted(struct ceph_inode_info *ci) +int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check) { struct ceph_cap *cap; struct rb_node *p; @@ -875,7 +875,7 @@ int __ceph_caps_mds_wanted(struct ceph_inode_info *ci) for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) { cap = rb_entry(p, struct ceph_cap, ci_node); - if (!__cap_is_valid(cap)) + if (check && !__cap_is_valid(cap)) continue; if (cap == ci->i_auth_cap) mds_wanted |= cap->mds_wanted; @@ -2491,7 +2491,7 @@ again: ret = 1; goto out_unlock; } - mds_wanted = __ceph_caps_mds_wanted(ci); + mds_wanted = __ceph_caps_mds_wanted(ci, false); if (need & ~(mds_wanted & need)) { dout("get_cap_refs %p caps were dropped" " (session killed?)\n", inode); diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 045d30d26624..beb24f8bb917 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -283,7 +283,7 @@ int ceph_open(struct inode *inode, struct file *file) spin_lock(&ci->i_ceph_lock); if (__ceph_is_any_real_caps(ci) && (((fmode & CEPH_FILE_MODE_WR) == 0) || ci->i_auth_cap)) { - int mds_wanted = __ceph_caps_mds_wanted(ci); + int mds_wanted = __ceph_caps_mds_wanted(ci, true); int issued = __ceph_caps_issued(ci, NULL); dout("open %p fmode %d want %s issued %s using existing\n", diff --git a/fs/ceph/super.h b/fs/ceph/super.h index f3f9215abf8e..6477264bfc7e 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -602,7 +602,7 @@ static inline int __ceph_caps_wanted(struct ceph_inode_info *ci) } /* what the mds thinks we want */ -extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci); +extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check); extern void ceph_caps_init(struct ceph_mds_client *mdsc); extern void ceph_caps_finalize(struct ceph_mds_client *mdsc); -- cgit v1.2.3-70-g09d2 From 6fffaef954c432f6ddb42ec1064087e8ed704ca8 Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Tue, 31 Jan 2017 10:55:38 -0500 Subject: ceph: remove "Debugging hook" from ceph_fill_trace Keeping around commented out code is just asking for it to bitrot and makes viewing the code under cscope more confusing. If we really need this, then we can revert this patch and put it under a Kconfig option. Signed-off-by: Jeff Layton Reviewed-by: Yan, Zheng Signed-off-by: Ilya Dryomov --- fs/ceph/inode.c | 34 ---------------------------------- 1 file changed, 34 deletions(-) diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 4926265f4223..38a54ebf647d 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -1120,40 +1120,6 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, dout("fill_trace %p is_dentry %d is_target %d\n", req, rinfo->head->is_dentry, rinfo->head->is_target); -#if 0 - /* - * Debugging hook: - * - * If we resend completed ops to a recovering mds, we get no - * trace. Since that is very rare, pretend this is the case - * to ensure the 'no trace' handlers in the callers behave. - * - * Fill in inodes unconditionally to avoid breaking cap - * invariants. - */ - if (rinfo->head->op & CEPH_MDS_OP_WRITE) { - pr_info("fill_trace faking empty trace on %lld %s\n", - req->r_tid, ceph_mds_op_name(rinfo->head->op)); - if (rinfo->head->is_dentry) { - rinfo->head->is_dentry = 0; - err = fill_inode(req->r_locked_dir, - &rinfo->diri, rinfo->dirfrag, - session, req->r_request_started, -1); - } - if (rinfo->head->is_target) { - rinfo->head->is_target = 0; - ininfo = rinfo->targeti.in; - vino.ino = le64_to_cpu(ininfo->ino); - vino.snap = le64_to_cpu(ininfo->snapid); - in = ceph_get_inode(sb, vino); - err = fill_inode(in, &rinfo->targeti, NULL, - session, req->r_request_started, - req->r_fmode); - iput(in); - } - } -#endif - if (!rinfo->head->is_target && !rinfo->head->is_dentry) { dout("fill_trace reply is empty!\n"); if (rinfo->head->result == 0 && req->r_locked_dir) -- cgit v1.2.3-70-g09d2 From f5a03b080450784e671998921feb62fd3846c953 Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Tue, 31 Jan 2017 11:06:13 -0500 Subject: ceph: drop session argument to ceph_fill_trace Just get it from r_session since that's what's always passed in. Signed-off-by: Jeff Layton Reviewed-by: Yan, Zheng Signed-off-by: Ilya Dryomov --- fs/ceph/inode.c | 4 ++-- fs/ceph/mds_client.c | 2 +- fs/ceph/super.h | 3 +-- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 38a54ebf647d..b18462c64cdd 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -1108,9 +1108,9 @@ out: * * Called with snap_rwsem (read). */ -int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req, - struct ceph_mds_session *session) +int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req) { + struct ceph_mds_session *session = req->r_session; struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; struct inode *in = NULL; struct ceph_vino vino; diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 176512960b14..4a68067a76b5 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -2516,7 +2516,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) /* insert trace into our cache */ mutex_lock(&req->r_fill_mutex); current->journal_info = req; - err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session); + err = ceph_fill_trace(mdsc->fsc->sb, req); if (err == 0) { if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR || req->r_op == CEPH_MDS_OP_LSSNAP)) diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 6477264bfc7e..950170136be9 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -764,8 +764,7 @@ extern void ceph_fill_file_time(struct inode *inode, int issued, u64 time_warp_seq, struct timespec *ctime, struct timespec *mtime, struct timespec *atime); extern int ceph_fill_trace(struct super_block *sb, - struct ceph_mds_request *req, - struct ceph_mds_session *session); + struct ceph_mds_request *req); extern int ceph_readdir_prepopulate(struct ceph_mds_request *req, struct ceph_mds_session *session); -- cgit v1.2.3-70-g09d2 From bc2de10dc4da5036ada3381775bd966f0c21c603 Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Wed, 1 Feb 2017 13:49:09 -0500 Subject: ceph: convert bools in ceph_mds_request to a new r_req_flags field Currently, we have a bunch of bool flags in struct ceph_mds_request. We need more flags though, but each bool takes (at least) a byte. Those add up over time. Merge all of the existing bools in this struct into a single unsigned long, and use the set/test/clear_bit macros to manipulate them. These are atomic operations, but that is required here to prevent load/modify/store races. The existing flags are protected by different locks, so we can't rely on them for that purpose. Signed-off-by: Jeff Layton Reviewed-by: Yan, Zheng Signed-off-by: Ilya Dryomov --- fs/ceph/debugfs.c | 2 +- fs/ceph/dir.c | 4 ++-- fs/ceph/inode.c | 18 +++++++++--------- fs/ceph/mds_client.c | 47 +++++++++++++++++++++++++---------------------- fs/ceph/mds_client.h | 12 ++++++++---- 5 files changed, 45 insertions(+), 38 deletions(-) diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c index 39ff678e567f..f2ae393e2c31 100644 --- a/fs/ceph/debugfs.c +++ b/fs/ceph/debugfs.c @@ -70,7 +70,7 @@ static int mdsc_show(struct seq_file *s, void *p) seq_printf(s, "%s", ceph_mds_op_name(req->r_op)); - if (req->r_got_unsafe) + if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) seq_puts(s, "\t(unsafe)"); else seq_puts(s, "\t"); diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index d4385563b70a..5fb86d71220e 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -371,7 +371,7 @@ more: /* hints to request -> mds selection code */ req->r_direct_mode = USE_AUTH_MDS; req->r_direct_hash = ceph_frag_value(frag); - req->r_direct_is_hash = true; + __set_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); if (fi->last_name) { req->r_path2 = kstrdup(fi->last_name, GFP_KERNEL); if (!req->r_path2) { @@ -417,7 +417,7 @@ more: fi->frag = frag; fi->last_readdir = req; - if (req->r_did_prepopulate) { + if (test_bit(CEPH_MDS_R_DID_PREPOPULATE, &req->r_req_flags)) { fi->readdir_cache_idx = req->r_readdir_cache_idx; if (fi->readdir_cache_idx < 0) { /* preclude from marking dir ordered */ diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index b18462c64cdd..ebfb156aba89 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -1199,8 +1199,8 @@ retry_lookup: err = fill_inode(in, req->r_locked_page, &rinfo->targeti, NULL, session, req->r_request_started, - (!req->r_aborted && rinfo->head->result == 0) ? - req->r_fmode : -1, + (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags) && + rinfo->head->result == 0) ? req->r_fmode : -1, &req->r_caps_reservation); if (err < 0) { pr_err("fill_inode badness %p %llx.%llx\n", @@ -1213,8 +1213,8 @@ retry_lookup: * ignore null lease/binding on snapdir ENOENT, or else we * will have trouble splicing in the virtual snapdir later */ - if (rinfo->head->is_dentry && !req->r_aborted && - req->r_locked_dir && + if (rinfo->head->is_dentry && req->r_locked_dir && + !test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags) && (rinfo->head->is_target || strncmp(req->r_dentry->d_name.name, fsc->mount_options->snapdir_name, req->r_dentry->d_name.len))) { @@ -1317,9 +1317,9 @@ retry_lookup: update_dentry_lease(dn, rinfo->dlease, session, req->r_request_started); dout(" final dn %p\n", dn); - } else if (!req->r_aborted && - (req->r_op == CEPH_MDS_OP_LOOKUPSNAP || - req->r_op == CEPH_MDS_OP_MKSNAP)) { + } else if ((req->r_op == CEPH_MDS_OP_LOOKUPSNAP || + req->r_op == CEPH_MDS_OP_MKSNAP) && + !test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { struct dentry *dn = req->r_dentry; struct inode *dir = req->r_locked_dir; @@ -1444,7 +1444,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, u32 fpos_offset; struct ceph_readdir_cache_control cache_ctl = {}; - if (req->r_aborted) + if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) return readdir_prepopulate_inodes_only(req, session); if (rinfo->hash_order && req->r_path2) { @@ -1598,7 +1598,7 @@ next_item: } out: if (err == 0 && skipped == 0) { - req->r_did_prepopulate = true; + set_bit(CEPH_MDS_R_DID_PREPOPULATE, &req->r_req_flags); req->r_readdir_cache_idx = cache_ctl.index; } ceph_readdir_cache_release(&cache_ctl); diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 4a68067a76b5..ccf75a3260e8 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -644,13 +644,15 @@ static void __unregister_request(struct ceph_mds_client *mdsc, erase_request(&mdsc->request_tree, req); - if (req->r_unsafe_dir && req->r_got_unsafe) { + if (req->r_unsafe_dir && + test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); spin_lock(&ci->i_unsafe_lock); list_del_init(&req->r_unsafe_dir_item); spin_unlock(&ci->i_unsafe_lock); } - if (req->r_target_inode && req->r_got_unsafe) { + if (req->r_target_inode && + test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); spin_lock(&ci->i_unsafe_lock); list_del_init(&req->r_unsafe_target_item); @@ -705,7 +707,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc, int mode = req->r_direct_mode; int mds = -1; u32 hash = req->r_direct_hash; - bool is_hash = req->r_direct_is_hash; + bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); /* * is there a specific mds we should try? ignore hint if we have @@ -2042,7 +2044,7 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc, dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req, req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts); - if (req->r_got_unsafe) { + if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { void *p; /* * Replay. Do not regenerate message (and rebuild @@ -2091,7 +2093,7 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc, rhead = msg->front.iov_base; rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); - if (req->r_got_unsafe) + if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) flags |= CEPH_MDS_FLAG_REPLAY; if (req->r_locked_dir) flags |= CEPH_MDS_FLAG_WANT_DENTRY; @@ -2114,8 +2116,8 @@ static int __do_request(struct ceph_mds_client *mdsc, int mds = -1; int err = 0; - if (req->r_err || req->r_got_result) { - if (req->r_aborted) + if (req->r_err || test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { + if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) __unregister_request(mdsc, req); goto out; } @@ -2245,7 +2247,7 @@ static void kick_requests(struct ceph_mds_client *mdsc, int mds) while (p) { req = rb_entry(p, struct ceph_mds_request, r_node); p = rb_next(p); - if (req->r_got_unsafe) + if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) continue; if (req->r_attempts > 0) continue; /* only new requests */ @@ -2319,7 +2321,7 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, mutex_lock(&mdsc->mutex); /* only abort if we didn't race with a real reply */ - if (req->r_got_result) { + if (test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)) { err = le32_to_cpu(req->r_reply_info.head->result); } else if (err < 0) { dout("aborted request %lld with %d\n", req->r_tid, err); @@ -2331,7 +2333,7 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, */ mutex_lock(&req->r_fill_mutex); req->r_err = err; - req->r_aborted = true; + set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); mutex_unlock(&req->r_fill_mutex); if (req->r_locked_dir && @@ -2409,14 +2411,14 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) } /* dup? */ - if ((req->r_got_unsafe && !head->safe) || - (req->r_got_safe && head->safe)) { + if ((test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags) && !head->safe) || + (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags) && head->safe)) { pr_warn("got a dup %s reply on %llu from mds%d\n", head->safe ? "safe" : "unsafe", tid, mds); mutex_unlock(&mdsc->mutex); goto out; } - if (req->r_got_safe) { + if (test_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags)) { pr_warn("got unsafe after safe on %llu from mds%d\n", tid, mds); mutex_unlock(&mdsc->mutex); @@ -2455,10 +2457,10 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) if (head->safe) { - req->r_got_safe = true; + set_bit(CEPH_MDS_R_GOT_SAFE, &req->r_req_flags); __unregister_request(mdsc, req); - if (req->r_got_unsafe) { + if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { /* * We already handled the unsafe response, now do the * cleanup. No need to examine the response; the MDS @@ -2476,7 +2478,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) goto out; } } else { - req->r_got_unsafe = true; + set_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags); list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe); if (req->r_unsafe_dir) { struct ceph_inode_info *ci = @@ -2530,7 +2532,8 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) if (realm) ceph_put_snap_realm(mdsc, realm); - if (err == 0 && req->r_got_unsafe && req->r_target_inode) { + if (err == 0 && req->r_target_inode && + test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) { struct ceph_inode_info *ci = ceph_inode(req->r_target_inode); spin_lock(&ci->i_unsafe_lock); list_add_tail(&req->r_unsafe_target_item, &ci->i_unsafe_iops); @@ -2538,12 +2541,12 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) } out_err: mutex_lock(&mdsc->mutex); - if (!req->r_aborted) { + if (!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { if (err) { req->r_err = err; } else { req->r_reply = ceph_msg_get(msg); - req->r_got_result = true; + set_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags); } } else { dout("reply arrived after request %lld was aborted\n", tid); @@ -2587,7 +2590,7 @@ static void handle_forward(struct ceph_mds_client *mdsc, goto out; /* dup reply? */ } - if (req->r_aborted) { + if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { dout("forward tid %llu aborted, unregistering\n", tid); __unregister_request(mdsc, req); } else if (fwd_seq <= req->r_num_fwd) { @@ -2597,7 +2600,7 @@ static void handle_forward(struct ceph_mds_client *mdsc, /* resend. forward race not possible; mds would drop */ dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds); BUG_ON(req->r_err); - BUG_ON(req->r_got_result); + BUG_ON(test_bit(CEPH_MDS_R_GOT_RESULT, &req->r_req_flags)); req->r_attempts = 0; req->r_num_fwd = fwd_seq; req->r_resend_mds = next_mds; @@ -2762,7 +2765,7 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc, while (p) { req = rb_entry(p, struct ceph_mds_request, r_node); p = rb_next(p); - if (req->r_got_unsafe) + if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) continue; if (req->r_attempts == 0) continue; /* only old requests */ diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 3c6f77b7bb02..409b0e3c3b7a 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -205,6 +205,14 @@ struct ceph_mds_request { struct inode *r_locked_dir; /* dir (if any) i_mutex locked by vfs */ struct inode *r_target_inode; /* resulting inode */ +#define CEPH_MDS_R_DIRECT_IS_HASH (1) /* r_direct_hash is valid */ +#define CEPH_MDS_R_ABORTED (2) /* call was aborted */ +#define CEPH_MDS_R_GOT_UNSAFE (3) /* got an unsafe reply */ +#define CEPH_MDS_R_GOT_SAFE (4) /* got a safe reply */ +#define CEPH_MDS_R_GOT_RESULT (5) /* got a result */ +#define CEPH_MDS_R_DID_PREPOPULATE (6) /* prepopulated readdir */ + unsigned long r_req_flags; + struct mutex r_fill_mutex; union ceph_mds_request_args r_args; @@ -216,7 +224,6 @@ struct ceph_mds_request { /* for choosing which mds to send this request to */ int r_direct_mode; u32 r_direct_hash; /* choose dir frag based on this dentry hash */ - bool r_direct_is_hash; /* true if r_direct_hash is valid */ /* data payload is used for xattr ops */ struct ceph_pagelist *r_pagelist; @@ -234,7 +241,6 @@ struct ceph_mds_request { struct ceph_mds_reply_info_parsed r_reply_info; struct page *r_locked_page; int r_err; - bool r_aborted; unsigned long r_timeout; /* optional. jiffies, 0 is "wait forever" */ unsigned long r_started; /* start time to measure timeout against */ @@ -262,9 +268,7 @@ struct ceph_mds_request { ceph_mds_request_callback_t r_callback; ceph_mds_request_wait_callback_t r_wait_for_completion; struct list_head r_unsafe_item; /* per-session unsafe list item */ - bool r_got_unsafe, r_got_safe, r_got_result; - bool r_did_prepopulate; long long r_dir_release_cnt; long long r_dir_ordered_cnt; int r_readdir_cache_idx; -- cgit v1.2.3-70-g09d2 From 3dd69aabcef3d835446a9a1e11d2eab0e6e35e95 Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Tue, 31 Jan 2017 10:28:26 -0500 Subject: ceph: add a new flag to indicate whether parent is locked struct ceph_mds_request has an r_locked_dir pointer, which is set to indicate the parent inode and that its i_rwsem is locked. In some critical places, we need to be able to indicate the parent inode to the request handling code, even when its i_rwsem may not be locked. Most of the code that operates on r_locked_dir doesn't require that the i_rwsem be locked. We only really need it to handle manipulation of the dcache. The rest (filling of the inode, updating dentry leases, etc.) already has its own locking. Add a new r_req_flags bit that indicates whether the parent is locked when doing the request, and rename the pointer to "r_parent". For now, all the places that set r_parent also set this flag, but that will change in a later patch. Signed-off-by: Jeff Layton Reviewed-by: Yan, Zheng Signed-off-by: Ilya Dryomov --- fs/ceph/dir.c | 21 ++++++++++++++------- fs/ceph/export.c | 3 ++- fs/ceph/file.c | 3 ++- fs/ceph/inode.c | 13 +++++++------ fs/ceph/mds_client.c | 24 ++++++++++++------------ fs/ceph/mds_client.h | 3 ++- 6 files changed, 39 insertions(+), 28 deletions(-) diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 5fb86d71220e..c8562b8a8c7e 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -752,7 +752,8 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry, mask |= CEPH_CAP_XATTR_SHARED; req->r_args.getattr.mask = cpu_to_le32(mask); - req->r_locked_dir = dir; + req->r_parent = dir; + set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); err = ceph_mdsc_do_request(mdsc, NULL, req); err = ceph_handle_snapdir(req, dentry, err); dentry = ceph_finish_lookup(req, dentry, err); @@ -813,7 +814,8 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry, } req->r_dentry = dget(dentry); req->r_num_caps = 2; - req->r_locked_dir = dir; + req->r_parent = dir; + set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); req->r_args.mknod.mode = cpu_to_le32(mode); req->r_args.mknod.rdev = cpu_to_le32(rdev); req->r_dentry_drop = CEPH_CAP_FILE_SHARED; @@ -864,7 +866,8 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry, ceph_mdsc_put_request(req); goto out; } - req->r_locked_dir = dir; + req->r_parent = dir; + set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); req->r_dentry = dget(dentry); req->r_num_caps = 2; req->r_dentry_drop = CEPH_CAP_FILE_SHARED; @@ -913,7 +916,8 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode) req->r_dentry = dget(dentry); req->r_num_caps = 2; - req->r_locked_dir = dir; + req->r_parent = dir; + set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); req->r_args.mkdir.mode = cpu_to_le32(mode); req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; @@ -957,7 +961,8 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir, req->r_dentry = dget(dentry); req->r_num_caps = 2; req->r_old_dentry = dget(old_dentry); - req->r_locked_dir = dir; + req->r_parent = dir; + set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; /* release LINK_SHARED on source inode (mds will lock it) */ @@ -1023,7 +1028,8 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry) } req->r_dentry = dget(dentry); req->r_num_caps = 2; - req->r_locked_dir = dir; + req->r_parent = dir; + set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; req->r_inode_drop = drop_caps_for_unlink(inode); @@ -1066,7 +1072,8 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, req->r_num_caps = 2; req->r_old_dentry = dget(old_dentry); req->r_old_dentry_dir = old_dir; - req->r_locked_dir = new_dir; + req->r_parent = new_dir; + set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL; req->r_dentry_drop = CEPH_CAP_FILE_SHARED; diff --git a/fs/ceph/export.c b/fs/ceph/export.c index 180bbef760f2..e8f11fa565c5 100644 --- a/fs/ceph/export.c +++ b/fs/ceph/export.c @@ -207,7 +207,8 @@ static int ceph_get_name(struct dentry *parent, char *name, req->r_inode = d_inode(child); ihold(d_inode(child)); req->r_ino2 = ceph_vino(d_inode(parent)); - req->r_locked_dir = d_inode(parent); + req->r_parent = d_inode(parent); + set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); req->r_num_caps = 2; err = ceph_mdsc_do_request(mdsc, NULL, req); diff --git a/fs/ceph/file.c b/fs/ceph/file.c index beb24f8bb917..baad3b688ada 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -379,7 +379,8 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry, mask |= CEPH_CAP_XATTR_SHARED; req->r_args.open.mask = cpu_to_le32(mask); - req->r_locked_dir = dir; /* caller holds dir->i_mutex */ + req->r_parent = dir; + set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); err = ceph_mdsc_do_request(mdsc, (flags & (O_CREAT|O_TRUNC)) ? dir : NULL, req); diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index ebfb156aba89..35a8c453bea6 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -1122,13 +1122,13 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req) if (!rinfo->head->is_target && !rinfo->head->is_dentry) { dout("fill_trace reply is empty!\n"); - if (rinfo->head->result == 0 && req->r_locked_dir) + if (rinfo->head->result == 0 && req->r_parent) ceph_invalidate_dir_request(req); return 0; } if (rinfo->head->is_dentry) { - struct inode *dir = req->r_locked_dir; + struct inode *dir = req->r_parent; if (dir) { err = fill_inode(dir, NULL, @@ -1213,8 +1213,9 @@ retry_lookup: * ignore null lease/binding on snapdir ENOENT, or else we * will have trouble splicing in the virtual snapdir later */ - if (rinfo->head->is_dentry && req->r_locked_dir && - !test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags) && + if (rinfo->head->is_dentry && + !test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags) && + test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags) && (rinfo->head->is_target || strncmp(req->r_dentry->d_name.name, fsc->mount_options->snapdir_name, req->r_dentry->d_name.len))) { @@ -1223,7 +1224,7 @@ retry_lookup: * mknod symlink mkdir : null -> new inode * unlink : linked -> null */ - struct inode *dir = req->r_locked_dir; + struct inode *dir = req->r_parent; struct dentry *dn = req->r_dentry; bool have_dir_cap, have_lease; @@ -1321,7 +1322,7 @@ retry_lookup: req->r_op == CEPH_MDS_OP_MKSNAP) && !test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { struct dentry *dn = req->r_dentry; - struct inode *dir = req->r_locked_dir; + struct inode *dir = req->r_parent; /* fill out a snapdir LOOKUPSNAP dentry */ BUG_ON(!dn); diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index ccf75a3260e8..52521f339745 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -547,8 +547,8 @@ void ceph_mdsc_release_request(struct kref *kref) ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); iput(req->r_inode); } - if (req->r_locked_dir) - ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); + if (req->r_parent) + ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); iput(req->r_target_inode); if (req->r_dentry) dput(req->r_dentry); @@ -735,7 +735,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc, rcu_read_lock(); parent = req->r_dentry->d_parent; - dir = req->r_locked_dir ? : d_inode_rcu(parent); + dir = req->r_parent ? : d_inode_rcu(parent); if (!dir || dir->i_sb != mdsc->fsc->sb) { /* not this fs or parent went negative */ @@ -1894,7 +1894,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, int ret; ret = set_request_path_attr(req->r_inode, req->r_dentry, - req->r_locked_dir, req->r_path1, req->r_ino1.ino, + req->r_parent, req->r_path1, req->r_ino1.ino, &path1, &pathlen1, &ino1, &freepath1); if (ret < 0) { msg = ERR_PTR(ret); @@ -1956,7 +1956,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, mds, req->r_inode_drop, req->r_inode_unless, 0); if (req->r_dentry_drop) releases += ceph_encode_dentry_release(&p, req->r_dentry, - req->r_locked_dir, mds, req->r_dentry_drop, + req->r_parent, mds, req->r_dentry_drop, req->r_dentry_unless); if (req->r_old_dentry_drop) releases += ceph_encode_dentry_release(&p, req->r_old_dentry, @@ -2095,14 +2095,14 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc, rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) flags |= CEPH_MDS_FLAG_REPLAY; - if (req->r_locked_dir) + if (req->r_parent) flags |= CEPH_MDS_FLAG_WANT_DENTRY; rhead->flags = cpu_to_le32(flags); rhead->num_fwd = req->r_num_fwd; rhead->num_retry = req->r_attempts - 1; rhead->ino = 0; - dout(" r_locked_dir = %p\n", req->r_locked_dir); + dout(" r_parent = %p\n", req->r_parent); return 0; } @@ -2282,11 +2282,11 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, dout("do_request on %p\n", req); - /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */ + /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */ if (req->r_inode) ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); - if (req->r_locked_dir) - ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN); + if (req->r_parent) + ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); if (req->r_old_dentry_dir) ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir), CEPH_CAP_PIN); @@ -2336,7 +2336,7 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags); mutex_unlock(&req->r_fill_mutex); - if (req->r_locked_dir && + if (req->r_parent && (req->r_op & CEPH_MDS_OP_WRITE)) ceph_invalidate_dir_request(req); } else { @@ -2355,7 +2355,7 @@ out: */ void ceph_invalidate_dir_request(struct ceph_mds_request *req) { - struct inode *inode = req->r_locked_dir; + struct inode *inode = req->r_parent; dout("invalidate_dir_request %p (complete, lease(s))\n", inode); diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 409b0e3c3b7a..ac0475a2daa7 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -202,7 +202,7 @@ struct ceph_mds_request { char *r_path1, *r_path2; struct ceph_vino r_ino1, r_ino2; - struct inode *r_locked_dir; /* dir (if any) i_mutex locked by vfs */ + struct inode *r_parent; /* parent dir inode */ struct inode *r_target_inode; /* resulting inode */ #define CEPH_MDS_R_DIRECT_IS_HASH (1) /* r_direct_hash is valid */ @@ -211,6 +211,7 @@ struct ceph_mds_request { #define CEPH_MDS_R_GOT_SAFE (4) /* got a safe reply */ #define CEPH_MDS_R_GOT_RESULT (5) /* got a result */ #define CEPH_MDS_R_DID_PREPOPULATE (6) /* prepopulated readdir */ +#define CEPH_MDS_R_PARENT_LOCKED (7) /* is r_parent->i_rwsem wlocked? */ unsigned long r_req_flags; struct mutex r_fill_mutex; -- cgit v1.2.3-70-g09d2 From 80d025ffede88969f6adf7266fbdedfd5641148a Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Thu, 26 Jan 2017 16:14:18 -0500 Subject: ceph: don't update_dentry_lease unless we actually got one This if block updates the dentry lease even in the case where the MDS didn't grant one. Signed-off-by: Jeff Layton Reviewed-by: Yan, Zheng Signed-off-by: Ilya Dryomov --- fs/ceph/inode.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 35a8c453bea6..542e3c6b0f1b 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -1286,8 +1286,8 @@ retry_lookup: ceph_dir_clear_ordered(dir); dout("d_delete %p\n", dn); d_delete(dn); - } else { - if (have_lease && d_unhashed(dn)) + } else if (have_lease) { + if (d_unhashed(dn)) d_add(dn, NULL); update_dentry_lease(dn, rinfo->dlease, session, -- cgit v1.2.3-70-g09d2 From f5d55f03973f79a008bf8635452849f8ef589c23 Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Fri, 27 Jan 2017 09:13:57 -0500 Subject: ceph: vet the target and parent inodes before updating dentry lease In a later patch, we're going to need to allow ceph_fill_trace to update the dentry's lease when the parent is not locked. This is potentially racy though -- by the time we get around to processing the trace, the parent may have already changed. Change update_dentry_lease to take a ceph_vino pointer and use that to ensure that the dentry's parent still matches it before updating the lease. Signed-off-by: Jeff Layton Reviewed-by: Yan, Zheng Signed-off-by: Ilya Dryomov --- fs/ceph/inode.c | 72 ++++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 48 insertions(+), 24 deletions(-) diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 542e3c6b0f1b..f5111df561e4 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -1016,7 +1016,9 @@ out: static void update_dentry_lease(struct dentry *dentry, struct ceph_mds_reply_lease *lease, struct ceph_mds_session *session, - unsigned long from_time) + unsigned long from_time, + struct ceph_vino *tgt_vino, + struct ceph_vino *dir_vino) { struct ceph_dentry_info *di = ceph_dentry(dentry); long unsigned duration = le32_to_cpu(lease->duration_ms); @@ -1024,13 +1026,27 @@ static void update_dentry_lease(struct dentry *dentry, long unsigned half_ttl = from_time + (duration * HZ / 2) / 1000; struct inode *dir; + /* + * Make sure dentry's inode matches tgt_vino. NULL tgt_vino means that + * we expect a negative dentry. + */ + if (!tgt_vino && d_really_is_positive(dentry)) + return; + + if (tgt_vino && (d_really_is_negative(dentry) || + !ceph_ino_compare(d_inode(dentry), tgt_vino))) + return; + spin_lock(&dentry->d_lock); dout("update_dentry_lease %p duration %lu ms ttl %lu\n", dentry, duration, ttl); - /* make lease_rdcache_gen match directory */ dir = d_inode(dentry->d_parent); + /* make sure parent matches dir_vino */ + if (!ceph_ino_compare(dir, dir_vino)) + goto out_unlock; + /* only track leases on regular dentries */ if (ceph_snap(dir) != CEPH_NOSNAP) goto out_unlock; @@ -1113,7 +1129,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req) struct ceph_mds_session *session = req->r_session; struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info; struct inode *in = NULL; - struct ceph_vino vino; + struct ceph_vino tvino, dvino; struct ceph_fs_client *fsc = ceph_sb_to_client(sb); int err = 0; @@ -1154,8 +1170,8 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req) dname.name = rinfo->dname; dname.len = rinfo->dname_len; dname.hash = full_name_hash(parent, dname.name, dname.len); - vino.ino = le64_to_cpu(rinfo->targeti.in->ino); - vino.snap = le64_to_cpu(rinfo->targeti.in->snapid); + tvino.ino = le64_to_cpu(rinfo->targeti.in->ino); + tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid); retry_lookup: dn = d_lookup(parent, &dname); dout("d_lookup on parent=%p name=%.*s got %p\n", @@ -1172,8 +1188,8 @@ retry_lookup: } err = 0; } else if (d_really_is_positive(dn) && - (ceph_ino(d_inode(dn)) != vino.ino || - ceph_snap(d_inode(dn)) != vino.snap)) { + (ceph_ino(d_inode(dn)) != tvino.ino || + ceph_snap(d_inode(dn)) != tvino.snap)) { dout(" dn %p points to wrong inode %p\n", dn, d_inode(dn)); d_delete(dn); @@ -1187,10 +1203,10 @@ retry_lookup: } if (rinfo->head->is_target) { - vino.ino = le64_to_cpu(rinfo->targeti.in->ino); - vino.snap = le64_to_cpu(rinfo->targeti.in->snapid); + tvino.ino = le64_to_cpu(rinfo->targeti.in->ino); + tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid); - in = ceph_get_inode(sb, vino); + in = ceph_get_inode(sb, tvino); if (IS_ERR(in)) { err = PTR_ERR(in); goto done; @@ -1231,10 +1247,12 @@ retry_lookup: BUG_ON(!dn); BUG_ON(!dir); BUG_ON(d_inode(dn->d_parent) != dir); - BUG_ON(ceph_ino(dir) != - le64_to_cpu(rinfo->diri.in->ino)); - BUG_ON(ceph_snap(dir) != - le64_to_cpu(rinfo->diri.in->snapid)); + + dvino.ino = le64_to_cpu(rinfo->diri.in->ino); + dvino.snap = le64_to_cpu(rinfo->diri.in->snapid); + + BUG_ON(ceph_ino(dir) != dvino.ino); + BUG_ON(ceph_snap(dir) != dvino.snap); /* do we have a lease on the whole dir? */ have_dir_cap = @@ -1291,7 +1309,8 @@ retry_lookup: d_add(dn, NULL); update_dentry_lease(dn, rinfo->dlease, session, - req->r_request_started); + req->r_request_started, + NULL, &dvino); } goto done; } @@ -1314,9 +1333,13 @@ retry_lookup: have_lease = false; } - if (have_lease) + if (have_lease) { + tvino.ino = le64_to_cpu(rinfo->targeti.in->ino); + tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid); update_dentry_lease(dn, rinfo->dlease, session, - req->r_request_started); + req->r_request_started, + &tvino, &dvino); + } dout(" final dn %p\n", dn); } else if ((req->r_op == CEPH_MDS_OP_LOOKUPSNAP || req->r_op == CEPH_MDS_OP_MKSNAP) && @@ -1490,14 +1513,14 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, /* FIXME: release caps/leases if error occurs */ for (i = 0; i < rinfo->dir_nr; i++) { struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i; - struct ceph_vino vino; + struct ceph_vino tvino, dvino; dname.name = rde->name; dname.len = rde->name_len; dname.hash = full_name_hash(parent, dname.name, dname.len); - vino.ino = le64_to_cpu(rde->inode.in->ino); - vino.snap = le64_to_cpu(rde->inode.in->snapid); + tvino.ino = le64_to_cpu(rde->inode.in->ino); + tvino.snap = le64_to_cpu(rde->inode.in->snapid); if (rinfo->hash_order) { u32 hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash, @@ -1526,8 +1549,8 @@ retry_lookup: goto out; } } else if (d_really_is_positive(dn) && - (ceph_ino(d_inode(dn)) != vino.ino || - ceph_snap(d_inode(dn)) != vino.snap)) { + (ceph_ino(d_inode(dn)) != tvino.ino || + ceph_snap(d_inode(dn)) != tvino.snap)) { dout(" dn %p points to wrong inode %p\n", dn, d_inode(dn)); d_delete(dn); @@ -1539,7 +1562,7 @@ retry_lookup: if (d_really_is_positive(dn)) { in = d_inode(dn); } else { - in = ceph_get_inode(parent->d_sb, vino); + in = ceph_get_inode(parent->d_sb, tvino); if (IS_ERR(in)) { dout("new_inode badness\n"); d_drop(dn); @@ -1584,8 +1607,9 @@ retry_lookup: ceph_dentry(dn)->offset = rde->offset; + dvino = ceph_vino(d_inode(parent)); update_dentry_lease(dn, rde->lease, req->r_session, - req->r_request_started); + req->r_request_started, &tvino, &dvino); if (err == 0 && skipped == 0 && cache_ctl.index >= 0) { ret = fill_readdir_cache(d_inode(parent), dn, -- cgit v1.2.3-70-g09d2 From cdde7c43513de7d8baec3ad7f6a01a177e00e968 Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Fri, 27 Jan 2017 13:07:10 -0500 Subject: ceph: call update_dentry_lease even when r_locked dir is not set We don't really require that the parent be locked in order to update the lease on a dentry. Lease info is protected by the d_lock. In the event that the parent is not locked in ceph_fill_trace, and we have both parent and target info, go ahead and update the dentry lease. Signed-off-by: Jeff Layton Reviewed-by: Yan, Zheng Signed-off-by: Ilya Dryomov --- fs/ceph/inode.c | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index f5111df561e4..68f46132b157 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -1360,6 +1360,26 @@ retry_lookup: goto done; } req->r_dentry = dn; /* may have spliced */ + } else if (rinfo->head->is_dentry) { + struct ceph_vino *ptvino = NULL; + + if ((le32_to_cpu(rinfo->diri.in->cap.caps) & CEPH_CAP_FILE_SHARED) || + le32_to_cpu(rinfo->dlease->duration_ms)) { + dvino.ino = le64_to_cpu(rinfo->diri.in->ino); + dvino.snap = le64_to_cpu(rinfo->diri.in->snapid); + + if (rinfo->head->is_target) { + tvino.ino = le64_to_cpu(rinfo->targeti.in->ino); + tvino.snap = le64_to_cpu(rinfo->targeti.in->snapid); + ptvino = &tvino; + } + + update_dentry_lease(req->r_dentry, rinfo->dlease, + session, req->r_request_started, ptvino, + &dvino); + } else { + dout("%s: no dentry lease or dir cap\n", __func__); + } } done: dout("fill_trace done err=%d\n", err); -- cgit v1.2.3-70-g09d2 From 5eb9f6040f3854c5b92c60fd211dc0235ef25ca8 Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Mon, 30 Jan 2017 09:47:25 -0500 Subject: ceph: do a LOOKUP in d_revalidate instead of GETATTR In commit c3f4688a08f (ceph: don't set req->r_locked_dir in ceph_d_revalidate), we changed the code to do a GETATTR instead of a LOOKUP as the parent info isn't strictly necessary to revalidate the dentry. What we missed there though is that in order to update the lease on the dentry after revalidating it, we _do_ need parent info. Change ceph_d_revalidate back to doing a LOOKUP instead of a GETATTR so that we can get the parent info in order to update the lease from ceph_fill_trace. Note that we set req->r_parent here, but we cannot set the CEPH_MDS_R_PARENT_LOCKED flag as we can't guarantee that it is. Signed-off-by: Jeff Layton Reviewed-by: Yan, Zheng Signed-off-by: Ilya Dryomov --- fs/ceph/dir.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index c8562b8a8c7e..3e9ad501addf 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -1244,11 +1244,12 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) return -ECHILD; op = ceph_snap(dir) == CEPH_SNAPDIR ? - CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_GETATTR; + CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP; req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS); if (!IS_ERR(req)) { req->r_dentry = dget(dentry); - req->r_num_caps = op == CEPH_MDS_OP_GETATTR ? 1 : 2; + req->r_num_caps = 2; + req->r_parent = dir; mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED; if (ceph_security_xattr_wanted(dir)) -- cgit v1.2.3-70-g09d2 From 19def166f3781edb4c68c3c68fcb8199eed2d605 Mon Sep 17 00:00:00 2001 From: Stafford Horne Date: Sun, 5 Feb 2017 16:07:32 +0900 Subject: libceph: remove unneeded stddef.h include This was causing a build failure for openrisc when using musl and gcc 5.4.0 since the file is not available in the toolchain. It doesnt seem this is needed and removing it does not cause any build warnings for me. Signed-off-by: Stafford Horne Signed-off-by: Ilya Dryomov --- net/ceph/snapshot.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/net/ceph/snapshot.c b/net/ceph/snapshot.c index 154683f5f14c..705414e78ae0 100644 --- a/net/ceph/snapshot.c +++ b/net/ceph/snapshot.c @@ -18,8 +18,6 @@ * 02110-1301, USA. */ -#include - #include #include #include -- cgit v1.2.3-70-g09d2 From 1b6a78b5b91cdc07cc0b940b458e90c86835cf73 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 31 Jan 2017 15:55:06 +0100 Subject: libceph: add osdmap_set_crush() helper Simplify osdmap_decode() and osdmap_apply_incremental() a bit. Signed-off-by: Ilya Dryomov --- net/ceph/osdmap.c | 36 ++++++++++++++++-------------------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index d2436880b305..47df075b31e5 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -808,6 +808,17 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, int max) return 0; } +static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush) +{ + if (IS_ERR(crush)) + return PTR_ERR(crush); + + if (map->crush) + crush_destroy(map->crush); + map->crush = crush; + return 0; +} + #define OSDMAP_WRAPPER_COMPAT_VER 7 #define OSDMAP_CLIENT_DATA_COMPAT_VER 1 @@ -1214,13 +1225,9 @@ static int osdmap_decode(void **p, void *end, struct ceph_osdmap *map) /* crush */ ceph_decode_32_safe(p, end, len, e_inval); - map->crush = crush_decode(*p, min(*p + len, end)); - if (IS_ERR(map->crush)) { - err = PTR_ERR(map->crush); - map->crush = NULL; + err = osdmap_set_crush(map, crush_decode(*p, min(*p + len, end))); + if (err) goto bad; - } - *p += len; /* ignore the rest */ *p = end; @@ -1375,7 +1382,6 @@ e_inval: struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, struct ceph_osdmap *map) { - struct crush_map *newcrush = NULL; struct ceph_fsid fsid; u32 epoch = 0; struct ceph_timespec modified; @@ -1414,12 +1420,10 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, /* new crush? */ ceph_decode_32_safe(p, end, len, e_inval); if (len > 0) { - newcrush = crush_decode(*p, min(*p+len, end)); - if (IS_ERR(newcrush)) { - err = PTR_ERR(newcrush); - newcrush = NULL; + err = osdmap_set_crush(map, + crush_decode(*p, min(*p + len, end))); + if (err) goto bad; - } *p += len; } @@ -1439,12 +1443,6 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end, map->epoch++; map->modified = modified; - if (newcrush) { - if (map->crush) - crush_destroy(map->crush); - map->crush = newcrush; - newcrush = NULL; - } /* new_pools */ err = decode_new_pools(p, end, map); @@ -1505,8 +1503,6 @@ bad: print_hex_dump(KERN_DEBUG, "osdmap: ", DUMP_PREFIX_OFFSET, 16, 1, start, end - start, true); - if (newcrush) - crush_destroy(newcrush); return ERR_PTR(err); } -- cgit v1.2.3-70-g09d2 From 66a0e2d579dbec5c676cfe446234ffebb267c564 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 31 Jan 2017 15:55:06 +0100 Subject: crush: remove mutable part of CRUSH map Then add it to the working state. It would be very nice if we didn't have to take a lock to calculate a crush placement. By moving the permutation array into the working data, we can treat the CRUSH map as immutable. Reflects ceph.git commit cbcd039651c0569551cb90d26ce27e1432671f2a. Signed-off-by: Ilya Dryomov --- include/linux/ceph/osdmap.h | 1 + include/linux/crush/crush.h | 41 +++++++-- include/linux/crush/mapper.h | 4 +- net/ceph/crush/crush.c | 5 -- net/ceph/crush/mapper.c | 206 +++++++++++++++++++++++++++++-------------- net/ceph/osdmap.c | 47 ++++++++-- 6 files changed, 218 insertions(+), 86 deletions(-) diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h index 412906609954..cef1cab789b9 100644 --- a/include/linux/ceph/osdmap.h +++ b/include/linux/ceph/osdmap.h @@ -175,6 +175,7 @@ struct ceph_osdmap { struct mutex crush_scratch_mutex; int crush_scratch_ary[CEPH_PG_MAX_SIZE * 3]; + void *crush_workspace; }; static inline bool ceph_osd_exists(struct ceph_osdmap *map, int osd) diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h index be8f12b8f195..fbecbd089d75 100644 --- a/include/linux/crush/crush.h +++ b/include/linux/crush/crush.h @@ -135,13 +135,6 @@ struct crush_bucket { __u32 size; /* num items */ __s32 *items; - /* - * cached random permutation: used for uniform bucket and for - * the linear search fallback for the other bucket types. - */ - __u32 perm_x; /* @x for which *perm is defined */ - __u32 perm_n; /* num elements of *perm that are permuted/defined */ - __u32 *perm; }; struct crush_bucket_uniform { @@ -211,6 +204,21 @@ struct crush_map { * device fails. */ __u8 chooseleaf_stable; + /* + * This value is calculated after decode or construction by + * the builder. It is exposed here (rather than having a + * 'build CRUSH working space' function) so that callers can + * reserve a static buffer, allocate space on the stack, or + * otherwise avoid calling into the heap allocator if they + * want to. The size of the working space depends on the map, + * while the size of the scratch vector passed to the mapper + * depends on the size of the desired result set. + * + * Nothing stops the caller from allocating both in one swell + * foop and passing in two points, though. + */ + size_t working_size; + #ifndef __KERNEL__ /* * version 0 (original) of straw_calc has various flaws. version 1 @@ -248,4 +256,23 @@ static inline int crush_calc_tree_node(int i) return ((i+1) << 1)-1; } +/* + * These data structures are private to the CRUSH implementation. They + * are exposed in this header file because builder needs their + * definitions to calculate the total working size. + * + * Moving this out of the crush map allow us to treat the CRUSH map as + * immutable within the mapper and removes the requirement for a CRUSH + * map lock. + */ +struct crush_work_bucket { + __u32 perm_x; /* @x for which *perm is defined */ + __u32 perm_n; /* num elements of *perm that are permuted/defined */ + __u32 *perm; /* Permutation of the bucket's items */ +}; + +struct crush_work { + struct crush_work_bucket **work; /* Per-bucket working store */ +}; + #endif diff --git a/include/linux/crush/mapper.h b/include/linux/crush/mapper.h index 5dfd5b1125d2..3303c7fd8a31 100644 --- a/include/linux/crush/mapper.h +++ b/include/linux/crush/mapper.h @@ -15,6 +15,8 @@ extern int crush_do_rule(const struct crush_map *map, int ruleno, int x, int *result, int result_max, const __u32 *weights, int weight_max, - int *scratch); + void *cwin, int *scratch); + +void crush_init_workspace(const struct crush_map *map, void *v); #endif diff --git a/net/ceph/crush/crush.c b/net/ceph/crush/crush.c index 80d7c3a97cb8..5bf94c04f645 100644 --- a/net/ceph/crush/crush.c +++ b/net/ceph/crush/crush.c @@ -45,7 +45,6 @@ int crush_get_bucket_item_weight(const struct crush_bucket *b, int p) void crush_destroy_bucket_uniform(struct crush_bucket_uniform *b) { - kfree(b->h.perm); kfree(b->h.items); kfree(b); } @@ -54,14 +53,12 @@ void crush_destroy_bucket_list(struct crush_bucket_list *b) { kfree(b->item_weights); kfree(b->sum_weights); - kfree(b->h.perm); kfree(b->h.items); kfree(b); } void crush_destroy_bucket_tree(struct crush_bucket_tree *b) { - kfree(b->h.perm); kfree(b->h.items); kfree(b->node_weights); kfree(b); @@ -71,7 +68,6 @@ void crush_destroy_bucket_straw(struct crush_bucket_straw *b) { kfree(b->straws); kfree(b->item_weights); - kfree(b->h.perm); kfree(b->h.items); kfree(b); } @@ -79,7 +75,6 @@ void crush_destroy_bucket_straw(struct crush_bucket_straw *b) void crush_destroy_bucket_straw2(struct crush_bucket_straw2 *b) { kfree(b->item_weights); - kfree(b->h.perm); kfree(b->h.items); kfree(b); } diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index 130ab407c5ec..9e75be5ec716 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -54,7 +54,6 @@ int crush_find_rule(const struct crush_map *map, int ruleset, int type, int size return -1; } - /* * bucket choose methods * @@ -72,59 +71,60 @@ int crush_find_rule(const struct crush_map *map, int ruleset, int type, int size * Since this is expensive, we optimize for the r=0 case, which * captures the vast majority of calls. */ -static int bucket_perm_choose(struct crush_bucket *bucket, +static int bucket_perm_choose(const struct crush_bucket *bucket, + struct crush_work_bucket *work, int x, int r) { unsigned int pr = r % bucket->size; unsigned int i, s; /* start a new permutation if @x has changed */ - if (bucket->perm_x != (__u32)x || bucket->perm_n == 0) { + if (work->perm_x != (__u32)x || work->perm_n == 0) { dprintk("bucket %d new x=%d\n", bucket->id, x); - bucket->perm_x = x; + work->perm_x = x; /* optimize common r=0 case */ if (pr == 0) { s = crush_hash32_3(bucket->hash, x, bucket->id, 0) % bucket->size; - bucket->perm[0] = s; - bucket->perm_n = 0xffff; /* magic value, see below */ + work->perm[0] = s; + work->perm_n = 0xffff; /* magic value, see below */ goto out; } for (i = 0; i < bucket->size; i++) - bucket->perm[i] = i; - bucket->perm_n = 0; - } else if (bucket->perm_n == 0xffff) { + work->perm[i] = i; + work->perm_n = 0; + } else if (work->perm_n == 0xffff) { /* clean up after the r=0 case above */ for (i = 1; i < bucket->size; i++) - bucket->perm[i] = i; - bucket->perm[bucket->perm[0]] = 0; - bucket->perm_n = 1; + work->perm[i] = i; + work->perm[work->perm[0]] = 0; + work->perm_n = 1; } /* calculate permutation up to pr */ - for (i = 0; i < bucket->perm_n; i++) + for (i = 0; i < work->perm_n; i++) dprintk(" perm_choose have %d: %d\n", i, bucket->perm[i]); - while (bucket->perm_n <= pr) { - unsigned int p = bucket->perm_n; + while (work->perm_n <= pr) { + unsigned int p = work->perm_n; /* no point in swapping the final entry */ if (p < bucket->size - 1) { i = crush_hash32_3(bucket->hash, x, bucket->id, p) % (bucket->size - p); if (i) { - unsigned int t = bucket->perm[p + i]; - bucket->perm[p + i] = bucket->perm[p]; - bucket->perm[p] = t; + unsigned int t = work->perm[p + i]; + work->perm[p + i] = work->perm[p]; + work->perm[p] = t; } dprintk(" perm_choose swap %d with %d\n", p, p+i); } - bucket->perm_n++; + work->perm_n++; } for (i = 0; i < bucket->size; i++) dprintk(" perm_choose %d: %d\n", i, bucket->perm[i]); - s = bucket->perm[pr]; + s = work->perm[pr]; out: dprintk(" perm_choose %d sz=%d x=%d r=%d (%d) s=%d\n", bucket->id, bucket->size, x, r, pr, s); @@ -132,14 +132,14 @@ out: } /* uniform */ -static int bucket_uniform_choose(struct crush_bucket_uniform *bucket, - int x, int r) +static int bucket_uniform_choose(const struct crush_bucket_uniform *bucket, + struct crush_work_bucket *work, int x, int r) { - return bucket_perm_choose(&bucket->h, x, r); + return bucket_perm_choose(&bucket->h, work, x, r); } /* list */ -static int bucket_list_choose(struct crush_bucket_list *bucket, +static int bucket_list_choose(const struct crush_bucket_list *bucket, int x, int r) { int i; @@ -155,8 +155,9 @@ static int bucket_list_choose(struct crush_bucket_list *bucket, w *= bucket->sum_weights[i]; w = w >> 16; /*dprintk(" scaled %llx\n", w);*/ - if (w < bucket->item_weights[i]) + if (w < bucket->item_weights[i]) { return bucket->h.items[i]; + } } dprintk("bad list sums for bucket %d\n", bucket->h.id); @@ -192,7 +193,7 @@ static int terminal(int x) return x & 1; } -static int bucket_tree_choose(struct crush_bucket_tree *bucket, +static int bucket_tree_choose(const struct crush_bucket_tree *bucket, int x, int r) { int n; @@ -224,7 +225,7 @@ static int bucket_tree_choose(struct crush_bucket_tree *bucket, /* straw */ -static int bucket_straw_choose(struct crush_bucket_straw *bucket, +static int bucket_straw_choose(const struct crush_bucket_straw *bucket, int x, int r) { __u32 i; @@ -301,7 +302,7 @@ static __u64 crush_ln(unsigned int xin) * */ -static int bucket_straw2_choose(struct crush_bucket_straw2 *bucket, +static int bucket_straw2_choose(const struct crush_bucket_straw2 *bucket, int x, int r) { unsigned int i, high = 0; @@ -344,37 +345,42 @@ static int bucket_straw2_choose(struct crush_bucket_straw2 *bucket, high_draw = draw; } } + return bucket->h.items[high]; } -static int crush_bucket_choose(struct crush_bucket *in, int x, int r) +static int crush_bucket_choose(const struct crush_bucket *in, + struct crush_work_bucket *work, + int x, int r) { dprintk(" crush_bucket_choose %d x=%d r=%d\n", in->id, x, r); BUG_ON(in->size == 0); switch (in->alg) { case CRUSH_BUCKET_UNIFORM: - return bucket_uniform_choose((struct crush_bucket_uniform *)in, - x, r); + return bucket_uniform_choose( + (const struct crush_bucket_uniform *)in, + work, x, r); case CRUSH_BUCKET_LIST: - return bucket_list_choose((struct crush_bucket_list *)in, + return bucket_list_choose((const struct crush_bucket_list *)in, x, r); case CRUSH_BUCKET_TREE: - return bucket_tree_choose((struct crush_bucket_tree *)in, + return bucket_tree_choose((const struct crush_bucket_tree *)in, x, r); case CRUSH_BUCKET_STRAW: - return bucket_straw_choose((struct crush_bucket_straw *)in, - x, r); + return bucket_straw_choose( + (const struct crush_bucket_straw *)in, + x, r); case CRUSH_BUCKET_STRAW2: - return bucket_straw2_choose((struct crush_bucket_straw2 *)in, - x, r); + return bucket_straw2_choose( + (const struct crush_bucket_straw2 *)in, + x, r); default: dprintk("unknown bucket %d alg %d\n", in->id, in->alg); return in->items[0]; } } - /* * true if device is marked "out" (failed, fully offloaded) * of the cluster @@ -416,7 +422,8 @@ static int is_out(const struct crush_map *map, * @parent_r: r value passed from the parent */ static int crush_choose_firstn(const struct crush_map *map, - struct crush_bucket *bucket, + struct crush_work *work, + const struct crush_bucket *bucket, const __u32 *weight, int weight_max, int x, int numrep, int type, int *out, int outpos, @@ -434,7 +441,7 @@ static int crush_choose_firstn(const struct crush_map *map, int rep; unsigned int ftotal, flocal; int retry_descent, retry_bucket, skip_rep; - struct crush_bucket *in = bucket; + const struct crush_bucket *in = bucket; int r; int i; int item = 0; @@ -473,9 +480,13 @@ static int crush_choose_firstn(const struct crush_map *map, if (local_fallback_retries > 0 && flocal >= (in->size>>1) && flocal > local_fallback_retries) - item = bucket_perm_choose(in, x, r); + item = bucket_perm_choose( + in, work->work[-1-in->id], + x, r); else - item = crush_bucket_choose(in, x, r); + item = crush_bucket_choose( + in, work->work[-1-in->id], + x, r); if (item >= map->max_devices) { dprintk(" bad item %d\n", item); skip_rep = 1; @@ -518,19 +529,21 @@ static int crush_choose_firstn(const struct crush_map *map, sub_r = r >> (vary_r-1); else sub_r = 0; - if (crush_choose_firstn(map, - map->buckets[-1-item], - weight, weight_max, - x, stable ? 1 : outpos+1, 0, - out2, outpos, count, - recurse_tries, 0, - local_retries, - local_fallback_retries, - 0, - vary_r, - stable, - NULL, - sub_r) <= outpos) + if (crush_choose_firstn( + map, + work, + map->buckets[-1-item], + weight, weight_max, + x, stable ? 1 : outpos+1, 0, + out2, outpos, count, + recurse_tries, 0, + local_retries, + local_fallback_retries, + 0, + vary_r, + stable, + NULL, + sub_r) <= outpos) /* didn't get leaf */ reject = 1; } else { @@ -600,7 +613,8 @@ reject: * */ static void crush_choose_indep(const struct crush_map *map, - struct crush_bucket *bucket, + struct crush_work *work, + const struct crush_bucket *bucket, const __u32 *weight, int weight_max, int x, int left, int numrep, int type, int *out, int outpos, @@ -610,7 +624,7 @@ static void crush_choose_indep(const struct crush_map *map, int *out2, int parent_r) { - struct crush_bucket *in = bucket; + const struct crush_bucket *in = bucket; int endpos = outpos + left; int rep; unsigned int ftotal; @@ -678,7 +692,9 @@ static void crush_choose_indep(const struct crush_map *map, break; } - item = crush_bucket_choose(in, x, r); + item = crush_bucket_choose( + in, work->work[-1-in->id], + x, r); if (item >= map->max_devices) { dprintk(" bad item %d\n", item); out[rep] = CRUSH_ITEM_NONE; @@ -724,13 +740,15 @@ static void crush_choose_indep(const struct crush_map *map, if (recurse_to_leaf) { if (item < 0) { - crush_choose_indep(map, - map->buckets[-1-item], - weight, weight_max, - x, 1, numrep, 0, - out2, rep, - recurse_tries, 0, - 0, NULL, r); + crush_choose_indep( + map, + work, + map->buckets[-1-item], + weight, weight_max, + x, 1, numrep, 0, + out2, rep, + recurse_tries, 0, + 0, NULL, r); if (out2[rep] == CRUSH_ITEM_NONE) { /* placed nothing; no leaf */ break; @@ -781,6 +799,53 @@ static void crush_choose_indep(const struct crush_map *map, #endif } + +/* + * This takes a chunk of memory and sets it up to be a shiny new + * working area for a CRUSH placement computation. It must be called + * on any newly allocated memory before passing it in to + * crush_do_rule. It may be used repeatedly after that, so long as the + * map has not changed. If the map /has/ changed, you must make sure + * the working size is no smaller than what was allocated and re-run + * crush_init_workspace. + * + * If you do retain the working space between calls to crush, make it + * thread-local. + */ +void crush_init_workspace(const struct crush_map *map, void *v) +{ + struct crush_work *w = v; + __s32 b; + + /* + * We work by moving through the available space and setting + * values and pointers as we go. + * + * It's a bit like Forth's use of the 'allot' word since we + * set the pointer first and then reserve the space for it to + * point to by incrementing the point. + */ + v += sizeof(struct crush_work *); + w->work = v; + v += map->max_buckets * sizeof(struct crush_work_bucket *); + for (b = 0; b < map->max_buckets; ++b) { + if (!map->buckets[b]) + continue; + + w->work[b] = v; + switch (map->buckets[b]->alg) { + default: + v += sizeof(struct crush_work_bucket); + break; + } + w->work[b]->perm_x = 0; + w->work[b]->perm_n = 0; + w->work[b]->perm = v; + v += map->buckets[b]->size * sizeof(__u32); + } + BUG_ON(v - (void *)w != map->working_size); +} + /** * crush_do_rule - calculate a mapping with the given input and rule * @map: the crush_map @@ -790,14 +855,16 @@ static void crush_choose_indep(const struct crush_map *map, * @result_max: maximum result size * @weight: weight vector (for map leaves) * @weight_max: size of weight vector + * @cwin: pointer to at least map->working_size bytes of memory * @scratch: scratch vector for private use; must be >= 3 * result_max */ int crush_do_rule(const struct crush_map *map, int ruleno, int x, int *result, int result_max, const __u32 *weight, int weight_max, - int *scratch) + void *cwin, int *scratch) { int result_len; + struct crush_work *cw = cwin; int *a = scratch; int *b = scratch + result_max; int *c = scratch + result_max*2; @@ -807,7 +874,7 @@ int crush_do_rule(const struct crush_map *map, int *o; int osize; int *tmp; - struct crush_rule *rule; + const struct crush_rule *rule; __u32 step; int i, j; int numrep; @@ -840,7 +907,7 @@ int crush_do_rule(const struct crush_map *map, for (step = 0; step < rule->len; step++) { int firstn = 0; - struct crush_rule_step *curstep = &rule->steps[step]; + const struct crush_rule_step *curstep = &rule->steps[step]; switch (curstep->op) { case CRUSH_RULE_TAKE: @@ -936,6 +1003,7 @@ int crush_do_rule(const struct crush_map *map, recurse_tries = choose_tries; osize += crush_choose_firstn( map, + cw, map->buckets[bno], weight, weight_max, x, numrep, @@ -956,6 +1024,7 @@ int crush_do_rule(const struct crush_map *map, numrep : (result_max-osize)); crush_choose_indep( map, + cw, map->buckets[bno], weight, weight_max, x, out_size, numrep, @@ -997,5 +1066,6 @@ int crush_do_rule(const struct crush_map *map, break; } } + return result_len; } diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 47df075b31e5..3892e7fa7747 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -153,6 +153,32 @@ bad: return -EINVAL; } +static void crush_finalize(struct crush_map *c) +{ + __s32 b; + + /* Space for the array of pointers to per-bucket workspace */ + c->working_size = sizeof(struct crush_work) + + c->max_buckets * sizeof(struct crush_work_bucket *); + + for (b = 0; b < c->max_buckets; b++) { + if (!c->buckets[b]) + continue; + + switch (c->buckets[b]->alg) { + default: + /* + * The base case, permutation variables and + * the pointer to the permutation array. + */ + c->working_size += sizeof(struct crush_work_bucket); + break; + } + /* Every bucket has a permutation array. */ + c->working_size += c->buckets[b]->size * sizeof(__u32); + } +} + static struct crush_map *crush_decode(void *pbyval, void *end) { struct crush_map *c; @@ -246,10 +272,6 @@ static struct crush_map *crush_decode(void *pbyval, void *end) b->items = kcalloc(b->size, sizeof(__s32), GFP_NOFS); if (b->items == NULL) goto badmem; - b->perm = kcalloc(b->size, sizeof(u32), GFP_NOFS); - if (b->perm == NULL) - goto badmem; - b->perm_n = 0; ceph_decode_need(p, end, b->size*sizeof(u32), bad); for (j = 0; j < b->size; j++) @@ -368,6 +390,8 @@ static struct crush_map *crush_decode(void *pbyval, void *end) dout("crush decode tunable chooseleaf_stable = %d\n", c->chooseleaf_stable); + crush_finalize(c); + done: dout("crush_decode success\n"); return c; @@ -753,6 +777,7 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map) kfree(map->osd_weight); kfree(map->osd_addr); kfree(map->osd_primary_affinity); + kfree(map->crush_workspace); kfree(map); } @@ -810,12 +835,23 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, int max) static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush) { + void *workspace; + if (IS_ERR(crush)) return PTR_ERR(crush); + workspace = kmalloc(crush->working_size, GFP_NOIO); + if (!workspace) { + crush_destroy(crush); + return -ENOMEM; + } + crush_init_workspace(crush, workspace); + if (map->crush) crush_destroy(map->crush); + kfree(map->crush_workspace); map->crush = crush; + map->crush_workspace = workspace; return 0; } @@ -1940,7 +1976,8 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x, mutex_lock(&map->crush_scratch_mutex); r = crush_do_rule(map->crush, ruleno, x, result, result_max, - weight, weight_max, map->crush_scratch_ary); + weight, weight_max, map->crush_workspace, + map->crush_scratch_ary); mutex_unlock(&map->crush_scratch_mutex); return r; -- cgit v1.2.3-70-g09d2 From 743efcffffc6620ab44ea9ec67c7e4e28dfa7742 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 31 Jan 2017 15:55:06 +0100 Subject: crush: merge working data and scratch Much like Arlo Guthrie, I decided that one big pile is better than two little piles. Reflects ceph.git commit 95c2df6c7e0b22d2ea9d91db500cf8b9441c73ba. Signed-off-by: Ilya Dryomov --- include/linux/ceph/osdmap.h | 3 +-- include/linux/crush/mapper.h | 14 +++++++++++++- net/ceph/crush/mapper.c | 17 +++++++---------- net/ceph/osdmap.c | 14 ++++++++------ 4 files changed, 29 insertions(+), 19 deletions(-) diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h index cef1cab789b9..8cebdc4158c3 100644 --- a/include/linux/ceph/osdmap.h +++ b/include/linux/ceph/osdmap.h @@ -173,8 +173,7 @@ struct ceph_osdmap { * the list of osds that store+replicate them. */ struct crush_map *crush; - struct mutex crush_scratch_mutex; - int crush_scratch_ary[CEPH_PG_MAX_SIZE * 3]; + struct mutex crush_workspace_mutex; void *crush_workspace; }; diff --git a/include/linux/crush/mapper.h b/include/linux/crush/mapper.h index 3303c7fd8a31..c95e19e1ff11 100644 --- a/include/linux/crush/mapper.h +++ b/include/linux/crush/mapper.h @@ -15,7 +15,19 @@ extern int crush_do_rule(const struct crush_map *map, int ruleno, int x, int *result, int result_max, const __u32 *weights, int weight_max, - void *cwin, int *scratch); + void *cwin); + +/* + * Returns the exact amount of workspace that will need to be used + * for a given combination of crush_map and result_max. The caller can + * then allocate this much on its own, either on the stack, in a + * per-thread long-lived buffer, or however it likes. + */ +static inline size_t crush_work_size(const struct crush_map *map, + int result_max) +{ + return map->working_size + result_max * 3 * sizeof(__u32); +} void crush_init_workspace(const struct crush_map *map, void *v); diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index 9e75be5ec716..2e31217ccae3 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -855,23 +855,22 @@ void crush_init_workspace(const struct crush_map *map, void *v) * @result_max: maximum result size * @weight: weight vector (for map leaves) * @weight_max: size of weight vector - * @cwin: pointer to at least map->working_size bytes of memory - * @scratch: scratch vector for private use; must be >= 3 * result_max + * @cwin: pointer to at least crush_work_size() bytes of memory */ int crush_do_rule(const struct crush_map *map, int ruleno, int x, int *result, int result_max, const __u32 *weight, int weight_max, - void *cwin, int *scratch) + void *cwin) { int result_len; struct crush_work *cw = cwin; - int *a = scratch; - int *b = scratch + result_max; - int *c = scratch + result_max*2; + int *a = cwin + map->working_size; + int *b = a + result_max; + int *c = b + result_max; + int *w = a; + int *o = b; int recurse_to_leaf; - int *w; int wsize = 0; - int *o; int osize; int *tmp; const struct crush_rule *rule; @@ -902,8 +901,6 @@ int crush_do_rule(const struct crush_map *map, rule = map->rules[ruleno]; result_len = 0; - w = a; - o = b; for (step = 0; step < rule->len; step++) { int firstn = 0; diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 3892e7fa7747..2374956c4d40 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -743,7 +743,7 @@ struct ceph_osdmap *ceph_osdmap_alloc(void) map->pool_max = -1; map->pg_temp = RB_ROOT; map->primary_temp = RB_ROOT; - mutex_init(&map->crush_scratch_mutex); + mutex_init(&map->crush_workspace_mutex); return map; } @@ -836,11 +836,14 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, int max) static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush) { void *workspace; + size_t work_size; if (IS_ERR(crush)) return PTR_ERR(crush); - workspace = kmalloc(crush->working_size, GFP_NOIO); + work_size = crush_work_size(crush, CEPH_PG_MAX_SIZE); + dout("%s work_size %zu bytes\n", __func__, work_size); + workspace = kmalloc(work_size, GFP_NOIO); if (!workspace) { crush_destroy(crush); return -ENOMEM; @@ -1974,11 +1977,10 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x, BUG_ON(result_max > CEPH_PG_MAX_SIZE); - mutex_lock(&map->crush_scratch_mutex); + mutex_lock(&map->crush_workspace_mutex); r = crush_do_rule(map->crush, ruleno, x, result, result_max, - weight, weight_max, map->crush_workspace, - map->crush_scratch_ary); - mutex_unlock(&map->crush_scratch_mutex); + weight, weight_max, map->crush_workspace); + mutex_unlock(&map->crush_workspace_mutex); return r; } -- cgit v1.2.3-70-g09d2 From ef9324bb11357c02a4f0529b806341e5b768d872 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 8 Feb 2017 18:57:48 +0100 Subject: libceph: don't go through with the mapping if the PG is too wide With EC overwrites maturing, the kernel client will be getting exposed to potentially very wide EC pools. While "min(pi->size, X)" works fine when the cluster is stable and happy, truncating OSD sets interferes with resend logic (ceph_is_new_interval(), etc). Abort the mapping if the pool is too wide, assigning the request to the homeless session. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- net/ceph/osdmap.c | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 2374956c4d40..6824c0ec8373 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -2013,8 +2013,14 @@ static void pg_to_raw_osds(struct ceph_osdmap *osdmap, return; } - len = do_crush(osdmap, ruleno, pps, raw->osds, - min_t(int, pi->size, ARRAY_SIZE(raw->osds)), + if (pi->size > ARRAY_SIZE(raw->osds)) { + pr_err_ratelimited("pool %lld ruleset %d type %d too wide: size %d > %zu\n", + pi->id, pi->crush_ruleset, pi->type, pi->size, + ARRAY_SIZE(raw->osds)); + return; + } + + len = do_crush(osdmap, ruleno, pps, raw->osds, pi->size, osdmap->osd_weight, osdmap->max_osd); if (len < 0) { pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n", -- cgit v1.2.3-70-g09d2 From 083a51fbc57ca848ab087692f3cc97898fd88b54 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 9 Feb 2017 16:14:52 +0100 Subject: libceph: bump CEPH_PG_MAX_SIZE to 32 ... to accommodate potentially very wide EC pools. This increases the size of a typical rbd ceph_osd_request by ~12% (from 1040 to 1168 bytes), but I'd rather go future proof here. Signed-off-by: Ilya Dryomov Reviewed-by: Sage Weil --- include/linux/ceph/rados.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h index 5c0da61cb763..5d0018782d50 100644 --- a/include/linux/ceph/rados.h +++ b/include/linux/ceph/rados.h @@ -50,7 +50,7 @@ struct ceph_timespec { #define CEPH_PG_LAYOUT_LINEAR 2 #define CEPH_PG_LAYOUT_HYBRID 3 -#define CEPH_PG_MAX_SIZE 16 /* max # osds in a single pg */ +#define CEPH_PG_MAX_SIZE 32 /* max # osds in a single pg */ /* * placement group. -- cgit v1.2.3-70-g09d2 From 848d796c8c918e2d60060992786f82754f539cd4 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 25 Jan 2017 18:16:21 +0100 Subject: rbd: use kstrndup() in rbd_header_from_disk() Signed-off-by: Ilya Dryomov Reviewed-by: Jason Dillaman --- drivers/block/rbd.c | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 36d2b9f4e836..c3251f9123c0 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -992,15 +992,11 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev, /* Allocate this now to avoid having to handle failure below */ if (first_time) { - size_t len; - - len = strnlen(ondisk->object_prefix, - sizeof (ondisk->object_prefix)); - object_prefix = kmalloc(len + 1, GFP_KERNEL); + object_prefix = kstrndup(ondisk->object_prefix, + sizeof(ondisk->object_prefix), + GFP_KERNEL); if (!object_prefix) return -ENOMEM; - memcpy(object_prefix, ondisk->object_prefix, len); - object_prefix[len] = '\0'; } /* Allocate the snapshot context and fill it in */ -- cgit v1.2.3-70-g09d2 From 24dca799fdbeeb0c955ef2f56a37461e6566aa07 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 25 Jan 2017 18:16:21 +0100 Subject: rbd: kill rbd_image_header::{crypt_type,comp_type} Image format 1 is deprecated and format 2 doesn't have these. Also, __rbd_dev_create() takes care of zeroing (or otherwise initializing) format 2 specific fields. Signed-off-by: Ilya Dryomov Reviewed-by: Jason Dillaman --- drivers/block/rbd.c | 9 --------- drivers/block/rbd_types.h | 3 --- 2 files changed, 12 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index c3251f9123c0..fa47f53942b7 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -144,8 +144,6 @@ struct rbd_image_header { /* These six fields never change for a given rbd image */ char *object_prefix; __u8 obj_order; - __u8 crypt_type; - __u8 comp_type; u64 stripe_unit; u64 stripe_count; u64 features; /* Might be changeable someday? */ @@ -1047,12 +1045,6 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev, if (first_time) { header->object_prefix = object_prefix; header->obj_order = ondisk->options.order; - header->crypt_type = ondisk->options.crypt_type; - header->comp_type = ondisk->options.comp_type; - /* The rest aren't used for format 1 images */ - header->stripe_unit = 0; - header->stripe_count = 0; - header->features = 0; } else { ceph_put_snap_context(header->snapc); kfree(header->snap_names); @@ -5938,7 +5930,6 @@ static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev) if (ret < 0) goto out_err; } - /* No support for crypto and compression type format 2 images */ return 0; out_err: diff --git a/drivers/block/rbd_types.h b/drivers/block/rbd_types.h index 94f367db27b0..be9c76d292f5 100644 --- a/drivers/block/rbd_types.h +++ b/drivers/block/rbd_types.h @@ -57,9 +57,6 @@ enum rbd_notify_op { #define RBD_MIN_OBJ_ORDER 16 #define RBD_MAX_OBJ_ORDER 30 -#define RBD_COMP_NONE 0 -#define RBD_CRYPT_NONE 0 - #define RBD_HEADER_TEXT "<<< Rados Block Device Image >>>\n" #define RBD_HEADER_SIGNATURE "RBD" #define RBD_HEADER_VERSION "001.005" -- cgit v1.2.3-70-g09d2 From 431a02cd82dd028f650d588f7cedc78643c5c86a Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 25 Jan 2017 18:16:21 +0100 Subject: rbd: initialize rbd_dev->header_oloc early No reason to delay it until image_id is known. This will be required by some rbd_obj_method_sync() callers, after rbd_obj_method_sync() is changed to take oloc. Signed-off-by: Ilya Dryomov Reviewed-by: Jason Dillaman --- drivers/block/rbd.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index fa47f53942b7..047e877cacb8 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -4863,7 +4863,7 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc, init_rwsem(&rbd_dev->header_rwsem); ceph_oid_init(&rbd_dev->header_oid); - ceph_oloc_init(&rbd_dev->header_oloc); + rbd_dev->header_oloc.pool = spec->pool_id; mutex_init(&rbd_dev->watch_mutex); rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED; @@ -6062,8 +6062,6 @@ static int rbd_dev_header_name(struct rbd_device *rbd_dev) /* Record the header object name for this rbd image. */ rbd_assert(rbd_image_format_valid(rbd_dev->image_format)); - - rbd_dev->header_oloc.pool = rbd_dev->layout.pool_id; if (rbd_dev->image_format == 1) ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s", spec->image_name, RBD_SUFFIX); -- cgit v1.2.3-70-g09d2 From fe5478e0f6694312ad17dea7083296c1aea0a049 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 25 Jan 2017 18:16:21 +0100 Subject: rbd: do away with obj_request in rbd_obj_read_sync() rbd_obj_request machinery is completely unnecessary here; all that's being done is fetching a metadata object - no striping, cloning, etc. More importantly, rbd_osd_req_create() grabs pool id from layout and that is becoming a data pool id. Kill offset argument - all metadata objects are small and read in full. Signed-off-by: Ilya Dryomov Reviewed-by: Jason Dillaman --- drivers/block/rbd.c | 81 +++++++++++++++++++++-------------------------------- 1 file changed, 32 insertions(+), 49 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 047e877cacb8..2c596f7498bd 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -4242,63 +4242,46 @@ static void rbd_free_disk(struct rbd_device *rbd_dev) } static int rbd_obj_read_sync(struct rbd_device *rbd_dev, - const char *object_name, - u64 offset, u64 length, void *buf) + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, + void *buf, int buf_len) { - struct rbd_obj_request *obj_request; - struct page **pages = NULL; - u32 page_count; - size_t size; + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct ceph_osd_request *req; + struct page **pages; + int num_pages = calc_pages_for(0, buf_len); int ret; - page_count = (u32) calc_pages_for(offset, length); - pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); - if (IS_ERR(pages)) - return PTR_ERR(pages); - - ret = -ENOMEM; - obj_request = rbd_obj_request_create(object_name, offset, length, - OBJ_REQUEST_PAGES); - if (!obj_request) - goto out; - - obj_request->pages = pages; - obj_request->page_count = page_count; - - obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1, - obj_request); - if (!obj_request->osd_req) - goto out; + req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_KERNEL); + if (!req) + return -ENOMEM; - osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ, - offset, length, 0, 0); - osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0, - obj_request->pages, - obj_request->length, - obj_request->offset & ~PAGE_MASK, - false, false); + ceph_oid_copy(&req->r_base_oid, oid); + ceph_oloc_copy(&req->r_base_oloc, oloc); + req->r_flags = CEPH_OSD_FLAG_READ; - rbd_obj_request_submit(obj_request); - ret = rbd_obj_request_wait(obj_request); + ret = ceph_osdc_alloc_messages(req, GFP_KERNEL); if (ret) - goto out; + goto out_req; - ret = obj_request->result; - if (ret < 0) - goto out; + pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); + if (IS_ERR(pages)) { + ret = PTR_ERR(pages); + goto out_req; + } - rbd_assert(obj_request->xferred <= (u64) SIZE_MAX); - size = (size_t) obj_request->xferred; - ceph_copy_from_page_vector(pages, buf, 0, size); - rbd_assert(size <= (size_t)INT_MAX); - ret = (int)size; -out: - if (obj_request) - rbd_obj_request_put(obj_request); - else - ceph_release_page_vector(pages, page_count); + osd_req_op_extent_init(req, 0, CEPH_OSD_OP_READ, 0, buf_len, 0, 0); + osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false, + true); + + ceph_osdc_start_request(osdc, req, false); + ret = ceph_osdc_wait_request(osdc, req); + if (ret >= 0) + ceph_copy_from_page_vector(pages, buf, 0, ret); +out_req: + ceph_osdc_put_request(req); return ret; } @@ -4334,8 +4317,8 @@ static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev) if (!ondisk) return -ENOMEM; - ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_oid.name, - 0, size, ondisk); + ret = rbd_obj_read_sync(rbd_dev, &rbd_dev->header_oid, + &rbd_dev->header_oloc, ondisk, size); if (ret < 0) goto out; if ((size_t)ret < size) { -- cgit v1.2.3-70-g09d2 From 2544a02090aa76ffb8069e6bf23d886e34d9c8da Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 25 Jan 2017 18:16:21 +0100 Subject: libceph: pass reply buffer length through ceph_osdc_call() To spare checking for "this reply fits into a page, but does it fit into my buffer?" in some callers, osd_req_op_cls_response_data_pages() needs to know how big it is. Signed-off-by: Ilya Dryomov Reviewed-by: Jason Dillaman --- net/ceph/cls_lock_client.c | 2 +- net/ceph/osd_client.c | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/net/ceph/cls_lock_client.c b/net/ceph/cls_lock_client.c index 50f040fdb2a9..f13a1ea87459 100644 --- a/net/ceph/cls_lock_client.c +++ b/net/ceph/cls_lock_client.c @@ -278,7 +278,7 @@ int ceph_cls_lock_info(struct ceph_osd_client *osdc, int get_info_op_buf_size; int name_len = strlen(lock_name); struct page *get_info_op_page, *reply_page; - size_t reply_len; + size_t reply_len = PAGE_SIZE; void *p, *end; int ret; diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 3a2417bb6ff0..ac4753421d0c 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -4023,7 +4023,7 @@ EXPORT_SYMBOL(ceph_osdc_maybe_request_map); * Execute an OSD class method on an object. * * @flags: CEPH_OSD_FLAG_* - * @resp_len: out param for reply length + * @resp_len: in/out param for reply length */ int ceph_osdc_call(struct ceph_osd_client *osdc, struct ceph_object_id *oid, @@ -4036,6 +4036,9 @@ int ceph_osdc_call(struct ceph_osd_client *osdc, struct ceph_osd_request *req; int ret; + if (req_len > PAGE_SIZE || (resp_page && *resp_len > PAGE_SIZE)) + return -E2BIG; + req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO); if (!req) return -ENOMEM; @@ -4054,7 +4057,7 @@ int ceph_osdc_call(struct ceph_osd_client *osdc, 0, false, false); if (resp_page) osd_req_op_cls_response_data_pages(req, 0, &resp_page, - PAGE_SIZE, 0, false, false); + *resp_len, 0, false, false); ceph_osdc_start_request(osdc, req, false); ret = ceph_osdc_wait_request(osdc, req); -- cgit v1.2.3-70-g09d2 From ecd4a68a26a2159b9f7233c6e892c94c90074cb0 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 25 Jan 2017 18:16:21 +0100 Subject: rbd: switch rbd_obj_method_sync() to ceph_osdc_call() As explained in the previous commit, rbd_obj_request machinery (and rbd_osd_req_create() in particular) shouldn't be used for working with metadata objects. Switch to the recently added ceph_osdc_call(). It assumes single pages for outbound and inbound buffers, but that's OK - none of the callers need more than that. These pages need to be allocated (messenger is in dire need of proper iterator interface!), but we are swapping for pages[] and pagelist allocations in the existing code. Kill class_name argument - all rbd methods are under "rbd". Signed-off-by: Ilya Dryomov Reviewed-by: Jason Dillaman --- drivers/block/rbd.c | 162 ++++++++++++++++++++++------------------------------ 1 file changed, 67 insertions(+), 95 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 2c596f7498bd..644786e208ac 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -3971,17 +3971,17 @@ out: * returned in the outbound buffer, or a negative error code. */ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, - const char *object_name, - const char *class_name, + struct ceph_object_id *oid, + struct ceph_object_locator *oloc, const char *method_name, const void *outbound, size_t outbound_size, void *inbound, size_t inbound_size) { - struct rbd_obj_request *obj_request; - struct page **pages; - u32 page_count; + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct page *req_page = NULL; + struct page *reply_page; int ret; /* @@ -3991,61 +3991,35 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev, * method. Currently if this is present it will be a * snapshot id. */ - page_count = (u32)calc_pages_for(0, inbound_size); - pages = ceph_alloc_page_vector(page_count, GFP_KERNEL); - if (IS_ERR(pages)) - return PTR_ERR(pages); + if (outbound) { + if (outbound_size > PAGE_SIZE) + return -E2BIG; - ret = -ENOMEM; - obj_request = rbd_obj_request_create(object_name, 0, inbound_size, - OBJ_REQUEST_PAGES); - if (!obj_request) - goto out; - - obj_request->pages = pages; - obj_request->page_count = page_count; - - obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1, - obj_request); - if (!obj_request->osd_req) - goto out; - - osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL, - class_name, method_name); - if (outbound_size) { - struct ceph_pagelist *pagelist; - - pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); - if (!pagelist) - goto out; + req_page = alloc_page(GFP_KERNEL); + if (!req_page) + return -ENOMEM; - ceph_pagelist_init(pagelist); - ceph_pagelist_append(pagelist, outbound, outbound_size); - osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0, - pagelist); + memcpy(page_address(req_page), outbound, outbound_size); } - osd_req_op_cls_response_data_pages(obj_request->osd_req, 0, - obj_request->pages, inbound_size, - 0, false, false); - rbd_obj_request_submit(obj_request); - ret = rbd_obj_request_wait(obj_request); - if (ret) - goto out; - - ret = obj_request->result; - if (ret < 0) - goto out; + reply_page = alloc_page(GFP_KERNEL); + if (!reply_page) { + if (req_page) + __free_page(req_page); + return -ENOMEM; + } - rbd_assert(obj_request->xferred < (u64)INT_MAX); - ret = (int)obj_request->xferred; - ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred); -out: - if (obj_request) - rbd_obj_request_put(obj_request); - else - ceph_release_page_vector(pages, page_count); + ret = ceph_osdc_call(osdc, oid, oloc, RBD_DRV_NAME, method_name, + CEPH_OSD_FLAG_READ, req_page, outbound_size, + reply_page, &inbound_size); + if (!ret) { + memcpy(inbound, page_address(reply_page), inbound_size); + ret = inbound_size; + } + if (req_page) + __free_page(req_page); + __free_page(reply_page); return ret; } @@ -4939,10 +4913,10 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id, __le64 size; } __attribute__ ((packed)) size_buf = { 0 }; - ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name, - "rbd", "get_size", - &snapid, sizeof (snapid), - &size_buf, sizeof (size_buf)); + ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, + &rbd_dev->header_oloc, "get_size", + &snapid, sizeof(snapid), + &size_buf, sizeof(size_buf)); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) return ret; @@ -4979,9 +4953,9 @@ static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev) if (!reply_buf) return -ENOMEM; - ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name, - "rbd", "get_object_prefix", NULL, 0, - reply_buf, RBD_OBJ_PREFIX_LEN_MAX); + ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, + &rbd_dev->header_oloc, "get_object_prefix", + NULL, 0, reply_buf, RBD_OBJ_PREFIX_LEN_MAX); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) goto out; @@ -5014,10 +4988,10 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id, u64 unsup; int ret; - ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name, - "rbd", "get_features", - &snapid, sizeof (snapid), - &features_buf, sizeof (features_buf)); + ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, + &rbd_dev->header_oloc, "get_features", + &snapid, sizeof(snapid), + &features_buf, sizeof(features_buf)); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) return ret; @@ -5076,10 +5050,9 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev) } snapid = cpu_to_le64(rbd_dev->spec->snap_id); - ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name, - "rbd", "get_parent", - &snapid, sizeof (snapid), - reply_buf, size); + ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, + &rbd_dev->header_oloc, "get_parent", + &snapid, sizeof(snapid), reply_buf, size); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) goto out_err; @@ -5179,9 +5152,9 @@ static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev) u64 stripe_count; int ret; - ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name, - "rbd", "get_stripe_unit_count", NULL, 0, - (char *)&striping_info_buf, size); + ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, + &rbd_dev->header_oloc, "get_stripe_unit_count", + NULL, 0, &striping_info_buf, size); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) return ret; @@ -5218,6 +5191,7 @@ static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev) static char *rbd_dev_image_name(struct rbd_device *rbd_dev) { + CEPH_DEFINE_OID_ONSTACK(oid); size_t image_id_size; char *image_id; void *p; @@ -5245,10 +5219,10 @@ static char *rbd_dev_image_name(struct rbd_device *rbd_dev) if (!reply_buf) goto out; - ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY, - "rbd", "dir_get_name", - image_id, image_id_size, - reply_buf, size); + ceph_oid_printf(&oid, "%s", RBD_DIRECTORY); + ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc, + "dir_get_name", image_id, image_id_size, + reply_buf, size); if (ret < 0) goto out; p = reply_buf; @@ -5427,9 +5401,9 @@ static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev) if (!reply_buf) return -ENOMEM; - ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name, - "rbd", "get_snapcontext", NULL, 0, - reply_buf, size); + ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, + &rbd_dev->header_oloc, "get_snapcontext", + NULL, 0, reply_buf, size); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) goto out; @@ -5492,10 +5466,9 @@ static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, return ERR_PTR(-ENOMEM); snapid = cpu_to_le64(snap_id); - ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_oid.name, - "rbd", "get_snapshot_name", - &snapid, sizeof (snapid), - reply_buf, size); + ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, + &rbd_dev->header_oloc, "get_snapshot_name", + &snapid, sizeof(snapid), reply_buf, size); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret < 0) { snap_name = ERR_PTR(ret); @@ -5802,7 +5775,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev) { int ret; size_t size; - char *object_name; + CEPH_DEFINE_OID_ONSTACK(oid); void *response; char *image_id; @@ -5822,12 +5795,12 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev) * First, see if the format 2 image id file exists, and if * so, get the image's persistent id from it. */ - size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name); - object_name = kmalloc(size, GFP_NOIO); - if (!object_name) - return -ENOMEM; - sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name); - dout("rbd id object name is %s\n", object_name); + ret = ceph_oid_aprintf(&oid, GFP_KERNEL, "%s%s", RBD_ID_PREFIX, + rbd_dev->spec->image_name); + if (ret) + return ret; + + dout("rbd id object name is %s\n", oid.name); /* Response will be an encoded string, which includes a length */ @@ -5840,9 +5813,9 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev) /* If it doesn't exist we'll assume it's a format 1 image */ - ret = rbd_obj_method_sync(rbd_dev, object_name, - "rbd", "get_id", NULL, 0, - response, RBD_IMAGE_ID_LEN_MAX); + ret = rbd_obj_method_sync(rbd_dev, &oid, &rbd_dev->header_oloc, + "get_id", NULL, 0, + response, RBD_IMAGE_ID_LEN_MAX); dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret); if (ret == -ENOENT) { image_id = kstrdup("", GFP_KERNEL); @@ -5865,8 +5838,7 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev) } out: kfree(response); - kfree(object_name); - + ceph_oid_destroy(&oid); return ret; } -- cgit v1.2.3-70-g09d2 From 03acf083355ffeb89b9269d205004d6d888fff99 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 25 Jan 2017 18:16:22 +0100 Subject: rbd: remove now unused rbd_obj_request_wait() and helpers Signed-off-by: Ilya Dryomov Reviewed-by: Jason Dillaman --- drivers/block/rbd.c | 38 -------------------------------------- 1 file changed, 38 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 644786e208ac..e2ffd4fffd06 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1619,44 +1619,6 @@ static void rbd_obj_request_submit(struct rbd_obj_request *obj_request) ceph_osdc_start_request(osd_req->r_osdc, osd_req, false); } -static void rbd_obj_request_end(struct rbd_obj_request *obj_request) -{ - dout("%s %p\n", __func__, obj_request); - ceph_osdc_cancel_request(obj_request->osd_req); -} - -/* - * Wait for an object request to complete. If interrupted, cancel the - * underlying osd request. - * - * @timeout: in jiffies, 0 means "wait forever" - */ -static int __rbd_obj_request_wait(struct rbd_obj_request *obj_request, - unsigned long timeout) -{ - long ret; - - dout("%s %p\n", __func__, obj_request); - ret = wait_for_completion_interruptible_timeout( - &obj_request->completion, - ceph_timeout_jiffies(timeout)); - if (ret <= 0) { - if (ret == 0) - ret = -ETIMEDOUT; - rbd_obj_request_end(obj_request); - } else { - ret = 0; - } - - dout("%s %p ret %d\n", __func__, obj_request, (int)ret); - return ret; -} - -static int rbd_obj_request_wait(struct rbd_obj_request *obj_request) -{ - return __rbd_obj_request_wait(obj_request, 0); -} - static void rbd_img_request_complete(struct rbd_img_request *img_request) { -- cgit v1.2.3-70-g09d2 From 5bc3fb177548503735bcc35fe98475d883740ecb Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 25 Jan 2017 18:16:22 +0100 Subject: rbd: use rbd_obj_bytes() more Returning u64 doesn't make sense: max header->obj_order is 25 and ceph_file_layout::object_size is u32. Signed-off-by: Ilya Dryomov Reviewed-by: Jason Dillaman --- drivers/block/rbd.c | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index e2ffd4fffd06..55e30db0576d 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -970,6 +970,14 @@ static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk) return true; } +/* + * returns the size of an object in the image + */ +static u32 rbd_obj_bytes(struct rbd_image_header *header) +{ + return 1U << header->obj_order; +} + /* * Fill an rbd image header with information from the given format 1 * on-disk header. @@ -1255,7 +1263,7 @@ static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset) { - u64 segment_size = (u64) 1 << rbd_dev->header.obj_order; + u64 segment_size = rbd_obj_bytes(&rbd_dev->header); return offset & (segment_size - 1); } @@ -1263,7 +1271,7 @@ static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset) static u64 rbd_segment_length(struct rbd_device *rbd_dev, u64 offset, u64 length) { - u64 segment_size = (u64) 1 << rbd_dev->header.obj_order; + u64 segment_size = rbd_obj_bytes(&rbd_dev->header); offset &= segment_size - 1; @@ -1274,14 +1282,6 @@ static u64 rbd_segment_length(struct rbd_device *rbd_dev, return length; } -/* - * returns the size of an object in the image - */ -static u64 rbd_obj_bytes(struct rbd_image_header *header) -{ - return 1 << header->obj_order; -} - /* * bio helpers */ @@ -2721,7 +2721,7 @@ static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request) * child image to which the original request was to be sent. */ img_offset = obj_request->img_offset - obj_request->offset; - length = (u64)1 << rbd_dev->header.obj_order; + length = rbd_obj_bytes(&rbd_dev->header); /* * There is no defined parent data beyond the parent @@ -5130,7 +5130,7 @@ static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev) * out, and only fail if the image has non-default values. */ ret = -EINVAL; - obj_size = (u64)1 << rbd_dev->header.obj_order; + obj_size = rbd_obj_bytes(&rbd_dev->header); p = &striping_info_buf; stripe_unit = ceph_decode_64(&p); if (stripe_unit != obj_size) { -- cgit v1.2.3-70-g09d2 From 263423f8adf4acbdc4fd26ed5b35f4a6408bc0ab Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 25 Jan 2017 18:16:22 +0100 Subject: rbd: introduce rbd_init_layout() Rather than initializing layout fields with some made up values in __rbd_dev_create(), move the initialization into rbd_init_layout() and call it after the header is actually populated. Signed-off-by: Ilya Dryomov Reviewed-by: Jason Dillaman --- drivers/block/rbd.c | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 55e30db0576d..e7131c758118 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -978,6 +978,21 @@ static u32 rbd_obj_bytes(struct rbd_image_header *header) return 1U << header->obj_order; } +static void rbd_init_layout(struct rbd_device *rbd_dev) +{ + if (rbd_dev->header.stripe_unit == 0 || + rbd_dev->header.stripe_count == 0) { + rbd_dev->header.stripe_unit = rbd_obj_bytes(&rbd_dev->header); + rbd_dev->header.stripe_count = 1; + } + + rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit; + rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count; + rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header); + rbd_dev->layout.pool_id = rbd_dev->spec->pool_id; + RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL); +} + /* * Fill an rbd image header with information from the given format 1 * on-disk header. @@ -1053,6 +1068,7 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev, if (first_time) { header->object_prefix = object_prefix; header->obj_order = ondisk->options.order; + rbd_init_layout(rbd_dev); } else { ceph_put_snap_context(header->snapc); kfree(header->snap_names); @@ -4804,12 +4820,6 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc, rbd_dev->rbd_client = rbdc; rbd_dev->spec = spec; - rbd_dev->layout.stripe_unit = 1 << RBD_MAX_OBJ_ORDER; - rbd_dev->layout.stripe_count = 1; - rbd_dev->layout.object_size = 1 << RBD_MAX_OBJ_ORDER; - rbd_dev->layout.pool_id = spec->pool_id; - RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL); - return rbd_dev; } @@ -5848,12 +5858,13 @@ static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev) goto out_err; } + rbd_init_layout(rbd_dev); return 0; + out_err: rbd_dev->header.features = 0; kfree(rbd_dev->header.object_prefix); rbd_dev->header.object_prefix = NULL; - return ret; } -- cgit v1.2.3-70-g09d2 From 7e97332ea9caad3b7c6d86bc3b982e17eda2f736 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 25 Jan 2017 18:16:22 +0100 Subject: rbd: support for data-pool feature Add support for RBD_FEATURE_DATA_POOL feature. rbd_dev->layout.pool_id now stores the data pool id. Signed-off-by: Ilya Dryomov Reviewed-by: Jason Dillaman --- drivers/block/rbd.c | 33 +++++++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index e7131c758118..ceac7877b5f4 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -123,9 +123,11 @@ static int atomic_dec_return_safe(atomic_t *v) #define RBD_FEATURE_LAYERING (1<<0) #define RBD_FEATURE_STRIPINGV2 (1<<1) #define RBD_FEATURE_EXCLUSIVE_LOCK (1<<2) +#define RBD_FEATURE_DATA_POOL (1<<7) #define RBD_FEATURES_ALL (RBD_FEATURE_LAYERING | \ RBD_FEATURE_STRIPINGV2 | \ - RBD_FEATURE_EXCLUSIVE_LOCK) + RBD_FEATURE_EXCLUSIVE_LOCK | \ + RBD_FEATURE_DATA_POOL) /* Features supported by this (client software) implementation. */ @@ -146,6 +148,7 @@ struct rbd_image_header { __u8 obj_order; u64 stripe_unit; u64 stripe_count; + s64 data_pool_id; u64 features; /* Might be changeable someday? */ /* The remaining fields need to be updated occasionally */ @@ -989,7 +992,8 @@ static void rbd_init_layout(struct rbd_device *rbd_dev) rbd_dev->layout.stripe_unit = rbd_dev->header.stripe_unit; rbd_dev->layout.stripe_count = rbd_dev->header.stripe_count; rbd_dev->layout.object_size = rbd_obj_bytes(&rbd_dev->header); - rbd_dev->layout.pool_id = rbd_dev->spec->pool_id; + rbd_dev->layout.pool_id = rbd_dev->header.data_pool_id == CEPH_NOPOOL ? + rbd_dev->spec->pool_id : rbd_dev->header.data_pool_id; RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL); } @@ -4797,6 +4801,7 @@ static struct rbd_device *__rbd_dev_create(struct rbd_client *rbdc, INIT_LIST_HEAD(&rbd_dev->node); init_rwsem(&rbd_dev->header_rwsem); + rbd_dev->header.data_pool_id = CEPH_NOPOOL; ceph_oid_init(&rbd_dev->header_oid); rbd_dev->header_oloc.pool = spec->pool_id; @@ -5161,6 +5166,24 @@ static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev) return 0; } +static int rbd_dev_v2_data_pool(struct rbd_device *rbd_dev) +{ + __le64 data_pool_id; + int ret; + + ret = rbd_obj_method_sync(rbd_dev, &rbd_dev->header_oid, + &rbd_dev->header_oloc, "get_data_pool", + NULL, 0, &data_pool_id, sizeof(data_pool_id)); + if (ret < 0) + return ret; + if (ret < sizeof(data_pool_id)) + return -EBADMSG; + + rbd_dev->header.data_pool_id = le64_to_cpu(data_pool_id); + WARN_ON(rbd_dev->header.data_pool_id == CEPH_NOPOOL); + return 0; +} + static char *rbd_dev_image_name(struct rbd_device *rbd_dev) { CEPH_DEFINE_OID_ONSTACK(oid); @@ -5858,6 +5881,12 @@ static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev) goto out_err; } + if (rbd_dev->header.features & RBD_FEATURE_DATA_POOL) { + ret = rbd_dev_v2_data_pool(rbd_dev); + if (ret) + goto out_err; + } + rbd_init_layout(rbd_dev); return 0; -- cgit v1.2.3-70-g09d2 From 67e2b65274e3c5fc9100544c1f90adcc85dbe597 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 25 Jan 2017 18:16:22 +0100 Subject: rbd: set offset and length outside of rbd_obj_request_create() The allocation doesn't depend on offset and length. Both offset and length can be changed after obj_request is allocated, too. Signed-off-by: Ilya Dryomov Reviewed-by: Jason Dillaman --- drivers/block/rbd.c | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index ceac7877b5f4..c975d49e612c 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1631,7 +1631,9 @@ static void rbd_obj_request_submit(struct rbd_obj_request *obj_request) { struct ceph_osd_request *osd_req = obj_request->osd_req; - dout("%s %p osd_req %p\n", __func__, obj_request, osd_req); + dout("%s %p \"%s\" %llu~%llu osd_req %p\n", __func__, + obj_request, obj_request->object_name, obj_request->offset, + obj_request->length, osd_req); if (obj_request_img_data_test(obj_request)) { WARN_ON(obj_request->callback != rbd_img_obj_callback); rbd_img_request_get(obj_request->img_request); @@ -2073,7 +2075,6 @@ static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req) /* object_name is assumed to be a non-null pointer and NUL-terminated */ static struct rbd_obj_request *rbd_obj_request_create(const char *object_name, - u64 offset, u64 length, enum obj_request_type type) { struct rbd_obj_request *obj_request; @@ -2094,18 +2095,13 @@ static struct rbd_obj_request *rbd_obj_request_create(const char *object_name, } obj_request->object_name = memcpy(name, object_name, size); - obj_request->offset = offset; - obj_request->length = length; - obj_request->flags = 0; obj_request->which = BAD_WHICH; obj_request->type = type; INIT_LIST_HEAD(&obj_request->links); init_completion(&obj_request->completion); kref_init(&obj_request->kref); - dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name, - offset, length, (int)type, obj_request); - + dout("%s %p\n", __func__, obj_request); return obj_request; } @@ -2517,21 +2513,21 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, while (resid) { struct ceph_osd_request *osd_req; const char *object_name; - u64 offset; - u64 length; + u64 offset = rbd_segment_offset(rbd_dev, img_offset); + u64 length = rbd_segment_length(rbd_dev, img_offset, resid); object_name = rbd_segment_name(rbd_dev, img_offset); if (!object_name) goto out_unwind; - offset = rbd_segment_offset(rbd_dev, img_offset); - length = rbd_segment_length(rbd_dev, img_offset, resid); - obj_request = rbd_obj_request_create(object_name, - offset, length, type); + obj_request = rbd_obj_request_create(object_name, type); /* object request has its own copy of the object name */ rbd_segment_name_free(object_name); if (!obj_request) goto out_unwind; + obj_request->offset = offset; + obj_request->length = length; + /* * set obj_request->img_request before creating the * osd_request so that it gets the right snapc @@ -2870,7 +2866,7 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) size_t size; int ret; - stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0, + stat_request = rbd_obj_request_create(obj_request->object_name, OBJ_REQUEST_PAGES); if (!stat_request) return -ENOMEM; -- cgit v1.2.3-70-g09d2 From bc81207ea9fc01896cbc102f025f1ebb2c4bae1d Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 25 Jan 2017 18:16:23 +0100 Subject: rbd: factor out __rbd_osd_req_create() Factor OSD request allocation and initialization code out into __rbd_osd_req_create(). Signed-off-by: Ilya Dryomov Reviewed-by: Jason Dillaman --- drivers/block/rbd.c | 103 ++++++++++++++++++++-------------------------------- 1 file changed, 40 insertions(+), 63 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index c975d49e612c..b44b457b4dc7 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1952,6 +1952,38 @@ static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request) osd_req->r_data_offset = obj_request->offset; } +static struct ceph_osd_request * +__rbd_osd_req_create(struct rbd_device *rbd_dev, + struct ceph_snap_context *snapc, + int num_ops, unsigned int flags, + struct rbd_obj_request *obj_request) +{ + struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; + struct ceph_osd_request *req; + + req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO); + if (!req) + return NULL; + + req->r_flags = flags; + req->r_callback = rbd_osd_req_callback; + req->r_priv = obj_request; + + req->r_base_oloc.pool = rbd_dev->layout.pool_id; + if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, "%s", + obj_request->object_name)) + goto err_req; + + if (ceph_osdc_alloc_messages(req, GFP_NOIO)) + goto err_req; + + return req; + +err_req: + ceph_osdc_put_request(req); + return NULL; +} + /* * Create an osd request. A read request has one osd op (read). * A write request has either one (watch) or two (hint+write) osd ops. @@ -1965,8 +1997,6 @@ static struct ceph_osd_request *rbd_osd_req_create( struct rbd_obj_request *obj_request) { struct ceph_snap_context *snapc = NULL; - struct ceph_osd_client *osdc; - struct ceph_osd_request *osd_req; if (obj_request_img_data_test(obj_request) && (op_type == OBJ_OP_DISCARD || op_type == OBJ_OP_WRITE)) { @@ -1981,35 +2011,10 @@ static struct ceph_osd_request *rbd_osd_req_create( rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2)); - /* Allocate and initialize the request, for the num_ops ops */ - - osdc = &rbd_dev->rbd_client->client->osdc; - osd_req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, - GFP_NOIO); - if (!osd_req) - goto fail; - - if (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) - osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; - else - osd_req->r_flags = CEPH_OSD_FLAG_READ; - - osd_req->r_callback = rbd_osd_req_callback; - osd_req->r_priv = obj_request; - - osd_req->r_base_oloc.pool = rbd_dev->layout.pool_id; - if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s", - obj_request->object_name)) - goto fail; - - if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO)) - goto fail; - - return osd_req; - -fail: - ceph_osdc_put_request(osd_req); - return NULL; + return __rbd_osd_req_create(rbd_dev, snapc, num_ops, + (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ? + CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK : CEPH_OSD_FLAG_READ, + obj_request); } /* @@ -2022,10 +2027,6 @@ static struct ceph_osd_request * rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) { struct rbd_img_request *img_request; - struct ceph_snap_context *snapc; - struct rbd_device *rbd_dev; - struct ceph_osd_client *osdc; - struct ceph_osd_request *osd_req; int num_osd_ops = 3; rbd_assert(obj_request_img_data_test(obj_request)); @@ -2037,36 +2038,12 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) if (img_request_discard_test(img_request)) num_osd_ops = 2; - /* Allocate and initialize the request, for all the ops */ - - snapc = img_request->snapc; - rbd_dev = img_request->rbd_dev; - osdc = &rbd_dev->rbd_client->client->osdc; - osd_req = ceph_osdc_alloc_request(osdc, snapc, num_osd_ops, - false, GFP_NOIO); - if (!osd_req) - goto fail; - - osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; - osd_req->r_callback = rbd_osd_req_callback; - osd_req->r_priv = obj_request; - - osd_req->r_base_oloc.pool = rbd_dev->layout.pool_id; - if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s", - obj_request->object_name)) - goto fail; - - if (ceph_osdc_alloc_messages(osd_req, GFP_NOIO)) - goto fail; - - return osd_req; - -fail: - ceph_osdc_put_request(osd_req); - return NULL; + return __rbd_osd_req_create(img_request->rbd_dev, + img_request->snapc, num_osd_ops, + CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, + obj_request); } - static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req) { ceph_osdc_put_request(osd_req); -- cgit v1.2.3-70-g09d2 From 223768d02e2ca5c1747099e03cf949c17aebffd2 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 25 Jan 2017 18:16:23 +0100 Subject: rbd: RBD_V{1,2}_DATA_FORMAT macros ... and also fix up the comment -- format 1 data objects have always been 12 hex digits long. Signed-off-by: Ilya Dryomov Reviewed-by: Jason Dillaman --- drivers/block/rbd.c | 6 ++---- drivers/block/rbd_types.h | 7 ++++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index b44b457b4dc7..e10d0708a419 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1257,18 +1257,16 @@ static void rbd_segment_name_free(const char *name) static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) { + const char *name_format = rbd_dev->image_format == 1 ? + RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT; char *name; u64 segment; int ret; - char *name_format; name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO); if (!name) return NULL; segment = offset >> rbd_dev->header.obj_order; - name_format = "%s.%012llx"; - if (rbd_dev->image_format == 2) - name_format = "%s.%016llx"; ret = snprintf(name, CEPH_MAX_OID_NAME_LEN + 1, name_format, rbd_dev->header.object_prefix, segment); if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) { diff --git a/drivers/block/rbd_types.h b/drivers/block/rbd_types.h index be9c76d292f5..62ff50d3e7a6 100644 --- a/drivers/block/rbd_types.h +++ b/drivers/block/rbd_types.h @@ -25,8 +25,8 @@ */ #define RBD_HEADER_PREFIX "rbd_header." -#define RBD_DATA_PREFIX "rbd_data." #define RBD_ID_PREFIX "rbd_id." +#define RBD_V2_DATA_FORMAT "%s.%016llx" #define RBD_LOCK_NAME "rbd_lock" #define RBD_LOCK_TAG "internal" @@ -42,13 +42,14 @@ enum rbd_notify_op { /* * For format version 1, rbd image 'foo' consists of objects * foo.rbd - image metadata - * rb...00000000 - * rb...00000001 + * rb....000000000000 + * rb....000000000001 * ... - data * There is no notion of a persistent image id in rbd format 1. */ #define RBD_SUFFIX ".rbd" +#define RBD_V1_DATA_FORMAT "%s.%012llx" #define RBD_DIRECTORY "rbd_directory" #define RBD_INFO "rbd_info" -- cgit v1.2.3-70-g09d2 From a90bb0c1d47dc1195bc6ac753b34a52535089f80 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 25 Jan 2017 18:16:23 +0100 Subject: rbd: store and use obj_request->object_no object_no can be trivially formatted into an object name. We already store object names in OSD requests with special care to avoid dynamic allocations for short names. Storing a name in obj_request, obtained as below (!), is a waste and will be removed in the next commit. name = kmem_cache_alloc(rbd_segment_name_cache, ...); snprintf(name, ...); obj_request->object_name = kstrdup(name); kmem_cache_free(rbd_segment_name_cache, name); ... ceph_oid_aprintf(..., "%s", obj_request->object_name); Signed-off-by: Ilya Dryomov Reviewed-by: Jason Dillaman --- drivers/block/rbd.c | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index e10d0708a419..6c094b580eae 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -232,6 +232,7 @@ enum obj_req_flags { struct rbd_obj_request { const char *object_name; + u64 object_no; u64 offset; /* object start byte */ u64 length; /* bytes from offset */ unsigned long flags; @@ -1629,8 +1630,8 @@ static void rbd_obj_request_submit(struct rbd_obj_request *obj_request) { struct ceph_osd_request *osd_req = obj_request->osd_req; - dout("%s %p \"%s\" %llu~%llu osd_req %p\n", __func__, - obj_request, obj_request->object_name, obj_request->offset, + dout("%s %p object_no %016llx %llu~%llu osd_req %p\n", __func__, + obj_request, obj_request->object_no, obj_request->offset, obj_request->length, osd_req); if (obj_request_img_data_test(obj_request)) { WARN_ON(obj_request->callback != rbd_img_obj_callback); @@ -1925,8 +1926,8 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req) rbd_osd_call_callback(obj_request); break; default: - rbd_warn(NULL, "%s: unsupported op %hu", - obj_request->object_name, (unsigned short) opcode); + rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d", + obj_request->object_no, opcode); break; } @@ -1958,6 +1959,8 @@ __rbd_osd_req_create(struct rbd_device *rbd_dev, { struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc; struct ceph_osd_request *req; + const char *name_format = rbd_dev->image_format == 1 ? + RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT; req = ceph_osdc_alloc_request(osdc, snapc, num_ops, false, GFP_NOIO); if (!req) @@ -1968,8 +1971,8 @@ __rbd_osd_req_create(struct rbd_device *rbd_dev, req->r_priv = obj_request; req->r_base_oloc.pool = rbd_dev->layout.pool_id; - if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, "%s", - obj_request->object_name)) + if (ceph_oid_aprintf(&req->r_base_oid, GFP_NOIO, name_format, + rbd_dev->header.object_prefix, obj_request->object_no)) goto err_req; if (ceph_osdc_alloc_messages(req, GFP_NOIO)) @@ -2488,6 +2491,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, while (resid) { struct ceph_osd_request *osd_req; const char *object_name; + u64 object_no = img_offset >> rbd_dev->header.obj_order; u64 offset = rbd_segment_offset(rbd_dev, img_offset); u64 length = rbd_segment_length(rbd_dev, img_offset, resid); @@ -2500,6 +2504,7 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, if (!obj_request) goto out_unwind; + obj_request->object_no = object_no; obj_request->offset = offset; obj_request->length = length; @@ -2846,6 +2851,8 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) if (!stat_request) return -ENOMEM; + stat_request->object_no = obj_request->object_no; + stat_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1, stat_request); if (!stat_request->osd_req) { -- cgit v1.2.3-70-g09d2 From 6c696d8560e74cd42458931375875d62ae88c6ae Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 25 Jan 2017 18:16:23 +0100 Subject: rbd: kill obj_request->object_name and rbd_segment_name_cache Signed-off-by: Ilya Dryomov Reviewed-by: Jason Dillaman --- drivers/block/rbd.c | 79 ++++----------------------------------------- include/linux/ceph/osdmap.h | 7 ---- 2 files changed, 7 insertions(+), 79 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 6c094b580eae..b6e269b4d534 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -231,7 +231,6 @@ enum obj_req_flags { }; struct rbd_obj_request { - const char *object_name; u64 object_no; u64 offset; /* object start byte */ u64 length; /* bytes from offset */ @@ -440,7 +439,6 @@ static DEFINE_SPINLOCK(rbd_client_list_lock); static struct kmem_cache *rbd_img_request_cache; static struct kmem_cache *rbd_obj_request_cache; -static struct kmem_cache *rbd_segment_name_cache; static int rbd_major; static DEFINE_IDA(rbd_dev_id_ida); @@ -1249,37 +1247,6 @@ static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev) rbd_dev->mapping.features = 0; } -static void rbd_segment_name_free(const char *name) -{ - /* The explicit cast here is needed to drop the const qualifier */ - - kmem_cache_free(rbd_segment_name_cache, (void *)name); -} - -static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset) -{ - const char *name_format = rbd_dev->image_format == 1 ? - RBD_V1_DATA_FORMAT : RBD_V2_DATA_FORMAT; - char *name; - u64 segment; - int ret; - - name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO); - if (!name) - return NULL; - segment = offset >> rbd_dev->header.obj_order; - ret = snprintf(name, CEPH_MAX_OID_NAME_LEN + 1, name_format, - rbd_dev->header.object_prefix, segment); - if (ret < 0 || ret > CEPH_MAX_OID_NAME_LEN) { - pr_err("error formatting segment name for #%llu (%d)\n", - segment, ret); - rbd_segment_name_free(name); - name = NULL; - } - - return name; -} - static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset) { u64 segment_size = rbd_obj_bytes(&rbd_dev->header); @@ -2050,29 +2017,17 @@ static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req) ceph_osdc_put_request(osd_req); } -/* object_name is assumed to be a non-null pointer and NUL-terminated */ - -static struct rbd_obj_request *rbd_obj_request_create(const char *object_name, - enum obj_request_type type) +static struct rbd_obj_request * +rbd_obj_request_create(enum obj_request_type type) { struct rbd_obj_request *obj_request; - size_t size; - char *name; rbd_assert(obj_request_type_valid(type)); - size = strlen(object_name) + 1; - name = kmalloc(size, GFP_NOIO); - if (!name) - return NULL; - obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_NOIO); - if (!obj_request) { - kfree(name); + if (!obj_request) return NULL; - } - obj_request->object_name = memcpy(name, object_name, size); obj_request->which = BAD_WHICH; obj_request->type = type; INIT_LIST_HEAD(&obj_request->links); @@ -2114,8 +2069,6 @@ static void rbd_obj_request_destroy(struct kref *kref) break; } - kfree(obj_request->object_name); - obj_request->object_name = NULL; kmem_cache_free(rbd_obj_request_cache, obj_request); } @@ -2490,17 +2443,11 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request, while (resid) { struct ceph_osd_request *osd_req; - const char *object_name; u64 object_no = img_offset >> rbd_dev->header.obj_order; u64 offset = rbd_segment_offset(rbd_dev, img_offset); u64 length = rbd_segment_length(rbd_dev, img_offset, resid); - object_name = rbd_segment_name(rbd_dev, img_offset); - if (!object_name) - goto out_unwind; - obj_request = rbd_obj_request_create(object_name, type); - /* object request has its own copy of the object name */ - rbd_segment_name_free(object_name); + obj_request = rbd_obj_request_create(type); if (!obj_request) goto out_unwind; @@ -2846,8 +2793,7 @@ static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request) size_t size; int ret; - stat_request = rbd_obj_request_create(obj_request->object_name, - OBJ_REQUEST_PAGES); + stat_request = rbd_obj_request_create(OBJ_REQUEST_PAGES); if (!stat_request) return -ENOMEM; @@ -6389,27 +6335,16 @@ static int rbd_slab_init(void) if (!rbd_obj_request_cache) goto out_err; - rbd_assert(!rbd_segment_name_cache); - rbd_segment_name_cache = kmem_cache_create("rbd_segment_name", - CEPH_MAX_OID_NAME_LEN + 1, 1, 0, NULL); - if (rbd_segment_name_cache) - return 0; -out_err: - kmem_cache_destroy(rbd_obj_request_cache); - rbd_obj_request_cache = NULL; + return 0; +out_err: kmem_cache_destroy(rbd_img_request_cache); rbd_img_request_cache = NULL; - return -ENOMEM; } static void rbd_slab_exit(void) { - rbd_assert(rbd_segment_name_cache); - kmem_cache_destroy(rbd_segment_name_cache); - rbd_segment_name_cache = NULL; - rbd_assert(rbd_obj_request_cache); kmem_cache_destroy(rbd_obj_request_cache); rbd_obj_request_cache = NULL; diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h index 8cebdc4158c3..938656f70807 100644 --- a/include/linux/ceph/osdmap.h +++ b/include/linux/ceph/osdmap.h @@ -81,13 +81,6 @@ void ceph_oloc_copy(struct ceph_object_locator *dest, const struct ceph_object_locator *src); void ceph_oloc_destroy(struct ceph_object_locator *oloc); -/* - * Maximum supported by kernel client object name length - * - * (probably outdated: must be >= RBD_MAX_MD_NAME_LEN -- currently 100) - */ -#define CEPH_MAX_OID_NAME_LEN 100 - /* * 51-char inline_name is long enough for all cephfs and all but one * rbd requests: in ".rbd"/"rbd_id." can be -- cgit v1.2.3-70-g09d2 From b9942bc9d3c70d359496a1a0afc528d4ab89be46 Mon Sep 17 00:00:00 2001 From: Bhumika Goyal Date: Sat, 11 Feb 2017 12:14:38 +0530 Subject: rbd: constify device_type structure Declare device_type structure as const as it is only stored in the type field of a device structure. This field is of type const, so add const to the declaration of device_type structure. File size before: text data bss dec hex filename 61546 11610 208 73364 11e94 drivers/block/rbd.o File size after: text data bss dec hex filename 61610 11578 208 73396 11eb4 drivers/block/rbd.o Signed-off-by: Bhumika Goyal Signed-off-by: Ilya Dryomov --- drivers/block/rbd.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index b6e269b4d534..ffb8d3afc6b4 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -4630,7 +4630,7 @@ static const struct attribute_group *rbd_attr_groups[] = { static void rbd_dev_release(struct device *dev); -static struct device_type rbd_device_type = { +static const struct device_type rbd_device_type = { .name = "rbd", .groups = rbd_attr_groups, .release = rbd_dev_release, -- cgit v1.2.3-70-g09d2 From df963ea8a082d31521a120e8e31a29ad8a1dc215 Mon Sep 17 00:00:00 2001 From: Jeff Layton Date: Tue, 14 Feb 2017 10:09:40 -0500 Subject: ceph: remove req from unsafe list when unregistering it There's no reason a request should ever be on a s_unsafe list but not in the request tree. Cc: stable@vger.kernel.org Link: http://tracker.ceph.com/issues/18474 Signed-off-by: Jeff Layton Reviewed-by: Yan, Zheng Signed-off-by: Ilya Dryomov --- fs/ceph/mds_client.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 52521f339745..fdbc3544e41c 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -628,6 +628,9 @@ static void __unregister_request(struct ceph_mds_client *mdsc, { dout("__unregister_request %p tid %lld\n", req, req->r_tid); + /* Never leave an unregistered request on an unsafe list! */ + list_del_init(&req->r_unsafe_item); + if (req->r_tid == mdsc->oldest_tid) { struct rb_node *p = rb_next(&req->r_node); mdsc->oldest_tid = 0; @@ -1058,7 +1061,6 @@ static void cleanup_session_requests(struct ceph_mds_client *mdsc, while (!list_empty(&session->s_unsafe)) { req = list_first_entry(&session->s_unsafe, struct ceph_mds_request, r_unsafe_item); - list_del_init(&req->r_unsafe_item); pr_warn_ratelimited(" dropping unsafe request %llu\n", req->r_tid); __unregister_request(mdsc, req); @@ -2469,7 +2471,6 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) * useful we could do with a revised return value. */ dout("got safe reply %llu, mds%d\n", tid, mds); - list_del_init(&req->r_unsafe_item); /* last unsafe request during umount? */ if (mdsc->stopping && !__get_oldest_req(mdsc)) -- cgit v1.2.3-70-g09d2 From 98ba6af728de99953e25e550dbeca588c258ef03 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 16 Feb 2017 15:21:15 +0100 Subject: crush: do is_out test only if we do not collide The is_out() test may require an additional hashing operation, so we should skip it whenever possible. Reflects ceph.git commit db107cc7f15cf2481894add325dc93e33479f529. Signed-off-by: Ilya Dryomov --- net/ceph/crush/mapper.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index 2e31217ccae3..84d2de047865 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -552,14 +552,12 @@ static int crush_choose_firstn(const struct crush_map *map, } } - if (!reject) { + if (!reject && !collide) { /* out? */ if (itemtype == 0) reject = is_out(map, weight, weight_max, item, x); - else - reject = 0; } reject: -- cgit v1.2.3-70-g09d2 From 7ba0487cca6199cc19bffd34c00df4ea70ac032a Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 16 Feb 2017 15:38:05 +0100 Subject: crush: fix dprintk compilation The syntax error was not noticed because dprintk is a macro and the code is discarded by default. Reflects ceph.git commit f29b840c64a933b2cb13e3da6f3d785effd73a57. Signed-off-by: Ilya Dryomov --- net/ceph/crush/mapper.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c index 84d2de047865..b5cd8c21bfdf 100644 --- a/net/ceph/crush/mapper.c +++ b/net/ceph/crush/mapper.c @@ -105,7 +105,7 @@ static int bucket_perm_choose(const struct crush_bucket *bucket, /* calculate permutation up to pr */ for (i = 0; i < work->perm_n; i++) - dprintk(" perm_choose have %d: %d\n", i, bucket->perm[i]); + dprintk(" perm_choose have %d: %d\n", i, work->perm[i]); while (work->perm_n <= pr) { unsigned int p = work->perm_n; /* no point in swapping the final entry */ @@ -122,7 +122,7 @@ static int bucket_perm_choose(const struct crush_bucket *bucket, work->perm_n++; } for (i = 0; i < bucket->size; i++) - dprintk(" perm_choose %d: %d\n", i, bucket->perm[i]); + dprintk(" perm_choose %d: %d\n", i, work->perm[i]); s = work->perm[pr]; out: -- cgit v1.2.3-70-g09d2 From f107548039807eb890e65ce5cd29d6ac52562f09 Mon Sep 17 00:00:00 2001 From: Dan Carpenter Date: Thu, 23 Feb 2017 13:39:59 +0300 Subject: ceph: tidy some white space in get_nonsnap_parent() The white space here seems slightly messed up. Signed-off-by: Dan Carpenter Signed-off-by: Ilya Dryomov --- fs/ceph/mds_client.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index fdbc3544e41c..c681762d76e6 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -679,8 +679,9 @@ static void __unregister_request(struct ceph_mds_client *mdsc, * working with them. Once we hit a candidate dentry, we attempt to take a * reference to it, and return that as the result. */ -static struct inode *get_nonsnap_parent(struct dentry *dentry) { struct inode - *inode = NULL; +static struct inode *get_nonsnap_parent(struct dentry *dentry) +{ + struct inode *inode = NULL; while (dentry && !IS_ROOT(dentry)) { inode = d_inode_rcu(dentry); -- cgit v1.2.3-70-g09d2 From 55f2a04588c5881d90e22631b17a84cd25d17cc4 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 13 Feb 2017 14:44:19 +0100 Subject: ceph: remove special ack vs commit behavior - ask for a commit reply instead of an ack reply in __ceph_pool_perm_get() - don't ask for both ack and commit replies in ceph_sync_write() - since just only one reply is requested now, i_unsafe_writes list will always be empty -- kill ceph_sync_write_wait() and go back to a standard ->evict_inode() Signed-off-by: Ilya Dryomov Reviewed-by: Jeff Layton Reviewed-by: Sage Weil --- fs/ceph/addr.c | 2 +- fs/ceph/caps.c | 2 -- fs/ceph/file.c | 88 +-------------------------------------------------------- fs/ceph/inode.c | 9 ------ fs/ceph/super.c | 1 - fs/ceph/super.h | 4 +-- 6 files changed, 3 insertions(+), 103 deletions(-) diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 4547bbf80e4f..3f0474c55f05 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -1872,7 +1872,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, goto out_unlock; } - wr_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ACK; + wr_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL); ceph_oloc_copy(&wr_req->r_base_oloc, &rd_req->r_base_oloc); ceph_oid_copy(&wr_req->r_base_oid, &rd_req->r_base_oid); diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 3c2dfd72e5b2..cd966f276a8d 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -2091,8 +2091,6 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) dout("fsync %p%s\n", inode, datasync ? " datasync" : ""); - ceph_sync_write_wait(inode); - ret = filemap_write_and_wait_range(inode->i_mapping, start, end); if (ret < 0) goto out; diff --git a/fs/ceph/file.c b/fs/ceph/file.c index baad3b688ada..023cb7fd9b5f 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -795,89 +795,6 @@ out: kfree(aio_work); } -/* - * Write commit request unsafe callback, called to tell us when a - * request is unsafe (that is, in flight--has been handed to the - * messenger to send to its target osd). It is called again when - * we've received a response message indicating the request is - * "safe" (its CEPH_OSD_FLAG_ONDISK flag is set), or when a request - * is completed early (and unsuccessfully) due to a timeout or - * interrupt. - * - * This is used if we requested both an ACK and ONDISK commit reply - * from the OSD. - */ -static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe) -{ - struct ceph_inode_info *ci = ceph_inode(req->r_inode); - - dout("%s %p tid %llu %ssafe\n", __func__, req, req->r_tid, - unsafe ? "un" : ""); - if (unsafe) { - ceph_get_cap_refs(ci, CEPH_CAP_FILE_WR); - spin_lock(&ci->i_unsafe_lock); - list_add_tail(&req->r_unsafe_item, - &ci->i_unsafe_writes); - spin_unlock(&ci->i_unsafe_lock); - - complete_all(&req->r_completion); - } else { - spin_lock(&ci->i_unsafe_lock); - list_del_init(&req->r_unsafe_item); - spin_unlock(&ci->i_unsafe_lock); - ceph_put_cap_refs(ci, CEPH_CAP_FILE_WR); - } -} - -/* - * Wait on any unsafe replies for the given inode. First wait on the - * newest request, and make that the upper bound. Then, if there are - * more requests, keep waiting on the oldest as long as it is still older - * than the original request. - */ -void ceph_sync_write_wait(struct inode *inode) -{ - struct ceph_inode_info *ci = ceph_inode(inode); - struct list_head *head = &ci->i_unsafe_writes; - struct ceph_osd_request *req; - u64 last_tid; - - if (!S_ISREG(inode->i_mode)) - return; - - spin_lock(&ci->i_unsafe_lock); - if (list_empty(head)) - goto out; - - /* set upper bound as _last_ entry in chain */ - - req = list_last_entry(head, struct ceph_osd_request, - r_unsafe_item); - last_tid = req->r_tid; - - do { - ceph_osdc_get_request(req); - spin_unlock(&ci->i_unsafe_lock); - - dout("sync_write_wait on tid %llu (until %llu)\n", - req->r_tid, last_tid); - wait_for_completion(&req->r_done_completion); - ceph_osdc_put_request(req); - - spin_lock(&ci->i_unsafe_lock); - /* - * from here on look at first entry in chain, since we - * only want to wait for anything older than last_tid - */ - if (list_empty(head)) - break; - req = list_first_entry(head, struct ceph_osd_request, - r_unsafe_item); - } while (req->r_tid < last_tid); -out: - spin_unlock(&ci->i_unsafe_lock); -} - static ssize_t ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, struct ceph_snap_context *snapc, @@ -1119,8 +1036,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, flags = CEPH_OSD_FLAG_ORDERSNAP | CEPH_OSD_FLAG_ONDISK | - CEPH_OSD_FLAG_WRITE | - CEPH_OSD_FLAG_ACK; + CEPH_OSD_FLAG_WRITE; while ((len = iov_iter_count(from)) > 0) { size_t left; @@ -1166,8 +1082,6 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, goto out; } - /* get a second commit callback */ - req->r_unsafe_callback = ceph_sync_write_unsafe; req->r_inode = inode; osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 68f46132b157..fd8f771f99b7 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -499,7 +499,6 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ci->i_rdcache_gen = 0; ci->i_rdcache_revoking = 0; - INIT_LIST_HEAD(&ci->i_unsafe_writes); INIT_LIST_HEAD(&ci->i_unsafe_dirops); INIT_LIST_HEAD(&ci->i_unsafe_iops); spin_lock_init(&ci->i_unsafe_lock); @@ -583,14 +582,6 @@ int ceph_drop_inode(struct inode *inode) return 1; } -void ceph_evict_inode(struct inode *inode) -{ - /* wait unsafe sync writes */ - ceph_sync_write_wait(inode); - truncate_inode_pages_final(&inode->i_data); - clear_inode(inode); -} - static inline blkcnt_t calc_inode_blocks(u64 size) { return (size + (1<<9) - 1) >> 9; diff --git a/fs/ceph/super.c b/fs/ceph/super.c index a0a0b6d02f89..0ec8d0114e57 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -757,7 +757,6 @@ static const struct super_operations ceph_super_ops = { .destroy_inode = ceph_destroy_inode, .write_inode = ceph_write_inode, .drop_inode = ceph_drop_inode, - .evict_inode = ceph_evict_inode, .sync_fs = ceph_sync_fs, .put_super = ceph_put_super, .show_options = ceph_show_options, diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 950170136be9..e9410bcf4113 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -343,7 +343,6 @@ struct ceph_inode_info { u32 i_rdcache_gen; /* incremented each time we get FILE_CACHE. */ u32 i_rdcache_revoking; /* RDCACHE gen to async invalidate, if any */ - struct list_head i_unsafe_writes; /* uncommitted sync writes */ struct list_head i_unsafe_dirops; /* uncommitted mds dir ops */ struct list_head i_unsafe_iops; /* uncommitted mds inode ops */ spinlock_t i_unsafe_lock; @@ -753,7 +752,6 @@ extern const struct inode_operations ceph_file_iops; extern struct inode *ceph_alloc_inode(struct super_block *sb); extern void ceph_destroy_inode(struct inode *inode); extern int ceph_drop_inode(struct inode *inode); -extern void ceph_evict_inode(struct inode *inode); extern struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino); @@ -933,7 +931,7 @@ extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry, extern int ceph_release(struct inode *inode, struct file *filp); extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page, char *data, size_t len); -extern void ceph_sync_write_wait(struct inode *inode); + /* dir.c */ extern const struct file_operations ceph_dir_fops; extern const struct file_operations ceph_snapdir_fops; -- cgit v1.2.3-70-g09d2 From b18b9550e4059ceea0393c518eb323b95243f92f Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Sat, 11 Feb 2017 18:46:08 +0100 Subject: libceph: get rid of ack vs commit - CEPH_OSD_FLAG_ACK shouldn't be set anymore, so assert on it - remove support for handling ack replies (OSDs will send ack replies only if clients request them) - drop the "do lingering callbacks under osd->lock" logic from handle_reply() -- lreq->lock is sufficient in all three cases Signed-off-by: Ilya Dryomov Reviewed-by: Jeff Layton Reviewed-by: Sage Weil --- include/linux/ceph/osd_client.h | 6 +-- net/ceph/osd_client.c | 113 +++++++++------------------------------- 2 files changed, 27 insertions(+), 92 deletions(-) diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 03a6653d329a..2ea0c282f3dc 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -22,7 +22,6 @@ struct ceph_osd_client; * completion callback for async writepages */ typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *); -typedef void (*ceph_osdc_unsafe_callback_t)(struct ceph_osd_request *, bool); #define CEPH_HOMELESS_OSD -1 @@ -170,15 +169,12 @@ struct ceph_osd_request { unsigned int r_num_ops; int r_result; - bool r_got_reply; struct ceph_osd_client *r_osdc; struct kref r_kref; bool r_mempool; - struct completion r_completion; - struct completion r_done_completion; /* fsync waiter */ + struct completion r_completion; /* private to osd_client.c */ ceph_osdc_callback_t r_callback; - ceph_osdc_unsafe_callback_t r_unsafe_callback; struct list_head r_unsafe_item; struct inode *r_inode; /* for use by callbacks */ diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index ac4753421d0c..e1c6c2b4a295 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -460,7 +460,6 @@ static void request_init(struct ceph_osd_request *req) kref_init(&req->r_kref); init_completion(&req->r_completion); - init_completion(&req->r_done_completion); RB_CLEAR_NODE(&req->r_node); RB_CLEAR_NODE(&req->r_mc_node); INIT_LIST_HEAD(&req->r_unsafe_item); @@ -1637,7 +1636,7 @@ static void __submit_request(struct ceph_osd_request *req, bool wrlocked) bool need_send = false; bool promoted = false; - WARN_ON(req->r_tid || req->r_got_reply); + WARN_ON(req->r_tid); dout("%s req %p wrlocked %d\n", __func__, req, wrlocked); again: @@ -1705,17 +1704,10 @@ promote: static void account_request(struct ceph_osd_request *req) { - unsigned int mask = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK; + WARN_ON(req->r_flags & CEPH_OSD_FLAG_ACK); + WARN_ON(!(req->r_flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE))); - if (req->r_flags & CEPH_OSD_FLAG_READ) { - WARN_ON(req->r_flags & mask); - req->r_flags |= CEPH_OSD_FLAG_ACK; - } else if (req->r_flags & CEPH_OSD_FLAG_WRITE) - WARN_ON(!(req->r_flags & mask)); - else - WARN_ON(1); - - WARN_ON(req->r_unsafe_callback && (req->r_flags & mask) != mask); + req->r_flags |= CEPH_OSD_FLAG_ONDISK; atomic_inc(&req->r_osdc->num_requests); } @@ -1750,15 +1742,15 @@ static void finish_request(struct ceph_osd_request *req) static void __complete_request(struct ceph_osd_request *req) { - if (req->r_callback) + if (req->r_callback) { + dout("%s req %p tid %llu cb %pf result %d\n", __func__, req, + req->r_tid, req->r_callback, req->r_result); req->r_callback(req); - else - complete_all(&req->r_completion); + } } /* - * Note that this is open-coded in handle_reply(), which has to deal - * with ack vs commit, dup acks, etc. + * This is open-coded in handle_reply(). */ static void complete_request(struct ceph_osd_request *req, int err) { @@ -1767,7 +1759,7 @@ static void complete_request(struct ceph_osd_request *req, int err) req->r_result = err; finish_request(req); __complete_request(req); - complete_all(&req->r_done_completion); + complete_all(&req->r_completion); ceph_osdc_put_request(req); } @@ -1793,7 +1785,7 @@ static void cancel_request(struct ceph_osd_request *req) cancel_map_check(req); finish_request(req); - complete_all(&req->r_done_completion); + complete_all(&req->r_completion); ceph_osdc_put_request(req); } @@ -2170,7 +2162,6 @@ static void linger_commit_cb(struct ceph_osd_request *req) mutex_lock(&lreq->lock); dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq, lreq->linger_id, req->r_result); - WARN_ON(!__linger_registered(lreq)); linger_reg_commit_complete(lreq, req->r_result); lreq->committed = true; @@ -2786,31 +2777,8 @@ e_inval: } /* - * We are done with @req if - * - @m is a safe reply, or - * - @m is an unsafe reply and we didn't want a safe one - */ -static bool done_request(const struct ceph_osd_request *req, - const struct MOSDOpReply *m) -{ - return (m->result < 0 || - (m->flags & CEPH_OSD_FLAG_ONDISK) || - !(req->r_flags & CEPH_OSD_FLAG_ONDISK)); -} - -/* - * handle osd op reply. either call the callback if it is specified, - * or do the completion to wake up the waiting thread. - * - * ->r_unsafe_callback is set? yes no - * - * first reply is OK (needed r_cb/r_completion, r_cb/r_completion, - * any or needed/got safe) r_done_completion r_done_completion - * - * first reply is unsafe r_unsafe_cb(true) (nothing) - * - * when we get the safe reply r_unsafe_cb(false), r_cb/r_completion, - * r_done_completion r_done_completion + * Handle MOSDOpReply. Set ->r_result and call the callback if it is + * specified. */ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) { @@ -2819,7 +2787,6 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) struct MOSDOpReply m; u64 tid = le64_to_cpu(msg->hdr.tid); u32 data_len = 0; - bool already_acked; int ret; int i; @@ -2898,50 +2865,22 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) le32_to_cpu(msg->hdr.data_len), req->r_tid); goto fail_request; } - dout("%s req %p tid %llu acked %d result %d data_len %u\n", __func__, - req, req->r_tid, req->r_got_reply, m.result, data_len); - - already_acked = req->r_got_reply; - if (!already_acked) { - req->r_result = m.result ?: data_len; - req->r_replay_version = m.replay_version; /* struct */ - req->r_got_reply = true; - } else if (!(m.flags & CEPH_OSD_FLAG_ONDISK)) { - dout("req %p tid %llu dup ack\n", req, req->r_tid); - goto out_unlock_session; - } - - if (done_request(req, &m)) { - finish_request(req); - if (req->r_linger) { - WARN_ON(req->r_unsafe_callback); - dout("req %p tid %llu cb (locked)\n", req, req->r_tid); - __complete_request(req); - } - } + dout("%s req %p tid %llu result %d data_len %u\n", __func__, + req, req->r_tid, m.result, data_len); + /* + * Since we only ever request ONDISK, we should only ever get + * one (type of) reply back. + */ + WARN_ON(!(m.flags & CEPH_OSD_FLAG_ONDISK)); + req->r_result = m.result ?: data_len; + finish_request(req); mutex_unlock(&osd->lock); up_read(&osdc->lock); - if (done_request(req, &m)) { - if (already_acked && req->r_unsafe_callback) { - dout("req %p tid %llu safe-cb\n", req, req->r_tid); - req->r_unsafe_callback(req, false); - } else if (!req->r_linger) { - dout("req %p tid %llu cb\n", req, req->r_tid); - __complete_request(req); - } - complete_all(&req->r_done_completion); - ceph_osdc_put_request(req); - } else { - if (req->r_unsafe_callback) { - dout("req %p tid %llu unsafe-cb\n", req, req->r_tid); - req->r_unsafe_callback(req, true); - } else { - WARN_ON(1); - } - } - + __complete_request(req); + complete_all(&req->r_completion); + ceph_osdc_put_request(req); return; fail_request: @@ -3541,7 +3480,7 @@ again: up_read(&osdc->lock); dout("%s waiting on req %p tid %llu last_tid %llu\n", __func__, req, req->r_tid, last_tid); - wait_for_completion(&req->r_done_completion); + wait_for_completion(&req->r_completion); ceph_osdc_put_request(req); goto again; } -- cgit v1.2.3-70-g09d2 From 54ea0046b6fe36ec18e82d282a29a18da6cdea0f Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Sat, 11 Feb 2017 18:48:41 +0100 Subject: libceph, rbd, ceph: WRITE | ONDISK -> WRITE CEPH_OSD_FLAG_ONDISK is set in account_request(). Signed-off-by: Ilya Dryomov Reviewed-by: Jeff Layton Reviewed-by: Sage Weil --- drivers/block/rbd.c | 6 ++---- fs/ceph/addr.c | 14 +++++--------- fs/ceph/file.c | 15 ++++----------- net/ceph/cls_lock_client.c | 12 ++++++------ net/ceph/osd_client.c | 9 ++++----- 5 files changed, 21 insertions(+), 35 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index ffb8d3afc6b4..695ef552aaf0 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -1981,8 +1981,7 @@ static struct ceph_osd_request *rbd_osd_req_create( return __rbd_osd_req_create(rbd_dev, snapc, num_ops, (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ? - CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK : CEPH_OSD_FLAG_READ, - obj_request); + CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request); } /* @@ -2008,8 +2007,7 @@ rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request) return __rbd_osd_req_create(img_request->rbd_dev, img_request->snapc, num_osd_ops, - CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, - obj_request); + CEPH_OSD_FLAG_WRITE, obj_request); } static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req) diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 3f0474c55f05..6ecb920602ed 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -1018,8 +1018,7 @@ new_request: &ci->i_layout, vino, offset, &len, 0, num_ops, CEPH_OSD_OP_WRITE, - CEPH_OSD_FLAG_WRITE | - CEPH_OSD_FLAG_ONDISK, + CEPH_OSD_FLAG_WRITE, snapc, truncate_seq, truncate_size, false); if (IS_ERR(req)) { @@ -1029,8 +1028,7 @@ new_request: min(num_ops, CEPH_OSD_SLAB_OPS), CEPH_OSD_OP_WRITE, - CEPH_OSD_FLAG_WRITE | - CEPH_OSD_FLAG_ONDISK, + CEPH_OSD_FLAG_WRITE, snapc, truncate_seq, truncate_size, true); BUG_ON(IS_ERR(req)); @@ -1680,8 +1678,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page) req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, ceph_vino(inode), 0, &len, 0, 1, - CEPH_OSD_OP_CREATE, - CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, + CEPH_OSD_OP_CREATE, CEPH_OSD_FLAG_WRITE, NULL, 0, 0, false); if (IS_ERR(req)) { err = PTR_ERR(req); @@ -1698,8 +1695,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page) req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, ceph_vino(inode), 0, &len, 1, 3, - CEPH_OSD_OP_WRITE, - CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, + CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, NULL, ci->i_truncate_seq, ci->i_truncate_size, false); if (IS_ERR(req)) { @@ -1872,7 +1868,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, goto out_unlock; } - wr_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; + wr_req->r_flags = CEPH_OSD_FLAG_WRITE; osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL); ceph_oloc_copy(&wr_req->r_base_oloc, &rd_req->r_base_oloc); ceph_oid_copy(&wr_req->r_base_oid, &rd_req->r_base_oid); diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 023cb7fd9b5f..26cc95421cca 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -759,9 +759,7 @@ static void ceph_aio_retry_work(struct work_struct *work) goto out; } - req->r_flags = CEPH_OSD_FLAG_ORDERSNAP | - CEPH_OSD_FLAG_ONDISK | - CEPH_OSD_FLAG_WRITE; + req->r_flags = CEPH_OSD_FLAG_ORDERSNAP | CEPH_OSD_FLAG_WRITE; ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc); ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid); @@ -833,9 +831,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, if (ret2 < 0) dout("invalidate_inode_pages2_range returned %d\n", ret2); - flags = CEPH_OSD_FLAG_ORDERSNAP | - CEPH_OSD_FLAG_ONDISK | - CEPH_OSD_FLAG_WRITE; + flags = CEPH_OSD_FLAG_ORDERSNAP | CEPH_OSD_FLAG_WRITE; } else { flags = CEPH_OSD_FLAG_READ; } @@ -1034,9 +1030,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, if (ret < 0) dout("invalidate_inode_pages2_range returned %d\n", ret); - flags = CEPH_OSD_FLAG_ORDERSNAP | - CEPH_OSD_FLAG_ONDISK | - CEPH_OSD_FLAG_WRITE; + flags = CEPH_OSD_FLAG_ORDERSNAP | CEPH_OSD_FLAG_WRITE; while ((len = iov_iter_count(from)) > 0) { size_t left; @@ -1531,8 +1525,7 @@ static int ceph_zero_partial_object(struct inode *inode, ceph_vino(inode), offset, length, 0, 1, op, - CEPH_OSD_FLAG_WRITE | - CEPH_OSD_FLAG_ONDISK, + CEPH_OSD_FLAG_WRITE, NULL, 0, 0, false); if (IS_ERR(req)) { ret = PTR_ERR(req); diff --git a/net/ceph/cls_lock_client.c b/net/ceph/cls_lock_client.c index f13a1ea87459..b9233b990399 100644 --- a/net/ceph/cls_lock_client.c +++ b/net/ceph/cls_lock_client.c @@ -69,8 +69,8 @@ int ceph_cls_lock(struct ceph_osd_client *osdc, dout("%s lock_name %s type %d cookie %s tag %s desc %s flags 0x%x\n", __func__, lock_name, type, cookie, tag, desc, flags); ret = ceph_osdc_call(osdc, oid, oloc, "lock", "lock", - CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, - lock_op_page, lock_op_buf_size, NULL, NULL); + CEPH_OSD_FLAG_WRITE, lock_op_page, + lock_op_buf_size, NULL, NULL); dout("%s: status %d\n", __func__, ret); __free_page(lock_op_page); @@ -117,8 +117,8 @@ int ceph_cls_unlock(struct ceph_osd_client *osdc, dout("%s lock_name %s cookie %s\n", __func__, lock_name, cookie); ret = ceph_osdc_call(osdc, oid, oloc, "lock", "unlock", - CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, - unlock_op_page, unlock_op_buf_size, NULL, NULL); + CEPH_OSD_FLAG_WRITE, unlock_op_page, + unlock_op_buf_size, NULL, NULL); dout("%s: status %d\n", __func__, ret); __free_page(unlock_op_page); @@ -170,8 +170,8 @@ int ceph_cls_break_lock(struct ceph_osd_client *osdc, dout("%s lock_name %s cookie %s locker %s%llu\n", __func__, lock_name, cookie, ENTITY_NAME(*locker)); ret = ceph_osdc_call(osdc, oid, oloc, "lock", "break_lock", - CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK, - break_op_page, break_op_buf_size, NULL, NULL); + CEPH_OSD_FLAG_WRITE, break_op_page, + break_op_buf_size, NULL, NULL); dout("%s: status %d\n", __func__, ret); __free_page(break_op_page); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index e1c6c2b4a295..5c0938ddddf6 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -1704,7 +1704,7 @@ promote: static void account_request(struct ceph_osd_request *req) { - WARN_ON(req->r_flags & CEPH_OSD_FLAG_ACK); + WARN_ON(req->r_flags & (CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK)); WARN_ON(!(req->r_flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE))); req->r_flags |= CEPH_OSD_FLAG_ONDISK; @@ -3539,7 +3539,7 @@ ceph_osdc_watch(struct ceph_osd_client *osdc, ceph_oid_copy(&lreq->t.base_oid, oid); ceph_oloc_copy(&lreq->t.base_oloc, oloc); - lreq->t.flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; + lreq->t.flags = CEPH_OSD_FLAG_WRITE; lreq->mtime = CURRENT_TIME; lreq->reg_req = alloc_linger_request(lreq); @@ -3597,7 +3597,7 @@ int ceph_osdc_unwatch(struct ceph_osd_client *osdc, ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); - req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK; + req->r_flags = CEPH_OSD_FLAG_WRITE; req->r_mtime = CURRENT_TIME; osd_req_op_watch_init(req, 0, lreq->linger_id, CEPH_OSD_WATCH_OP_UNWATCH); @@ -4163,8 +4163,7 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino, int page_align = off & ~PAGE_MASK; req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1, - CEPH_OSD_OP_WRITE, - CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, + CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, snapc, truncate_seq, truncate_size, true); if (IS_ERR(req)) -- cgit v1.2.3-70-g09d2