From 6ed45a1c1e7a76a18dea5d00791c9abbdd84fb1e Mon Sep 17 00:00:00 2001 From: Josiah VanderZee Date: Thu, 2 Nov 2023 07:43:54 -0500 Subject: [PATCH 1/2] Extract class AggregateWriteBuffer from Stripe This encapsulates the `agg_` fields of `Stripe` in a new class. This is in preparation to move all the aggregation behavior into this new class, including the behavior to enqueue virtual connections and update the number of bytes waiting to be written to the buffer, which is currently being done by the virtual connections themselves. Note: `Stripe` does not use its own getters to access aggregate buffer, but calls the getters on the `AggregateWriteBuffer` itself. This is because I want to get rid of as many of the getters on `Stripe` as possible. --- include/iocore/cache/AggregateWriteBuffer.h | 118 ++++++++++++++++++++ include/iocore/cache/CacheVC.h | 2 +- src/iocore/cache/CacheDir.cc | 14 +-- src/iocore/cache/CacheVC.cc | 4 +- src/iocore/cache/CacheWrite.cc | 90 +++++++-------- src/iocore/cache/P_CacheDir.h | 2 +- src/iocore/cache/P_CacheHosting.h | 2 +- src/iocore/cache/P_CacheInternal.h | 4 +- src/iocore/cache/P_CacheVol.h | 63 +++++++++-- 9 files changed, 230 insertions(+), 69 deletions(-) create mode 100644 include/iocore/cache/AggregateWriteBuffer.h diff --git a/include/iocore/cache/AggregateWriteBuffer.h b/include/iocore/cache/AggregateWriteBuffer.h new file mode 100644 index 00000000000..4d838cf2eb7 --- /dev/null +++ b/include/iocore/cache/AggregateWriteBuffer.h @@ -0,0 +1,118 @@ +/** @file + + A brief file description + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#pragma once + +#include "iocore/eventsystem/Continuation.h" + +#include "tscore/ink_memory.h" +#include "tscore/List.h" + +#include + +#define AGG_SIZE (4 * 1024 * 1024) // 4MB +#define AGG_HIGH_WATER (AGG_SIZE / 2) // 2MB + +struct CacheVC; + +class AggregateWriteBuffer +{ +public: + AggregateWriteBuffer() + { + this->buffer = static_cast(ats_memalign(ats_pagesize(), AGG_SIZE)); + memset(this->buffer, 0, AGG_SIZE); + } + + ~AggregateWriteBuffer() { ats_free(this->buffer); } + + AggregateWriteBuffer(AggregateWriteBuffer const &) = delete; + AggregateWriteBuffer &operator=(AggregateWriteBuffer const &) = delete; + + // move semantics not supported yet to keep things simple + AggregateWriteBuffer(AggregateWriteBuffer &&other) = delete; + AggregateWriteBuffer &operator=(AggregateWriteBuffer &&other) = delete; + + Queue &get_pending_writers(); + char *get_buffer(); + int get_buffer_pos() const; + void add_buffer_pos(int size); + void seek(int offset); + void reset_buffer_pos(); + int get_bytes_pending_aggregation() const; + void add_bytes_pending_aggregation(int size); + +private: + Queue pending_writers; + char *buffer = nullptr; + int bytes_pending_aggregation = 0; + int buffer_pos = 0; +}; + +inline Queue & +AggregateWriteBuffer::get_pending_writers() +{ + return this->pending_writers; +} + +inline char * +AggregateWriteBuffer::get_buffer() +{ + return this->buffer; +} + +inline int +AggregateWriteBuffer::get_buffer_pos() const +{ + return this->buffer_pos; +} + +inline void +AggregateWriteBuffer::add_buffer_pos(int size) +{ + this->buffer_pos += size; +} + +inline void +AggregateWriteBuffer::seek(int offset) +{ + this->buffer_pos = offset; +} + +inline void +AggregateWriteBuffer::reset_buffer_pos() +{ + this->seek(0); +} + +inline int +AggregateWriteBuffer::get_bytes_pending_aggregation() const +{ + return this->bytes_pending_aggregation; +} + +inline void +AggregateWriteBuffer::add_bytes_pending_aggregation(int size) +{ + this->bytes_pending_aggregation += size; +} diff --git a/include/iocore/cache/CacheVC.h b/include/iocore/cache/CacheVC.h index 92881e30ea7..ef3be6a9960 100644 --- a/include/iocore/cache/CacheVC.h +++ b/include/iocore/cache/CacheVC.h @@ -48,7 +48,7 @@ #include -struct Stripe; +class Stripe; class HttpConfigAccessor; struct CacheVC : public CacheVConnection { diff --git a/src/iocore/cache/CacheDir.cc b/src/iocore/cache/CacheDir.cc index c989aa680d4..43fe7520d3a 100644 --- a/src/iocore/cache/CacheDir.cc +++ b/src/iocore/cache/CacheDir.cc @@ -996,21 +996,21 @@ sync_cache_dir_on_shutdown() // check if we have data in the agg buffer // dont worry about the cachevc s in the agg queue // directories have not been inserted for these writes - if (vol->agg_buf_pos) { + if (vol->get_agg_buf_pos()) { Dbg(dbg_ctl_cache_dir_sync, "Dir %s: flushing agg buffer first", vol->hash_text.get()); // set write limit - vol->header->agg_pos = vol->header->write_pos + vol->agg_buf_pos; + vol->header->agg_pos = vol->header->write_pos + vol->get_agg_buf_pos(); - int r = pwrite(vol->fd, vol->agg_buffer, vol->agg_buf_pos, vol->header->write_pos); - if (r != vol->agg_buf_pos) { + int r = pwrite(vol->fd, vol->get_agg_buffer(), vol->get_agg_buf_pos(), vol->header->write_pos); + if (r != vol->get_agg_buf_pos()) { ink_assert(!"flushing agg buffer failed"); continue; } vol->header->last_write_pos = vol->header->write_pos; - vol->header->write_pos += vol->agg_buf_pos; + vol->header->write_pos += vol->get_agg_buf_pos(); ink_assert(vol->header->write_pos == vol->header->agg_pos); - vol->agg_buf_pos = 0; + vol->reset_agg_buf_pos(); vol->header->write_serial++; } @@ -1136,7 +1136,7 @@ CacheSync::mainEvent(int event, Event *e) Dbg(dbg_ctl_cache_dir_sync, "Dir %s not dirty", vol->hash_text.get()); goto Ldone; } - if (vol->is_io_in_progress() || vol->agg_buf_pos) { + if (vol->is_io_in_progress() || vol->get_agg_buf_pos()) { Dbg(dbg_ctl_cache_dir_sync, "Dir %s: waiting for agg buffer", vol->hash_text.get()); vol->dir_sync_waiting = true; if (!vol->is_io_in_progress()) { diff --git a/src/iocore/cache/CacheVC.cc b/src/iocore/cache/CacheVC.cc index 4c24f4bd124..96b2d297c61 100644 --- a/src/iocore/cache/CacheVC.cc +++ b/src/iocore/cache/CacheVC.cc @@ -490,9 +490,9 @@ CacheVC::handleRead(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) if (dir_agg_buf_valid(vol, &dir)) { int agg_offset = vol->vol_offset(&dir) - vol->header->write_pos; buf = new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED); - ink_assert((agg_offset + io.aiocb.aio_nbytes) <= (unsigned)vol->agg_buf_pos); + ink_assert((agg_offset + io.aiocb.aio_nbytes) <= (unsigned)vol->get_agg_buf_pos()); char *doc = buf->data(); - char *agg = vol->agg_buffer + agg_offset; + char *agg = vol->get_agg_buffer() + agg_offset; memcpy(doc, agg, io.aiocb.aio_nbytes); io.aio_result = io.aiocb.aio_nbytes; SET_HANDLER(&CacheVC::handleReadDone); diff --git a/src/iocore/cache/CacheWrite.cc b/src/iocore/cache/CacheWrite.cc index 7e5878d930e..8fa1be83b3a 100644 --- a/src/iocore/cache/CacheWrite.cc +++ b/src/iocore/cache/CacheWrite.cc @@ -22,6 +22,7 @@ */ #include "P_Cache.h" +#include "iocore/cache/AggregateWriteBuffer.h" #include "iocore/cache/CacheEvacuateDocVC.h" #define UINT_WRAP_LTE(_x, _y) (((_y) - (_x)) < INT_MAX) // exploit overflow @@ -213,7 +214,7 @@ CacheVC::updateVector(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) agg queue. And if !f.evac_vector && !f.update the alternate->object_size is set to vc->total_len - f.readers. If set, assumes that this is an evacuation, so the write - is not aborted even if vol->agg_todo_size > agg_write_backlog + is not aborted even if vol->get_agg_todo_size() > agg_write_backlog - f.evacuator. If this is an evacuation. - f.rewrite_resident_alt. The resident alternate is rewritten. - f.update. Used only if the write_vector needs to be written to disk. @@ -246,10 +247,10 @@ CacheVC::handleWrite(int event, Event * /* e ATS_UNUSED */) set_agg_write_in_progress(); POP_HANDLER; - agg_len = vol->round_to_approx_size(write_len + header_len + frag_len + sizeof(Doc)); - vol->agg_todo_size += agg_len; - bool agg_error = (agg_len > AGG_SIZE || header_len + sizeof(Doc) > MAX_FRAG_SIZE || - (!f.readers && (vol->agg_todo_size > cache_config_agg_write_backlog + AGG_SIZE) && write_len)); + agg_len = vol->round_to_approx_size(write_len + header_len + frag_len + sizeof(Doc)); + vol->add_agg_todo(agg_len); + bool agg_error = (agg_len > AGG_SIZE || header_len + sizeof(Doc) > MAX_FRAG_SIZE || + (!f.readers && (vol->get_agg_todo_size() > cache_config_agg_write_backlog + AGG_SIZE) && write_len)); #ifdef CACHE_AGG_FAIL_RATE agg_error = agg_error || ((uint32_t)mutex->thread_holding->generator.random() < (uint32_t)(UINT_MAX * CACHE_AGG_FAIL_RATE)); #endif @@ -261,8 +262,8 @@ CacheVC::handleWrite(int event, Event * /* e ATS_UNUSED */) Metrics::Counter::increment(vol->cache_vol->vol_rsb.write_backlog_failure); Metrics::Counter::increment(cache_rsb.status[op_type].failure); Metrics::Counter::increment(vol->cache_vol->vol_rsb.status[op_type].failure); - vol->agg_todo_size -= agg_len; - io.aio_result = AIO_SOFT_FAILURE; + vol->add_agg_todo(-agg_len); + io.aio_result = AIO_SOFT_FAILURE; if (event == EVENT_CALL) { return EVENT_RETURN; } @@ -270,9 +271,9 @@ CacheVC::handleWrite(int event, Event * /* e ATS_UNUSED */) } ink_assert(agg_len <= AGG_SIZE); if (f.evac_vector) { - vol->agg.push(this); + vol->get_pending_writers().push(this); } else { - vol->agg.enqueue(this); + vol->get_pending_writers().enqueue(this); } if (!vol->is_io_in_progress()) { return vol->aggWrite(event, this); @@ -396,7 +397,7 @@ Stripe::aggWriteDone(int event, Event *e) if (header->write_pos + EVACUATION_SIZE > scan_pos) { periodic_scan(); } - agg_buf_pos = 0; + this->write_buffer.reset_buffer_pos(); header->write_serial++; } else { // delete all the directory entries that we inserted @@ -407,13 +408,13 @@ Stripe::aggWriteDone(int event, Event *e) (uint64_t)io.aiocb.aio_offset / CACHE_BLOCK_SIZE, (uint64_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) / CACHE_BLOCK_SIZE); Dir del_dir; dir_clear(&del_dir); - for (int done = 0; done < agg_buf_pos;) { - Doc *doc = reinterpret_cast(agg_buffer + done); + for (int done = 0; done < this->write_buffer.get_buffer_pos();) { + Doc *doc = reinterpret_cast(this->write_buffer.get_buffer() + done); dir_set_offset(&del_dir, header->write_pos + done); dir_delete(&doc->key, this, &del_dir); done += round_to_approx_size(doc->len); } - agg_buf_pos = 0; + this->write_buffer.reset_buffer_pos(); } set_io_not_in_progress(); // callback ready sync CacheVCs @@ -430,7 +431,7 @@ Stripe::aggWriteDone(int event, Event *e) dir_sync_waiting = false; cacheDirSync->handleEvent(EVENT_IMMEDIATE, nullptr); } - if (agg.head || sync.head) { + if (this->write_buffer.get_pending_writers().head || sync.head) { return aggWrite(event, e); } return EVENT_CONT; @@ -493,16 +494,16 @@ Stripe::evacuateWrite(CacheEvacuateDocVC *evacuator, int event, Event *e) { // push to front of aggregation write list, so it is written first - evacuator->agg_len = round_to_approx_size((reinterpret_cast(evacuator->buf->data()))->len); - agg_todo_size += evacuator->agg_len; + evacuator->agg_len = round_to_approx_size((reinterpret_cast(evacuator->buf->data()))->len); + this->write_buffer.add_bytes_pending_aggregation(evacuator->agg_len); /* insert the evacuator after all the other evacuators */ - CacheVC *cur = static_cast(agg.head); + CacheVC *cur = static_cast(this->write_buffer.get_pending_writers().head); CacheVC *after = nullptr; for (; cur && cur->f.evacuator; cur = (CacheVC *)cur->link.next) { after = cur; } ink_assert(evacuator->agg_len <= AGG_SIZE); - agg.insert(evacuator, after); + this->write_buffer.get_pending_writers().insert(evacuator, after); return aggWrite(event, e); } @@ -649,7 +650,7 @@ static int agg_copy(char *p, CacheVC *vc) { Stripe *vol = vc->vol; - off_t o = vol->header->write_pos + vol->agg_buf_pos; + off_t o = vol->header->write_pos + vol->get_agg_buf_pos(); if (!vc->f.evacuator) { Doc *doc = reinterpret_cast(p); @@ -902,8 +903,8 @@ Stripe::aggWrite(int event, void * /* e ATS_UNUSED */) this->aggregate_pending_writes(tocall); // if we got nothing... - if (!agg_buf_pos) { - if (!agg.head && !sync.head) { // nothing to get + if (!this->write_buffer.get_buffer_pos()) { + if (!this->write_buffer.get_pending_writers().head && !sync.head) { // nothing to get return EVENT_CONT; } if (header->write_pos == start) { @@ -911,21 +912,21 @@ Stripe::aggWrite(int event, void * /* e ATS_UNUSED */) Note("write aggregation exceeds vol size"); ink_assert(!tocall.head); ink_assert(false); - while ((c = agg.dequeue())) { - agg_todo_size -= c->agg_len; + while ((c = this->get_pending_writers().dequeue())) { + this->write_buffer.add_bytes_pending_aggregation(-c->agg_len); eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE); } return EVENT_CONT; } // start back - if (agg.head) { + if (this->get_pending_writers().head) { agg_wrap(); goto Lagain; } } // evacuate space - off_t end = header->write_pos + agg_buf_pos + EVACUATION_SIZE; + off_t end = header->write_pos + this->write_buffer.get_buffer_pos() + EVACUATION_SIZE; if (evac_range(header->write_pos, end, !header->phase) < 0) { goto Lwait; } @@ -935,18 +936,19 @@ Stripe::aggWrite(int event, void * /* e ATS_UNUSED */) } } - // if agg.head, then we are near the end of the disk, so + // if write_buffer.get_pending_writers.head, then we are near the end of the disk, so // write down the aggregation in whatever size it is. - if (agg_buf_pos < AGG_HIGH_WATER && !agg.head && !sync.head && !dir_sync_waiting) { + if (this->write_buffer.get_buffer_pos() < AGG_HIGH_WATER && !this->write_buffer.get_pending_writers().head && !sync.head && + !dir_sync_waiting) { goto Lwait; } // write sync marker - if (!agg_buf_pos) { + if (!this->write_buffer.get_buffer_pos()) { ink_assert(sync.head); - int l = round_to_approx_size(sizeof(Doc)); - agg_buf_pos = l; - Doc *d = reinterpret_cast(agg_buffer); + int l = round_to_approx_size(sizeof(Doc)); + this->write_buffer.seek(l); + Doc *d = reinterpret_cast(this->write_buffer.get_buffer()); memset(static_cast(d), 0, sizeof(Doc)); d->magic = DOC_MAGIC; d->len = l; @@ -955,12 +957,12 @@ Stripe::aggWrite(int event, void * /* e ATS_UNUSED */) } // set write limit - header->agg_pos = header->write_pos + agg_buf_pos; + header->agg_pos = header->write_pos + this->write_buffer.get_buffer_pos(); io.aiocb.aio_fildes = fd; io.aiocb.aio_offset = header->write_pos; - io.aiocb.aio_buf = agg_buffer; - io.aiocb.aio_nbytes = agg_buf_pos; + io.aiocb.aio_buf = this->write_buffer.get_buffer(); + io.aiocb.aio_nbytes = this->write_buffer.get_buffer_pos(); io.action = this; /* Callback on AIO thread so that we can issue a new write ASAP @@ -986,22 +988,22 @@ Stripe::aggWrite(int event, void * /* e ATS_UNUSED */) void Stripe::aggregate_pending_writes(Queue &tocall) { - for (auto *c = static_cast(this->agg.head); c;) { + for (auto *c = static_cast(this->write_buffer.get_pending_writers().head); c;) { int writelen = c->agg_len; // [amc] this is checked multiple places, on here was it strictly less. ink_assert(writelen <= AGG_SIZE); - if (this->agg_buf_pos + writelen > AGG_SIZE || - this->header->write_pos + this->agg_buf_pos + writelen > (this->skip + this->len)) { + if (this->write_buffer.get_buffer_pos() + writelen > AGG_SIZE || + this->header->write_pos + this->write_buffer.get_buffer_pos() + writelen > (this->skip + this->len)) { break; } - DDbg(dbg_ctl_agg_read, "copying: %d, %" PRIu64 ", key: %d", this->agg_buf_pos, this->header->write_pos + this->agg_buf_pos, - c->first_key.slice32(0)); - int wrotelen = agg_copy(this->agg_buffer + this->agg_buf_pos, c); + DDbg(dbg_ctl_agg_read, "copying: %d, %" PRIu64 ", key: %d", this->write_buffer.get_buffer_pos(), + this->header->write_pos + this->write_buffer.get_buffer_pos(), c->first_key.slice32(0)); + int wrotelen = agg_copy(this->write_buffer.get_buffer() + this->write_buffer.get_buffer_pos(), c); ink_assert(writelen == wrotelen); - this->agg_todo_size -= writelen; - this->agg_buf_pos += writelen; - CacheVC *n = (CacheVC *)c->link.next; - this->agg.dequeue(); + this->write_buffer.add_bytes_pending_aggregation(-writelen); + this->write_buffer.add_buffer_pos(writelen); + CacheVC *n = (CacheVC *)c->link.next; + this->write_buffer.get_pending_writers().dequeue(); if (c->f.sync && c->f.use_first_key) { CacheVC *last = this->sync.tail; while (last && UINT_WRAP_LT(c->write_serial, last->write_serial)) { diff --git a/src/iocore/cache/P_CacheDir.h b/src/iocore/cache/P_CacheDir.h index fc575d45eac..b811b14c770 100644 --- a/src/iocore/cache/P_CacheDir.h +++ b/src/iocore/cache/P_CacheDir.h @@ -31,7 +31,7 @@ // aio #include "iocore/aio/AIO.h" -struct Stripe; +class Stripe; struct InterimCacheVol; struct CacheVC; class CacheEvacuateDocVC; diff --git a/src/iocore/cache/P_CacheHosting.h b/src/iocore/cache/P_CacheHosting.h index 100014c3273..dce1518287e 100644 --- a/src/iocore/cache/P_CacheHosting.h +++ b/src/iocore/cache/P_CacheHosting.h @@ -30,7 +30,7 @@ #define CACHE_MEM_FREE_TIMEOUT HRTIME_SECONDS(1) -struct Stripe; +class Stripe; struct CacheVol; struct CacheHostResult; diff --git a/src/iocore/cache/P_CacheInternal.h b/src/iocore/cache/P_CacheInternal.h index 0b919daf187..7487c6e5ef3 100644 --- a/src/iocore/cache/P_CacheInternal.h +++ b/src/iocore/cache/P_CacheInternal.h @@ -372,7 +372,7 @@ Stripe::open_write(CacheVC *cont, int allow_if_writers, int max_writers) Stripe *vol = this; bool agg_error = false; if (!cont->f.remove) { - agg_error = (!cont->f.update && agg_todo_size > cache_config_agg_write_backlog); + agg_error = (!cont->f.update && this->get_agg_todo_size() > cache_config_agg_write_backlog); #ifdef CACHE_AGG_FAIL_RATE agg_error = agg_error || ((uint32_t)mutex->thread_holding->generator.random() < (uint32_t)(UINT_MAX * CACHE_AGG_FAIL_RATE)); #endif @@ -546,7 +546,7 @@ CacheRemoveCont::event_handler(int event, void *data) } struct CacheHostRecord; -struct Stripe; +class Stripe; class CacheHostTable; struct Cache { diff --git a/src/iocore/cache/P_CacheVol.h b/src/iocore/cache/P_CacheVol.h index 938f7e99b32..1e362f9a941 100644 --- a/src/iocore/cache/P_CacheVol.h +++ b/src/iocore/cache/P_CacheVol.h @@ -26,6 +26,7 @@ #include "P_CacheDir.h" #include "P_CacheStats.h" #include "P_RamCache.h" +#include "iocore/cache/AggregateWriteBuffer.h" #include "tscore/CryptoHash.h" @@ -75,7 +76,7 @@ #define DOC_NO_CHECKSUM ((uint32_t)0xA0B0C0D0) struct Cache; -struct Stripe; +class Stripe; struct CacheDisk; struct StripeInitInfo; struct DiskStripe; @@ -127,7 +128,9 @@ struct EvacuationBlock { LINK(EvacuationBlock, link); }; -struct Stripe : public Continuation { +class Stripe : public Continuation +{ +public: char *path = nullptr; ats_scoped_str hash_text; CryptoHash hash_id; @@ -149,11 +152,7 @@ struct Stripe : public Continuation { int hit_evacuate_window = 0; AIOCallbackInternal io; - Queue agg; Queue sync; - char *agg_buffer = nullptr; - int agg_todo_size = 0; - int agg_buf_pos = 0; Event *trigger = nullptr; @@ -276,18 +275,23 @@ struct Stripe : public Continuation { Stripe() : Continuation(new_ProxyMutex()) { open_dir.mutex = mutex; - agg_buffer = (char *)ats_memalign(ats_pagesize(), AGG_SIZE); - memset(agg_buffer, 0, AGG_SIZE); SET_HANDLER(&Stripe::aggWrite); } - ~Stripe() override { ats_free(agg_buffer); } + Queue &get_pending_writers(); + char *get_agg_buffer(); + int get_agg_buf_pos() const; + void reset_agg_buf_pos(); + int get_agg_todo_size() const; + void add_agg_todo(int size); private: void _clear_init(); void _init_dir(); void _init_data_internal(); void _init_data(); + + AggregateWriteBuffer write_buffer; }; struct AIO_failure_handler : public Continuation { @@ -402,7 +406,7 @@ Stripe::vol_out_of_phase_write_valid(Dir *e) const inline int Stripe::vol_in_phase_valid(Dir *e) const { - return (dir_offset(e) - 1 < ((this->header->write_pos + this->agg_buf_pos - this->start) / CACHE_BLOCK_SIZE)); + return (dir_offset(e) - 1 < ((this->header->write_pos + this->write_buffer.get_buffer_pos() - this->start) / CACHE_BLOCK_SIZE)); } inline off_t @@ -426,7 +430,8 @@ Stripe::vol_offset_to_offset(off_t pos) const inline int Stripe::vol_in_phase_agg_buf_valid(Dir *e) const { - return (this->vol_offset(e) >= this->header->write_pos && this->vol_offset(e) < (this->header->write_pos + this->agg_buf_pos)); + return (this->vol_offset(e) >= this->header->write_pos && + this->vol_offset(e) < (this->header->write_pos + this->write_buffer.get_buffer_pos())); } // length of the partition not including the offset of location 0. @@ -555,3 +560,39 @@ Stripe::set_io_not_in_progress() { io.aiocb.aio_fildes = AIO_NOT_IN_PROGRESS; } + +inline Queue & +Stripe::get_pending_writers() +{ + return this->write_buffer.get_pending_writers(); +} + +inline char * +Stripe::get_agg_buffer() +{ + return this->write_buffer.get_buffer(); +} + +inline int +Stripe::get_agg_buf_pos() const +{ + return this->write_buffer.get_buffer_pos(); +} + +inline void +Stripe::reset_agg_buf_pos() +{ + this->write_buffer.reset_buffer_pos(); +} + +inline int +Stripe::get_agg_todo_size() const +{ + return this->write_buffer.get_bytes_pending_aggregation(); +} + +inline void +Stripe::add_agg_todo(int size) +{ + this->write_buffer.add_bytes_pending_aggregation(size); +} From 81e02152494ae713ab9c86fbbe33b550cc6d6f0d Mon Sep 17 00:00:00 2001 From: Josiah VanderZee Date: Tue, 7 Nov 2023 06:47:49 -0600 Subject: [PATCH 2/2] Prefix private members with _ --- include/iocore/cache/AggregateWriteBuffer.h | 28 +++++------ src/iocore/cache/CacheWrite.cc | 56 ++++++++++----------- src/iocore/cache/P_CacheVol.h | 18 +++---- 3 files changed, 51 insertions(+), 51 deletions(-) diff --git a/include/iocore/cache/AggregateWriteBuffer.h b/include/iocore/cache/AggregateWriteBuffer.h index 4d838cf2eb7..1ead5ba51ea 100644 --- a/include/iocore/cache/AggregateWriteBuffer.h +++ b/include/iocore/cache/AggregateWriteBuffer.h @@ -40,11 +40,11 @@ class AggregateWriteBuffer public: AggregateWriteBuffer() { - this->buffer = static_cast(ats_memalign(ats_pagesize(), AGG_SIZE)); - memset(this->buffer, 0, AGG_SIZE); + this->_buffer = static_cast(ats_memalign(ats_pagesize(), AGG_SIZE)); + memset(this->_buffer, 0, AGG_SIZE); } - ~AggregateWriteBuffer() { ats_free(this->buffer); } + ~AggregateWriteBuffer() { ats_free(this->_buffer); } AggregateWriteBuffer(AggregateWriteBuffer const &) = delete; AggregateWriteBuffer &operator=(AggregateWriteBuffer const &) = delete; @@ -63,40 +63,40 @@ class AggregateWriteBuffer void add_bytes_pending_aggregation(int size); private: - Queue pending_writers; - char *buffer = nullptr; - int bytes_pending_aggregation = 0; - int buffer_pos = 0; + Queue _pending_writers; + char *_buffer = nullptr; + int _bytes_pending_aggregation = 0; + int _buffer_pos = 0; }; inline Queue & AggregateWriteBuffer::get_pending_writers() { - return this->pending_writers; + return this->_pending_writers; } inline char * AggregateWriteBuffer::get_buffer() { - return this->buffer; + return this->_buffer; } inline int AggregateWriteBuffer::get_buffer_pos() const { - return this->buffer_pos; + return this->_buffer_pos; } inline void AggregateWriteBuffer::add_buffer_pos(int size) { - this->buffer_pos += size; + this->_buffer_pos += size; } inline void AggregateWriteBuffer::seek(int offset) { - this->buffer_pos = offset; + this->_buffer_pos = offset; } inline void @@ -108,11 +108,11 @@ AggregateWriteBuffer::reset_buffer_pos() inline int AggregateWriteBuffer::get_bytes_pending_aggregation() const { - return this->bytes_pending_aggregation; + return this->_bytes_pending_aggregation; } inline void AggregateWriteBuffer::add_bytes_pending_aggregation(int size) { - this->bytes_pending_aggregation += size; + this->_bytes_pending_aggregation += size; } diff --git a/src/iocore/cache/CacheWrite.cc b/src/iocore/cache/CacheWrite.cc index 8fa1be83b3a..b0494fcef77 100644 --- a/src/iocore/cache/CacheWrite.cc +++ b/src/iocore/cache/CacheWrite.cc @@ -397,7 +397,7 @@ Stripe::aggWriteDone(int event, Event *e) if (header->write_pos + EVACUATION_SIZE > scan_pos) { periodic_scan(); } - this->write_buffer.reset_buffer_pos(); + this->_write_buffer.reset_buffer_pos(); header->write_serial++; } else { // delete all the directory entries that we inserted @@ -408,13 +408,13 @@ Stripe::aggWriteDone(int event, Event *e) (uint64_t)io.aiocb.aio_offset / CACHE_BLOCK_SIZE, (uint64_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) / CACHE_BLOCK_SIZE); Dir del_dir; dir_clear(&del_dir); - for (int done = 0; done < this->write_buffer.get_buffer_pos();) { - Doc *doc = reinterpret_cast(this->write_buffer.get_buffer() + done); + for (int done = 0; done < this->_write_buffer.get_buffer_pos();) { + Doc *doc = reinterpret_cast(this->_write_buffer.get_buffer() + done); dir_set_offset(&del_dir, header->write_pos + done); dir_delete(&doc->key, this, &del_dir); done += round_to_approx_size(doc->len); } - this->write_buffer.reset_buffer_pos(); + this->_write_buffer.reset_buffer_pos(); } set_io_not_in_progress(); // callback ready sync CacheVCs @@ -431,7 +431,7 @@ Stripe::aggWriteDone(int event, Event *e) dir_sync_waiting = false; cacheDirSync->handleEvent(EVENT_IMMEDIATE, nullptr); } - if (this->write_buffer.get_pending_writers().head || sync.head) { + if (this->_write_buffer.get_pending_writers().head || sync.head) { return aggWrite(event, e); } return EVENT_CONT; @@ -495,15 +495,15 @@ Stripe::evacuateWrite(CacheEvacuateDocVC *evacuator, int event, Event *e) // push to front of aggregation write list, so it is written first evacuator->agg_len = round_to_approx_size((reinterpret_cast(evacuator->buf->data()))->len); - this->write_buffer.add_bytes_pending_aggregation(evacuator->agg_len); + this->_write_buffer.add_bytes_pending_aggregation(evacuator->agg_len); /* insert the evacuator after all the other evacuators */ - CacheVC *cur = static_cast(this->write_buffer.get_pending_writers().head); + CacheVC *cur = static_cast(this->_write_buffer.get_pending_writers().head); CacheVC *after = nullptr; for (; cur && cur->f.evacuator; cur = (CacheVC *)cur->link.next) { after = cur; } ink_assert(evacuator->agg_len <= AGG_SIZE); - this->write_buffer.get_pending_writers().insert(evacuator, after); + this->_write_buffer.get_pending_writers().insert(evacuator, after); return aggWrite(event, e); } @@ -903,8 +903,8 @@ Stripe::aggWrite(int event, void * /* e ATS_UNUSED */) this->aggregate_pending_writes(tocall); // if we got nothing... - if (!this->write_buffer.get_buffer_pos()) { - if (!this->write_buffer.get_pending_writers().head && !sync.head) { // nothing to get + if (!this->_write_buffer.get_buffer_pos()) { + if (!this->_write_buffer.get_pending_writers().head && !sync.head) { // nothing to get return EVENT_CONT; } if (header->write_pos == start) { @@ -913,7 +913,7 @@ Stripe::aggWrite(int event, void * /* e ATS_UNUSED */) ink_assert(!tocall.head); ink_assert(false); while ((c = this->get_pending_writers().dequeue())) { - this->write_buffer.add_bytes_pending_aggregation(-c->agg_len); + this->_write_buffer.add_bytes_pending_aggregation(-c->agg_len); eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE); } return EVENT_CONT; @@ -926,7 +926,7 @@ Stripe::aggWrite(int event, void * /* e ATS_UNUSED */) } // evacuate space - off_t end = header->write_pos + this->write_buffer.get_buffer_pos() + EVACUATION_SIZE; + off_t end = header->write_pos + this->_write_buffer.get_buffer_pos() + EVACUATION_SIZE; if (evac_range(header->write_pos, end, !header->phase) < 0) { goto Lwait; } @@ -938,17 +938,17 @@ Stripe::aggWrite(int event, void * /* e ATS_UNUSED */) // if write_buffer.get_pending_writers.head, then we are near the end of the disk, so // write down the aggregation in whatever size it is. - if (this->write_buffer.get_buffer_pos() < AGG_HIGH_WATER && !this->write_buffer.get_pending_writers().head && !sync.head && + if (this->_write_buffer.get_buffer_pos() < AGG_HIGH_WATER && !this->_write_buffer.get_pending_writers().head && !sync.head && !dir_sync_waiting) { goto Lwait; } // write sync marker - if (!this->write_buffer.get_buffer_pos()) { + if (!this->_write_buffer.get_buffer_pos()) { ink_assert(sync.head); int l = round_to_approx_size(sizeof(Doc)); - this->write_buffer.seek(l); - Doc *d = reinterpret_cast(this->write_buffer.get_buffer()); + this->_write_buffer.seek(l); + Doc *d = reinterpret_cast(this->_write_buffer.get_buffer()); memset(static_cast(d), 0, sizeof(Doc)); d->magic = DOC_MAGIC; d->len = l; @@ -957,12 +957,12 @@ Stripe::aggWrite(int event, void * /* e ATS_UNUSED */) } // set write limit - header->agg_pos = header->write_pos + this->write_buffer.get_buffer_pos(); + header->agg_pos = header->write_pos + this->_write_buffer.get_buffer_pos(); io.aiocb.aio_fildes = fd; io.aiocb.aio_offset = header->write_pos; - io.aiocb.aio_buf = this->write_buffer.get_buffer(); - io.aiocb.aio_nbytes = this->write_buffer.get_buffer_pos(); + io.aiocb.aio_buf = this->_write_buffer.get_buffer(); + io.aiocb.aio_nbytes = this->_write_buffer.get_buffer_pos(); io.action = this; /* Callback on AIO thread so that we can issue a new write ASAP @@ -988,22 +988,22 @@ Stripe::aggWrite(int event, void * /* e ATS_UNUSED */) void Stripe::aggregate_pending_writes(Queue &tocall) { - for (auto *c = static_cast(this->write_buffer.get_pending_writers().head); c;) { + for (auto *c = static_cast(this->_write_buffer.get_pending_writers().head); c;) { int writelen = c->agg_len; // [amc] this is checked multiple places, on here was it strictly less. ink_assert(writelen <= AGG_SIZE); - if (this->write_buffer.get_buffer_pos() + writelen > AGG_SIZE || - this->header->write_pos + this->write_buffer.get_buffer_pos() + writelen > (this->skip + this->len)) { + if (this->_write_buffer.get_buffer_pos() + writelen > AGG_SIZE || + this->header->write_pos + this->_write_buffer.get_buffer_pos() + writelen > (this->skip + this->len)) { break; } - DDbg(dbg_ctl_agg_read, "copying: %d, %" PRIu64 ", key: %d", this->write_buffer.get_buffer_pos(), - this->header->write_pos + this->write_buffer.get_buffer_pos(), c->first_key.slice32(0)); - int wrotelen = agg_copy(this->write_buffer.get_buffer() + this->write_buffer.get_buffer_pos(), c); + DDbg(dbg_ctl_agg_read, "copying: %d, %" PRIu64 ", key: %d", this->_write_buffer.get_buffer_pos(), + this->header->write_pos + this->_write_buffer.get_buffer_pos(), c->first_key.slice32(0)); + int wrotelen = agg_copy(this->_write_buffer.get_buffer() + this->_write_buffer.get_buffer_pos(), c); ink_assert(writelen == wrotelen); - this->write_buffer.add_bytes_pending_aggregation(-writelen); - this->write_buffer.add_buffer_pos(writelen); + this->_write_buffer.add_bytes_pending_aggregation(-writelen); + this->_write_buffer.add_buffer_pos(writelen); CacheVC *n = (CacheVC *)c->link.next; - this->write_buffer.get_pending_writers().dequeue(); + this->_write_buffer.get_pending_writers().dequeue(); if (c->f.sync && c->f.use_first_key) { CacheVC *last = this->sync.tail; while (last && UINT_WRAP_LT(c->write_serial, last->write_serial)) { diff --git a/src/iocore/cache/P_CacheVol.h b/src/iocore/cache/P_CacheVol.h index 1e362f9a941..5e2902d1a52 100644 --- a/src/iocore/cache/P_CacheVol.h +++ b/src/iocore/cache/P_CacheVol.h @@ -291,7 +291,7 @@ class Stripe : public Continuation void _init_data_internal(); void _init_data(); - AggregateWriteBuffer write_buffer; + AggregateWriteBuffer _write_buffer; }; struct AIO_failure_handler : public Continuation { @@ -406,7 +406,7 @@ Stripe::vol_out_of_phase_write_valid(Dir *e) const inline int Stripe::vol_in_phase_valid(Dir *e) const { - return (dir_offset(e) - 1 < ((this->header->write_pos + this->write_buffer.get_buffer_pos() - this->start) / CACHE_BLOCK_SIZE)); + return (dir_offset(e) - 1 < ((this->header->write_pos + this->_write_buffer.get_buffer_pos() - this->start) / CACHE_BLOCK_SIZE)); } inline off_t @@ -431,7 +431,7 @@ inline int Stripe::vol_in_phase_agg_buf_valid(Dir *e) const { return (this->vol_offset(e) >= this->header->write_pos && - this->vol_offset(e) < (this->header->write_pos + this->write_buffer.get_buffer_pos())); + this->vol_offset(e) < (this->header->write_pos + this->_write_buffer.get_buffer_pos())); } // length of the partition not including the offset of location 0. @@ -564,35 +564,35 @@ Stripe::set_io_not_in_progress() inline Queue & Stripe::get_pending_writers() { - return this->write_buffer.get_pending_writers(); + return this->_write_buffer.get_pending_writers(); } inline char * Stripe::get_agg_buffer() { - return this->write_buffer.get_buffer(); + return this->_write_buffer.get_buffer(); } inline int Stripe::get_agg_buf_pos() const { - return this->write_buffer.get_buffer_pos(); + return this->_write_buffer.get_buffer_pos(); } inline void Stripe::reset_agg_buf_pos() { - this->write_buffer.reset_buffer_pos(); + this->_write_buffer.reset_buffer_pos(); } inline int Stripe::get_agg_todo_size() const { - return this->write_buffer.get_bytes_pending_aggregation(); + return this->_write_buffer.get_bytes_pending_aggregation(); } inline void Stripe::add_agg_todo(int size) { - this->write_buffer.add_bytes_pending_aggregation(size); + this->_write_buffer.add_bytes_pending_aggregation(size); }