From fe943d50425b6646606f8ef1ef8b8d4975fdbee2 Mon Sep 17 00:00:00 2001 From: Chengguang Xu Date: Thu, 12 Apr 2018 12:04:55 +0800 Subject: [PATCH 01/31] libceph, rbd: add error handling for osd_req_op_cls_init() Add proper error handling for osd_req_op_cls_init() to replace BUG_ON statement when failing from memory allocation. Signed-off-by: Chengguang Xu Reviewed-by: Ilya Dryomov Signed-off-by: Ilya Dryomov --- drivers/block/rbd.c | 9 ++++++--- include/linux/ceph/osd_client.h | 2 +- net/ceph/osd_client.c | 12 +++++++++--- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 33b36fea1d73..ac57bc0f6fca 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -2339,6 +2339,7 @@ static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes) static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes) { unsigned int num_osd_ops = obj_req->osd_req->r_num_ops; + int ret; dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes); rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT); @@ -2353,6 +2354,11 @@ static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes) if (!obj_req->osd_req) return -ENOMEM; + ret = osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd", + "copyup"); + if (ret) + return ret; + /* * Only send non-zero copyup data to save some I/O and network * bandwidth -- zero copyup data is equivalent to the object not @@ -2362,9 +2368,6 @@ static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes) dout("%s obj_req %p detected zeroes\n", __func__, obj_req); bytes = 0; } - - osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd", - "copyup"); osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0, obj_req->copyup_bvecs, obj_req->copyup_bvec_count, diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 96bb32285989..b73dd7ebe585 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -440,7 +440,7 @@ extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *, struct page **pages, u64 length, u32 alignment, bool pages_from_pool, bool own_pages); -extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req, +extern int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, const char *class, const char *method); extern int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which, diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index d2667e5dddc3..08b5fc1f90cc 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -767,7 +767,7 @@ void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req, } EXPORT_SYMBOL(osd_req_op_extent_dup_last); -void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, +int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, const char *class, const char *method) { struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, @@ -779,7 +779,9 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, BUG_ON(opcode != CEPH_OSD_OP_CALL); pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS); - BUG_ON(!pagelist); + if (!pagelist) + return -ENOMEM; + ceph_pagelist_init(pagelist); op->cls.class_name = class; @@ -799,6 +801,7 @@ void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which, osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist); op->indata_len = payload_len; + return 0; } EXPORT_SYMBOL(osd_req_op_cls_init); @@ -4928,7 +4931,10 @@ int ceph_osdc_call(struct ceph_osd_client *osdc, if (ret) goto out_put_req; - osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method); + ret = osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method); + if (ret) + goto out_put_req; + if (req_page) osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len, 0, false, false); From d2935d6f758fa72290ed30bd7482675e04715b6f Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 25 Apr 2018 12:17:13 +0200 Subject: [PATCH 02/31] libceph: get rid of more_kvec in try_write() All gotos to "more" are conditioned on con->state == OPEN, but the only thing "more" does is opening the socket if con->state == PREOPEN. Kill that label and rename "more_kvec" to "more". Signed-off-by: Ilya Dryomov Reviewed-by: Jason Dillaman --- net/ceph/messenger.c | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 3b3d33ea9ed8..53d145637ed5 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -2575,9 +2575,6 @@ static int try_write(struct ceph_connection *con) con->state != CON_STATE_OPEN) return 0; -more: - dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); - /* open the socket first? */ if (con->state == CON_STATE_PREOPEN) { BUG_ON(con->sock); @@ -2598,7 +2595,8 @@ more: } } -more_kvec: +more: + dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); BUG_ON(!con->sock); /* kvec data queued? */ @@ -2623,7 +2621,7 @@ more_kvec: ret = write_partial_message_data(con); if (ret == 1) - goto more_kvec; /* we need to send the footer, too! */ + goto more; /* we need to send the footer, too! */ if (ret == 0) goto out; if (ret < 0) { @@ -2659,8 +2657,6 @@ out: return ret; } - - /* * Read what we can from the socket. */ From e5c9388399e012ed919ad5dac818a11ed1391b18 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 27 Apr 2018 18:58:47 +0200 Subject: [PATCH 03/31] libceph: use MSG_TRUNC for discarding received bytes Avoid a copy into the "skip buffer". Signed-off-by: Ilya Dryomov --- net/ceph/messenger.c | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 53d145637ed5..c6413c360771 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -168,12 +168,6 @@ static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2; static struct lock_class_key socket_class; #endif -/* - * When skipping (ignoring) a block of input we read it into a "skip - * buffer," which is this many bytes in size. - */ -#define SKIP_BUF_SIZE 1024 - static void queue_con(struct ceph_connection *con); static void cancel_con(struct ceph_connection *con); static void ceph_con_workfn(struct work_struct *); @@ -520,12 +514,18 @@ static int ceph_tcp_connect(struct ceph_connection *con) return 0; } +/* + * If @buf is NULL, discard up to @len bytes. + */ static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len) { struct kvec iov = {buf, len}; struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; int r; + if (!buf) + msg.msg_flags |= MSG_TRUNC; + iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, len); r = sock_recvmsg(sock, &msg, msg.msg_flags); if (r == -EAGAIN) @@ -2717,16 +2717,11 @@ more: if (con->in_base_pos < 0) { /* * skipping + discarding content. - * - * FIXME: there must be a better way to do this! */ - static char buf[SKIP_BUF_SIZE]; - int skip = min((int) sizeof (buf), -con->in_base_pos); - - dout("skipping %d / %d bytes\n", skip, -con->in_base_pos); - ret = ceph_tcp_recvmsg(con->sock, buf, skip); + ret = ceph_tcp_recvmsg(con->sock, NULL, -con->in_base_pos); if (ret <= 0) goto out; + dout("skipped %d / %d bytes\n", ret, -con->in_base_pos); con->in_base_pos += ret; if (con->in_base_pos) goto more; From 4e9906e7985b962ca3b9f8ab66c0353e6e3ab45e Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Wed, 25 Apr 2018 17:14:05 +0800 Subject: [PATCH 04/31] ceph: use bit flags to define vxattr attributes Signed-off-by: "Yan, Zheng" Acked-by: Jeff Layton Signed-off-by: Ilya Dryomov --- fs/ceph/xattr.c | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index 315f7e63e7cc..f7dcafb7c5d4 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -50,10 +50,13 @@ struct ceph_vxattr { size_t name_size; /* strlen(name) + 1 (for '\0') */ size_t (*getxattr_cb)(struct ceph_inode_info *ci, char *val, size_t size); - bool readonly, hidden; bool (*exists_cb)(struct ceph_inode_info *ci); + unsigned int flags; }; +#define VXATTR_FLAG_READONLY (1<<0) +#define VXATTR_FLAG_HIDDEN (1<<1) + /* layouts */ static bool ceph_vxattrcb_layout_exists(struct ceph_inode_info *ci) @@ -267,27 +270,24 @@ static size_t ceph_vxattrcb_quota_max_files(struct ceph_inode_info *ci, .name = CEPH_XATTR_NAME(_type, _name), \ .name_size = sizeof (CEPH_XATTR_NAME(_type, _name)), \ .getxattr_cb = ceph_vxattrcb_ ## _type ## _ ## _name, \ - .readonly = true, \ - .hidden = false, \ - .exists_cb = NULL, \ + .exists_cb = NULL, \ + .flags = VXATTR_FLAG_READONLY, \ } #define XATTR_LAYOUT_FIELD(_type, _name, _field) \ { \ .name = CEPH_XATTR_NAME2(_type, _name, _field), \ .name_size = sizeof (CEPH_XATTR_NAME2(_type, _name, _field)), \ .getxattr_cb = ceph_vxattrcb_ ## _name ## _ ## _field, \ - .readonly = false, \ - .hidden = true, \ .exists_cb = ceph_vxattrcb_layout_exists, \ + .flags = VXATTR_FLAG_HIDDEN, \ } #define XATTR_QUOTA_FIELD(_type, _name) \ { \ .name = CEPH_XATTR_NAME(_type, _name), \ .name_size = sizeof(CEPH_XATTR_NAME(_type, _name)), \ .getxattr_cb = ceph_vxattrcb_ ## _type ## _ ## _name, \ - .readonly = false, \ - .hidden = true, \ .exists_cb = ceph_vxattrcb_quota_exists, \ + .flags = VXATTR_FLAG_HIDDEN, \ } static struct ceph_vxattr ceph_dir_vxattrs[] = { @@ -295,9 +295,8 @@ static struct ceph_vxattr ceph_dir_vxattrs[] = { .name = "ceph.dir.layout", .name_size = sizeof("ceph.dir.layout"), .getxattr_cb = ceph_vxattrcb_layout, - .readonly = false, - .hidden = true, .exists_cb = ceph_vxattrcb_layout_exists, + .flags = VXATTR_FLAG_HIDDEN, }, XATTR_LAYOUT_FIELD(dir, layout, stripe_unit), XATTR_LAYOUT_FIELD(dir, layout, stripe_count), @@ -316,9 +315,8 @@ static struct ceph_vxattr ceph_dir_vxattrs[] = { .name = "ceph.quota", .name_size = sizeof("ceph.quota"), .getxattr_cb = ceph_vxattrcb_quota, - .readonly = false, - .hidden = true, .exists_cb = ceph_vxattrcb_quota_exists, + .flags = VXATTR_FLAG_HIDDEN, }, XATTR_QUOTA_FIELD(quota, max_bytes), XATTR_QUOTA_FIELD(quota, max_files), @@ -333,9 +331,8 @@ static struct ceph_vxattr ceph_file_vxattrs[] = { .name = "ceph.file.layout", .name_size = sizeof("ceph.file.layout"), .getxattr_cb = ceph_vxattrcb_layout, - .readonly = false, - .hidden = true, .exists_cb = ceph_vxattrcb_layout_exists, + .flags = VXATTR_FLAG_HIDDEN, }, XATTR_LAYOUT_FIELD(file, layout, stripe_unit), XATTR_LAYOUT_FIELD(file, layout, stripe_count), @@ -374,9 +371,10 @@ static size_t __init vxattrs_name_size(struct ceph_vxattr *vxattrs) struct ceph_vxattr *vxattr; size_t size = 0; - for (vxattr = vxattrs; vxattr->name; vxattr++) - if (!vxattr->hidden) + for (vxattr = vxattrs; vxattr->name; vxattr++) { + if (!(vxattr->flags & VXATTR_FLAG_HIDDEN)) size += vxattr->name_size; + } return size; } @@ -919,7 +917,7 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size) err = namelen; if (vxattrs) { for (i = 0; vxattrs[i].name; i++) { - if (!vxattrs[i].hidden && + if (!(vxattrs[i].flags & VXATTR_FLAG_HIDDEN) && !(vxattrs[i].exists_cb && !vxattrs[i].exists_cb(ci))) { len = sprintf(names, "%s", vxattrs[i].name); @@ -1024,7 +1022,7 @@ int __ceph_setxattr(struct inode *inode, const char *name, vxattr = ceph_match_vxattr(inode, name); if (vxattr) { - if (vxattr->readonly) + if (vxattr->flags & VXATTR_FLAG_READONLY) return -EOPNOTSUPP; if (value && !strncmp(vxattr->name, "ceph.quota", 10)) check_realm = true; From 49a9f4f6714ec0ca2c6ada2ce764fbdd694962ee Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Wed, 25 Apr 2018 17:30:23 +0800 Subject: [PATCH 05/31] ceph: always get rstat from auth mds rstat is not tracked by capability. client can't know if rstat from non-auth mds is uptodate or not. Link: http://tracker.ceph.com/issues/23538 Signed-off-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- fs/ceph/caps.c | 2 ++ fs/ceph/inode.c | 21 +++++++++++++++------ fs/ceph/xattr.c | 30 ++++++++++++++++++------------ include/linux/ceph/ceph_fs.h | 1 + 4 files changed, 36 insertions(+), 18 deletions(-) diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 23dbfae16156..1b9f611c9dfe 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -69,6 +69,8 @@ static char *gcap_string(char *s, int c) *s++ = 'w'; if (c & CEPH_CAP_GBUFFER) *s++ = 'b'; + if (c & CEPH_CAP_GWREXTEND) + *s++ = 'a'; if (c & CEPH_CAP_GLAZYIO) *s++ = 'l'; return s; diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index ae056927080d..ec9441c2403b 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -854,6 +854,18 @@ static int fill_inode(struct inode *inode, struct page *locked_page, } } + /* layout and rstat are not tracked by capability, update them if + * the inode info is from auth mds */ + if (new_version || (info->cap.flags & CEPH_CAP_FLAG_AUTH)) { + if (S_ISDIR(inode->i_mode)) { + ci->i_dir_layout = iinfo->dir_layout; + ci->i_rbytes = le64_to_cpu(info->rbytes); + ci->i_rfiles = le64_to_cpu(info->rfiles); + ci->i_rsubdirs = le64_to_cpu(info->rsubdirs); + ceph_decode_timespec(&ci->i_rctime, &info->rctime); + } + } + /* xattrs */ /* note that if i_xattrs.len <= 4, i_xattrs.data will still be NULL. */ if ((ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL)) && @@ -919,14 +931,9 @@ static int fill_inode(struct inode *inode, struct page *locked_page, inode->i_op = &ceph_dir_iops; inode->i_fop = &ceph_dir_fops; - ci->i_dir_layout = iinfo->dir_layout; ci->i_files = le64_to_cpu(info->files); ci->i_subdirs = le64_to_cpu(info->subdirs); - ci->i_rbytes = le64_to_cpu(info->rbytes); - ci->i_rfiles = le64_to_cpu(info->rfiles); - ci->i_rsubdirs = le64_to_cpu(info->rsubdirs); - ceph_decode_timespec(&ci->i_rctime, &info->rctime); break; default: pr_err("fill_inode %llx.%llx BAD mode 0%o\n", @@ -2178,6 +2185,7 @@ int __ceph_do_getattr(struct inode *inode, struct page *locked_page, struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb); struct ceph_mds_client *mdsc = fsc->mdsc; struct ceph_mds_request *req; + int mode; int err; if (ceph_snap(inode) == CEPH_SNAPDIR) { @@ -2190,7 +2198,8 @@ int __ceph_do_getattr(struct inode *inode, struct page *locked_page, if (!force && ceph_caps_issued_mask(ceph_inode(inode), mask, 1)) return 0; - req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, USE_ANY_MDS); + mode = (mask & CEPH_STAT_RSTAT) ? USE_AUTH_MDS : USE_ANY_MDS; + req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, mode); if (IS_ERR(req)) return PTR_ERR(req); req->r_inode = inode; diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index f7dcafb7c5d4..5bc8edb4c2a6 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -56,6 +56,7 @@ struct ceph_vxattr { #define VXATTR_FLAG_READONLY (1<<0) #define VXATTR_FLAG_HIDDEN (1<<1) +#define VXATTR_FLAG_RSTAT (1<<2) /* layouts */ @@ -265,14 +266,16 @@ static size_t ceph_vxattrcb_quota_max_files(struct ceph_inode_info *ci, #define CEPH_XATTR_NAME2(_type, _name, _name2) \ XATTR_CEPH_PREFIX #_type "." #_name "." #_name2 -#define XATTR_NAME_CEPH(_type, _name) \ +#define XATTR_NAME_CEPH(_type, _name, _flags) \ { \ .name = CEPH_XATTR_NAME(_type, _name), \ .name_size = sizeof (CEPH_XATTR_NAME(_type, _name)), \ .getxattr_cb = ceph_vxattrcb_ ## _type ## _ ## _name, \ - .exists_cb = NULL, \ - .flags = VXATTR_FLAG_READONLY, \ + .exists_cb = NULL, \ + .flags = (VXATTR_FLAG_READONLY | _flags), \ } +#define XATTR_RSTAT_FIELD(_type, _name) \ + XATTR_NAME_CEPH(_type, _name, VXATTR_FLAG_RSTAT) #define XATTR_LAYOUT_FIELD(_type, _name, _field) \ { \ .name = CEPH_XATTR_NAME2(_type, _name, _field), \ @@ -303,14 +306,14 @@ static struct ceph_vxattr ceph_dir_vxattrs[] = { XATTR_LAYOUT_FIELD(dir, layout, object_size), XATTR_LAYOUT_FIELD(dir, layout, pool), XATTR_LAYOUT_FIELD(dir, layout, pool_namespace), - XATTR_NAME_CEPH(dir, entries), - XATTR_NAME_CEPH(dir, files), - XATTR_NAME_CEPH(dir, subdirs), - XATTR_NAME_CEPH(dir, rentries), - XATTR_NAME_CEPH(dir, rfiles), - XATTR_NAME_CEPH(dir, rsubdirs), - XATTR_NAME_CEPH(dir, rbytes), - XATTR_NAME_CEPH(dir, rctime), + XATTR_NAME_CEPH(dir, entries, 0), + XATTR_NAME_CEPH(dir, files, 0), + XATTR_NAME_CEPH(dir, subdirs, 0), + XATTR_RSTAT_FIELD(dir, rentries), + XATTR_RSTAT_FIELD(dir, rfiles), + XATTR_RSTAT_FIELD(dir, rsubdirs), + XATTR_RSTAT_FIELD(dir, rbytes), + XATTR_RSTAT_FIELD(dir, rctime), { .name = "ceph.quota", .name_size = sizeof("ceph.quota"), @@ -807,7 +810,10 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value, /* let's see if a virtual xattr was requested */ vxattr = ceph_match_vxattr(inode, name); if (vxattr) { - err = ceph_do_getattr(inode, 0, true); + int mask = 0; + if (vxattr->flags & VXATTR_FLAG_RSTAT) + mask |= CEPH_STAT_RSTAT; + err = ceph_do_getattr(inode, mask, true); if (err) return err; err = -ENODATA; diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h index 7ecfc88314d8..4903deb0777a 100644 --- a/include/linux/ceph/ceph_fs.h +++ b/include/linux/ceph/ceph_fs.h @@ -628,6 +628,7 @@ int ceph_flags_to_mode(int flags); CEPH_CAP_XATTR_SHARED) #define CEPH_STAT_CAP_INLINE_DATA (CEPH_CAP_FILE_SHARED | \ CEPH_CAP_FILE_RD) +#define CEPH_STAT_RSTAT CEPH_CAP_FILE_WREXTEND #define CEPH_CAP_ANY_SHARED (CEPH_CAP_AUTH_SHARED | \ CEPH_CAP_LINK_SHARED | \ From 2af54a72b585a7cc46fb28845a121635c2540563 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 27 Apr 2018 11:14:39 +0800 Subject: [PATCH 06/31] ceph: update i_files/i_subdirs only when Fs cap is issued In MDS, file/subdir counts of a directory inode are protected by filelock. In request reply without Fs cap, nfiles/nsubdirs can be stale. Signed-off-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- fs/ceph/inode.c | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index ec9441c2403b..4712c943cdf7 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -739,7 +739,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page, struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; struct ceph_mds_reply_inode *info = iinfo->in; struct ceph_inode_info *ci = ceph_inode(inode); - int issued = 0, implemented, new_issued; + int issued, new_issued, info_caps; struct timespec mtime, atime, ctime; struct ceph_buffer *xattr_blob = NULL; struct ceph_string *pool_ns = NULL; @@ -754,8 +754,10 @@ static int fill_inode(struct inode *inode, struct page *locked_page, inode, ceph_vinop(inode), le64_to_cpu(info->version), ci->i_version); + info_caps = le32_to_cpu(info->cap.caps); + /* prealloc new cap struct */ - if (info->cap.caps && ceph_snap(inode) == CEPH_NOSNAP) + if (info_caps && ceph_snap(inode) == CEPH_NOSNAP) new_cap = ceph_get_cap(mdsc, caps_reservation); /* @@ -792,9 +794,9 @@ static int fill_inode(struct inode *inode, struct page *locked_page, le64_to_cpu(info->version) > (ci->i_version & ~1))) new_version = true; - issued = __ceph_caps_issued(ci, &implemented); - issued |= implemented | __ceph_caps_dirty(ci); - new_issued = ~issued & le32_to_cpu(info->cap.caps); + __ceph_caps_issued(ci, &issued); + issued |= __ceph_caps_dirty(ci); + new_issued = ~issued & info_caps; /* update inode */ inode->i_rdev = le32_to_cpu(info->rdev); @@ -826,6 +828,11 @@ static int fill_inode(struct inode *inode, struct page *locked_page, &ctime, &mtime, &atime); } + if (new_version || (info_caps & CEPH_CAP_FILE_SHARED)) { + ci->i_files = le64_to_cpu(info->files); + ci->i_subdirs = le64_to_cpu(info->subdirs); + } + if (new_version || (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) { s64 old_pool = ci->i_layout.pool_id; @@ -930,10 +937,6 @@ static int fill_inode(struct inode *inode, struct page *locked_page, case S_IFDIR: inode->i_op = &ceph_dir_iops; inode->i_fop = &ceph_dir_fops; - - - ci->i_files = le64_to_cpu(info->files); - ci->i_subdirs = le64_to_cpu(info->subdirs); break; default: pr_err("fill_inode %llx.%llx BAD mode 0%o\n", @@ -941,12 +944,11 @@ static int fill_inode(struct inode *inode, struct page *locked_page, } /* were we issued a capability? */ - if (info->cap.caps) { + if (info_caps) { if (ceph_snap(inode) == CEPH_NOSNAP) { - unsigned caps = le32_to_cpu(info->cap.caps); ceph_add_cap(inode, session, le64_to_cpu(info->cap.cap_id), - cap_fmode, caps, + cap_fmode, info_caps, le32_to_cpu(info->cap.wanted), le32_to_cpu(info->cap.seq), le32_to_cpu(info->cap.mseq), @@ -956,7 +958,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page, /* set dir completion flag? */ if (S_ISDIR(inode->i_mode) && ci->i_files == 0 && ci->i_subdirs == 0 && - (caps & CEPH_CAP_FILE_SHARED) && + (info_caps & CEPH_CAP_FILE_SHARED) && (issued & CEPH_CAP_FILE_EXCL) == 0 && !__ceph_dir_is_complete(ci)) { dout(" marking %p complete (empty)\n", inode); @@ -969,8 +971,8 @@ static int fill_inode(struct inode *inode, struct page *locked_page, wake = true; } else { dout(" %p got snap_caps %s\n", inode, - ceph_cap_string(le32_to_cpu(info->cap.caps))); - ci->i_snap_caps |= le32_to_cpu(info->cap.caps); + ceph_cap_string(info_caps)); + ci->i_snap_caps |= info_caps; if (cap_fmode >= 0) __ceph_get_fmode(ci, cap_fmode); } @@ -985,8 +987,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page, int cache_caps = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; ci->i_inline_version = iinfo->inline_version; if (ci->i_inline_version != CEPH_INLINE_NONE && - (locked_page || - (le32_to_cpu(info->cap.caps) & cache_caps))) + (locked_page || (info_caps & cache_caps))) fill_inline = true; } From a1c6b8358171c16db0f858a7fbb28aa574b07c09 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 27 Apr 2018 10:29:44 +0800 Subject: [PATCH 07/31] ceph: define argument structure for handle_cap_grant The data structure includes the versioned feilds of cap message. Signed-off-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- fs/ceph/caps.c | 115 ++++++++++++++++++++++++++----------------------- 1 file changed, 61 insertions(+), 54 deletions(-) diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 1b9f611c9dfe..de7b7a34195e 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -3024,24 +3024,32 @@ static void invalidate_aliases(struct inode *inode) dput(prev); } +struct cap_extra_info { + struct ceph_string *pool_ns; + /* inline data */ + u64 inline_version; + void *inline_data; + u32 inline_len; + /* currently issued */ + int issued; +}; + /* * Handle a cap GRANT message from the MDS. (Note that a GRANT may * actually be a revocation if it specifies a smaller cap set.) * * caller holds s_mutex and i_ceph_lock, we drop both. */ -static void handle_cap_grant(struct ceph_mds_client *mdsc, - struct inode *inode, struct ceph_mds_caps *grant, - struct ceph_string **pns, u64 inline_version, - void *inline_data, u32 inline_len, - struct ceph_buffer *xattr_buf, +static void handle_cap_grant(struct inode *inode, struct ceph_mds_session *session, - struct ceph_cap *cap, int issued) + struct ceph_cap *cap, + struct ceph_mds_caps *grant, + struct ceph_buffer *xattr_buf, + struct cap_extra_info *extra_info) __releases(ci->i_ceph_lock) - __releases(mdsc->snap_rwsem) + __releases(session->s_mdsc->snap_rwsem) { struct ceph_inode_info *ci = ceph_inode(inode); - int mds = session->s_mds; int seq = le32_to_cpu(grant->seq); int newcaps = le32_to_cpu(grant->caps); int used, wanted, dirty; @@ -3057,7 +3065,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, bool fill_inline = false; dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n", - inode, cap, mds, seq, ceph_cap_string(newcaps)); + inode, cap, session->s_mds, seq, ceph_cap_string(newcaps)); dout(" size %llu max_size %llu, i_size %llu\n", size, max_size, inode->i_size); @@ -3103,7 +3111,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, __check_cap_issue(ci, cap, newcaps); if ((newcaps & CEPH_CAP_AUTH_SHARED) && - (issued & CEPH_CAP_AUTH_EXCL) == 0) { + (extra_info->issued & CEPH_CAP_AUTH_EXCL) == 0) { inode->i_mode = le32_to_cpu(grant->mode); inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid)); inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid)); @@ -3113,14 +3121,15 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, } if ((newcaps & CEPH_CAP_AUTH_SHARED) && - (issued & CEPH_CAP_LINK_EXCL) == 0) { + (extra_info->issued & CEPH_CAP_LINK_EXCL) == 0) { set_nlink(inode, le32_to_cpu(grant->nlink)); if (inode->i_nlink == 0 && (newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL))) deleted_inode = true; } - if ((issued & CEPH_CAP_XATTR_EXCL) == 0 && grant->xattr_len) { + if ((extra_info->issued & CEPH_CAP_XATTR_EXCL) == 0 && + grant->xattr_len) { int len = le32_to_cpu(grant->xattr_len); u64 version = le64_to_cpu(grant->xattr_version); @@ -3140,7 +3149,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, ceph_decode_timespec(&mtime, &grant->mtime); ceph_decode_timespec(&atime, &grant->atime); ceph_decode_timespec(&ctime, &grant->ctime); - ceph_fill_file_time(inode, issued, + ceph_fill_file_time(inode, extra_info->issued, le32_to_cpu(grant->time_warp_seq), &ctime, &mtime, &atime); } @@ -3153,15 +3162,16 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, ceph_file_layout_from_legacy(&ci->i_layout, &grant->layout); old_ns = rcu_dereference_protected(ci->i_layout.pool_ns, lockdep_is_held(&ci->i_ceph_lock)); - rcu_assign_pointer(ci->i_layout.pool_ns, *pns); + rcu_assign_pointer(ci->i_layout.pool_ns, extra_info->pool_ns); - if (ci->i_layout.pool_id != old_pool || *pns != old_ns) + if (ci->i_layout.pool_id != old_pool || + extra_info->pool_ns != old_ns) ci->i_ceph_flags &= ~CEPH_I_POOL_PERM; - *pns = old_ns; + extra_info->pool_ns = old_ns; /* size/truncate_seq? */ - queue_trunc = ceph_fill_file_size(inode, issued, + queue_trunc = ceph_fill_file_size(inode, extra_info->issued, le32_to_cpu(grant->truncate_seq), le64_to_cpu(grant->truncate_size), size); @@ -3240,24 +3250,26 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, } BUG_ON(cap->issued & ~cap->implemented); - if (inline_version > 0 && inline_version >= ci->i_inline_version) { - ci->i_inline_version = inline_version; + if (extra_info->inline_version > 0 && + extra_info->inline_version >= ci->i_inline_version) { + ci->i_inline_version = extra_info->inline_version; if (ci->i_inline_version != CEPH_INLINE_NONE && (newcaps & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO))) fill_inline = true; } if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) { - if (newcaps & ~issued) + if (newcaps & ~extra_info->issued) wake = true; - kick_flushing_inode_caps(mdsc, session, inode); - up_read(&mdsc->snap_rwsem); + kick_flushing_inode_caps(session->s_mdsc, session, inode); + up_read(&session->s_mdsc->snap_rwsem); } else { spin_unlock(&ci->i_ceph_lock); } if (fill_inline) - ceph_fill_inline_data(inode, NULL, inline_data, inline_len); + ceph_fill_inline_data(inode, NULL, extra_info->inline_data, + extra_info->inline_len); if (queue_trunc) ceph_queue_vmtruncate(inode); @@ -3722,31 +3734,24 @@ void ceph_handle_caps(struct ceph_mds_session *session, struct ceph_msg *msg) { struct ceph_mds_client *mdsc = session->s_mdsc; - struct super_block *sb = mdsc->fsc->sb; struct inode *inode; struct ceph_inode_info *ci; struct ceph_cap *cap; struct ceph_mds_caps *h; struct ceph_mds_cap_peer *peer = NULL; struct ceph_snap_realm *realm = NULL; - struct ceph_string *pool_ns = NULL; - int mds = session->s_mds; - int op, issued; + int op; u32 seq, mseq; struct ceph_vino vino; - u64 tid; - u64 inline_version = 0; - void *inline_data = NULL; - u32 inline_len = 0; void *snaptrace; size_t snaptrace_len; void *p, *end; + struct cap_extra_info extra_info = {}; - dout("handle_caps from mds%d\n", mds); + dout("handle_caps from mds%d\n", session->s_mds); /* decode */ end = msg->front.iov_base + msg->front.iov_len; - tid = le64_to_cpu(msg->hdr.tid); if (msg->front.iov_len < sizeof(*h)) goto bad; h = msg->front.iov_base; @@ -3781,12 +3786,12 @@ void ceph_handle_caps(struct ceph_mds_session *session, } if (le16_to_cpu(msg->hdr.version) >= 4) { - ceph_decode_64_safe(&p, end, inline_version, bad); - ceph_decode_32_safe(&p, end, inline_len, bad); - if (p + inline_len > end) + ceph_decode_64_safe(&p, end, extra_info.inline_version, bad); + ceph_decode_32_safe(&p, end, extra_info.inline_len, bad); + if (p + extra_info.inline_len > end) goto bad; - inline_data = p; - p += inline_len; + extra_info.inline_data = p; + p += extra_info.inline_len; } if (le16_to_cpu(msg->hdr.version) >= 5) { @@ -3811,13 +3816,14 @@ void ceph_handle_caps(struct ceph_mds_session *session, ceph_decode_32_safe(&p, end, pool_ns_len, bad); if (pool_ns_len > 0) { ceph_decode_need(&p, end, pool_ns_len, bad); - pool_ns = ceph_find_or_create_string(p, pool_ns_len); + extra_info.pool_ns = + ceph_find_or_create_string(p, pool_ns_len); p += pool_ns_len; } } /* lookup ino */ - inode = ceph_find_inode(sb, vino); + inode = ceph_find_inode(mdsc->fsc->sb, vino); ci = ceph_inode(inode); dout(" op %s ino %llx.%llx inode %p\n", ceph_cap_op_name(op), vino.ino, vino.snap, inode); @@ -3850,7 +3856,8 @@ void ceph_handle_caps(struct ceph_mds_session *session, /* these will work even if we don't have a cap yet */ switch (op) { case CEPH_CAP_OP_FLUSHSNAP_ACK: - handle_cap_flushsnap_ack(inode, tid, h, session); + handle_cap_flushsnap_ack(inode, le64_to_cpu(msg->hdr.tid), + h, session); goto done; case CEPH_CAP_OP_EXPORT: @@ -3869,10 +3876,9 @@ void ceph_handle_caps(struct ceph_mds_session *session, down_read(&mdsc->snap_rwsem); } handle_cap_import(mdsc, inode, h, peer, session, - &cap, &issued); - handle_cap_grant(mdsc, inode, h, &pool_ns, - inline_version, inline_data, inline_len, - msg->middle, session, cap, issued); + &cap, &extra_info.issued); + handle_cap_grant(inode, session, cap, + h, msg->middle, &extra_info); if (realm) ceph_put_snap_realm(mdsc, realm); goto done_unlocked; @@ -3880,10 +3886,11 @@ void ceph_handle_caps(struct ceph_mds_session *session, /* the rest require a cap */ spin_lock(&ci->i_ceph_lock); - cap = __get_cap_for_mds(ceph_inode(inode), mds); + cap = __get_cap_for_mds(ceph_inode(inode), session->s_mds); if (!cap) { dout(" no cap on %p ino %llx.%llx from mds%d\n", - inode, ceph_ino(inode), ceph_snap(inode), mds); + inode, ceph_ino(inode), ceph_snap(inode), + session->s_mds); spin_unlock(&ci->i_ceph_lock); goto flush_cap_releases; } @@ -3892,15 +3899,15 @@ void ceph_handle_caps(struct ceph_mds_session *session, switch (op) { case CEPH_CAP_OP_REVOKE: case CEPH_CAP_OP_GRANT: - __ceph_caps_issued(ci, &issued); - issued |= __ceph_caps_dirty(ci); - handle_cap_grant(mdsc, inode, h, &pool_ns, - inline_version, inline_data, inline_len, - msg->middle, session, cap, issued); + __ceph_caps_issued(ci, &extra_info.issued); + extra_info.issued |= __ceph_caps_dirty(ci); + handle_cap_grant(inode, session, cap, + h, msg->middle, &extra_info); goto done_unlocked; case CEPH_CAP_OP_FLUSH_ACK: - handle_cap_flush_ack(inode, tid, h, session, cap); + handle_cap_flush_ack(inode, le64_to_cpu(msg->hdr.tid), + h, session, cap); break; case CEPH_CAP_OP_TRUNC: @@ -3927,7 +3934,7 @@ done: mutex_unlock(&session->s_mutex); done_unlocked: iput(inode); - ceph_put_string(pool_ns); + ceph_put_string(extra_info.pool_ns); return; bad: From 4985d6f9e50fa48e35a9dbe1726434f987305cae Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 27 Apr 2018 11:11:31 +0800 Subject: [PATCH 08/31] ceph: handle the new nfiles/nsubdirs fields in cap message Without these new fields, stale st_size is returned in following case. 1. MDS modifies a directory 2. MDS issues CEPH_CAP_ANY_SHARED to client 3. The client satifies stat(2) by its cached metadata. set st_size to "i_files + i_subdirs". Link: http://tracker.ceph.com/issues/23855 Signed-off-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- fs/ceph/caps.c | 39 ++++++++++++++++++++++++++++++++++----- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index de7b7a34195e..477b822e6333 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -3030,6 +3030,10 @@ struct cap_extra_info { u64 inline_version; void *inline_data; u32 inline_len; + /* dirstat */ + bool dirstat_valid; + u64 nfiles; + u64 nsubdirs; /* currently issued */ int issued; }; @@ -3154,6 +3158,11 @@ static void handle_cap_grant(struct inode *inode, &ctime, &mtime, &atime); } + if ((newcaps & CEPH_CAP_FILE_SHARED) && extra_info->dirstat_valid) { + ci->i_files = extra_info->nfiles; + ci->i_subdirs = extra_info->nsubdirs; + } + if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) { /* file layout may have changed */ s64 old_pool = ci->i_layout.pool_id; @@ -3741,6 +3750,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, struct ceph_mds_cap_peer *peer = NULL; struct ceph_snap_realm *realm = NULL; int op; + int msg_version = le16_to_cpu(msg->hdr.version); u32 seq, mseq; struct ceph_vino vino; void *snaptrace; @@ -3765,7 +3775,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, snaptrace_len = le32_to_cpu(h->snap_trace_len); p = snaptrace + snaptrace_len; - if (le16_to_cpu(msg->hdr.version) >= 2) { + if (msg_version >= 2) { u32 flock_len; ceph_decode_32_safe(&p, end, flock_len, bad); if (p + flock_len > end) @@ -3773,7 +3783,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, p += flock_len; } - if (le16_to_cpu(msg->hdr.version) >= 3) { + if (msg_version >= 3) { if (op == CEPH_CAP_OP_IMPORT) { if (p + sizeof(*peer) > end) goto bad; @@ -3785,7 +3795,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, } } - if (le16_to_cpu(msg->hdr.version) >= 4) { + if (msg_version >= 4) { ceph_decode_64_safe(&p, end, extra_info.inline_version, bad); ceph_decode_32_safe(&p, end, extra_info.inline_len, bad); if (p + extra_info.inline_len > end) @@ -3794,7 +3804,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, p += extra_info.inline_len; } - if (le16_to_cpu(msg->hdr.version) >= 5) { + if (msg_version >= 5) { struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc; u32 epoch_barrier; @@ -3802,7 +3812,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, ceph_osdc_update_epoch_barrier(osdc, epoch_barrier); } - if (le16_to_cpu(msg->hdr.version) >= 8) { + if (msg_version >= 8) { u64 flush_tid; u32 caller_uid, caller_gid; u32 pool_ns_len; @@ -3822,6 +3832,25 @@ void ceph_handle_caps(struct ceph_mds_session *session, } } + if (msg_version >= 11) { + struct ceph_timespec *btime; + u64 change_attr; + u32 flags; + + /* version >= 9 */ + if (p + sizeof(*btime) > end) + goto bad; + btime = p; + p += sizeof(*btime); + ceph_decode_64_safe(&p, end, change_attr, bad); + /* version >= 10 */ + ceph_decode_32_safe(&p, end, flags, bad); + /* version >= 11 */ + extra_info.dirstat_valid = true; + ceph_decode_64_safe(&p, end, extra_info.nfiles, bad); + ceph_decode_64_safe(&p, end, extra_info.nsubdirs, bad); + } + /* lookup ino */ inode = ceph_find_inode(mdsc->fsc->sb, vino); ci = ceph_inode(inode); From 6dd4940ba5f96270ad428351cd88daf9ab871a97 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 3 May 2018 16:26:55 +0200 Subject: [PATCH 09/31] ceph: show wsize only if non-default This is how it was before commit 95cca2b44e54 ("ceph: limit osd write size") went in. Signed-off-by: Ilya Dryomov --- fs/ceph/super.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fs/ceph/super.c b/fs/ceph/super.c index b33082e6878f..3c1155803444 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -551,7 +551,7 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root) if (fsopt->mds_namespace) seq_show_option(m, "mds_namespace", fsopt->mds_namespace); - if (fsopt->wsize) + if (fsopt->wsize != CEPH_MAX_WRITE_SIZE) seq_printf(m, ",wsize=%d", fsopt->wsize); if (fsopt->rsize != CEPH_MAX_READ_SIZE) seq_printf(m, ",rsize=%d", fsopt->rsize); From 597817ddbbf27af5986d1f3df20390b2738411c6 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Tue, 15 May 2018 11:30:43 +0800 Subject: [PATCH 10/31] ceph: support file lock on directory Link: http://tracker.ceph.com/issues/24028 Signed-off-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- fs/ceph/dir.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 1a78dd6f8bf2..036ac0f3a393 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -1486,6 +1486,8 @@ const struct file_operations ceph_dir_fops = { .release = ceph_release, .unlocked_ioctl = ceph_ioctl, .fsync = ceph_fsync, + .lock = ceph_lock, + .flock = ceph_flock, }; const struct file_operations ceph_snapdir_fops = { From 8c6286f1c69743ebdb2ee15f9165f9c4d44eef49 Mon Sep 17 00:00:00 2001 From: Luis Henriques Date: Mon, 21 May 2018 10:27:29 +0100 Subject: [PATCH 11/31] ceph: fix st_nlink stat for directories Currently, calling stat on a cephfs directory returns 1 for st_nlink. This behaviour has recently changed in the fuse client, as some applications seem to expect this value to be either 0 (if it's unlinked) or 2 + number of subdirectories. This behaviour was changed in the fuse client with commit 67c7e4619188 ("client: use common interp of st_nlink for dirs"). This patch modifies the kernel client to have a similar behaviour. Link: https://tracker.ceph.com/issues/23873 Signed-off-by: Luis Henriques Reviewed-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- fs/ceph/inode.c | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 4712c943cdf7..4aeccb13437b 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -2271,6 +2271,14 @@ int ceph_getattr(const struct path *path, struct kstat *stat, stat->size = ci->i_files + ci->i_subdirs; stat->blocks = 0; stat->blksize = 65536; + /* + * Some applications rely on the number of st_nlink + * value on directories to be either 0 (if unlinked) + * or 2 + number of subdirectories. + */ + if (stat->nlink == 1) + /* '.' + '..' + subdirs */ + stat->nlink = 1 + 1 + ci->i_subdirs; } } return err; From 66850df58529eefc61cb96b895991508547503bf Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 15 May 2018 15:47:58 +0200 Subject: [PATCH 12/31] libceph: introduce ceph_osdc_abort_requests() This will be used by the filesystem for "umount -f". Signed-off-by: Ilya Dryomov --- include/linux/ceph/osd_client.h | 2 + net/ceph/osd_client.c | 67 ++++++++++++++++++++++++++++++--- 2 files changed, 64 insertions(+), 5 deletions(-) diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index b73dd7ebe585..874c31c01f80 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -347,6 +347,7 @@ struct ceph_osd_client { struct rb_root linger_map_checks; atomic_t num_requests; atomic_t num_homeless; + int abort_err; struct delayed_work timeout_work; struct delayed_work osds_timeout_work; #ifdef CONFIG_DEBUG_FS @@ -378,6 +379,7 @@ extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg); void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb); +void ceph_osdc_abort_requests(struct ceph_osd_client *osdc, int err); extern void osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, u32 flags); diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 08b5fc1f90cc..a7e090d2c957 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -1058,6 +1058,38 @@ EXPORT_SYMBOL(ceph_osdc_new_request); DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node) DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node) +/* + * Call @fn on each OSD request as long as @fn returns 0. + */ +static void for_each_request(struct ceph_osd_client *osdc, + int (*fn)(struct ceph_osd_request *req, void *arg), + void *arg) +{ + struct rb_node *n, *p; + + for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { + struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); + + for (p = rb_first(&osd->o_requests); p; ) { + struct ceph_osd_request *req = + rb_entry(p, struct ceph_osd_request, r_node); + + p = rb_next(p); + if (fn(req, arg)) + return; + } + } + + for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) { + struct ceph_osd_request *req = + rb_entry(p, struct ceph_osd_request, r_node); + + p = rb_next(p); + if (fn(req, arg)) + return; + } +} + static bool osd_homeless(struct ceph_osd *osd) { return osd->o_osd == CEPH_HOMELESS_OSD; @@ -2165,9 +2197,9 @@ static void __submit_request(struct ceph_osd_request *req, bool wrlocked) struct ceph_osd_client *osdc = req->r_osdc; struct ceph_osd *osd; enum calc_target_result ct_res; + int err = 0; bool need_send = false; bool promoted = false; - bool need_abort = false; WARN_ON(req->r_tid); dout("%s req %p wrlocked %d\n", __func__, req, wrlocked); @@ -2183,7 +2215,10 @@ again: goto promote; } - if (osdc->osdmap->epoch < osdc->epoch_barrier) { + if (osdc->abort_err) { + dout("req %p abort_err %d\n", req, osdc->abort_err); + err = osdc->abort_err; + } else if (osdc->osdmap->epoch < osdc->epoch_barrier) { dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch, osdc->epoch_barrier); req->r_t.paused = true; @@ -2208,7 +2243,7 @@ again: req->r_t.paused = true; maybe_request_map(osdc); if (req->r_abort_on_full) - need_abort = true; + err = -ENOSPC; } else if (!osd_homeless(osd)) { need_send = true; } else { @@ -2225,8 +2260,8 @@ again: link_request(osd, req); if (need_send) send_request(req); - else if (need_abort) - complete_request(req, -ENOSPC); + else if (err) + complete_request(req, err); mutex_unlock(&osd->lock); if (ct_res == CALC_TARGET_POOL_DNE) @@ -2340,6 +2375,28 @@ static void abort_request(struct ceph_osd_request *req, int err) complete_request(req, err); } +static int abort_fn(struct ceph_osd_request *req, void *arg) +{ + int err = *(int *)arg; + + abort_request(req, err); + return 0; /* continue iteration */ +} + +/* + * Abort all in-flight requests with @err and arrange for all future + * requests to be failed immediately. + */ +void ceph_osdc_abort_requests(struct ceph_osd_client *osdc, int err) +{ + dout("%s osdc %p err %d\n", __func__, osdc, err); + down_write(&osdc->lock); + for_each_request(osdc, abort_fn, &err); + osdc->abort_err = err; + up_write(&osdc->lock); +} +EXPORT_SYMBOL(ceph_osdc_abort_requests); + static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb) { if (likely(eb > osdc->epoch_barrier)) { From 12b69d5f6fe4064147ddb7e7ea2d4fa4aea3eab5 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 11 May 2018 17:12:02 +0800 Subject: [PATCH 13/31] ceph: abort osd requests on force umount This avoid force umount waiting on page writeback: io_schedule+0xd/0x30 wait_on_page_bit_common+0xc6/0x130 __filemap_fdatawait_range+0xbd/0x100 filemap_fdatawait_keep_errors+0x15/0x40 sync_inodes_sb+0x1cf/0x240 sync_filesystem+0x52/0x90 generic_shutdown_super+0x1d/0x110 ceph_kill_sb+0x28/0x80 [ceph] deactivate_locked_super+0x35/0x60 cleanup_mnt+0x36/0x70 task_work_run+0x79/0xa0 exit_to_usermode_loop+0x62/0x70 do_syscall_64+0xdb/0xf0 entry_SYSCALL_64_after_hwframe+0x44/0xa9 0xffffffffffffffff Signed-off-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- fs/ceph/super.c | 1 + 1 file changed, 1 insertion(+) diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 3c1155803444..40664e13cc0f 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -793,6 +793,7 @@ static void ceph_umount_begin(struct super_block *sb) if (!fsc) return; fsc->mount_state = CEPH_MOUNT_SHUTDOWN; + ceph_osdc_abort_requests(&fsc->client->osdc, -EIO); ceph_mdsc_force_umount(fsc->mdsc); return; } From a57d9064e4ee4e9882b922d0627be3d426004c69 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 18 May 2018 16:05:51 +0800 Subject: [PATCH 14/31] ceph: flush pending works before shutdown super Pending works hold inode references, which cause "Busy inodes after unmount" warning. Signed-off-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- fs/ceph/super.c | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 40664e13cc0f..a092cdb69288 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -674,6 +674,13 @@ fail: return ERR_PTR(err); } +static void flush_fs_workqueues(struct ceph_fs_client *fsc) +{ + flush_workqueue(fsc->wb_wq); + flush_workqueue(fsc->pg_inv_wq); + flush_workqueue(fsc->trunc_wq); +} + static void destroy_fs_client(struct ceph_fs_client *fsc) { dout("destroy_fs_client %p\n", fsc); @@ -1089,6 +1096,8 @@ static void ceph_kill_sb(struct super_block *s) dout("kill_sb %p\n", s); ceph_mdsc_pre_umount(fsc->mdsc); + flush_fs_workqueues(fsc); + generic_shutdown_super(s); fsc->client->extra_mon_dispatch = NULL; From 0d09c57d0846537332d3649eef7e01960ffdc574 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 18 May 2018 19:34:45 +0200 Subject: [PATCH 15/31] libceph: no need to call flush_workqueue() before destruction destroy_workqueue() drains the workqueue before proceeding with destruction. Signed-off-by: Ilya Dryomov --- net/ceph/osd_client.c | 1 - 1 file changed, 1 deletion(-) diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index a7e090d2c957..bcedeea80cd5 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -5081,7 +5081,6 @@ out: void ceph_osdc_stop(struct ceph_osd_client *osdc) { - flush_workqueue(osdc->notify_wq); destroy_workqueue(osdc->notify_wq); cancel_delayed_work_sync(&osdc->timeout_work); cancel_delayed_work_sync(&osdc->osds_timeout_work); From 26df726bcdfacf69335de716e7b78c517bc1df65 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 21 May 2018 15:33:48 +0200 Subject: [PATCH 16/31] libceph: move more code into __complete_request() Move req->r_completion wake up and req->r_kref decrement into __complete_request(). Signed-off-by: Ilya Dryomov Acked-by: Jeff Layton Reviewed-by: "Yan, Zheng" --- net/ceph/osd_client.c | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index bcedeea80cd5..a78f578a2da7 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -2320,11 +2320,13 @@ static void finish_request(struct ceph_osd_request *req) static void __complete_request(struct ceph_osd_request *req) { - if (req->r_callback) { - dout("%s req %p tid %llu cb %pf result %d\n", __func__, req, - req->r_tid, req->r_callback, req->r_result); + dout("%s req %p tid %llu cb %pf result %d\n", __func__, req, + req->r_tid, req->r_callback, req->r_result); + + if (req->r_callback) req->r_callback(req); - } + complete_all(&req->r_completion); + ceph_osdc_put_request(req); } /* @@ -2337,8 +2339,6 @@ static void complete_request(struct ceph_osd_request *req, int err) req->r_result = err; finish_request(req); __complete_request(req); - complete_all(&req->r_completion); - ceph_osdc_put_request(req); } static void cancel_map_check(struct ceph_osd_request *req) @@ -3602,8 +3602,6 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) up_read(&osdc->lock); __complete_request(req); - complete_all(&req->r_completion); - ceph_osdc_put_request(req); return; fail_request: From 88bc1922c273c95e84a8955e657401f9bc63a80b Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Mon, 21 May 2018 16:00:29 +0200 Subject: [PATCH 17/31] libceph: defer __complete_request() to a workqueue In the common case, req->r_callback is called by handle_reply() on the ceph-msgr worker thread without any locks. If handle_reply() fails, it is called with both osd->lock and osdc->lock. In the map check case, it is called with just osdc->lock but held for write. Finally, if the request is aborted because of -ENOSPC or by ceph_osdc_abort_requests(), it is called directly on the submitter's thread, again with both locks. req->r_callback on the submitter's thread is relatively new (introduced in 4.12) and ripe for deadlocks -- e.g. writeback worker thread waiting on itself: inode_wait_for_writeback+0x26/0x40 evict+0xb5/0x1a0 iput+0x1d2/0x220 ceph_put_wrbuffer_cap_refs+0xe0/0x2c0 [ceph] writepages_finish+0x2d3/0x410 [ceph] __complete_request+0x26/0x60 [libceph] complete_request+0x2e/0x70 [libceph] __submit_request+0x256/0x330 [libceph] submit_request+0x2b/0x30 [libceph] ceph_osdc_start_request+0x25/0x40 [libceph] ceph_writepages_start+0xdfe/0x1320 [ceph] do_writepages+0x1f/0x70 __writeback_single_inode+0x45/0x330 writeback_sb_inodes+0x26a/0x600 __writeback_inodes_wb+0x92/0xc0 wb_writeback+0x274/0x330 wb_workfn+0x2d5/0x3b0 Defer __complete_request() to a workqueue in all failure cases so it's never on the same thread as ceph_osdc_start_request() and always called with no locks held. Link: http://tracker.ceph.com/issues/23978 Signed-off-by: Ilya Dryomov Acked-by: Jeff Layton Reviewed-by: "Yan, Zheng" --- include/linux/ceph/osd_client.h | 2 ++ net/ceph/osd_client.c | 19 ++++++++++++++++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 874c31c01f80..d4191bde95a4 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -170,6 +170,7 @@ struct ceph_osd_request { u64 r_tid; /* unique for this client */ struct rb_node r_node; struct rb_node r_mc_node; /* map check */ + struct work_struct r_complete_work; struct ceph_osd *r_osd; struct ceph_osd_request_target r_t; @@ -360,6 +361,7 @@ struct ceph_osd_client { struct ceph_msgpool msgpool_op_reply; struct workqueue_struct *notify_wq; + struct workqueue_struct *completion_wq; }; static inline bool ceph_osdmap_flag(struct ceph_osd_client *osdc, int flag) diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index a78f578a2da7..a4c12c37aa90 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -2329,6 +2329,14 @@ static void __complete_request(struct ceph_osd_request *req) ceph_osdc_put_request(req); } +static void complete_request_workfn(struct work_struct *work) +{ + struct ceph_osd_request *req = + container_of(work, struct ceph_osd_request, r_complete_work); + + __complete_request(req); +} + /* * This is open-coded in handle_reply(). */ @@ -2338,7 +2346,9 @@ static void complete_request(struct ceph_osd_request *req, int err) req->r_result = err; finish_request(req); - __complete_request(req); + + INIT_WORK(&req->r_complete_work, complete_request_workfn); + queue_work(req->r_osdc->completion_wq, &req->r_complete_work); } static void cancel_map_check(struct ceph_osd_request *req) @@ -5058,6 +5068,10 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) if (!osdc->notify_wq) goto out_msgpool_reply; + osdc->completion_wq = create_singlethread_workqueue("ceph-completion"); + if (!osdc->completion_wq) + goto out_notify_wq; + schedule_delayed_work(&osdc->timeout_work, osdc->client->options->osd_keepalive_timeout); schedule_delayed_work(&osdc->osds_timeout_work, @@ -5065,6 +5079,8 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) return 0; +out_notify_wq: + destroy_workqueue(osdc->notify_wq); out_msgpool_reply: ceph_msgpool_destroy(&osdc->msgpool_op_reply); out_msgpool: @@ -5079,6 +5095,7 @@ out: void ceph_osdc_stop(struct ceph_osd_client *osdc) { + destroy_workqueue(osdc->completion_wq); destroy_workqueue(osdc->notify_wq); cancel_delayed_work_sync(&osdc->timeout_work); cancel_delayed_work_sync(&osdc->osds_timeout_work); From 4eea0fefd7e60552b36a74f49bd7064d1a5aef2d Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 16 May 2018 18:21:34 +0200 Subject: [PATCH 18/31] libceph: use for_each_request() in ceph_osdc_abort_on_full() Scanning the trees just to see if there is anything to abort is unnecessary -- all that is needed here is to update the epoch barrier first, before we start aborting. Simplify and do the update inside the loop before calling abort_request() for the first time. The switch to for_each_request() also fixes a bug: homeless requests weren't even considered for aborting. Signed-off-by: Ilya Dryomov Acked-by: Jeff Layton Reviewed-by: "Yan, Zheng" --- net/ceph/osd_client.c | 79 ++++++++++++++----------------------------- 1 file changed, 26 insertions(+), 53 deletions(-) diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index a4c12c37aa90..be274ab43d01 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -2433,6 +2433,30 @@ void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb) } EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier); +/* + * We can end up releasing caps as a result of abort_request(). + * In that case, we probably want to ensure that the cap release message + * has an updated epoch barrier in it, so set the epoch barrier prior to + * aborting the first request. + */ +static int abort_on_full_fn(struct ceph_osd_request *req, void *arg) +{ + struct ceph_osd_client *osdc = req->r_osdc; + bool *victims = arg; + + if (req->r_abort_on_full && + (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || + pool_full(osdc, req->r_t.target_oloc.pool))) { + if (!*victims) { + update_epoch_barrier(osdc, osdc->osdmap->epoch); + *victims = true; + } + abort_request(req, -ENOSPC); + } + + return 0; /* continue iteration */ +} + /* * Drop all pending requests that are stalled waiting on a full condition to * clear, and complete them with ENOSPC as the return code. Set the @@ -2441,61 +2465,10 @@ EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier); */ static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc) { - struct rb_node *n; bool victims = false; - dout("enter abort_on_full\n"); - - if (!ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) && !have_pool_full(osdc)) - goto out; - - /* Scan list and see if there is anything to abort */ - for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { - struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); - struct rb_node *m; - - m = rb_first(&osd->o_requests); - while (m) { - struct ceph_osd_request *req = rb_entry(m, - struct ceph_osd_request, r_node); - m = rb_next(m); - - if (req->r_abort_on_full) { - victims = true; - break; - } - } - if (victims) - break; - } - - if (!victims) - goto out; - - /* - * Update the barrier to current epoch if it's behind that point, - * since we know we have some calls to be aborted in the tree. - */ - update_epoch_barrier(osdc, osdc->osdmap->epoch); - - for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { - struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); - struct rb_node *m; - - m = rb_first(&osd->o_requests); - while (m) { - struct ceph_osd_request *req = rb_entry(m, - struct ceph_osd_request, r_node); - m = rb_next(m); - - if (req->r_abort_on_full && - (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || - pool_full(osdc, req->r_t.target_oloc.pool))) - abort_request(req, -ENOSPC); - } - } -out: - dout("return abort_on_full barrier=%u\n", osdc->epoch_barrier); + if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || have_pool_full(osdc)) + for_each_request(osdc, abort_on_full_fn, &victims); } static void check_pool_dne(struct ceph_osd_request *req) From 29e878201ee635940ba018bce51f4ee0f0e47a5b Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Thu, 17 May 2018 16:13:07 +0200 Subject: [PATCH 19/31] libceph: don't warn if req->r_abort_on_full is set The "FULL or reached pool quota" warning is there to explain paused requests. No need to emit it if pausing isn't going to occur. Signed-off-by: Ilya Dryomov Acked-by: Jeff Layton Reviewed-by: "Yan, Zheng" --- net/ceph/osd_client.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index be274ab43d01..34b5334548c3 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -2239,11 +2239,13 @@ again: (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || pool_full(osdc, req->r_t.base_oloc.pool))) { dout("req %p full/pool_full\n", req); - pr_warn_ratelimited("FULL or reached pool quota\n"); - req->r_t.paused = true; - maybe_request_map(osdc); - if (req->r_abort_on_full) + if (req->r_abort_on_full) { err = -ENOSPC; + } else { + pr_warn_ratelimited("FULL or reached pool quota\n"); + req->r_t.paused = true; + maybe_request_map(osdc); + } } else if (!osd_homeless(osd)) { need_send = true; } else { From 6001567c14eb8e93f8bceb35fc02158a3e1f20f8 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 22 May 2018 16:26:51 +0200 Subject: [PATCH 20/31] libceph: avoid a use-after-free during map check Sending map check after complete_request() was called is not only useless, but can lead to a use-after-free as req->r_kref decrement in __complete_request() races with map check code. Signed-off-by: Ilya Dryomov Acked-by: Jeff Layton Reviewed-by: "Yan, Zheng" --- net/ceph/osd_client.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 34b5334548c3..294320400c72 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -2266,7 +2266,7 @@ again: complete_request(req, err); mutex_unlock(&osd->lock); - if (ct_res == CALC_TARGET_POOL_DNE) + if (!err && ct_res == CALC_TARGET_POOL_DNE) send_map_check(req); if (promoted) From 690f951d7eb8c0529f8a367a3db9cfbfde624db4 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 30 May 2018 14:58:25 +0200 Subject: [PATCH 21/31] libceph: don't abort reads in ceph_osdc_abort_on_full() Don't consider reads for aborting and use ->base_oloc instead of ->target_oloc, as done in __submit_request(). Strictly speaking, we shouldn't be aborting FULL_TRY/FULL_FORCE writes either. But, there is an inconsistency in FULL_TRY/FULL_FORCE handling on the OSD side [1], so given that neither of these is used in the kernel client, leave it for when the OSD behaviour is sorted out. [1] http://tracker.ceph.com/issues/24339 Signed-off-by: Ilya Dryomov Acked-by: Jeff Layton Reviewed-by: "Yan, Zheng" --- net/ceph/osd_client.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 294320400c72..3d055529189c 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -2447,8 +2447,9 @@ static int abort_on_full_fn(struct ceph_osd_request *req, void *arg) bool *victims = arg; if (req->r_abort_on_full && + (req->r_flags & CEPH_OSD_FLAG_WRITE) && (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || - pool_full(osdc, req->r_t.target_oloc.pool))) { + pool_full(osdc, req->r_t.base_oloc.pool))) { if (!*victims) { update_epoch_barrier(osdc, osdc->osdmap->epoch); *victims = true; From c843d13caefad9f2f182f38d6bfe492c9f00e086 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 30 May 2018 16:29:14 +0200 Subject: [PATCH 22/31] libceph: make abort_on_full a per-osdc setting The intent behind making it a per-request setting was that it would be set for writes, but not for reads. As it is, the flag is set for all fs/ceph requests except for pool perm check stat request (technically a read). ceph_osdc_abort_on_full() skips reads since the previous commit and I don't see a use case for marking individual requests. Signed-off-by: Ilya Dryomov Acked-by: Jeff Layton Reviewed-by: "Yan, Zheng" --- fs/ceph/addr.c | 1 - fs/ceph/file.c | 1 - fs/ceph/super.c | 2 ++ include/linux/ceph/osd_client.h | 2 +- net/ceph/osd_client.c | 9 ++++----- 5 files changed, 7 insertions(+), 8 deletions(-) diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 5f7ad3d0df2e..ca0d5510ed50 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -1935,7 +1935,6 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false); wr_req->r_mtime = ci->vfs_inode.i_mtime; - wr_req->r_abort_on_full = true; err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false); if (!err) diff --git a/fs/ceph/file.c b/fs/ceph/file.c index cf0e45b10121..6b9f7f3cd237 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -895,7 +895,6 @@ static void ceph_aio_retry_work(struct work_struct *work) req->r_callback = ceph_aio_complete_req; req->r_inode = inode; req->r_priv = aio_req; - req->r_abort_on_full = true; ret = ceph_osdc_start_request(req->r_osdc, req, false); out: diff --git a/fs/ceph/super.c b/fs/ceph/super.c index a092cdb69288..cad046aa4fd0 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -616,7 +616,9 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, err = PTR_ERR(fsc->client); goto fail; } + fsc->client->extra_mon_dispatch = extra_mon_dispatch; + fsc->client->osdc.abort_on_full = true; if (!fsopt->mds_namespace) { ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index d4191bde95a4..0d6ee04b4c41 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -202,7 +202,6 @@ struct ceph_osd_request { struct timespec r_mtime; /* ditto */ u64 r_data_offset; /* ditto */ bool r_linger; /* don't resend on failure */ - bool r_abort_on_full; /* return ENOSPC when full */ /* internal */ unsigned long r_stamp; /* jiffies, send or check time */ @@ -348,6 +347,7 @@ struct ceph_osd_client { struct rb_root linger_map_checks; atomic_t num_requests; atomic_t num_homeless; + bool abort_on_full; /* abort w/ ENOSPC when full */ int abort_err; struct delayed_work timeout_work; struct delayed_work osds_timeout_work; diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 3d055529189c..05c4d27d25fe 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -1030,7 +1030,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc, truncate_size, truncate_seq); } - req->r_abort_on_full = true; req->r_flags = flags; req->r_base_oloc.pool = layout->pool_id; req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns); @@ -2239,7 +2238,7 @@ again: (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || pool_full(osdc, req->r_t.base_oloc.pool))) { dout("req %p full/pool_full\n", req); - if (req->r_abort_on_full) { + if (osdc->abort_on_full) { err = -ENOSPC; } else { pr_warn_ratelimited("FULL or reached pool quota\n"); @@ -2446,8 +2445,7 @@ static int abort_on_full_fn(struct ceph_osd_request *req, void *arg) struct ceph_osd_client *osdc = req->r_osdc; bool *victims = arg; - if (req->r_abort_on_full && - (req->r_flags & CEPH_OSD_FLAG_WRITE) && + if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || pool_full(osdc, req->r_t.base_oloc.pool))) { if (!*victims) { @@ -2470,7 +2468,8 @@ static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc) { bool victims = false; - if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || have_pool_full(osdc)) + if (osdc->abort_on_full && + (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || have_pool_full(osdc))) for_each_request(osdc, abort_on_full_fn, &victims); } From a86f009f106cba322c608785e09c8b5be8ffe8bb Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Wed, 23 May 2018 14:46:53 +0200 Subject: [PATCH 23/31] libceph: allocate the locator string with GFP_NOFAIL calc_target() isn't supposed to fail with anything but POOL_DNE, in which case we report that the pool doesn't exist and fail the request with -ENOENT. Doing this for -ENOMEM is at the very least confusing and also harmful -- as the preceding requests complete, a short-lived locator string allocation is likely to succeed after a wait. (We used to call ceph_object_locator_to_pg() for a pi lookup. In theory that could fail with -ENOENT, hence the "ret != -ENOENT" warning being removed.) Signed-off-by: Ilya Dryomov --- include/linux/ceph/osdmap.h | 8 ++++---- net/ceph/osd_client.c | 10 +--------- net/ceph/osdmap.c | 19 ++++++++----------- 3 files changed, 13 insertions(+), 24 deletions(-) diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h index e71fb222c7c3..5675b1f09bc5 100644 --- a/include/linux/ceph/osdmap.h +++ b/include/linux/ceph/osdmap.h @@ -279,10 +279,10 @@ bool ceph_osds_changed(const struct ceph_osds *old_acting, const struct ceph_osds *new_acting, bool any_change); -int __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi, - const struct ceph_object_id *oid, - const struct ceph_object_locator *oloc, - struct ceph_pg *raw_pgid); +void __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi, + const struct ceph_object_id *oid, + const struct ceph_object_locator *oloc, + struct ceph_pg *raw_pgid); int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap, const struct ceph_object_id *oid, const struct ceph_object_locator *oloc, diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 05c4d27d25fe..f2584fe1246f 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -1430,7 +1430,6 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc, bool recovery_deletes = ceph_osdmap_flag(osdc, CEPH_OSDMAP_RECOVERY_DELETES); enum calc_target_result ct_res; - int ret; t->epoch = osdc->osdmap->epoch; pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool); @@ -1466,14 +1465,7 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc, } } - ret = __ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc, - &pgid); - if (ret) { - WARN_ON(ret != -ENOENT); - t->osd = CEPH_HOMELESS_OSD; - ct_res = CALC_TARGET_POOL_DNE; - goto out; - } + __ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc, &pgid); last_pgid.pool = pgid.pool; last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask); diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c index 9645ffd6acfb..a7494f623451 100644 --- a/net/ceph/osdmap.c +++ b/net/ceph/osdmap.c @@ -2145,10 +2145,10 @@ bool ceph_osds_changed(const struct ceph_osds *old_acting, * Should only be called with target_oid and target_oloc (as opposed to * base_oid and base_oloc), since tiering isn't taken into account. */ -int __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi, - const struct ceph_object_id *oid, - const struct ceph_object_locator *oloc, - struct ceph_pg *raw_pgid) +void __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi, + const struct ceph_object_id *oid, + const struct ceph_object_locator *oloc, + struct ceph_pg *raw_pgid) { WARN_ON(pi->id != oloc->pool); @@ -2164,11 +2164,8 @@ int __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi, int nsl = oloc->pool_ns->len; size_t total = nsl + 1 + oid->name_len; - if (total > sizeof(stack_buf)) { - buf = kmalloc(total, GFP_NOIO); - if (!buf) - return -ENOMEM; - } + if (total > sizeof(stack_buf)) + buf = kmalloc(total, GFP_NOIO | __GFP_NOFAIL); memcpy(buf, oloc->pool_ns->str, nsl); buf[nsl] = '\037'; memcpy(buf + nsl + 1, oid->name, oid->name_len); @@ -2180,7 +2177,6 @@ int __ceph_object_locator_to_pg(struct ceph_pg_pool_info *pi, oid->name, nsl, oloc->pool_ns->str, raw_pgid->pool, raw_pgid->seed); } - return 0; } int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap, @@ -2194,7 +2190,8 @@ int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap, if (!pi) return -ENOENT; - return __ceph_object_locator_to_pg(pi, oid, oloc, raw_pgid); + __ceph_object_locator_to_pg(pi, oid, oloc, raw_pgid); + return 0; } EXPORT_SYMBOL(ceph_object_locator_to_pg); From fa466743a9fc6e4a24ef22285fb384f9ef4a2edb Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 25 May 2018 11:22:56 +0800 Subject: [PATCH 24/31] ceph: fix wrong check for the case of updating link count Signed-off-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- fs/ceph/caps.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 477b822e6333..0ae41854d676 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -3059,7 +3059,6 @@ static void handle_cap_grant(struct inode *inode, int used, wanted, dirty; u64 size = le64_to_cpu(grant->size); u64 max_size = le64_to_cpu(grant->max_size); - struct timespec mtime, atime, ctime; int check_caps = 0; bool wake = false; bool writeback = false; @@ -3124,7 +3123,7 @@ static void handle_cap_grant(struct inode *inode, from_kgid(&init_user_ns, inode->i_gid)); } - if ((newcaps & CEPH_CAP_AUTH_SHARED) && + if ((newcaps & CEPH_CAP_LINK_SHARED) && (extra_info->issued & CEPH_CAP_LINK_EXCL) == 0) { set_nlink(inode, le32_to_cpu(grant->nlink)); if (inode->i_nlink == 0 && @@ -3149,6 +3148,7 @@ static void handle_cap_grant(struct inode *inode, } if (newcaps & CEPH_CAP_ANY_RD) { + struct timespec mtime, atime, ctime; /* ctime/mtime/atime? */ ceph_decode_timespec(&mtime, &grant->mtime); ceph_decode_timespec(&atime, &grant->atime); From aae1a442f8eac6d5442ee479df66d278c73a6ecc Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Sat, 26 May 2018 16:54:39 +0800 Subject: [PATCH 25/31] ceph: prevent i_version from going back inode info from non-auth can be stale. Signed-off-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- fs/ceph/inode.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 4aeccb13437b..4fda7a9d4c9d 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -889,7 +889,8 @@ static int fill_inode(struct inode *inode, struct page *locked_page, } /* finally update i_version */ - ci->i_version = le64_to_cpu(info->version); + if (le64_to_cpu(info->version) > ci->i_version) + ci->i_version = le64_to_cpu(info->version); inode->i_mapping->a_ops = &ceph_aops; From 73fb0949cf246b212ff63d692a0ec88db954bb35 Mon Sep 17 00:00:00 2001 From: Luis Henriques Date: Mon, 28 May 2018 18:37:40 +0100 Subject: [PATCH 26/31] ceph: fix use-after-free in ceph_statfs() KASAN found an UAF in ceph_statfs. This was a one-off bug but looking at the code it looks like the monmap access needs to be protected as it can be modified while we're accessing it. Fix this by protecting the access with the monc->mutex. BUG: KASAN: use-after-free in ceph_statfs+0x21d/0x2c0 Read of size 8 at addr ffff88006844f2e0 by task trinity-c5/304 CPU: 0 PID: 304 Comm: trinity-c5 Not tainted 4.17.0-rc6+ #172 Hardware name: QEMU Standard PC (i440FX + PIIX, 1996), BIOS 1.0.0-prebuilt.qemu-project.org 04/01/2014 Call Trace: dump_stack+0xa5/0x11b ? show_regs_print_info+0x5/0x5 ? kmsg_dump_rewind+0x118/0x118 ? ceph_statfs+0x21d/0x2c0 print_address_description+0x73/0x2b0 ? ceph_statfs+0x21d/0x2c0 kasan_report+0x243/0x360 ceph_statfs+0x21d/0x2c0 ? ceph_umount_begin+0x80/0x80 ? kmem_cache_alloc+0xdf/0x1a0 statfs_by_dentry+0x79/0xb0 vfs_statfs+0x28/0x110 user_statfs+0x8c/0xe0 ? vfs_statfs+0x110/0x110 ? __fdget_raw+0x10/0x10 __se_sys_statfs+0x5d/0xa0 ? user_statfs+0xe0/0xe0 ? mutex_unlock+0x1d/0x40 ? __x64_sys_statfs+0x20/0x30 do_syscall_64+0xee/0x290 ? syscall_return_slowpath+0x1c0/0x1c0 ? page_fault+0x1e/0x30 ? syscall_return_slowpath+0x13c/0x1c0 ? prepare_exit_to_usermode+0xdb/0x140 ? syscall_trace_enter+0x330/0x330 ? __put_user_4+0x1c/0x30 entry_SYSCALL_64_after_hwframe+0x44/0xa9 Allocated by task 130: __kmalloc+0x124/0x210 ceph_monmap_decode+0x1c1/0x400 dispatch+0x113/0xd20 ceph_con_workfn+0xa7e/0x44e0 process_one_work+0x5f0/0xa30 worker_thread+0x184/0xa70 kthread+0x1a0/0x1c0 ret_from_fork+0x35/0x40 Freed by task 130: kfree+0xb8/0x210 dispatch+0x15a/0xd20 ceph_con_workfn+0xa7e/0x44e0 process_one_work+0x5f0/0xa30 worker_thread+0x184/0xa70 kthread+0x1a0/0x1c0 ret_from_fork+0x35/0x40 Signed-off-by: Luis Henriques Reviewed-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- fs/ceph/super.c | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/fs/ceph/super.c b/fs/ceph/super.c index cad046aa4fd0..a8e8e2629fb4 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -45,7 +45,7 @@ static void ceph_put_super(struct super_block *s) static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf) { struct ceph_fs_client *fsc = ceph_inode_to_client(d_inode(dentry)); - struct ceph_monmap *monmap = fsc->client->monc.monmap; + struct ceph_mon_client *monc = &fsc->client->monc; struct ceph_statfs st; u64 fsid; int err; @@ -58,7 +58,7 @@ static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf) } dout("statfs\n"); - err = ceph_monc_do_statfs(&fsc->client->monc, data_pool, &st); + err = ceph_monc_do_statfs(monc, data_pool, &st); if (err < 0) return err; @@ -94,8 +94,11 @@ static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf) buf->f_namelen = NAME_MAX; /* Must convert the fsid, for consistent values across arches */ - fsid = le64_to_cpu(*(__le64 *)(&monmap->fsid)) ^ - le64_to_cpu(*((__le64 *)&monmap->fsid + 1)); + mutex_lock(&monc->mutex); + fsid = le64_to_cpu(*(__le64 *)(&monc->monmap->fsid)) ^ + le64_to_cpu(*((__le64 *)&monc->monmap->fsid + 1)); + mutex_unlock(&monc->mutex); + buf->f_fsid.val[0] = fsid & 0xffffffff; buf->f_fsid.val[1] = fsid >> 32; From c36ed50de2ad1649ce0369a4a6fc2cc11b20dfb7 Mon Sep 17 00:00:00 2001 From: Chengguang Xu Date: Wed, 30 May 2018 10:13:11 +0800 Subject: [PATCH 27/31] ceph: fix alignment of rasize On currently logic: when I specify rasize=0~1 then it will be 4096. when I specify rasize=2~4097 then it will be 8192. Make it the same as rsize & wsize. Signed-off-by: Chengguang Xu Reviewed-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- fs/ceph/super.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fs/ceph/super.c b/fs/ceph/super.c index a8e8e2629fb4..b4ff1392e333 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -271,7 +271,7 @@ static int parse_fsopt_token(char *c, void *private) case Opt_rasize: if (intval < 0) return -EINVAL; - fsopt->rasize = ALIGN(intval + PAGE_SIZE - 1, PAGE_SIZE); + fsopt->rasize = ALIGN(intval, PAGE_SIZE); break; case Opt_caps_wanted_delay_min: if (intval < 1) From 8db0c7596f1258b28f32a38f2d5bbc0d63c104c9 Mon Sep 17 00:00:00 2001 From: Chengguang Xu Date: Wed, 30 May 2018 16:47:06 +0800 Subject: [PATCH 28/31] ceph: strengthen rsize/wsize/readdir_max_bytes validation The check (intval < PAGE_SIZE) will involve type cast, so even when specifying negative value to rsize/wsize/readdir_max_bytes, it will pass the validation check successfully. Signed-off-by: Chengguang Xu Reviewed-by: Ilya Dryomov Signed-off-by: Ilya Dryomov --- fs/ceph/super.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fs/ceph/super.c b/fs/ceph/super.c index b4ff1392e333..cec1d3343742 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -259,12 +259,12 @@ static int parse_fsopt_token(char *c, void *private) break; /* misc */ case Opt_wsize: - if (intval < PAGE_SIZE || intval > CEPH_MAX_WRITE_SIZE) + if (intval < (int)PAGE_SIZE || intval > CEPH_MAX_WRITE_SIZE) return -EINVAL; fsopt->wsize = ALIGN(intval, PAGE_SIZE); break; case Opt_rsize: - if (intval < PAGE_SIZE || intval > CEPH_MAX_READ_SIZE) + if (intval < (int)PAGE_SIZE || intval > CEPH_MAX_READ_SIZE) return -EINVAL; fsopt->rsize = ALIGN(intval, PAGE_SIZE); break; @@ -289,7 +289,7 @@ static int parse_fsopt_token(char *c, void *private) fsopt->max_readdir = intval; break; case Opt_readdir_max_bytes: - if (intval < PAGE_SIZE && intval != 0) + if (intval < (int)PAGE_SIZE && intval != 0) return -EINVAL; fsopt->max_readdir_bytes = intval; break; From 3619aa8b74490fe5f803f7e71af02845aede6b5c Mon Sep 17 00:00:00 2001 From: Chengguang Xu Date: Mon, 4 Jun 2018 16:03:51 +0800 Subject: [PATCH 29/31] ceph: show ino32 if the value is different with default In current ceph_show_options(), there is no item for showing 'ino32', so add showing mount option 'ino32' if the value is different with default. Signed-off-by: Chengguang Xu Reviewed-by: Ilya Dryomov Signed-off-by: Ilya Dryomov --- fs/ceph/super.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/fs/ceph/super.c b/fs/ceph/super.c index cec1d3343742..95a3b3ac9b6e 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -537,6 +537,8 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root) seq_puts(m, ",noasyncreaddir"); if ((fsopt->flags & CEPH_MOUNT_OPT_DCACHE) == 0) seq_puts(m, ",nodcache"); + if (fsopt->flags & CEPH_MOUNT_OPT_INO32) + seq_puts(m, ",ino32"); if (fsopt->flags & CEPH_MOUNT_OPT_FSCACHE) { seq_show_option(m, "fsc", fsopt->fscache_uniq); } From c7f04944bca9f6186829722831e51858951ad57c Mon Sep 17 00:00:00 2001 From: Chengguang Xu Date: Mon, 4 Jun 2018 20:10:05 +0800 Subject: [PATCH 30/31] ceph: update description of some mount options Based on code, default value of rsize/wsize is 16 MB. Signed-off-by: Chengguang Xu Reviewed-by: Ilya Dryomov Signed-off-by: Ilya Dryomov --- Documentation/filesystems/ceph.txt | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/Documentation/filesystems/ceph.txt b/Documentation/filesystems/ceph.txt index d7f011ddc150..8bf62240e10d 100644 --- a/Documentation/filesystems/ceph.txt +++ b/Documentation/filesystems/ceph.txt @@ -105,15 +105,13 @@ Mount Options address its connection to the monitor originates from. wsize=X - Specify the maximum write size in bytes. By default there is no - maximum. Ceph will normally size writes based on the file stripe - size. + Specify the maximum write size in bytes. Default: 16 MB. rsize=X - Specify the maximum read size in bytes. Default: 64 MB. + Specify the maximum read size in bytes. Default: 16 MB. rasize=X - Specify the maximum readahead. Default: 8 MB. + Specify the maximum readahead size in bytes. Default: 8 MB. mount_timeout=X Specify the timeout value for mount (in seconds), in the case From 23edca864951250af845a11da86bb3ea63522ed2 Mon Sep 17 00:00:00 2001 From: Dongsheng Yang Date: Mon, 4 Jun 2018 06:24:37 -0400 Subject: [PATCH 31/31] rbd: flush rbd_dev->watch_dwork after watch is unregistered There is a problem if we are going to unmap a rbd device and the watch_dwork is going to queue delayed work for watch: unmap Thread watch Thread timer do_rbd_remove cancel_tasks_sync(rbd_dev) queue_delayed_work for watch destroy_workqueue(rbd_dev->task_wq) drain_workqueue(wq) destroy other resources in wq call_timer_fn __queue_work() Then the delayed work escape the cancel_tasks_sync() and destroy_workqueue() and we will get an user-after-free call trace: BUG: unable to handle kernel NULL pointer dereference at 0000000000000000 PGD 0 P4D 0 Oops: 0000 [#1] SMP PTI Modules linked in: CPU: 7 PID: 0 Comm: swapper/7 Tainted: G OE 4.17.0-rc6+ #13 Hardware name: Red Hat KVM, BIOS 0.5.1 01/01/2011 RIP: 0010:__queue_work+0x6a/0x3b0 RSP: 0018:ffff9427df1c3e90 EFLAGS: 00010086 RAX: ffff9427deca8400 RBX: 0000000000000000 RCX: 0000000000000000 RDX: ffff9427deca8400 RSI: ffff9427df1c3e50 RDI: 0000000000000000 RBP: ffff942783e39e00 R08: ffff9427deca8400 R09: ffff9427df1c3f00 R10: 0000000000000004 R11: 0000000000000005 R12: ffff9427cfb85970 R13: 0000000000002000 R14: 000000000001eca0 R15: 0000000000000007 FS: 0000000000000000(0000) GS:ffff9427df1c0000(0000) knlGS:0000000000000000 CS: 0010 DS: 0000 ES: 0000 CR0: 0000000080050033 CR2: 0000000000000000 CR3: 00000004c900a005 CR4: 00000000000206e0 Call Trace: ? __queue_work+0x3b0/0x3b0 call_timer_fn+0x2d/0x130 run_timer_softirq+0x16e/0x430 ? tick_sched_timer+0x37/0x70 __do_softirq+0xd2/0x280 irq_exit+0xd5/0xe0 smp_apic_timer_interrupt+0x6c/0x130 apic_timer_interrupt+0xf/0x20 [ Move rbd_dev->watch_dwork cancellation so that rbd_reregister_watch() either bails out early because the watch is UNREGISTERED at that point or just gets cancelled. ] Cc: stable@vger.kernel.org Fixes: 99d1694310df ("rbd: retry watch re-registration periodically") Signed-off-by: Dongsheng Yang Reviewed-by: Ilya Dryomov Signed-off-by: Ilya Dryomov --- drivers/block/rbd.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index ac57bc0f6fca..5faafdafabd9 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -3400,7 +3400,6 @@ static void cancel_tasks_sync(struct rbd_device *rbd_dev) { dout("%s rbd_dev %p\n", __func__, rbd_dev); - cancel_delayed_work_sync(&rbd_dev->watch_dwork); cancel_work_sync(&rbd_dev->acquired_lock_work); cancel_work_sync(&rbd_dev->released_lock_work); cancel_delayed_work_sync(&rbd_dev->lock_dwork); @@ -3418,6 +3417,7 @@ static void rbd_unregister_watch(struct rbd_device *rbd_dev) rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED; mutex_unlock(&rbd_dev->watch_mutex); + cancel_delayed_work_sync(&rbd_dev->watch_dwork); ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc); }