diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h index 8cf644197b1a..85650b415e73 100644 --- a/include/linux/ceph/osd_client.h +++ b/include/linux/ceph/osd_client.h @@ -267,6 +267,7 @@ struct ceph_osd_client { struct rb_root osds; /* osds */ struct list_head osd_lru; /* idle osds */ spinlock_t osd_lru_lock; + u32 epoch_barrier; struct ceph_osd homeless_osd; atomic64_t last_tid; /* tid of last request */ u64 last_linger_id; @@ -305,6 +306,7 @@ extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg); extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg); +void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb); extern void osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which, u16 opcode, u32 flags); diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c index d7e63a4f5578..71ba13927b3d 100644 --- a/net/ceph/debugfs.c +++ b/net/ceph/debugfs.c @@ -62,7 +62,8 @@ static int osdmap_show(struct seq_file *s, void *p) return 0; down_read(&osdc->lock); - seq_printf(s, "epoch %d flags 0x%x\n", map->epoch, map->flags); + seq_printf(s, "epoch %u barrier %u flags 0x%x\n", map->epoch, + osdc->epoch_barrier, map->flags); for (n = rb_first(&map->pg_pools); n; n = rb_next(n)) { struct ceph_pg_pool_info *pi = diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 55b7585ccefd..9b8f57fd5ee1 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -1298,8 +1298,9 @@ static bool target_should_be_paused(struct ceph_osd_client *osdc, __pool_full(pi); WARN_ON(pi->id != t->base_oloc.pool); - return (t->flags & CEPH_OSD_FLAG_READ && pauserd) || - (t->flags & CEPH_OSD_FLAG_WRITE && pausewr); + return ((t->flags & CEPH_OSD_FLAG_READ) && pauserd) || + ((t->flags & CEPH_OSD_FLAG_WRITE) && pausewr) || + (osdc->osdmap->epoch < osdc->epoch_barrier); } enum calc_target_result { @@ -1654,8 +1655,13 @@ again: goto promote; } - if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && - ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) { + if (osdc->osdmap->epoch < osdc->epoch_barrier) { + dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch, + osdc->epoch_barrier); + req->r_t.paused = true; + maybe_request_map(osdc); + } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) && + ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) { dout("req %p pausewr\n", req); req->r_t.paused = true; maybe_request_map(osdc); @@ -1807,19 +1813,77 @@ static void abort_request(struct ceph_osd_request *req, int err) complete_request(req, err); } +static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb) +{ + if (likely(eb > osdc->epoch_barrier)) { + dout("updating epoch_barrier from %u to %u\n", + osdc->epoch_barrier, eb); + osdc->epoch_barrier = eb; + /* Request map if we're not to the barrier yet */ + if (eb > osdc->osdmap->epoch) + maybe_request_map(osdc); + } +} + +void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb) +{ + down_read(&osdc->lock); + if (unlikely(eb > osdc->epoch_barrier)) { + up_read(&osdc->lock); + down_write(&osdc->lock); + update_epoch_barrier(osdc, eb); + up_write(&osdc->lock); + } else { + up_read(&osdc->lock); + } +} +EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier); + /* * Drop all pending requests that are stalled waiting on a full condition to - * clear, and complete them with ENOSPC as the return code. + * clear, and complete them with ENOSPC as the return code. Set the + * osdc->epoch_barrier to the latest map epoch that we've seen if any were + * cancelled. */ static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc) { struct rb_node *n; + bool victims = false; dout("enter abort_on_full\n"); if (!ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) && !have_pool_full(osdc)) goto out; + /* Scan list and see if there is anything to abort */ + for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { + struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); + struct rb_node *m; + + m = rb_first(&osd->o_requests); + while (m) { + struct ceph_osd_request *req = rb_entry(m, + struct ceph_osd_request, r_node); + m = rb_next(m); + + if (req->r_abort_on_full) { + victims = true; + break; + } + } + if (victims) + break; + } + + if (!victims) + goto out; + + /* + * Update the barrier to current epoch if it's behind that point, + * since we know we have some calls to be aborted in the tree. + */ + update_epoch_barrier(osdc, osdc->osdmap->epoch); + for (n = rb_first(&osdc->osds); n; n = rb_next(n)) { struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node); struct rb_node *m; @@ -1837,7 +1901,7 @@ static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc) } } out: - dout("return abort_on_full\n"); + dout("return abort_on_full barrier=%u\n", osdc->epoch_barrier); } static void check_pool_dne(struct ceph_osd_request *req) @@ -3293,7 +3357,8 @@ done: pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) || ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) || have_pool_full(osdc); - if (was_pauserd || was_pausewr || pauserd || pausewr) + if (was_pauserd || was_pausewr || pauserd || pausewr || + osdc->osdmap->epoch < osdc->epoch_barrier) maybe_request_map(osdc); kick_requests(osdc, &need_resend, &need_resend_linger);