From 483302af2622cb26983c847196b8bad0a80fbd2f Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Sat, 21 Nov 2020 17:04:12 -0500 Subject: [PATCH 01/26] cls/log: Take const references of things you won't modify Signed-off-by: Adam C. Emerson (cherry picked from commit 73ea8cec06addc6af2ba354321f1099f657f13c5) Signed-off-by: Adam C. Emerson --- src/cls/log/cls_log_client.cc | 4 ++-- src/cls/log/cls_log_client.h | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/cls/log/cls_log_client.cc b/src/cls/log/cls_log_client.cc index 418599c8066e4..182bb9fec47e9 100644 --- a/src/cls/log/cls_log_client.cc +++ b/src/cls/log/cls_log_client.cc @@ -113,8 +113,8 @@ class LogListCtx : public ObjectOperationCompletion { } }; -void cls_log_list(librados::ObjectReadOperation& op, utime_t& from, utime_t& to, - const string& in_marker, int max_entries, +void cls_log_list(librados::ObjectReadOperation& op, const utime_t& from, + const utime_t& to, const string& in_marker, int max_entries, list& entries, string *out_marker, bool *truncated) { diff --git a/src/cls/log/cls_log_client.h b/src/cls/log/cls_log_client.h index b049c2cc01bda..2afdabeb3e0a2 100644 --- a/src/cls/log/cls_log_client.h +++ b/src/cls/log/cls_log_client.h @@ -19,9 +19,9 @@ void cls_log_add(librados::ObjectWriteOperation& op, cls_log_entry& entry); void cls_log_add(librados::ObjectWriteOperation& op, const utime_t& timestamp, const std::string& section, const std::string& name, ceph::buffer::list& bl); -void cls_log_list(librados::ObjectReadOperation& op, utime_t& from, utime_t& to, - const std::string& in_marker, int max_entries, - std::list& entries, +void cls_log_list(librados::ObjectReadOperation& op, const utime_t& from, + const utime_t& to, const std::string& in_marker, + int max_entries, std::list& entries, std::string *out_marker, bool *truncated); void cls_log_trim(librados::ObjectWriteOperation& op, const utime_t& from_time, const utime_t& to_time, From 35f044f39da713b3bf4c5002aade7b456727190e Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Tue, 3 Nov 2020 16:02:26 -0500 Subject: [PATCH 02/26] rgw: Add AioCompletion* versions for the rest of the FIFO methods Signed-off-by: Adam C. Emerson (cherry picked from commit 665573ab8905bfa2e1ede6fc3be9bc80a625cb49) Signed-off-by: Adam C. Emerson --- src/rgw/cls_fifo_legacy.cc | 1583 +++++++++++++++++++++----- src/rgw/cls_fifo_legacy.h | 91 +- src/rgw/rgw_datalog.cc | 7 +- src/test/rgw/test_cls_fifo_legacy.cc | 484 +++++++- 4 files changed, 1826 insertions(+), 339 deletions(-) diff --git a/src/rgw/cls_fifo_legacy.cc b/src/rgw/cls_fifo_legacy.cc index d835aeec76ab8..569a3e77c458f 100644 --- a/src/rgw/cls_fifo_legacy.cc +++ b/src/rgw/cls_fifo_legacy.cc @@ -109,6 +109,7 @@ int get_meta(lr::IoCtx& ioctx, const std::string& oid, return r; }; +namespace { void update_meta(lr::ObjectWriteOperation* op, const fifo::objv& objv, const fifo::update& update) { @@ -175,6 +176,27 @@ int push_part(lr::IoCtx& ioctx, const std::string& oid, std::string_view tag, return retval; } +void push_part(lr::IoCtx& ioctx, const std::string& oid, std::string_view tag, + std::deque data_bufs, std::uint64_t tid, + lr::AioCompletion* c) +{ + lr::ObjectWriteOperation op; + fifo::op::push_part pp; + + pp.tag = tag; + pp.data_bufs = data_bufs; + pp.total_len = 0; + + for (const auto& bl : data_bufs) + pp.total_len += bl.length(); + + cb::list in; + encode(pp, in); + op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in); + auto r = ioctx.aio_operate(oid, c, &op, lr::OPERATION_RETURNVEC); + ceph_assert(r >= 0); +} + void trim_part(lr::ObjectWriteOperation* op, std::optional tag, std::uint64_t ofs, bool exclusive) @@ -232,6 +254,70 @@ int list_part(lr::IoCtx& ioctx, const std::string& oid, return r; } +struct list_entry_completion : public lr::ObjectOperationCompletion { + CephContext* cct; + int* r_out; + std::vector* entries; + bool* more; + bool* full_part; + std::string* ptag; + std::uint64_t tid; + + list_entry_completion(CephContext* cct, int* r_out, std::vector* entries, + bool* more, bool* full_part, std::string* ptag, + std::uint64_t tid) + : cct(cct), r_out(r_out), entries(entries), more(more), + full_part(full_part), ptag(ptag), tid(tid) {} + virtual ~list_entry_completion() = default; + void handle_completion(int r, bufferlist& bl) override { + if (r >= 0) try { + fifo::op::list_part_reply reply; + auto iter = bl.cbegin(); + decode(reply, iter); + if (entries) *entries = std::move(reply.entries); + if (more) *more = reply.more; + if (full_part) *full_part = reply.full_part; + if (ptag) *ptag = reply.tag; + } catch (const cb::error& err) { + lderr(cct) + << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " decode failed: " << err.what() + << " tid=" << tid << dendl; + r = from_error_code(err.code()); + } else if (r < 0) { + lderr(cct) + << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " fifo::op::LIST_PART failed r=" << r << " tid=" << tid + << dendl; + } + if (r_out) *r_out = r; + } +}; + +lr::ObjectReadOperation list_part(CephContext* cct, + std::optional tag, + std::uint64_t ofs, + std::uint64_t max_entries, + int* r_out, + std::vector* entries, + bool* more, bool* full_part, + std::string* ptag, std::uint64_t tid) +{ + lr::ObjectReadOperation op; + fifo::op::list_part lp; + + lp.tag = tag; + lp.ofs = ofs; + lp.max_entries = max_entries; + + cb::list in; + encode(lp, in); + op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in, + new list_entry_completion(cct, r_out, entries, more, full_part, + ptag, tid)); + return op; +} + int get_part_info(lr::IoCtx& ioctx, const std::string& oid, fifo::part_header* header, std::uint64_t tid, optional_yield y) @@ -264,29 +350,131 @@ int get_part_info(lr::IoCtx& ioctx, const std::string& oid, return r; } -static void complete(lr::AioCompletion* c_, int r) +struct partinfo_completion : public lr::ObjectOperationCompletion { + CephContext* cct; + int* rp; + fifo::part_header* h; + std::uint64_t tid; + partinfo_completion(CephContext* cct, int* rp, fifo::part_header* h, + std::uint64_t tid) : + cct(cct), rp(rp), h(h), tid(tid) { + } + virtual ~partinfo_completion() = default; + void handle_completion(int r, bufferlist& bl) override { + if (r >= 0) try { + fifo::op::get_part_info_reply reply; + auto iter = bl.cbegin(); + decode(reply, iter); + if (h) *h = std::move(reply.header); + } catch (const cb::error& err) { + r = from_error_code(err.code()); + lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " decode failed: " << err.what() + << " tid=" << tid << dendl; + } else { + lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " fifo::op::GET_PART_INFO failed r=" << r << " tid=" << tid + << dendl; + } + if (rp) { + *rp = r; + } + } +}; + +template +struct Completion { +private: + lr::AioCompletion* _cur = nullptr; + lr::AioCompletion* _super; +public: + + using Ptr = std::unique_ptr; + + lr::AioCompletion* cur() const { + return _cur; + } + lr::AioCompletion* super() const { + return _super; + } + + Completion(lr::AioCompletion* super) : _super(super) { + super->pc->get(); + } + + ~Completion() { + if (_super) { + _super->pc->put(); + } + if (_cur) + _cur->release(); + _super = nullptr; + _cur = nullptr; + } + + // The only times that aio_operate can return an error are: + // 1. The completion contains a null pointer. This should just + // crash, and in our case it does. + // 2. An attempt is made to write to a snapshot. RGW doesn't use + // snapshots, so we don't care. + // + // So we will just assert that initiating an Aio operation succeeds + // and not worry about recovering. + static lr::AioCompletion* call(Ptr&& p) { + p->_cur = lr::Rados::aio_create_completion(static_cast(p.get()), + &cb); + auto c = p->_cur; + p.release(); + return c; + } + static void complete(Ptr&& p, int r) { + auto c = p->_super->pc; + p->_super = nullptr; + c->lock.lock(); + c->rval = r; + c->complete = true; + c->lock.unlock(); + + auto cb_complete = c->callback_complete; + auto cb_complete_arg = c->callback_complete_arg; + if (cb_complete) + cb_complete(c, cb_complete_arg); + + auto cb_safe = c->callback_safe; + auto cb_safe_arg = c->callback_safe_arg; + if (cb_safe) + cb_safe(c, cb_safe_arg); + + c->lock.lock(); + c->callback_complete = nullptr; + c->callback_safe = nullptr; + c->cond.notify_all(); + c->put_unlock(); + } + + static void cb(lr::completion_t, void* arg) { + auto t = static_cast(arg); + auto r = t->_cur->get_return_value(); + t->_cur->release(); + t->_cur = nullptr; + t->handle(Ptr(t), r); + } +}; + +lr::ObjectReadOperation get_part_info(CephContext* cct, + fifo::part_header* header, + std::uint64_t tid, int* r = 0) { - auto c = c_->pc; - c->lock.lock(); - c->rval = r; - c->complete = true; - c->lock.unlock(); - - auto cb_complete = c->callback_complete; - auto cb_complete_arg = c->callback_complete_arg; - if (cb_complete) - cb_complete(c, cb_complete_arg); - - auto cb_safe = c->callback_safe; - auto cb_safe_arg = c->callback_safe_arg; - if (cb_safe) - cb_safe(c, cb_safe_arg); - - c->lock.lock(); - c->callback_complete = NULL; - c->callback_safe = NULL; - c->cond.notify_all(); - c->put_unlock(); + lr::ObjectReadOperation op; + fifo::op::get_part_info gpi; + + cb::list in; + cb::list bl; + encode(gpi, in); + op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in, + new partinfo_completion(cct, r, header, tid)); + return op; +} } std::optional FIFO::to_marker(std::string_view s) @@ -385,11 +573,8 @@ int FIFO::_update_meta(const fifo::update& update, return r; } -struct Updater { +struct Updater : public Completion { FIFO* fifo; - lr::AioCompletion* super; - lr::AioCompletion* cur = lr::Rados::aio_create_completion( - static_cast(this), &FIFO::update_callback); fifo::update update; fifo::objv version; bool reread = false; @@ -398,92 +583,74 @@ struct Updater { Updater(FIFO* fifo, lr::AioCompletion* super, const fifo::update& update, fifo::objv version, bool* pcanceled, std::uint64_t tid) - : fifo(fifo), super(super), update(update), version(version), - pcanceled(pcanceled), tid(tid) { - super->pc->get(); - } - ~Updater() { - cur->release(); - } -}; - -void FIFO::update_callback(lr::completion_t, void* arg) -{ - std::unique_ptr updater(static_cast(arg)); - auto cct = updater->fifo->cct; - auto tid = updater->tid; - ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - if (!updater->reread) { - ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " handling async update_meta: tid=" - << tid << dendl; - int r = updater->cur->get_return_value(); + : Completion(super), fifo(fifo), update(update), version(version), + pcanceled(pcanceled) {} + + void handle(Ptr&& p, int r) { + ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " entering: tid=" << tid << dendl; + if (reread) + handle_reread(std::move(p), r); + else + handle_update(std::move(p), r); + } + + void handle_update(Ptr&& p, int r) { + ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " handling async update_meta: tid=" + << tid << dendl; if (r < 0 && r != -ECANCELED) { - lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " update failed: r=" << r << " tid=" << tid << dendl; - complete(updater->super, r); + complete(std::move(p), r); return; } bool canceled = (r == -ECANCELED); if (!canceled) { - int r = updater->fifo->apply_update(&updater->fifo->info, - updater->version, - updater->update, tid); + int r = fifo->apply_update(&fifo->info, version, update, tid); if (r < 0) { - ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " update failed, marking canceled: r=" << r << " tid=" - << tid << dendl; + ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " update failed, marking canceled: r=" << r + << " tid=" << tid << dendl; canceled = true; } } if (canceled) { - updater->cur->release(); - updater->cur = lr::Rados::aio_create_completion( - arg, &FIFO::update_callback); - updater->reread = true; - auto r = updater->fifo->read_meta(tid, updater->cur); - if (r < 0) { - lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " failed dispatching read_meta: r=" << r << " tid=" - << tid << dendl; - complete(updater->super, r); - } else { - updater.release(); - } + reread = true; + fifo->read_meta(tid, call(std::move(p))); return; } - if (updater->pcanceled) - *updater->pcanceled = false; - ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " completing: tid=" << tid << dendl; - complete(updater->super, 0); - return; - } - - ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " handling async read_meta: tid=" - << tid << dendl; - int r = updater->cur->get_return_value(); - if (r < 0 && updater->pcanceled) { - *updater->pcanceled = false; - } else if (r >= 0 && updater->pcanceled) { - *updater->pcanceled = true; - } - if (r < 0) { - lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " failed dispatching read_meta: r=" << r << " tid=" - << tid << dendl; - } else { - ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " completing: tid=" << tid << dendl; + if (pcanceled) + *pcanceled = false; + ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " completing: tid=" << tid << dendl; + complete(std::move(p), 0); + } + + void handle_reread(Ptr&& p, int r) { + ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " handling async read_meta: tid=" + << tid << dendl; + if (r < 0 && pcanceled) { + *pcanceled = false; + } else if (r >= 0 && pcanceled) { + *pcanceled = true; + } + if (r < 0) { + lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " failed dispatching read_meta: r=" << r << " tid=" + << tid << dendl; + } else { + ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " completing: tid=" << tid << dendl; + } + complete(std::move(p), r); } - complete(updater->super, r); -} +}; -int FIFO::_update_meta(const fifo::update& update, - fifo::objv version, bool* pcanceled, - std::uint64_t tid, lr::AioCompletion* c) +void FIFO::_update_meta(const fifo::update& update, + fifo::objv version, bool* pcanceled, + std::uint64_t tid, lr::AioCompletion* c) { ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " entering: tid=" << tid << dendl; @@ -491,15 +658,8 @@ int FIFO::_update_meta(const fifo::update& update, update_meta(&op, info.version, update); auto updater = std::make_unique(this, c, update, version, pcanceled, tid); - auto r = ioctx.aio_operate(oid, updater->cur, &op); - if (r < 0) { - lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " failed dispatching update_meta: r=" << r << " tid=" - << tid << dendl; - } else { - updater.release(); - } - return r; + auto r = ioctx.aio_operate(oid, Updater::call(std::move(updater)), &op); + assert(r >= 0); } int FIFO::create_part(int64_t part_num, std::string_view tag, std::uint64_t tid, @@ -509,7 +669,7 @@ int FIFO::create_part(int64_t part_num, std::string_view tag, std::uint64_t tid, << " entering: tid=" << tid << dendl; lr::ObjectWriteOperation op; op.create(false); /* We don't need exclusivity, part_init ensures - we're creating from the same journal entry. */ + we're creating from the same journal entry. */ std::unique_lock l(m); part_init(&op, tag, info.params); auto oid = info.part_oid(part_num); @@ -806,6 +966,209 @@ int FIFO::_prepare_new_head(std::uint64_t tid, optional_yield y) return 0; } +struct NewPartPreparer : public Completion { + FIFO* f; + std::vector jentries; + int i = 0; + std::int64_t new_head_part_num; + bool canceled = false; + uint64_t tid; + + NewPartPreparer(FIFO* f, lr::AioCompletion* super, + std::vector jentries, + std::int64_t new_head_part_num, + std::uint64_t tid) + : Completion(super), f(f), jentries(std::move(jentries)), + new_head_part_num(new_head_part_num), tid(tid) {} + + void handle(Ptr&& p, int r) { + ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " entering: tid=" << tid << dendl; + if (r < 0) { + lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " _update_meta failed: r=" << r + << " tid=" << tid << dendl; + complete(std::move(p), r); + return; + } + + if (canceled) { + std::unique_lock l(f->m); + auto iter = f->info.journal.find(jentries.front().part_num); + auto max_push_part_num = f->info.max_push_part_num; + auto head_part_num = f->info.head_part_num; + auto version = f->info.version; + auto found = (iter != f->info.journal.end()); + l.unlock(); + if ((max_push_part_num >= jentries.front().part_num && + head_part_num >= new_head_part_num)) { + ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " raced, but journaled and processed: i=" << i + << " tid=" << tid << dendl; + complete(std::move(p), 0); + return; + } + if (i >= MAX_RACE_RETRIES) { + complete(std::move(p), -ECANCELED); + return; + } + if (!found) { + ++i; + f->_update_meta(fifo::update{} + .journal_entries_add(jentries), + version, &canceled, tid, call(std::move(p))); + return; + } else { + ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " raced, journaled but not processed: i=" << i + << " tid=" << tid << dendl; + canceled = false; + } + // Fall through. We still need to process the journal. + } + f->process_journal(tid, super()); + return; + } +}; + +void FIFO::_prepare_new_part(bool is_head, std::uint64_t tid, + lr::AioCompletion* c) +{ + std::unique_lock l(m); + std::vector jentries = { info.next_journal_entry(generate_tag()) }; + if (info.journal.find(jentries.front().part_num) != info.journal.end()) { + l.unlock(); + ldout(cct, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " new part journaled, but not processed: tid=" + << tid << dendl; + process_journal(tid, c); + return; + } + std::int64_t new_head_part_num = info.head_part_num; + auto version = info.version; + + if (is_head) { + auto new_head_jentry = jentries.front(); + new_head_jentry.op = fifo::journal_entry::Op::set_head; + new_head_part_num = jentries.front().part_num; + jentries.push_back(std::move(new_head_jentry)); + } + l.unlock(); + + auto n = std::make_unique(this, c, jentries, + new_head_part_num, tid); + auto np = n.get(); + _update_meta(fifo::update{}.journal_entries_add(jentries), version, + &np->canceled, tid, NewPartPreparer::call(std::move(n))); +} + +struct NewHeadPreparer : public Completion { + FIFO* f; + int i = 0; + bool newpart; + std::int64_t new_head_num; + bool canceled = false; + std::uint64_t tid; + + NewHeadPreparer(FIFO* f, lr::AioCompletion* super, + bool newpart, std::int64_t new_head_num, std::uint64_t tid) + : Completion(super), f(f), newpart(newpart), new_head_num(new_head_num), + tid(tid) {} + + void handle(Ptr&& p, int r) { + if (newpart) + handle_newpart(std::move(p), r); + else + handle_update(std::move(p), r); + } + + void handle_newpart(Ptr&& p, int r) { + if (r < 0) { + lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " _prepare_new_part failed: r=" << r + << " tid=" << tid << dendl; + complete(std::move(p), r); + return; + } + std::unique_lock l(f->m); + if (f->info.max_push_part_num < new_head_num) { + l.unlock(); + lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " _prepare_new_part failed: r=" << r + << " tid=" << tid << dendl; + complete(std::move(p), -EIO); + } else { + l.unlock(); + complete(std::move(p), 0); + } + } + + void handle_update(Ptr&& p, int r) { + std::unique_lock l(f->m); + auto head_part_num = f->info.head_part_num; + auto version = f->info.version; + l.unlock(); + + if (r < 0) { + lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " _update_meta failed: r=" << r + << " tid=" << tid << dendl; + complete(std::move(p), r); + return; + } + if (canceled) { + if (i >= MAX_RACE_RETRIES) { + lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " canceled too many times, giving up: tid=" << tid << dendl; + complete(std::move(p), -ECANCELED); + return; + } + + // Raced, but there's still work to do! + if (head_part_num < new_head_num) { + canceled = false; + ++i; + ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " updating head: i=" << i << " tid=" << tid << dendl; + f->_update_meta(fifo::update{}.head_part_num(new_head_num), + version, &this->canceled, tid, call(std::move(p))); + return; + } + } + ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " succeeded : i=" << i << " tid=" << tid << dendl; + complete(std::move(p), 0); + return; + } +}; + +void FIFO::_prepare_new_head(std::uint64_t tid, lr::AioCompletion* c) +{ + ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " entering: tid=" << tid << dendl; + std::unique_lock l(m); + int64_t new_head_num = info.head_part_num + 1; + auto max_push_part_num = info.max_push_part_num; + auto version = info.version; + l.unlock(); + + if (max_push_part_num < new_head_num) { + ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " need new part: tid=" << tid << dendl; + auto n = std::make_unique(this, c, true, new_head_num, + tid); + _prepare_new_part(true, tid, NewHeadPreparer::call(std::move(n))); + } else { + ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " updating head: tid=" << tid << dendl; + auto n = std::make_unique(this, c, false, new_head_num, + tid); + auto np = n.get(); + _update_meta(fifo::update{}.head_part_num(new_head_num), version, + &np->canceled, tid, NewHeadPreparer::call(std::move(n))); + } +} + int FIFO::push_entries(const std::deque& data_bufs, std::uint64_t tid, optional_yield y) { @@ -825,6 +1188,18 @@ int FIFO::push_entries(const std::deque& data_bufs, return r; } +void FIFO::push_entries(const std::deque& data_bufs, + std::uint64_t tid, lr::AioCompletion* c) +{ + std::unique_lock l(m); + auto head_part_num = info.head_part_num; + auto tag = info.head_tag; + const auto part_oid = info.part_oid(head_part_num); + l.unlock(); + + push_part(ioctx, part_oid, tag, data_bufs, tid, c); +} + int FIFO::trim_part(int64_t part_num, uint64_t ofs, std::optional tag, bool exclusive, std::uint64_t tid, @@ -845,10 +1220,10 @@ int FIFO::trim_part(int64_t part_num, uint64_t ofs, return 0; } -int FIFO::trim_part(int64_t part_num, uint64_t ofs, - std::optional tag, - bool exclusive, std::uint64_t tid, - lr::AioCompletion* c) +void FIFO::trim_part(int64_t part_num, uint64_t ofs, + std::optional tag, + bool exclusive, std::uint64_t tid, + lr::AioCompletion* c) { ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " entering: tid=" << tid << dendl; @@ -858,12 +1233,7 @@ int FIFO::trim_part(int64_t part_num, uint64_t ofs, l.unlock(); rgw::cls::fifo::trim_part(&op, tag, ofs, exclusive); auto r = ioctx.aio_operate(part_oid, c, &op); - if (r < 0) { - lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " failed scheduling trim_part: r=" << r - << " tid=" << tid << dendl; - } - return r; + ceph_assert(r >= 0); } int FIFO::open(lr::IoCtx ioctx, std::string oid, std::unique_ptr* fifo, @@ -960,54 +1330,42 @@ int FIFO::read_meta(optional_yield y) { return read_meta(tid, y); } -struct Reader { +struct Reader : public Completion { FIFO* fifo; cb::list bl; - lr::AioCompletion* super; std::uint64_t tid; - lr::AioCompletion* cur = lr::Rados::aio_create_completion( - static_cast(this), &FIFO::read_callback); Reader(FIFO* fifo, lr::AioCompletion* super, std::uint64_t tid) - : fifo(fifo), super(super), tid(tid) { - super->pc->get(); - } - ~Reader() { - cur->release(); - } -}; + : Completion(super), fifo(fifo), tid(tid) {} -void FIFO::read_callback(lr::completion_t, void* arg) -{ - std::unique_ptr reader(static_cast(arg)); - auto cct = reader->fifo->cct; - auto tid = reader->tid; - ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - auto r = reader->cur->get_return_value(); - if (r >= 0) try { - fifo::op::get_meta_reply reply; - auto iter = reader->bl.cbegin(); - decode(reply, iter); - std::unique_lock l(reader->fifo->m); - if (reply.info.version.same_or_later(reader->fifo->info.version)) { - reader->fifo->info = std::move(reply.info); - reader->fifo->part_header_size = reply.part_header_size; - reader->fifo->part_entry_overhead = reply.part_entry_overhead; - } - } catch (const cb::error& err) { + void handle(Ptr&& p, int r) { + auto cct = fifo->cct; + ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " entering: tid=" << tid << dendl; + if (r >= 0) try { + fifo::op::get_meta_reply reply; + auto iter = bl.cbegin(); + decode(reply, iter); + std::unique_lock l(fifo->m); + if (reply.info.version.same_or_later(fifo->info.version)) { + fifo->info = std::move(reply.info); + fifo->part_header_size = reply.part_header_size; + fifo->part_entry_overhead = reply.part_entry_overhead; + } + } catch (const cb::error& err) { + lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " failed to decode response err=" << err.what() + << " tid=" << tid << dendl; + r = from_error_code(err.code()); + } else { lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " failed to decode response err=" << err.what() + << " read_meta failed r=" << r << " tid=" << tid << dendl; - r = from_error_code(err.code()); - } else { - lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " read_meta failed r=" << r - << " tid=" << tid << dendl; + } + complete(std::move(p), r); } - complete(reader->super, r); -} +}; -int FIFO::read_meta(std::uint64_t tid, lr::AioCompletion* c) +void FIFO::read_meta(std::uint64_t tid, lr::AioCompletion* c) { ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " entering: tid=" << tid << dendl; @@ -1016,16 +1374,10 @@ int FIFO::read_meta(std::uint64_t tid, lr::AioCompletion* c) cb::list in; encode(gm, in); auto reader = std::make_unique(this, c, tid); - auto r = ioctx.aio_exec(oid, reader->cur, fifo::op::CLASS, - fifo::op::GET_META, in, &reader->bl); - if (r < 0) { - lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " failed scheduling read_meta r=" << r - << " tid=" << tid << dendl; - } else { - reader.release(); - } - return r; + auto rp = reader.get(); + auto r = ioctx.aio_exec(oid, Reader::call(std::move(reader)), fifo::op::CLASS, + fifo::op::GET_META, in, &rp->bl); + assert(r >= 0); } const fifo::info& FIFO::meta() const { @@ -1040,6 +1392,10 @@ int FIFO::push(const cb::list& bl, optional_yield y) { return push(std::vector{ bl }, y); } +void FIFO::push(const cb::list& bl, lr::AioCompletion* c) { + push(std::vector{ bl }, c); +} + int FIFO::push(const std::vector& data_bufs, optional_yield y) { std::unique_lock l(m); @@ -1153,24 +1509,185 @@ int FIFO::push(const std::vector& data_bufs, optional_yield y) return 0; } -int FIFO::list(int max_entries, - std::optional markstr, - std::vector* presult, bool* pmore, - optional_yield y) -{ - std::unique_lock l(m); - auto tid = ++next_tid; - std::int64_t part_num = info.tail_part_num; - l.unlock(); - ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - std::uint64_t ofs = 0; - if (markstr) { - auto marker = to_marker(*markstr); - if (!marker) { - lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " invalid marker string: " << markstr - << " tid= "<< tid << dendl; +struct Pusher : public Completion { + FIFO* f; + std::deque remaining; + std::deque batch; + int i = 0; + std::uint64_t tid; + bool new_heading = false; + + void prep_then_push(Ptr&& p, const unsigned successes) { + std::unique_lock l(f->m); + auto max_part_size = f->info.params.max_part_size; + auto part_entry_overhead = f->part_entry_overhead; + l.unlock(); + + ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " preparing push: remaining=" << remaining.size() + << " batch=" << batch.size() << " i=" << i + << " tid=" << tid << dendl; + + uint64_t batch_len = 0; + if (successes > 0) { + if (successes == batch.size()) { + batch.clear(); + } else { + batch.erase(batch.begin(), batch.begin() + successes); + for (const auto& b : batch) { + batch_len += b.length() + part_entry_overhead; + } + } + } + + if (batch.empty() && remaining.empty()) { + complete(std::move(p), 0); + return; + } + + while (!remaining.empty() && + (remaining.front().length() + batch_len <= max_part_size)) { + + /* We can send entries with data_len up to max_entry_size, + however, we want to also account the overhead when + dealing with multiple entries. Previous check doesn't + account for overhead on purpose. */ + batch_len += remaining.front().length() + part_entry_overhead; + batch.push_back(std::move(remaining.front())); + remaining.pop_front(); + } + ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " prepared push: remaining=" << remaining.size() + << " batch=" << batch.size() << " i=" << i + << " batch_len=" << batch_len + << " tid=" << tid << dendl; + push(std::move(p)); + } + + void push(Ptr&& p) { + f->push_entries(batch, tid, call(std::move(p))); + } + + void new_head(Ptr&& p) { + new_heading = true; + f->_prepare_new_head(tid, call(std::move(p))); + } + + void handle(Ptr&& p, int r) { + if (!new_heading) { + if (r == -ERANGE) { + ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " need new head tid=" << tid << dendl; + new_head(std::move(p)); + return; + } + if (r < 0) { + lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " push_entries failed: r=" << r + << " tid=" << tid << dendl; + complete(std::move(p), r); + return; + } + i = 0; // We've made forward progress, so reset the race counter! + prep_then_push(std::move(p), r); + } else { + if (r < 0) { + lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " prepare_new_head failed: r=" << r + << " tid=" << tid << dendl; + complete(std::move(p), r); + return; + } + new_heading = false; + handle_new_head(std::move(p), r); + } + } + + void handle_new_head(Ptr&& p, int r) { + if (r == -ECANCELED) { + if (p->i == MAX_RACE_RETRIES) { + lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " canceled too many times, giving up: tid=" << tid << dendl; + complete(std::move(p), -ECANCELED); + return; + } + ++p->i; + } else if (r) { + complete(std::move(p), r); + return; + } + + if (p->batch.empty()) { + prep_then_push(std::move(p), 0); + return; + } else { + push(std::move(p)); + return; + } + } + + Pusher(FIFO* f, std::deque&& remaining, + std::uint64_t tid, lr::AioCompletion* super) + : Completion(super), f(f), remaining(std::move(remaining)), + tid(tid) {} +}; + +void FIFO::push(const std::vector& data_bufs, + lr::AioCompletion* c) +{ + std::unique_lock l(m); + auto tid = ++next_tid; + auto max_entry_size = info.params.max_entry_size; + auto need_new_head = info.need_new_head(); + l.unlock(); + ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " entering: tid=" << tid << dendl; + auto p = std::make_unique(this, std::deque(data_bufs.begin(), data_bufs.end()), + tid, c); + // Validate sizes + for (const auto& bl : data_bufs) { + if (bl.length() > max_entry_size) { + lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " entry bigger than max_entry_size tid=" << tid << dendl; + Pusher::complete(std::move(p), -E2BIG); + return; + } + } + + if (data_bufs.empty() ) { + ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " empty push, returning success tid=" << tid << dendl; + Pusher::complete(std::move(p), 0); + return; + } + + if (need_new_head) { + ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " need new head tid=" << tid << dendl; + p->new_head(std::move(p)); + } else { + p->prep_then_push(std::move(p), 0); + } +} + +int FIFO::list(int max_entries, + std::optional markstr, + std::vector* presult, bool* pmore, + optional_yield y) +{ + std::unique_lock l(m); + auto tid = ++next_tid; + std::int64_t part_num = info.tail_part_num; + l.unlock(); + ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " entering: tid=" << tid << dendl; + std::uint64_t ofs = 0; + if (markstr) { + auto marker = to_marker(*markstr); + if (!marker) { + lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " invalid marker string: " << markstr + << " tid= "<< tid << dendl; return -EINVAL; } part_num = marker->num; @@ -1340,157 +1857,116 @@ int FIFO::trim(std::string_view markstr, bool exclusive, optional_yield y) return 0; } -struct Trimmer { +struct Trimmer : public Completion { FIFO* fifo; std::int64_t part_num; std::uint64_t ofs; std::int64_t pn; bool exclusive; - lr::AioCompletion* super; std::uint64_t tid; - lr::AioCompletion* cur = lr::Rados::aio_create_completion( - static_cast(this), &FIFO::trim_callback); bool update = false; bool canceled = false; int retries = 0; Trimmer(FIFO* fifo, std::int64_t part_num, std::uint64_t ofs, std::int64_t pn, bool exclusive, lr::AioCompletion* super, std::uint64_t tid) - : fifo(fifo), part_num(part_num), ofs(ofs), pn(pn), exclusive(exclusive), - super(super), tid(tid) { - super->pc->get(); - } - ~Trimmer() { - cur->release(); - } -}; + : Completion(super), fifo(fifo), part_num(part_num), ofs(ofs), pn(pn), + exclusive(exclusive), tid(tid) {} -void FIFO::trim_callback(lr::completion_t, void* arg) -{ - std::unique_ptr trimmer(static_cast(arg)); - auto cct = trimmer->fifo->cct; - auto tid = trimmer->tid; - ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " entering: tid=" << tid << dendl; - int r = trimmer->cur->get_return_value(); - if (r == -ENOENT) { - r = 0; - } - - if (r < 0) { - lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " trim failed: r=" << r << " tid=" << tid << dendl; - complete(trimmer->super, r); - return; - } - - if (!trimmer->update) { + void handle(Ptr&& p, int r) { + auto cct = fifo->cct; ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " handling preceding trim callback: tid=" << tid << dendl; - trimmer->retries = 0; - if (trimmer->pn < trimmer->part_num) { - std::unique_lock l(trimmer->fifo->m); - const auto max_part_size = trimmer->fifo->info.params.max_part_size; - l.unlock(); - trimmer->cur->release(); - trimmer->cur = lr::Rados::aio_create_completion(arg, &FIFO::trim_callback); - r = trimmer->fifo->trim_part(trimmer->pn++, max_part_size, std::nullopt, - false, tid, trimmer->cur); - if (r < 0) { - lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " trim failed: r=" << r << " tid=" << tid << dendl; - complete(trimmer->super, r); - } else { - trimmer.release(); - } - return; + << " entering: tid=" << tid << dendl; + if (r == -ENOENT) { + r = 0; } - std::unique_lock l(trimmer->fifo->m); - const auto tail_part_num = trimmer->fifo->info.tail_part_num; - l.unlock(); - trimmer->cur->release(); - trimmer->cur = lr::Rados::aio_create_completion(arg, &FIFO::trim_callback); - trimmer->update = true; - trimmer->canceled = tail_part_num < trimmer->part_num; - r = trimmer->fifo->trim_part(trimmer->part_num, trimmer->ofs, - std::nullopt, trimmer->exclusive, tid, trimmer->cur); if (r < 0) { lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " failed scheduling trim: r=" << r << " tid=" << tid << dendl; - complete(trimmer->super, r); - } else { - trimmer.release(); + << (update ? " update_meta " : " trim ") << "failed: r=" + << r << " tid=" << tid << dendl; + complete(std::move(p), r); + return; } - return; - } - ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " handling update-needed callback: tid=" << tid << dendl; - std::unique_lock l(trimmer->fifo->m); - auto tail_part_num = trimmer->fifo->info.tail_part_num; - auto objv = trimmer->fifo->info.version; - l.unlock(); - if ((tail_part_num < trimmer->part_num) && - trimmer->canceled) { - if (trimmer->retries > MAX_RACE_RETRIES) { - lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " canceled too many times, giving up: tid=" << tid << dendl; - complete(trimmer->super, -EIO); + if (!update) { + ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " handling preceding trim callback: tid=" << tid << dendl; + retries = 0; + if (pn < part_num) { + ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " pn=" << pn << " tid=" << tid << dendl; + std::unique_lock l(fifo->m); + const auto max_part_size = fifo->info.params.max_part_size; + l.unlock(); + fifo->trim_part(pn++, max_part_size, std::nullopt, + false, tid, call(std::move(p))); + return; + } + + std::unique_lock l(fifo->m); + const auto tail_part_num = fifo->info.tail_part_num; + l.unlock(); + update = true; + canceled = tail_part_num < part_num; + fifo->trim_part(part_num, ofs, std::nullopt, exclusive, tid, + call(std::move(p))); return; } - trimmer->cur->release(); - trimmer->cur = lr::Rados::aio_create_completion(arg, - &FIFO::trim_callback); - ++trimmer->retries; - r = trimmer->fifo->_update_meta(fifo::update{} - .tail_part_num(trimmer->part_num), - objv, &trimmer->canceled, - tid, trimmer->cur); - if (r < 0) { - lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " failed scheduling _update_meta: r=" - << r << " tid=" << tid << dendl; - complete(trimmer->super, r); + + ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " handling update-needed callback: tid=" << tid << dendl; + std::unique_lock l(fifo->m); + auto tail_part_num = fifo->info.tail_part_num; + auto objv = fifo->info.version; + l.unlock(); + if ((tail_part_num < part_num) && + canceled) { + if (retries > MAX_RACE_RETRIES) { + lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " canceled too many times, giving up: tid=" << tid << dendl; + complete(std::move(p), -EIO); + return; + } + ++retries; + fifo->_update_meta(fifo::update{} + .tail_part_num(part_num), objv, &canceled, + tid, call(std::move(p))); } else { - trimmer.release(); + complete(std::move(p), 0); } - } else { - complete(trimmer->super, 0); } -} +}; -int FIFO::trim(std::string_view markstr, bool exclusive, lr::AioCompletion* c) { +void FIFO::trim(std::string_view markstr, bool exclusive, + lr::AioCompletion* c) { auto marker = to_marker(markstr); - if (!marker) { - return -EINVAL; - } + auto realmark = marker.value_or(::rgw::cls::fifo::marker{}); std::unique_lock l(m); const auto max_part_size = info.params.max_part_size; const auto pn = info.tail_part_num; const auto part_oid = info.part_oid(pn); auto tid = ++next_tid; l.unlock(); - auto trimmer = std::make_unique(this, marker->num, marker->ofs, pn, exclusive, c, - tid); + ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " entering: tid=" << tid << dendl; + auto trimmer = std::make_unique(this, realmark.num, realmark.ofs, + pn, exclusive, c, tid); + if (!marker) { + Trimmer::complete(std::move(trimmer), -EINVAL); + return; + } ++trimmer->pn; auto ofs = marker->ofs; if (pn < marker->num) { + ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " pn=" << pn << " tid=" << tid << dendl; ofs = max_part_size; } else { trimmer->update = true; } - auto r = trim_part(pn, ofs, std::nullopt, exclusive, - tid, trimmer->cur); - if (r < 0) { - lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ - << " failed scheduling trim_part: r=" - << r << " tid=" << tid << dendl; - complete(trimmer->super, r); - } else { - trimmer.release(); - } - return r; + trim_part(pn, ofs, std::nullopt, exclusive, + tid, Trimmer::call(std::move(trimmer))); } int FIFO::get_part_info(int64_t part_num, @@ -1509,4 +1985,521 @@ int FIFO::get_part_info(int64_t part_num, } return r; } + +void FIFO::get_part_info(int64_t part_num, + fifo::part_header* header, + lr::AioCompletion* c) +{ + std::unique_lock l(m); + const auto part_oid = info.part_oid(part_num); + auto tid = ++next_tid; + l.unlock(); + auto op = rgw::cls::fifo::get_part_info(cct, header, tid); + auto r = ioctx.aio_operate(part_oid, c, &op, nullptr); + ceph_assert(r >= 0); +} + +struct InfoGetter : Completion { + FIFO* fifo; + fifo::part_header header; + fu2::function f; + std::uint64_t tid; + bool headerread = false; + + InfoGetter(FIFO* fifo, fu2::function f, + std::uint64_t tid, lr::AioCompletion* super) + : Completion(super), fifo(fifo), f(std::move(f)), tid(tid) {} + void handle(Ptr&& p, int r) { + if (!headerread) { + if (r < 0) { + lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " read_meta failed: r=" + << r << " tid=" << tid << dendl; + if (f) + f(r, {}); + complete(std::move(p), r); + return; + } + + auto info = fifo->meta(); + auto hpn = info.head_part_num; + if (hpn < 0) { + ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " no head, returning empty partinfo r=" + << r << " tid=" << tid << dendl; + if (f) + f(0, {}); + complete(std::move(p), r); + return; + } + headerread = true; + auto op = rgw::cls::fifo::get_part_info(fifo->cct, &header, tid); + std::unique_lock l(fifo->m); + auto oid = fifo->info.part_oid(hpn); + l.unlock(); + r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op, + nullptr); + ceph_assert(r >= 0); + return; + } + + if (r < 0) { + lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " get_part_info failed: r=" + << r << " tid=" << tid << dendl; + } + + if (f) + f(r, std::move(header)); + complete(std::move(p), r); + return; + } +}; + +void FIFO::get_head_info(fu2::unique_function f, + lr::AioCompletion* c) +{ + std::unique_lock l(m); + auto tid = ++next_tid; + l.unlock(); + auto ig = std::make_unique(this, std::move(f), tid, c); + read_meta(tid, InfoGetter::call(std::move(ig))); +} + +struct JournalProcessor : public Completion { +private: + FIFO* const fifo; + + std::vector processed; + std::multimap journal; + std::multimap::iterator iter; + std::int64_t new_tail; + std::int64_t new_head; + std::int64_t new_max; + int race_retries = 0; + bool first_pp = true; + bool canceled = false; + std::uint64_t tid; + + enum { + entry_callback, + pp_callback, + } state; + + void create_part(Ptr&& p, int64_t part_num, + std::string_view tag) { + ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " entering: tid=" << tid << dendl; + state = entry_callback; + lr::ObjectWriteOperation op; + op.create(false); /* We don't need exclusivity, part_init ensures + we're creating from the same journal entry. */ + std::unique_lock l(fifo->m); + part_init(&op, tag, fifo->info.params); + auto oid = fifo->info.part_oid(part_num); + l.unlock(); + auto r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op); + ceph_assert(r >= 0); + return; + } + + void remove_part(Ptr&& p, int64_t part_num, + std::string_view tag) { + ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " entering: tid=" << tid << dendl; + state = entry_callback; + lr::ObjectWriteOperation op; + op.remove(); + std::unique_lock l(fifo->m); + auto oid = fifo->info.part_oid(part_num); + l.unlock(); + auto r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op); + ceph_assert(r >= 0); + return; + } + + void finish_je(Ptr&& p, int r, + const fifo::journal_entry& entry) { + ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " entering: tid=" << tid << dendl; + + ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " finishing entry: entry=" << entry + << " tid=" << tid << dendl; + + if (entry.op == fifo::journal_entry::Op::remove && r == -ENOENT) + r = 0; + + if (r < 0) { + lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " processing entry failed: entry=" << entry + << " r=" << r << " tid=" << tid << dendl; + complete(std::move(p), r); + return; + } else { + switch (entry.op) { + case fifo::journal_entry::Op::unknown: + case fifo::journal_entry::Op::set_head: + // Can't happen. Filtered out in process. + complete(std::move(p), -EIO); + return; + + case fifo::journal_entry::Op::create: + if (entry.part_num > new_max) { + new_max = entry.part_num; + } + break; + case fifo::journal_entry::Op::remove: + if (entry.part_num >= new_tail) { + new_tail = entry.part_num + 1; + } + break; + } + processed.push_back(entry); + } + ++iter; + process(std::move(p)); + } + + void postprocess(Ptr&& p) { + ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " entering: tid=" << tid << dendl; + if (processed.empty()) { + ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " nothing to update any more: race_retries=" + << race_retries << " tid=" << tid << dendl; + complete(std::move(p), 0); + return; + } + pp_run(std::move(p), 0, false); + } + +public: + + JournalProcessor(FIFO* fifo, std::uint64_t tid, lr::AioCompletion* super) + : Completion(super), fifo(fifo), tid(tid) { + std::unique_lock l(fifo->m); + journal = fifo->info.journal; + iter = journal.begin(); + new_tail = fifo->info.tail_part_num; + new_head = fifo->info.head_part_num; + new_max = fifo->info.max_push_part_num; + } + + void pp_run(Ptr&& p, int r, bool canceled) { + ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " entering: tid=" << tid << dendl; + std::optional tail_part_num; + std::optional head_part_num; + std::optional max_part_num; + + if (r < 0) { + lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " failed, r=: " << r << " tid=" << tid << dendl; + complete(std::move(p), r); + } + + + ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " postprocessing: race_retries=" + << race_retries << " tid=" << tid << dendl; + + if (!first_pp && r == 0 && !canceled) { + ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " nothing to update any more: race_retries=" + << race_retries << " tid=" << tid << dendl; + complete(std::move(p), 0); + return; + } + + first_pp = false; + + if (canceled) { + if (race_retries >= MAX_RACE_RETRIES) { + lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " canceled too many times, giving up: tid=" + << tid << dendl; + complete(std::move(p), -ECANCELED); + return; + } + ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " update canceled, retrying: race_retries=" + << race_retries << " tid=" << tid << dendl; + + ++race_retries; + + std::vector new_processed; + std::unique_lock l(fifo->m); + for (auto& e : processed) { + auto jiter = fifo->info.journal.find(e.part_num); + /* journal entry was already processed */ + if (jiter == fifo->info.journal.end() || + !(jiter->second == e)) { + continue; + } + new_processed.push_back(e); + } + processed = std::move(new_processed); + } + + std::unique_lock l(fifo->m); + auto objv = fifo->info.version; + if (new_tail > fifo->info.tail_part_num) { + tail_part_num = new_tail; + } + + if (new_head > fifo->info.head_part_num) { + head_part_num = new_head; + } + + if (new_max > fifo->info.max_push_part_num) { + max_part_num = new_max; + } + l.unlock(); + + if (processed.empty() && + !tail_part_num && + !max_part_num) { + /* nothing to update anymore */ + ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " nothing to update any more: race_retries=" + << race_retries << " tid=" << tid << dendl; + complete(std::move(p), 0); + return; + } + state = pp_callback; + fifo->_update_meta(fifo::update{} + .tail_part_num(tail_part_num) + .head_part_num(head_part_num) + .max_push_part_num(max_part_num) + .journal_entries_rm(processed), + objv, &this->canceled, tid, call(std::move(p))); + return; + } + + JournalProcessor(const JournalProcessor&) = delete; + JournalProcessor& operator =(const JournalProcessor&) = delete; + JournalProcessor(JournalProcessor&&) = delete; + JournalProcessor& operator =(JournalProcessor&&) = delete; + + void process(Ptr&& p) { + ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " entering: tid=" << tid << dendl; + while (iter != journal.end()) { + ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " processing entry: entry=" << *iter + << " tid=" << tid << dendl; + const auto entry = iter->second; + switch (entry.op) { + case fifo::journal_entry::Op::create: + create_part(std::move(p), entry.part_num, entry.part_tag); + return; + case fifo::journal_entry::Op::set_head: + if (entry.part_num > new_head) { + new_head = entry.part_num; + } + processed.push_back(entry); + ++iter; + continue; + case fifo::journal_entry::Op::remove: + remove_part(std::move(p), entry.part_num, entry.part_tag); + return; + default: + lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " unknown journaled op: entry=" << entry << " tid=" + << tid << dendl; + complete(std::move(p), -EIO); + return; + } + } + postprocess(std::move(p)); + return; + } + + void handle(Ptr&& p, int r) { + ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " entering: tid=" << tid << dendl; + switch (state) { + case entry_callback: + finish_je(std::move(p), r, iter->second); + return; + case pp_callback: + auto c = canceled; + canceled = false; + pp_run(std::move(p), r, c); + return; + } + + abort(); + } + +}; + +void FIFO::process_journal(std::uint64_t tid, lr::AioCompletion* c) { + auto p = std::make_unique(this, tid, c); + p->process(std::move(p)); +} + +struct Lister : Completion { + FIFO* f; + std::vector result; + bool more = false; + std::int64_t part_num; + std::uint64_t ofs; + int max_entries; + int r_out = 0; + std::vector entries; + bool part_more = false; + bool part_full = false; + std::vector* entries_out; + bool* more_out; + std::uint64_t tid; + + bool read = false; + + void complete(Ptr&& p, int r) { + if (r >= 0) { + if (more_out) *more_out = more; + if (entries_out) *entries_out = std::move(result); + } + Completion::complete(std::move(p), r); + } + +public: + Lister(FIFO* f, std::int64_t part_num, std::uint64_t ofs, int max_entries, + std::vector* entries_out, bool* more_out, + std::uint64_t tid, lr::AioCompletion* super) + : Completion(super), f(f), part_num(part_num), ofs(ofs), max_entries(max_entries), + entries_out(entries_out), more_out(more_out), tid(tid) { + result.reserve(max_entries); + } + + Lister(const Lister&) = delete; + Lister& operator =(const Lister&) = delete; + Lister(Lister&&) = delete; + Lister& operator =(Lister&&) = delete; + + void handle(Ptr&& p, int r) { + if (read) + handle_read(std::move(p), r); + else + handle_list(std::move(p), r); + } + + void list(Ptr&& p) { + if (max_entries > 0) { + part_more = false; + part_full = false; + entries.clear(); + + std::unique_lock l(f->m); + auto part_oid = f->info.part_oid(part_num); + l.unlock(); + + read = false; + auto op = list_part(f->cct, {}, ofs, max_entries, &r_out, + &entries, &part_more, &part_full, + nullptr, tid); + f->ioctx.aio_operate(part_oid, call(std::move(p)), &op, nullptr); + } else { + complete(std::move(p), 0); + } + } + + void handle_read(Ptr&& p, int r) { + read = false; + if (r >= 0) r = r_out; + r_out = 0; + + if (r < 0) { + complete(std::move(p), r); + return; + } + + if (part_num < f->info.tail_part_num) { + /* raced with trim? restart */ + max_entries += result.size(); + result.clear(); + part_num = f->info.tail_part_num; + ofs = 0; + list(std::move(p)); + return; + } + /* assuming part was not written yet, so end of data */ + more = false; + complete(std::move(p), 0); + return; + } + + void handle_list(Ptr&& p, int r) { + if (r >= 0) r = r_out; + r_out = 0; + std::unique_lock l(f->m); + auto part_oid = f->info.part_oid(part_num); + l.unlock(); + if (r == -ENOENT) { + read = true; + f->read_meta(tid, call(std::move(p))); + return; + } + if (r < 0) { + complete(std::move(p), r); + return; + } + + more = part_full || part_more; + for (auto& entry : entries) { + list_entry e; + e.data = std::move(entry.data); + e.marker = marker{part_num, entry.ofs}.to_string(); + e.mtime = entry.mtime; + result.push_back(std::move(e)); + } + max_entries -= entries.size(); + entries.clear(); + if (max_entries > 0 && part_more) { + list(std::move(p)); + return; + } + + if (!part_full) { /* head part is not full */ + complete(std::move(p), 0); + return; + } + ++part_num; + ofs = 0; + list(std::move(p)); + } +}; + +void FIFO::list(int max_entries, + std::optional markstr, + std::vector* out, + bool* more, + lr::AioCompletion* c) { + std::unique_lock l(m); + auto tid = ++next_tid; + std::int64_t part_num = info.tail_part_num; + l.unlock(); + std::uint64_t ofs = 0; + std::optional<::rgw::cls::fifo::marker> marker; + + if (markstr) { + marker = to_marker(*markstr); + if (marker) { + part_num = marker->num; + ofs = marker->ofs; + } + } + + auto ls = std::make_unique(this, part_num, ofs, max_entries, out, + more, tid, c); + if (markstr && !marker) { + auto l = ls.get(); + l->complete(std::move(ls), -EINVAL); + } else { + ls->list(std::move(ls)); + } +} } diff --git a/src/rgw/cls_fifo_legacy.h b/src/rgw/cls_fifo_legacy.h index 1f8d3f3fc95d8..b6b5f04bb30ad 100644 --- a/src/rgw/cls_fifo_legacy.h +++ b/src/rgw/cls_fifo_legacy.h @@ -31,6 +31,7 @@ #include "include/rados/librados.hpp" #include "include/buffer.h" +#include "include/function2.hpp" #include "common/async/yield_context.h" @@ -57,24 +58,6 @@ int get_meta(lr::IoCtx& ioctx, const std::string& oid, std::uint32_t* part_entry_overhead, std::uint64_t tid, optional_yield y, bool probe = false); -void update_meta(lr::ObjectWriteOperation* op, const fifo::objv& objv, - const fifo::update& update); -void part_init(lr::ObjectWriteOperation* op, std::string_view tag, - fifo::data_params params); -int push_part(lr::IoCtx& ioctx, const std::string& oid, std::string_view tag, - std::deque data_bufs, std::uint64_t tid, optional_yield y); -void trim_part(lr::ObjectWriteOperation* op, - std::optional tag, std::uint64_t ofs, - bool exclusive); -int list_part(lr::IoCtx& ioctx, const std::string& oid, - std::optional tag, std::uint64_t ofs, - std::uint64_t max_entries, - std::vector* entries, - bool* more, bool* full_part, std::string* ptag, - std::uint64_t tid, optional_yield y); -int get_part_info(lr::IoCtx& ioctx, const std::string& oid, - fifo::part_header* header, std::uint64_t, - optional_yield y); struct marker { std::int64_t num = 0; @@ -117,6 +100,12 @@ class FIFO { friend struct Reader; friend struct Updater; friend struct Trimmer; + friend struct InfoGetter; + friend struct Pusher; + friend struct NewPartPreparer; + friend struct NewHeadPreparer; + friend struct JournalProcessor; + friend struct Lister; mutable lr::IoCtx ioctx; CephContext* cct = static_cast(ioctx.cct()); @@ -144,32 +133,34 @@ class FIFO { int _update_meta(const fifo::update& update, fifo::objv version, bool* pcanceled, std::uint64_t tid, optional_yield y); - int _update_meta(const fifo::update& update, - fifo::objv version, bool* pcanceled, - std::uint64_t tid, lr::AioCompletion* c); + void _update_meta(const fifo::update& update, + fifo::objv version, bool* pcanceled, + std::uint64_t tid, lr::AioCompletion* c); int create_part(int64_t part_num, std::string_view tag, std::uint64_t tid, optional_yield y); int remove_part(int64_t part_num, std::string_view tag, std::uint64_t tid, optional_yield y); int process_journal(std::uint64_t tid, optional_yield y); + void process_journal(std::uint64_t tid, lr::AioCompletion* c); int _prepare_new_part(bool is_head, std::uint64_t tid, optional_yield y); + void _prepare_new_part(bool is_head, std::uint64_t tid, lr::AioCompletion* c); int _prepare_new_head(std::uint64_t tid, optional_yield y); + void _prepare_new_head(std::uint64_t tid, lr::AioCompletion* c); int push_entries(const std::deque& data_bufs, std::uint64_t tid, optional_yield y); + void push_entries(const std::deque& data_bufs, + std::uint64_t tid, lr::AioCompletion* c); int trim_part(int64_t part_num, uint64_t ofs, std::optional tag, bool exclusive, std::uint64_t tid, optional_yield y); - int trim_part(int64_t part_num, uint64_t ofs, - std::optional tag, bool exclusive, - std::uint64_t tid, lr::AioCompletion* c); + void trim_part(int64_t part_num, uint64_t ofs, + std::optional tag, bool exclusive, + std::uint64_t tid, lr::AioCompletion* c); - static void trim_callback(lr::completion_t, void* arg); - static void update_callback(lr::completion_t, void* arg); - static void read_callback(lr::completion_t, void* arg); /// Force refresh of metadata, yielding/blocking style int read_meta(std::uint64_t tid, optional_yield y); /// Force refresh of metadata, with a librados Completion - int read_meta(std::uint64_t tid, lr::AioCompletion* c); + void read_meta(std::uint64_t tid, lr::AioCompletion* c); public: @@ -215,12 +206,20 @@ class FIFO { int push(const cb::list& bl, //< Entry to push optional_yield y //< Optional yield ); - /// Push entres to the FIFO + /// Push an entry to the FIFO + void push(const cb::list& bl, //< Entry to push + lr::AioCompletion* c //< Async Completion + ); + /// Push entries to the FIFO int push(const std::vector& data_bufs, //< Entries to push - /// Optional yield - optional_yield y); + optional_yield y //< Optional yield + ); + /// Push entries to the FIFO + void push(const std::vector& data_bufs, //< Entries to push + lr::AioCompletion* c //< Async Completion + ); /// List entries - int list(int max_entries, /// Maximum entries to list + int list(int max_entries, //< Maximum entries to list /// Point after which to begin listing. Start at tail if null std::optional markstr, std::vector* out, //< OUT: entries @@ -228,6 +227,14 @@ class FIFO { bool* more, optional_yield y //< Optional yield ); + void list(int max_entries, //< Maximum entries to list + /// Point after which to begin listing. Start at tail if null + std::optional markstr, + std::vector* out, //< OUT: entries + /// OUT: True if more entries in FIFO beyond the last returned + bool* more, + lr::AioCompletion* c //< Async Completion + ); /// Trim entries, coroutine/block style int trim(std::string_view markstr, //< Position to which to trim, inclusive bool exclusive, //< If true, do not trim the target entry @@ -235,16 +242,28 @@ class FIFO { optional_yield y //< Optional yield ); /// Trim entries, librados AioCompletion style - int trim(std::string_view markstr, //< Position to which to trim, inclusive - bool exclusive, //< If true, do not trim the target entry - //< itself, just all those before it. - lr::AioCompletion* c //< librados AIO Completion + void trim(std::string_view markstr, //< Position to which to trim, inclusive + bool exclusive, //< If true, do not trim the target entry + //< itself, just all those before it. + lr::AioCompletion* c //< librados AIO Completion ); /// Get part info int get_part_info(int64_t part_num, /// Part number fifo::part_header* header, //< OUT: Information optional_yield y //< Optional yield ); + /// Get part info + void get_part_info(int64_t part_num, //< Part number + fifo::part_header* header, //< OUT: Information + lr::AioCompletion* c //< AIO Completion + ); + /// A convenience method to fetch the part information for the FIFO + /// head, using librados::AioCompletion, since + /// libradio::AioCompletions compose lousily. + void get_head_info(fu2::unique_function< //< Function to receive info + void(int r, fifo::part_header&&)>, + lr::AioCompletion* c //< AIO Completion + ); }; } diff --git a/src/rgw/rgw_datalog.cc b/src/rgw/rgw_datalog.cc index a875d075ecade..8142b26e01a8b 100644 --- a/src/rgw/rgw_datalog.cc +++ b/src/rgw/rgw_datalog.cc @@ -469,12 +469,7 @@ class RGWDataChangesFIFO final : public RGWDataChangesBE { pc->cond.notify_all(); pc->put_unlock(); } else { - r = fifos[index]->trim(marker, false, c); - if (r < 0) { - lderr(cct) << __PRETTY_FUNCTION__ - << ": unable to trim FIFO: " << get_oid(index) - << ": " << cpp_strerror(-r) << dendl; - } + fifos[index]->trim(marker, false, c); } return r; } diff --git a/src/test/rgw/test_cls_fifo_legacy.cc b/src/test/rgw/test_cls_fifo_legacy.cc index dae4980f8dca4..69cee5a887405 100644 --- a/src/test/rgw/test_cls_fifo_legacy.cc +++ b/src/test/rgw/test_cls_fifo_legacy.cc @@ -69,6 +69,8 @@ class LegacyFIFO : public testing::Test { }; using LegacyClsFIFO = LegacyFIFO; +using AioLegacyFIFO = LegacyFIFO; + TEST_F(LegacyClsFIFO, TestCreate) { @@ -577,8 +579,7 @@ TEST_F(LegacyFIFO, TestAioTrim) marker = result.front().marker; std::unique_ptr c(rados.aio_create_completion(nullptr, nullptr)); - r = f->trim(*marker, false, c.get()); - ASSERT_EQ(0, r); + f->trim(*marker, false, c.get()); c->wait_for_complete(); r = c->get_return_value(); ASSERT_EQ(0, r); @@ -645,3 +646,482 @@ TEST_F(LegacyFIFO, TestTrimExclusive) { ASSERT_EQ(result.size(), 1); ASSERT_EQ(max_entries - 1, val); } + +TEST_F(AioLegacyFIFO, TestPushListTrim) +{ + std::unique_ptr f; + auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield); + ASSERT_EQ(0, r); + static constexpr auto max_entries = 10u; + for (uint32_t i = 0; i < max_entries; ++i) { + cb::list bl; + encode(i, bl); + auto c = R::Rados::aio_create_completion(); + f->push(bl, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); + } + + std::optional marker; + /* get entries one by one */ + std::vector result; + bool more = false; + for (auto i = 0u; i < max_entries; ++i) { + auto c = R::Rados::aio_create_completion(); + f->list(1, marker, &result, &more, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); + + bool expected_more = (i != (max_entries - 1)); + ASSERT_EQ(expected_more, more); + ASSERT_EQ(1, result.size()); + + std::uint32_t val; + std::tie(val, marker) = decode_entry(result.front()); + + ASSERT_EQ(i, val); + result.clear(); + } + + /* get all entries at once */ + std::string markers[max_entries]; + std::uint32_t min_entry = 0; + auto c = R::Rados::aio_create_completion(); + f->list(max_entries * 10, std::nullopt, &result, &more, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); + + ASSERT_FALSE(more); + ASSERT_EQ(max_entries, result.size()); + for (auto i = 0u; i < max_entries; ++i) { + std::uint32_t val; + std::tie(val, markers[i]) = decode_entry(result[i]); + ASSERT_EQ(i, val); + } + + /* trim one entry */ + c = R::Rados::aio_create_completion(); + f->trim(markers[min_entry], false, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); + ++min_entry; + + c = R::Rados::aio_create_completion(); + f->list(max_entries * 10, std::nullopt, &result, &more, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); + ASSERT_FALSE(more); + ASSERT_EQ(max_entries - min_entry, result.size()); + + for (auto i = min_entry; i < max_entries; ++i) { + std::uint32_t val; + std::tie(val, markers[i - min_entry]) = + decode_entry(result[i - min_entry]); + EXPECT_EQ(i, val); + } +} + + +TEST_F(AioLegacyFIFO, TestPushTooBig) +{ + static constexpr auto max_part_size = 2048ull; + static constexpr auto max_entry_size = 128ull; + + std::unique_ptr f; + auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt, + std::nullopt, false, max_part_size, max_entry_size); + ASSERT_EQ(0, r); + + char buf[max_entry_size + 1]; + memset(buf, 0, sizeof(buf)); + + cb::list bl; + bl.append(buf, sizeof(buf)); + + auto c = R::Rados::aio_create_completion(); + f->push(bl, c); + c->wait_for_complete(); + r = c->get_return_value(); + ASSERT_EQ(-E2BIG, r); + c->release(); + + c = R::Rados::aio_create_completion(); + f->push(std::vector{}, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + EXPECT_EQ(0, r); +} + + +TEST_F(AioLegacyFIFO, TestMultipleParts) +{ + static constexpr auto max_part_size = 2048ull; + static constexpr auto max_entry_size = 128ull; + std::unique_ptr f; + auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); + ASSERT_EQ(0, r); + + { + auto c = R::Rados::aio_create_completion(); + f->get_head_info([&](int r, RCf::part_info&& p) { + ASSERT_TRUE(p.tag.empty()); + ASSERT_EQ(0, p.magic); + ASSERT_EQ(0, p.min_ofs); + ASSERT_EQ(0, p.last_ofs); + ASSERT_EQ(0, p.next_ofs); + ASSERT_EQ(0, p.min_index); + ASSERT_EQ(0, p.max_index); + ASSERT_EQ(ceph::real_time{}, p.max_time); + }, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + } + + char buf[max_entry_size]; + memset(buf, 0, sizeof(buf)); + const auto [part_header_size, part_entry_overhead] = + f->get_part_layout_info(); + const auto entries_per_part = ((max_part_size - part_header_size) / + (max_entry_size + part_entry_overhead)); + const auto max_entries = entries_per_part * 4 + 1; + /* push enough entries */ + for (auto i = 0u; i < max_entries; ++i) { + cb::list bl; + *(int *)buf = i; + bl.append(buf, sizeof(buf)); + auto c = R::Rados::aio_create_completion(); + f->push(bl, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + EXPECT_EQ(0, r); + } + + auto info = f->meta(); + ASSERT_EQ(info.id, fifo_id); + /* head should have advanced */ + ASSERT_GT(info.head_part_num, 0); + + /* list all at once */ + std::vector result; + bool more = false; + auto c = R::Rados::aio_create_completion(); + f->list(max_entries, std::nullopt, &result, &more, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + EXPECT_EQ(0, r); + EXPECT_EQ(false, more); + ASSERT_EQ(max_entries, result.size()); + + for (auto i = 0u; i < max_entries; ++i) { + auto& bl = result[i].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } + + std::optional marker; + /* get entries one by one */ + + for (auto i = 0u; i < max_entries; ++i) { + c = R::Rados::aio_create_completion(); + f->list(1, marker, &result, &more, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + EXPECT_EQ(0, r); + ASSERT_EQ(result.size(), 1); + const bool expected_more = (i != (max_entries - 1)); + ASSERT_EQ(expected_more, more); + + std::uint32_t val; + std::tie(val, marker) = decode_entry(result.front()); + + auto& entry = result.front(); + auto& bl = entry.data; + ASSERT_EQ(i, *(int *)bl.c_str()); + marker = entry.marker; + } + + /* trim one at a time */ + marker.reset(); + for (auto i = 0u; i < max_entries; ++i) { + /* read single entry */ + c = R::Rados::aio_create_completion(); + f->list(1, marker, &result, &more, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + EXPECT_EQ(0, r); + ASSERT_EQ(result.size(), 1); + const bool expected_more = (i != (max_entries - 1)); + ASSERT_EQ(expected_more, more); + + marker = result.front().marker; + c = R::Rados::aio_create_completion(); + f->trim(*marker, false, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + EXPECT_EQ(0, r); + ASSERT_EQ(result.size(), 1); + + /* check tail */ + info = f->meta(); + ASSERT_EQ(info.tail_part_num, i / entries_per_part); + + /* try to read all again, see how many entries left */ + c = R::Rados::aio_create_completion(); + f->list(max_entries, marker, &result, &more, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + EXPECT_EQ(0, r); + ASSERT_EQ(max_entries - i - 1, result.size()); + ASSERT_EQ(false, more); + } + + /* tail now should point at head */ + info = f->meta(); + ASSERT_EQ(info.head_part_num, info.tail_part_num); + + /* check old tails are removed */ + for (auto i = 0; i < info.tail_part_num; ++i) { + c = R::Rados::aio_create_completion(); + RCf::part_info partinfo; + f->get_part_info(i, &partinfo, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(-ENOENT, r); + } + /* check current tail exists */ + std::uint64_t next_ofs; + { + c = R::Rados::aio_create_completion(); + RCf::part_info partinfo; + f->get_part_info(info.tail_part_num, &partinfo, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + next_ofs = partinfo.next_ofs; + } + ASSERT_EQ(0, r); + + c = R::Rados::aio_create_completion(); + f->get_head_info([&](int r, RCf::part_info&& p) { + ASSERT_EQ(next_ofs, p.next_ofs); + }, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); +} + +TEST_F(AioLegacyFIFO, TestTwoPushers) +{ + static constexpr auto max_part_size = 2048ull; + static constexpr auto max_entry_size = 128ull; + + std::unique_ptr f; + auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); + ASSERT_EQ(0, r); + char buf[max_entry_size]; + memset(buf, 0, sizeof(buf)); + + auto [part_header_size, part_entry_overhead] = f->get_part_layout_info(); + const auto entries_per_part = ((max_part_size - part_header_size) / + (max_entry_size + part_entry_overhead)); + const auto max_entries = entries_per_part * 4 + 1; + std::unique_ptr f2; + r = RCf::FIFO::open(ioctx, fifo_id, &f2, null_yield); + std::vector fifos{&f, &f2}; + + for (auto i = 0u; i < max_entries; ++i) { + cb::list bl; + *(int *)buf = i; + bl.append(buf, sizeof(buf)); + auto& f = *fifos[i % fifos.size()]; + auto c = R::Rados::aio_create_completion(); + f->push(bl, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); + } + + /* list all by both */ + std::vector result; + bool more = false; + auto c = R::Rados::aio_create_completion(); + f2->list(max_entries, std::nullopt, &result, &more, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); + ASSERT_EQ(false, more); + ASSERT_EQ(max_entries, result.size()); + + c = R::Rados::aio_create_completion(); + f2->list(max_entries, std::nullopt, &result, &more, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); + ASSERT_EQ(false, more); + ASSERT_EQ(max_entries, result.size()); + + for (auto i = 0u; i < max_entries; ++i) { + auto& bl = result[i].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } +} + +TEST_F(AioLegacyFIFO, TestTwoPushersTrim) +{ + static constexpr auto max_part_size = 2048ull; + static constexpr auto max_entry_size = 128ull; + std::unique_ptr f1; + auto r = RCf::FIFO::create(ioctx, fifo_id, &f1, null_yield, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); + ASSERT_EQ(0, r); + + char buf[max_entry_size]; + memset(buf, 0, sizeof(buf)); + + auto [part_header_size, part_entry_overhead] = f1->get_part_layout_info(); + const auto entries_per_part = ((max_part_size - part_header_size) / + (max_entry_size + part_entry_overhead)); + const auto max_entries = entries_per_part * 4 + 1; + + std::unique_ptr f2; + r = RCf::FIFO::open(ioctx, fifo_id, &f2, null_yield); + ASSERT_EQ(0, r); + + /* push one entry to f2 and the rest to f1 */ + for (auto i = 0u; i < max_entries; ++i) { + cb::list bl; + *(int *)buf = i; + bl.append(buf, sizeof(buf)); + auto& f = (i < 1 ? f2 : f1); + auto c = R::Rados::aio_create_completion(); + f->push(bl, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); + } + + /* trim half by fifo1 */ + auto num = max_entries / 2; + std::string marker; + std::vector result; + bool more = false; + auto c = R::Rados::aio_create_completion(); + f1->list(num, std::nullopt, &result, &more, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); + ASSERT_EQ(true, more); + ASSERT_EQ(num, result.size()); + + for (auto i = 0u; i < num; ++i) { + auto& bl = result[i].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } + + auto& entry = result[num - 1]; + marker = entry.marker; + c = R::Rados::aio_create_completion(); + f1->trim(marker, false, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); + /* list what's left by fifo2 */ + + const auto left = max_entries - num; + c = R::Rados::aio_create_completion(); + f2->list(left, marker, &result, &more, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); + ASSERT_EQ(left, result.size()); + ASSERT_EQ(false, more); + + for (auto i = num; i < max_entries; ++i) { + auto& bl = result[i - num].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } +} + +TEST_F(AioLegacyFIFO, TestPushBatch) +{ + static constexpr auto max_part_size = 2048ull; + static constexpr auto max_entry_size = 128ull; + + std::unique_ptr f; + auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt, + std::nullopt, false, max_part_size, + max_entry_size); + ASSERT_EQ(0, r); + + char buf[max_entry_size]; + memset(buf, 0, sizeof(buf)); + auto [part_header_size, part_entry_overhead] = f->get_part_layout_info(); + auto entries_per_part = ((max_part_size - part_header_size) / + (max_entry_size + part_entry_overhead)); + auto max_entries = entries_per_part * 4 + 1; /* enough entries to span multiple parts */ + std::vector bufs; + for (auto i = 0u; i < max_entries; ++i) { + cb::list bl; + *(int *)buf = i; + bl.append(buf, sizeof(buf)); + bufs.push_back(bl); + } + ASSERT_EQ(max_entries, bufs.size()); + + auto c = R::Rados::aio_create_completion(); + f->push(bufs, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); + + /* list all */ + + std::vector result; + bool more = false; + c = R::Rados::aio_create_completion(); + f->list(max_entries, std::nullopt, &result, &more, c); + c->wait_for_complete(); + r = c->get_return_value(); + c->release(); + ASSERT_EQ(0, r); + ASSERT_EQ(false, more); + ASSERT_EQ(max_entries, result.size()); + for (auto i = 0u; i < max_entries; ++i) { + auto& bl = result[i].data; + ASSERT_EQ(i, *(int *)bl.c_str()); + } + auto& info = f->meta(); + ASSERT_EQ(info.head_part_num, 4); +} From aede44ac6667c9a1ec7e813b547f8765754d896f Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Sat, 21 Nov 2020 01:44:36 -0500 Subject: [PATCH 03/26] rgw: Factor out tool to deal with different log backing Read through the shards of a log and find out what kind it is. Also remove a log. Signed-off-by: Adam C. Emerson (cherry picked from commit ed15d03f068c6f6e959f04d9d8f99eac82ebbd29) Signed-off-by: Adam C. Emerson --- src/cls/log/cls_log_types.h | 3 + src/rgw/CMakeLists.txt | 1 + src/rgw/rgw_log_backing.cc | 215 +++++++++++++++++++++++++++++++ src/rgw/rgw_log_backing.h | 70 ++++++++++ src/test/rgw/CMakeLists.txt | 5 + src/test/rgw/test_log_backing.cc | 176 +++++++++++++++++++++++++ 6 files changed, 470 insertions(+) create mode 100644 src/rgw/rgw_log_backing.cc create mode 100644 src/rgw/rgw_log_backing.h create mode 100644 src/test/rgw/test_log_backing.cc diff --git a/src/cls/log/cls_log_types.h b/src/cls/log/cls_log_types.h index c5c00766d8156..1746d243e5a14 100644 --- a/src/cls/log/cls_log_types.h +++ b/src/cls/log/cls_log_types.h @@ -65,6 +65,9 @@ inline bool operator ==(const cls_log_header& lhs, const cls_log_header& rhs) { return (lhs.max_marker == rhs.max_marker && lhs.max_time == rhs.max_time); } +inline bool operator !=(const cls_log_header& lhs, const cls_log_header& rhs) { + return !(lhs == rhs); +} WRITE_CLASS_ENCODER(cls_log_header) diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 44de25895ea2d..d3d91d4957947 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -141,6 +141,7 @@ set(librgw_common_srcs rgw_tag.cc rgw_tag_s3.cc rgw_tools.cc + rgw_log_backing.cc rgw_user.cc rgw_website.cc rgw_xml.cc diff --git a/src/rgw/rgw_log_backing.cc b/src/rgw/rgw_log_backing.cc new file mode 100644 index 0000000000000..63edf972a0307 --- /dev/null +++ b/src/rgw/rgw_log_backing.cc @@ -0,0 +1,215 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include "cls/log/cls_log_client.h" + +#include "rgw_log_backing.h" +#include "rgw_tools.h" +#include "cls_fifo_legacy.h" + +static constexpr auto dout_subsys = ceph_subsys_rgw; + +enum class shard_check { dne, omap, fifo, corrupt }; +inline std::ostream& operator <<(std::ostream& m, const shard_check& t) { + switch (t) { + case shard_check::dne: + return m << "shard_check::dne"; + case shard_check::omap: + return m << "shard_check::omap"; + case shard_check::fifo: + return m << "shard_check::fifo"; + case shard_check::corrupt: + return m << "shard_check::corrupt"; + } + + return m << "shard_check::UNKNOWN=" << static_cast(t); +} + +namespace { +/// Return the shard type, and a bool to see whether it has entries. +std::pair +probe_shard(librados::IoCtx& ioctx, const std::string& oid, optional_yield y) +{ + auto cct = static_cast(ioctx.cct()); + bool omap = false; + { + librados::ObjectReadOperation op; + cls_log_header header; + cls_log_info(op, &header); + auto r = rgw_rados_operate(ioctx, oid, &op, nullptr, y); + if (r == -ENOENT) { + return { shard_check::dne, {} }; + } + + if (r < 0) { + lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " error probing for omap: r=" << r + << ", oid=" << oid << dendl; + return { shard_check::corrupt, {} }; + } + if (header != cls_log_header{}) + omap = true; + } + std::unique_ptr fifo; + auto r = rgw::cls::fifo::FIFO::open(ioctx, oid, + &fifo, y, + std::nullopt, true); + if (r < 0 && !(r == -ENOENT || r == -ENODATA)) { + lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " error probing for fifo: r=" << r + << ", oid=" << oid << dendl; + return { shard_check::corrupt, {} }; + } + if (fifo && omap) { + lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " fifo and omap found: oid=" << oid << dendl; + return { shard_check::corrupt, {} }; + } + if (fifo) { + bool more = false; + std::vector entries; + r = fifo->list(1, nullopt, &entries, &more, y); + if (r < 0) { + lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << ": unable to list entries: r=" << r + << ", oid=" << oid << dendl; + return { shard_check::corrupt, {} }; + } + return { shard_check::fifo, !entries.empty() }; + } + if (omap) { + std::list entries; + std::string out_marker; + bool truncated = false; + librados::ObjectReadOperation op; + cls_log_list(op, {}, {}, {}, 1, entries, + &out_marker, &truncated); + auto r = rgw_rados_operate(ioctx, oid, &op, nullptr, y); + if (r < 0) { + lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << ": failed to list: r=" << r << ", oid=" << oid << dendl; + return { shard_check::corrupt, {} }; + } + return { shard_check::omap, !entries.empty() }; + } + + // An object exists, but has never had FIFO or cls_log entries written + // to it. Likely just the marker Omap. + return { shard_check::dne, {} }; +} + +tl::expected +handle_dne(librados::IoCtx& ioctx, + log_type def, + std::string oid, + optional_yield y) +{ + auto cct = static_cast(ioctx.cct()); + if (def == log_type::fifo) { + std::unique_ptr fifo; + auto r = rgw::cls::fifo::FIFO::create(ioctx, oid, + &fifo, y, + std::nullopt); + if (r < 0) { + lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " error creating FIFO: r=" << r + << ", oid=" << oid << dendl; + return tl::unexpected(bs::error_code(-r, bs::system_category())); + } + } + return def; +} +} + +tl::expected +log_backing_type(librados::IoCtx& ioctx, + log_type def, + int shards, + const fu2::unique_function& get_oid, + optional_yield y) +{ + auto cct = static_cast(ioctx.cct()); + auto check = shard_check::dne; + for (int i = 0; i < shards; ++i) { + auto [c, e] = probe_shard(ioctx, get_oid(i), y); + if (c == shard_check::corrupt) + return tl::unexpected(bs::error_code(EIO, bs::system_category())); + if (c == shard_check::dne) continue; + if (check == shard_check::dne) { + check = c; + continue; + } + + if (check != c) { + lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " clashing types: check=" << check + << ", c=" << c << dendl; + return tl::unexpected(bs::error_code(EIO, bs::system_category())); + } + } + if (check == shard_check::corrupt) { + lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << " should be unreachable!" << dendl; + return tl::unexpected(bs::error_code(EIO, bs::system_category())); + } + + if (check == shard_check::dne) + return handle_dne(ioctx, + def, + get_oid(0), + y); + + return (check == shard_check::fifo ? log_type::fifo : log_type::omap); +} + +bs::error_code log_remove(librados::IoCtx& ioctx, + int shards, + const fu2::unique_function& get_oid, + optional_yield y) +{ + bs::error_code ec; + auto cct = static_cast(ioctx.cct()); + for (int i = 0; i < shards; ++i) { + auto oid = get_oid(i); + rados::cls::fifo::info info; + uint32_t part_header_size = 0, part_entry_overhead = 0; + + auto r = rgw::cls::fifo::get_meta(ioctx, oid, nullopt, &info, + &part_header_size, &part_entry_overhead, + 0, y, true); + if (r == -ENOENT) continue; + if (r == 0 && info.head_part_num > -1) { + for (auto j = info.tail_part_num; j <= info.head_part_num; ++j) { + librados::ObjectWriteOperation op; + op.remove(); + auto part_oid = info.part_oid(j); + auto subr = rgw_rados_operate(ioctx, part_oid, &op, null_yield); + if (subr < 0 && subr != -ENOENT) { + if (!ec) + ec = bs::error_code(-subr, bs::system_category()); + lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << ": failed removing FIFO part: part_oid=" << part_oid + << ", subr=" << subr << dendl; + } + } + } + if (r < 0 && r != -ENODATA) { + if (!ec) + ec = bs::error_code(-r, bs::system_category()); + lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << ": failed checking FIFO part: oid=" << oid + << ", r=" << r << dendl; + } + librados::ObjectWriteOperation op; + op.remove(); + r = rgw_rados_operate(ioctx, oid, &op, null_yield); + if (r < 0 && r != -ENOENT) { + if (!ec) + ec = bs::error_code(-r, bs::system_category()); + lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__ + << ": failed removing shard: oid=" << oid + << ", r=" << r << dendl; + } + } + return ec; +} diff --git a/src/rgw/rgw_log_backing.h b/src/rgw/rgw_log_backing.h new file mode 100644 index 0000000000000..d769af48b01fe --- /dev/null +++ b/src/rgw/rgw_log_backing.h @@ -0,0 +1,70 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#ifndef CEPH_RGW_LOGBACKING_H +#define CEPH_RGW_LOGBACKING_H + +#include +#include +#include +#include + +#include + +#include + +#include "include/rados/librados.hpp" +#include "include/expected.hpp" +#include "include/function2.hpp" + +#include "common/async/yield_context.h" + +namespace bs = boost::system; + +/// Type of log backing, stored in the mark used in the quick check, +/// and passed to checking functions. +enum class log_type { + omap = 0, + fifo = 1 +}; + +inline std::optional to_log_type(std::string_view s) { + if (strncasecmp(s.data(), "omap", s.length()) == 0) { + return log_type::omap; + } else if (strncasecmp(s.data(), "fifo", s.length()) == 0) { + return log_type::fifo; + } else { + return std::nullopt; + } +} +inline std::ostream& operator <<(std::ostream& m, const log_type& t) { + switch (t) { + case log_type::omap: + return m << "log_type::omap"; + case log_type::fifo: + return m << "log_type::fifo"; + } + + return m << "log_type::UNKNOWN=" << static_cast(t); +} + +/// Look over the shards in a log and determine the type. +tl::expected +log_backing_type(librados::IoCtx& ioctx, + log_type def, + int shards, //< Total number of shards + /// A function taking a shard number and + /// returning an oid. + const fu2::unique_function& get_oid, + optional_yield y); + +/// Remove all log shards and associated parts of fifos. +bs::error_code log_remove(librados::IoCtx& ioctx, + int shards, //< Total number of shards + /// A function taking a shard number and + /// returning an oid. + const fu2::unique_function& get_oid, + optional_yield y); + + +#endif diff --git a/src/test/rgw/CMakeLists.txt b/src/test/rgw/CMakeLists.txt index 7817a42ef9ab8..c4aa22db81749 100644 --- a/src/test/rgw/CMakeLists.txt +++ b/src/test/rgw/CMakeLists.txt @@ -213,6 +213,11 @@ add_executable(unittest_cls_fifo_legacy test_cls_fifo_legacy.cc) target_link_libraries(unittest_cls_fifo_legacy radostest-cxx ${UNITTEST_LIBS} ${rgw_libs}) +# unittest_log_backing +add_executable(unittest_log_backing test_log_backing.cc) +target_link_libraries(unittest_log_backing radostest-cxx ${UNITTEST_LIBS} + ${rgw_libs}) + add_executable(unittest_rgw_lua test_rgw_lua.cc) add_ceph_unittest(unittest_rgw_lua) target_link_libraries(unittest_rgw_lua ${rgw_libs} ${LUA_LIBRARIES}) diff --git a/src/test/rgw/test_log_backing.cc b/src/test/rgw/test_log_backing.cc new file mode 100644 index 0000000000000..5180d5fc74fe8 --- /dev/null +++ b/src/test/rgw/test_log_backing.cc @@ -0,0 +1,176 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2019 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "rgw_log_backing.h" + +#include +#include +#include + +#undef FMT_HEADER_ONLY +#define FMT_HEADER_ONLY 1 +#include + +#include "include/types.h" +#include "include/rados/librados.hpp" + +#include "test/librados/test_cxx.h" +#include "global/global_context.h" + +#include "cls/log/cls_log_client.h" + +#include "rgw/rgw_tools.h" +#include "rgw/cls_fifo_legacy.h" + +#include "gtest/gtest.h" + +namespace lr = librados; +namespace cb = ceph::buffer; +namespace fifo = rados::cls::fifo; +namespace RCf = rgw::cls::fifo; + +class LogBacking : public testing::Test { +protected: + static constexpr int SHARDS = 3; + const std::string pool_name = get_temp_pool_name(); + lr::Rados rados; + lr::IoCtx ioctx; + + void SetUp() override { + ASSERT_EQ("", create_one_pool_pp(pool_name, rados)); + ASSERT_EQ(0, rados.ioctx_create(pool_name.c_str(), ioctx)); + } + void TearDown() override { + destroy_one_pool_pp(pool_name, rados); + } + + static std::string get_oid(int i) { + return fmt::format("shard.{}", i); + } + + void make_omap() { + for (int i = 0; i < SHARDS; ++i) { + using ceph::encode; + lr::ObjectWriteOperation op; + cb::list bl; + encode(i, bl); + cls_log_add(op, ceph_clock_now(), {}, "meow", bl); + auto r = rgw_rados_operate(ioctx, get_oid(i), &op, null_yield); + ASSERT_GE(r, 0); + } + } + + void add_omap(int i) { + using ceph::encode; + lr::ObjectWriteOperation op; + cb::list bl; + encode(i, bl); + cls_log_add(op, ceph_clock_now(), {}, "meow", bl); + auto r = rgw_rados_operate(ioctx, get_oid(i), &op, null_yield); + ASSERT_GE(r, 0); + } + + void empty_omap() { + for (int i = 0; i < SHARDS; ++i) { + auto oid = get_oid(i); + std::string to_marker; + { + lr::ObjectReadOperation op; + std::list entries; + bool truncated = false; + cls_log_list(op, {}, {}, {}, 1, entries, &to_marker, &truncated); + auto r = rgw_rados_operate(ioctx, oid, &op, nullptr, null_yield); + ASSERT_GE(r, 0); + ASSERT_FALSE(entries.empty()); + } + { + lr::ObjectWriteOperation op; + cls_log_trim(op, {}, {}, {}, to_marker); + auto r = rgw_rados_operate(ioctx, oid, &op, null_yield); + ASSERT_GE(r, 0); + } + { + lr::ObjectReadOperation op; + std::list entries; + bool truncated = false; + cls_log_list(op, {}, {}, {}, 1, entries, &to_marker, &truncated); + auto r = rgw_rados_operate(ioctx, oid, &op, nullptr, null_yield); + ASSERT_GE(r, 0); + ASSERT_TRUE(entries.empty()); + } + } + } + + void make_fifo() + { + for (int i = 0; i < SHARDS; ++i) { + std::unique_ptr fifo; + auto r = RCf::FIFO::create(ioctx, get_oid(i), &fifo, null_yield); + ASSERT_EQ(0, r); + ASSERT_TRUE(fifo); + } + } + + void add_fifo(int i) + { + using ceph::encode; + std::unique_ptr fifo; + auto r = RCf::FIFO::open(ioctx, get_oid(i), &fifo, null_yield); + ASSERT_GE(0, r); + ASSERT_TRUE(fifo); + cb::list bl; + encode(i, bl); + r = fifo->push(bl, null_yield); + ASSERT_GE(0, r); + } + + void assert_empty() { + std::vector result; + lr::ObjectCursor next; + auto r = ioctx.object_list(ioctx.object_list_begin(), ioctx.object_list_end(), + 100, {}, &result, &next); + ASSERT_GE(r, 0); + ASSERT_TRUE(result.empty()); + } +}; + +TEST_F(LogBacking, TestOmap) +{ + make_omap(); + auto stat = log_backing_type(ioctx, log_type::fifo, SHARDS, + get_oid, null_yield); + ASSERT_EQ(log_type::omap, *stat); +} + +TEST_F(LogBacking, TestOmapEmpty) +{ + auto stat = log_backing_type(ioctx, log_type::omap, SHARDS, + get_oid, null_yield); + ASSERT_EQ(log_type::omap, *stat); +} + +TEST_F(LogBacking, TestFIFO) +{ + make_fifo(); + auto stat = log_backing_type(ioctx, log_type::fifo, SHARDS, + get_oid, null_yield); + ASSERT_EQ(log_type::fifo, *stat); +} + +TEST_F(LogBacking, TestFIFOEmpty) +{ + auto stat = log_backing_type(ioctx, log_type::fifo, SHARDS, + get_oid, null_yield); + ASSERT_EQ(log_type::fifo, *stat); +} From 8c81b6fa1b2a0f1d409afbd0126d18cfc97315c4 Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Sat, 21 Nov 2020 15:45:12 -0500 Subject: [PATCH 04/26] rgw: Use refactored log backing tools Signed-off-by: Adam C. Emerson (cherry picked from commit da6223d281e33e43fa74c50f4d0eedb5ac25ace4) Signed-off-by: Adam C. Emerson --- src/common/options.cc | 16 ++-- src/rgw/rgw_datalog.cc | 208 +++++------------------------------------ src/rgw/rgw_datalog.h | 5 +- 3 files changed, 31 insertions(+), 198 deletions(-) diff --git a/src/common/options.cc b/src/common/options.cc index 75d6589c08296..8fdd62fb14ccb 100644 --- a/src/common/options.cc +++ b/src/common/options.cc @@ -7407,17 +7407,15 @@ std::vector