diff --git a/include/iocore/cache/AggregateWriteBuffer.h b/include/iocore/cache/AggregateWriteBuffer.h new file mode 100644 index 00000000000..1ead5ba51ea --- /dev/null +++ b/include/iocore/cache/AggregateWriteBuffer.h @@ -0,0 +1,118 @@ +/** @file + + A brief file description + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#pragma once + +#include "iocore/eventsystem/Continuation.h" + +#include "tscore/ink_memory.h" +#include "tscore/List.h" + +#include + +#define AGG_SIZE (4 * 1024 * 1024) // 4MB +#define AGG_HIGH_WATER (AGG_SIZE / 2) // 2MB + +struct CacheVC; + +class AggregateWriteBuffer +{ +public: + AggregateWriteBuffer() + { + this->_buffer = static_cast(ats_memalign(ats_pagesize(), AGG_SIZE)); + memset(this->_buffer, 0, AGG_SIZE); + } + + ~AggregateWriteBuffer() { ats_free(this->_buffer); } + + AggregateWriteBuffer(AggregateWriteBuffer const &) = delete; + AggregateWriteBuffer &operator=(AggregateWriteBuffer const &) = delete; + + // move semantics not supported yet to keep things simple + AggregateWriteBuffer(AggregateWriteBuffer &&other) = delete; + AggregateWriteBuffer &operator=(AggregateWriteBuffer &&other) = delete; + + Queue &get_pending_writers(); + char *get_buffer(); + int get_buffer_pos() const; + void add_buffer_pos(int size); + void seek(int offset); + void reset_buffer_pos(); + int get_bytes_pending_aggregation() const; + void add_bytes_pending_aggregation(int size); + +private: + Queue _pending_writers; + char *_buffer = nullptr; + int _bytes_pending_aggregation = 0; + int _buffer_pos = 0; +}; + +inline Queue & +AggregateWriteBuffer::get_pending_writers() +{ + return this->_pending_writers; +} + +inline char * +AggregateWriteBuffer::get_buffer() +{ + return this->_buffer; +} + +inline int +AggregateWriteBuffer::get_buffer_pos() const +{ + return this->_buffer_pos; +} + +inline void +AggregateWriteBuffer::add_buffer_pos(int size) +{ + this->_buffer_pos += size; +} + +inline void +AggregateWriteBuffer::seek(int offset) +{ + this->_buffer_pos = offset; +} + +inline void +AggregateWriteBuffer::reset_buffer_pos() +{ + this->seek(0); +} + +inline int +AggregateWriteBuffer::get_bytes_pending_aggregation() const +{ + return this->_bytes_pending_aggregation; +} + +inline void +AggregateWriteBuffer::add_bytes_pending_aggregation(int size) +{ + this->_bytes_pending_aggregation += size; +} diff --git a/include/iocore/cache/CacheVC.h b/include/iocore/cache/CacheVC.h index 92881e30ea7..ef3be6a9960 100644 --- a/include/iocore/cache/CacheVC.h +++ b/include/iocore/cache/CacheVC.h @@ -48,7 +48,7 @@ #include -struct Stripe; +class Stripe; class HttpConfigAccessor; struct CacheVC : public CacheVConnection { diff --git a/src/iocore/cache/CacheDir.cc b/src/iocore/cache/CacheDir.cc index c989aa680d4..43fe7520d3a 100644 --- a/src/iocore/cache/CacheDir.cc +++ b/src/iocore/cache/CacheDir.cc @@ -996,21 +996,21 @@ sync_cache_dir_on_shutdown() // check if we have data in the agg buffer // dont worry about the cachevc s in the agg queue // directories have not been inserted for these writes - if (vol->agg_buf_pos) { + if (vol->get_agg_buf_pos()) { Dbg(dbg_ctl_cache_dir_sync, "Dir %s: flushing agg buffer first", vol->hash_text.get()); // set write limit - vol->header->agg_pos = vol->header->write_pos + vol->agg_buf_pos; + vol->header->agg_pos = vol->header->write_pos + vol->get_agg_buf_pos(); - int r = pwrite(vol->fd, vol->agg_buffer, vol->agg_buf_pos, vol->header->write_pos); - if (r != vol->agg_buf_pos) { + int r = pwrite(vol->fd, vol->get_agg_buffer(), vol->get_agg_buf_pos(), vol->header->write_pos); + if (r != vol->get_agg_buf_pos()) { ink_assert(!"flushing agg buffer failed"); continue; } vol->header->last_write_pos = vol->header->write_pos; - vol->header->write_pos += vol->agg_buf_pos; + vol->header->write_pos += vol->get_agg_buf_pos(); ink_assert(vol->header->write_pos == vol->header->agg_pos); - vol->agg_buf_pos = 0; + vol->reset_agg_buf_pos(); vol->header->write_serial++; } @@ -1136,7 +1136,7 @@ CacheSync::mainEvent(int event, Event *e) Dbg(dbg_ctl_cache_dir_sync, "Dir %s not dirty", vol->hash_text.get()); goto Ldone; } - if (vol->is_io_in_progress() || vol->agg_buf_pos) { + if (vol->is_io_in_progress() || vol->get_agg_buf_pos()) { Dbg(dbg_ctl_cache_dir_sync, "Dir %s: waiting for agg buffer", vol->hash_text.get()); vol->dir_sync_waiting = true; if (!vol->is_io_in_progress()) { diff --git a/src/iocore/cache/CacheVC.cc b/src/iocore/cache/CacheVC.cc index 4c24f4bd124..96b2d297c61 100644 --- a/src/iocore/cache/CacheVC.cc +++ b/src/iocore/cache/CacheVC.cc @@ -490,9 +490,9 @@ CacheVC::handleRead(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) if (dir_agg_buf_valid(vol, &dir)) { int agg_offset = vol->vol_offset(&dir) - vol->header->write_pos; buf = new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED); - ink_assert((agg_offset + io.aiocb.aio_nbytes) <= (unsigned)vol->agg_buf_pos); + ink_assert((agg_offset + io.aiocb.aio_nbytes) <= (unsigned)vol->get_agg_buf_pos()); char *doc = buf->data(); - char *agg = vol->agg_buffer + agg_offset; + char *agg = vol->get_agg_buffer() + agg_offset; memcpy(doc, agg, io.aiocb.aio_nbytes); io.aio_result = io.aiocb.aio_nbytes; SET_HANDLER(&CacheVC::handleReadDone); diff --git a/src/iocore/cache/CacheWrite.cc b/src/iocore/cache/CacheWrite.cc index 7e5878d930e..b0494fcef77 100644 --- a/src/iocore/cache/CacheWrite.cc +++ b/src/iocore/cache/CacheWrite.cc @@ -22,6 +22,7 @@ */ #include "P_Cache.h" +#include "iocore/cache/AggregateWriteBuffer.h" #include "iocore/cache/CacheEvacuateDocVC.h" #define UINT_WRAP_LTE(_x, _y) (((_y) - (_x)) < INT_MAX) // exploit overflow @@ -213,7 +214,7 @@ CacheVC::updateVector(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) agg queue. And if !f.evac_vector && !f.update the alternate->object_size is set to vc->total_len - f.readers. If set, assumes that this is an evacuation, so the write - is not aborted even if vol->agg_todo_size > agg_write_backlog + is not aborted even if vol->get_agg_todo_size() > agg_write_backlog - f.evacuator. If this is an evacuation. - f.rewrite_resident_alt. The resident alternate is rewritten. - f.update. Used only if the write_vector needs to be written to disk. @@ -246,10 +247,10 @@ CacheVC::handleWrite(int event, Event * /* e ATS_UNUSED */) set_agg_write_in_progress(); POP_HANDLER; - agg_len = vol->round_to_approx_size(write_len + header_len + frag_len + sizeof(Doc)); - vol->agg_todo_size += agg_len; - bool agg_error = (agg_len > AGG_SIZE || header_len + sizeof(Doc) > MAX_FRAG_SIZE || - (!f.readers && (vol->agg_todo_size > cache_config_agg_write_backlog + AGG_SIZE) && write_len)); + agg_len = vol->round_to_approx_size(write_len + header_len + frag_len + sizeof(Doc)); + vol->add_agg_todo(agg_len); + bool agg_error = (agg_len > AGG_SIZE || header_len + sizeof(Doc) > MAX_FRAG_SIZE || + (!f.readers && (vol->get_agg_todo_size() > cache_config_agg_write_backlog + AGG_SIZE) && write_len)); #ifdef CACHE_AGG_FAIL_RATE agg_error = agg_error || ((uint32_t)mutex->thread_holding->generator.random() < (uint32_t)(UINT_MAX * CACHE_AGG_FAIL_RATE)); #endif @@ -261,8 +262,8 @@ CacheVC::handleWrite(int event, Event * /* e ATS_UNUSED */) Metrics::Counter::increment(vol->cache_vol->vol_rsb.write_backlog_failure); Metrics::Counter::increment(cache_rsb.status[op_type].failure); Metrics::Counter::increment(vol->cache_vol->vol_rsb.status[op_type].failure); - vol->agg_todo_size -= agg_len; - io.aio_result = AIO_SOFT_FAILURE; + vol->add_agg_todo(-agg_len); + io.aio_result = AIO_SOFT_FAILURE; if (event == EVENT_CALL) { return EVENT_RETURN; } @@ -270,9 +271,9 @@ CacheVC::handleWrite(int event, Event * /* e ATS_UNUSED */) } ink_assert(agg_len <= AGG_SIZE); if (f.evac_vector) { - vol->agg.push(this); + vol->get_pending_writers().push(this); } else { - vol->agg.enqueue(this); + vol->get_pending_writers().enqueue(this); } if (!vol->is_io_in_progress()) { return vol->aggWrite(event, this); @@ -396,7 +397,7 @@ Stripe::aggWriteDone(int event, Event *e) if (header->write_pos + EVACUATION_SIZE > scan_pos) { periodic_scan(); } - agg_buf_pos = 0; + this->_write_buffer.reset_buffer_pos(); header->write_serial++; } else { // delete all the directory entries that we inserted @@ -407,13 +408,13 @@ Stripe::aggWriteDone(int event, Event *e) (uint64_t)io.aiocb.aio_offset / CACHE_BLOCK_SIZE, (uint64_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) / CACHE_BLOCK_SIZE); Dir del_dir; dir_clear(&del_dir); - for (int done = 0; done < agg_buf_pos;) { - Doc *doc = reinterpret_cast(agg_buffer + done); + for (int done = 0; done < this->_write_buffer.get_buffer_pos();) { + Doc *doc = reinterpret_cast(this->_write_buffer.get_buffer() + done); dir_set_offset(&del_dir, header->write_pos + done); dir_delete(&doc->key, this, &del_dir); done += round_to_approx_size(doc->len); } - agg_buf_pos = 0; + this->_write_buffer.reset_buffer_pos(); } set_io_not_in_progress(); // callback ready sync CacheVCs @@ -430,7 +431,7 @@ Stripe::aggWriteDone(int event, Event *e) dir_sync_waiting = false; cacheDirSync->handleEvent(EVENT_IMMEDIATE, nullptr); } - if (agg.head || sync.head) { + if (this->_write_buffer.get_pending_writers().head || sync.head) { return aggWrite(event, e); } return EVENT_CONT; @@ -493,16 +494,16 @@ Stripe::evacuateWrite(CacheEvacuateDocVC *evacuator, int event, Event *e) { // push to front of aggregation write list, so it is written first - evacuator->agg_len = round_to_approx_size((reinterpret_cast(evacuator->buf->data()))->len); - agg_todo_size += evacuator->agg_len; + evacuator->agg_len = round_to_approx_size((reinterpret_cast(evacuator->buf->data()))->len); + this->_write_buffer.add_bytes_pending_aggregation(evacuator->agg_len); /* insert the evacuator after all the other evacuators */ - CacheVC *cur = static_cast(agg.head); + CacheVC *cur = static_cast(this->_write_buffer.get_pending_writers().head); CacheVC *after = nullptr; for (; cur && cur->f.evacuator; cur = (CacheVC *)cur->link.next) { after = cur; } ink_assert(evacuator->agg_len <= AGG_SIZE); - agg.insert(evacuator, after); + this->_write_buffer.get_pending_writers().insert(evacuator, after); return aggWrite(event, e); } @@ -649,7 +650,7 @@ static int agg_copy(char *p, CacheVC *vc) { Stripe *vol = vc->vol; - off_t o = vol->header->write_pos + vol->agg_buf_pos; + off_t o = vol->header->write_pos + vol->get_agg_buf_pos(); if (!vc->f.evacuator) { Doc *doc = reinterpret_cast(p); @@ -902,8 +903,8 @@ Stripe::aggWrite(int event, void * /* e ATS_UNUSED */) this->aggregate_pending_writes(tocall); // if we got nothing... - if (!agg_buf_pos) { - if (!agg.head && !sync.head) { // nothing to get + if (!this->_write_buffer.get_buffer_pos()) { + if (!this->_write_buffer.get_pending_writers().head && !sync.head) { // nothing to get return EVENT_CONT; } if (header->write_pos == start) { @@ -911,21 +912,21 @@ Stripe::aggWrite(int event, void * /* e ATS_UNUSED */) Note("write aggregation exceeds vol size"); ink_assert(!tocall.head); ink_assert(false); - while ((c = agg.dequeue())) { - agg_todo_size -= c->agg_len; + while ((c = this->get_pending_writers().dequeue())) { + this->_write_buffer.add_bytes_pending_aggregation(-c->agg_len); eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE); } return EVENT_CONT; } // start back - if (agg.head) { + if (this->get_pending_writers().head) { agg_wrap(); goto Lagain; } } // evacuate space - off_t end = header->write_pos + agg_buf_pos + EVACUATION_SIZE; + off_t end = header->write_pos + this->_write_buffer.get_buffer_pos() + EVACUATION_SIZE; if (evac_range(header->write_pos, end, !header->phase) < 0) { goto Lwait; } @@ -935,18 +936,19 @@ Stripe::aggWrite(int event, void * /* e ATS_UNUSED */) } } - // if agg.head, then we are near the end of the disk, so + // if write_buffer.get_pending_writers.head, then we are near the end of the disk, so // write down the aggregation in whatever size it is. - if (agg_buf_pos < AGG_HIGH_WATER && !agg.head && !sync.head && !dir_sync_waiting) { + if (this->_write_buffer.get_buffer_pos() < AGG_HIGH_WATER && !this->_write_buffer.get_pending_writers().head && !sync.head && + !dir_sync_waiting) { goto Lwait; } // write sync marker - if (!agg_buf_pos) { + if (!this->_write_buffer.get_buffer_pos()) { ink_assert(sync.head); - int l = round_to_approx_size(sizeof(Doc)); - agg_buf_pos = l; - Doc *d = reinterpret_cast(agg_buffer); + int l = round_to_approx_size(sizeof(Doc)); + this->_write_buffer.seek(l); + Doc *d = reinterpret_cast(this->_write_buffer.get_buffer()); memset(static_cast(d), 0, sizeof(Doc)); d->magic = DOC_MAGIC; d->len = l; @@ -955,12 +957,12 @@ Stripe::aggWrite(int event, void * /* e ATS_UNUSED */) } // set write limit - header->agg_pos = header->write_pos + agg_buf_pos; + header->agg_pos = header->write_pos + this->_write_buffer.get_buffer_pos(); io.aiocb.aio_fildes = fd; io.aiocb.aio_offset = header->write_pos; - io.aiocb.aio_buf = agg_buffer; - io.aiocb.aio_nbytes = agg_buf_pos; + io.aiocb.aio_buf = this->_write_buffer.get_buffer(); + io.aiocb.aio_nbytes = this->_write_buffer.get_buffer_pos(); io.action = this; /* Callback on AIO thread so that we can issue a new write ASAP @@ -986,22 +988,22 @@ Stripe::aggWrite(int event, void * /* e ATS_UNUSED */) void Stripe::aggregate_pending_writes(Queue &tocall) { - for (auto *c = static_cast(this->agg.head); c;) { + for (auto *c = static_cast(this->_write_buffer.get_pending_writers().head); c;) { int writelen = c->agg_len; // [amc] this is checked multiple places, on here was it strictly less. ink_assert(writelen <= AGG_SIZE); - if (this->agg_buf_pos + writelen > AGG_SIZE || - this->header->write_pos + this->agg_buf_pos + writelen > (this->skip + this->len)) { + if (this->_write_buffer.get_buffer_pos() + writelen > AGG_SIZE || + this->header->write_pos + this->_write_buffer.get_buffer_pos() + writelen > (this->skip + this->len)) { break; } - DDbg(dbg_ctl_agg_read, "copying: %d, %" PRIu64 ", key: %d", this->agg_buf_pos, this->header->write_pos + this->agg_buf_pos, - c->first_key.slice32(0)); - int wrotelen = agg_copy(this->agg_buffer + this->agg_buf_pos, c); + DDbg(dbg_ctl_agg_read, "copying: %d, %" PRIu64 ", key: %d", this->_write_buffer.get_buffer_pos(), + this->header->write_pos + this->_write_buffer.get_buffer_pos(), c->first_key.slice32(0)); + int wrotelen = agg_copy(this->_write_buffer.get_buffer() + this->_write_buffer.get_buffer_pos(), c); ink_assert(writelen == wrotelen); - this->agg_todo_size -= writelen; - this->agg_buf_pos += writelen; - CacheVC *n = (CacheVC *)c->link.next; - this->agg.dequeue(); + this->_write_buffer.add_bytes_pending_aggregation(-writelen); + this->_write_buffer.add_buffer_pos(writelen); + CacheVC *n = (CacheVC *)c->link.next; + this->_write_buffer.get_pending_writers().dequeue(); if (c->f.sync && c->f.use_first_key) { CacheVC *last = this->sync.tail; while (last && UINT_WRAP_LT(c->write_serial, last->write_serial)) { diff --git a/src/iocore/cache/P_CacheDir.h b/src/iocore/cache/P_CacheDir.h index fc575d45eac..b811b14c770 100644 --- a/src/iocore/cache/P_CacheDir.h +++ b/src/iocore/cache/P_CacheDir.h @@ -31,7 +31,7 @@ // aio #include "iocore/aio/AIO.h" -struct Stripe; +class Stripe; struct InterimCacheVol; struct CacheVC; class CacheEvacuateDocVC; diff --git a/src/iocore/cache/P_CacheHosting.h b/src/iocore/cache/P_CacheHosting.h index 100014c3273..dce1518287e 100644 --- a/src/iocore/cache/P_CacheHosting.h +++ b/src/iocore/cache/P_CacheHosting.h @@ -30,7 +30,7 @@ #define CACHE_MEM_FREE_TIMEOUT HRTIME_SECONDS(1) -struct Stripe; +class Stripe; struct CacheVol; struct CacheHostResult; diff --git a/src/iocore/cache/P_CacheInternal.h b/src/iocore/cache/P_CacheInternal.h index 0b919daf187..7487c6e5ef3 100644 --- a/src/iocore/cache/P_CacheInternal.h +++ b/src/iocore/cache/P_CacheInternal.h @@ -372,7 +372,7 @@ Stripe::open_write(CacheVC *cont, int allow_if_writers, int max_writers) Stripe *vol = this; bool agg_error = false; if (!cont->f.remove) { - agg_error = (!cont->f.update && agg_todo_size > cache_config_agg_write_backlog); + agg_error = (!cont->f.update && this->get_agg_todo_size() > cache_config_agg_write_backlog); #ifdef CACHE_AGG_FAIL_RATE agg_error = agg_error || ((uint32_t)mutex->thread_holding->generator.random() < (uint32_t)(UINT_MAX * CACHE_AGG_FAIL_RATE)); #endif @@ -546,7 +546,7 @@ CacheRemoveCont::event_handler(int event, void *data) } struct CacheHostRecord; -struct Stripe; +class Stripe; class CacheHostTable; struct Cache { diff --git a/src/iocore/cache/P_CacheVol.h b/src/iocore/cache/P_CacheVol.h index 938f7e99b32..5e2902d1a52 100644 --- a/src/iocore/cache/P_CacheVol.h +++ b/src/iocore/cache/P_CacheVol.h @@ -26,6 +26,7 @@ #include "P_CacheDir.h" #include "P_CacheStats.h" #include "P_RamCache.h" +#include "iocore/cache/AggregateWriteBuffer.h" #include "tscore/CryptoHash.h" @@ -75,7 +76,7 @@ #define DOC_NO_CHECKSUM ((uint32_t)0xA0B0C0D0) struct Cache; -struct Stripe; +class Stripe; struct CacheDisk; struct StripeInitInfo; struct DiskStripe; @@ -127,7 +128,9 @@ struct EvacuationBlock { LINK(EvacuationBlock, link); }; -struct Stripe : public Continuation { +class Stripe : public Continuation +{ +public: char *path = nullptr; ats_scoped_str hash_text; CryptoHash hash_id; @@ -149,11 +152,7 @@ struct Stripe : public Continuation { int hit_evacuate_window = 0; AIOCallbackInternal io; - Queue agg; Queue sync; - char *agg_buffer = nullptr; - int agg_todo_size = 0; - int agg_buf_pos = 0; Event *trigger = nullptr; @@ -276,18 +275,23 @@ struct Stripe : public Continuation { Stripe() : Continuation(new_ProxyMutex()) { open_dir.mutex = mutex; - agg_buffer = (char *)ats_memalign(ats_pagesize(), AGG_SIZE); - memset(agg_buffer, 0, AGG_SIZE); SET_HANDLER(&Stripe::aggWrite); } - ~Stripe() override { ats_free(agg_buffer); } + Queue &get_pending_writers(); + char *get_agg_buffer(); + int get_agg_buf_pos() const; + void reset_agg_buf_pos(); + int get_agg_todo_size() const; + void add_agg_todo(int size); private: void _clear_init(); void _init_dir(); void _init_data_internal(); void _init_data(); + + AggregateWriteBuffer _write_buffer; }; struct AIO_failure_handler : public Continuation { @@ -402,7 +406,7 @@ Stripe::vol_out_of_phase_write_valid(Dir *e) const inline int Stripe::vol_in_phase_valid(Dir *e) const { - return (dir_offset(e) - 1 < ((this->header->write_pos + this->agg_buf_pos - this->start) / CACHE_BLOCK_SIZE)); + return (dir_offset(e) - 1 < ((this->header->write_pos + this->_write_buffer.get_buffer_pos() - this->start) / CACHE_BLOCK_SIZE)); } inline off_t @@ -426,7 +430,8 @@ Stripe::vol_offset_to_offset(off_t pos) const inline int Stripe::vol_in_phase_agg_buf_valid(Dir *e) const { - return (this->vol_offset(e) >= this->header->write_pos && this->vol_offset(e) < (this->header->write_pos + this->agg_buf_pos)); + return (this->vol_offset(e) >= this->header->write_pos && + this->vol_offset(e) < (this->header->write_pos + this->_write_buffer.get_buffer_pos())); } // length of the partition not including the offset of location 0. @@ -555,3 +560,39 @@ Stripe::set_io_not_in_progress() { io.aiocb.aio_fildes = AIO_NOT_IN_PROGRESS; } + +inline Queue & +Stripe::get_pending_writers() +{ + return this->_write_buffer.get_pending_writers(); +} + +inline char * +Stripe::get_agg_buffer() +{ + return this->_write_buffer.get_buffer(); +} + +inline int +Stripe::get_agg_buf_pos() const +{ + return this->_write_buffer.get_buffer_pos(); +} + +inline void +Stripe::reset_agg_buf_pos() +{ + this->_write_buffer.reset_buffer_pos(); +} + +inline int +Stripe::get_agg_todo_size() const +{ + return this->_write_buffer.get_bytes_pending_aggregation(); +} + +inline void +Stripe::add_agg_todo(int size) +{ + this->_write_buffer.add_bytes_pending_aggregation(size); +}