From f314719618d181036ceebcf45338ab76559daf55 Mon Sep 17 00:00:00 2001 From: Tatsuhiro Tsujikawa Date: Tue, 27 Nov 2012 22:04:59 +0900 Subject: [PATCH] Added --disk-cache option This option enables disk cache. If SIZE is 0, the disk cache is disabled. This feature caches the downloaded data in memory, which grows to at most SIZE bytes. The cache storage is created for aria2 instance and shared by all downloads. The one advantage of the disk cache is reduce the disk seek time because the data is written in larger unit and it is reordered by the offset of the file. If the underlying file is heavily fragmented it is not the case. --- src/BtPieceMessage.cc | 68 +++++++++------ src/BtPieceMessage.h | 2 - src/DefaultPieceStorage.cc | 36 +++++++- src/DefaultPieceStorage.h | 11 +++ src/DownloadCommand.cc | 74 ++++++++++++++--- src/DownloadCommand.h | 2 + src/DownloadEngineFactory.cc | 1 + src/Makefile.am | 4 +- src/OptionHandlerFactory.cc | 9 ++ src/Piece.cc | 111 ++++++++++++++++++++++++- src/Piece.h | 20 +++++ src/PieceStorage.h | 6 ++ src/RequestGroup.cc | 4 + src/RequestGroupMan.cc | 20 ++++- src/RequestGroupMan.h | 12 +++ src/SinkStreamFilter.cc | 19 ++++- src/SinkStreamFilter.h | 6 +- src/UnknownLengthPieceStorage.h | 4 + src/WrDiskCache.cc | 129 +++++++++++++++++++++++++++++ src/WrDiskCache.h | 83 +++++++++++++++++++ src/WrDiskCacheEntry.cc | 110 +++++++++++++++++++++++++ src/WrDiskCacheEntry.h | 142 ++++++++++++++++++++++++++++++++ src/prefs.cc | 2 + src/prefs.h | 2 + src/usage_text.h | 12 +++ test/Makefile.am | 4 +- test/MockPieceStorage.h | 6 ++ test/PieceTest.cc | 68 ++++++++++++++- test/SinkStreamFilterTest.cc | 4 +- test/TestUtil.cc | 14 ++++ test/TestUtil.h | 5 ++ test/WrDiskCacheEntryTest.cc | 55 +++++++++++++ test/WrDiskCacheTest.cc | 74 +++++++++++++++++ 33 files changed, 1062 insertions(+), 57 deletions(-) create mode 100644 src/WrDiskCache.cc create mode 100644 src/WrDiskCache.h create mode 100644 src/WrDiskCacheEntry.cc create mode 100644 src/WrDiskCacheEntry.h create mode 100644 test/WrDiskCacheEntryTest.cc create mode 100644 test/WrDiskCacheTest.cc diff --git a/src/BtPieceMessage.cc b/src/BtPieceMessage.cc index 3ddc140a..f2e7dccd 100644 --- a/src/BtPieceMessage.cc +++ b/src/BtPieceMessage.cc @@ -57,6 +57,9 @@ #include "DownloadContext.h" #include "PeerStorage.h" #include "array_fun.h" +#include "WrDiskCache.h" +#include "WrDiskCacheEntry.h" +#include "DownloadFailureException.h" namespace aria2 { @@ -119,8 +122,17 @@ void BtPieceMessage::doReceivedAction() A2_LOG_DEBUG("Already have this block."); return; } - getPieceStorage()->getDiskAdaptor()->writeData - (data_+9, blockLength_, offset); + if(piece->getWrDiskCacheEntry()) { + // Write Disk Cache enabled. Unfortunately, it incurs extra data + // copy. + unsigned char* dataCopy = new unsigned char[blockLength_]; + memcpy(dataCopy, data_+9, blockLength_); + piece->updateWrCache(getPieceStorage()->getWrDiskCache(), + dataCopy, 0, blockLength_, offset); + } else { + getPieceStorage()->getDiskAdaptor()->writeData(data_+9, blockLength_, + offset); + } piece->completeBlock(slot.getBlockIndex()); A2_LOG_DEBUG(fmt(MSG_PIECE_BITFIELD, getCuid(), util::toHex(piece->getBitfield(), @@ -234,16 +246,37 @@ bool BtPieceMessage::checkPieceHash(const SharedHandle& piece) return piece->getDigest() == downloadContext_->getPieceHash(piece->getIndex()); } else { - int64_t offset = static_cast(piece->getIndex()) - *downloadContext_->getPieceLength(); - return message_digest::staticSHA1Digest - (getPieceStorage()->getDiskAdaptor(), offset, piece->getLength()) - == downloadContext_->getPieceHash(piece->getIndex()); + A2_LOG_DEBUG(fmt("Calculating hash index=%lu", + static_cast(piece->getIndex()))); + try { + return piece->getDigestWithWrCache(downloadContext_->getPieceLength(), + getPieceStorage()->getDiskAdaptor()) + == downloadContext_->getPieceHash(piece->getIndex()); + } catch(RecoverableException& e) { + piece->clearAllBlock(); + if(piece->getWrDiskCacheEntry()) { + piece->clearWrCache(getPieceStorage()->getWrDiskCache()); + } + throw; + } } } void BtPieceMessage::onNewPiece(const SharedHandle& piece) { + if(piece->getWrDiskCacheEntry()) { + // We flush cached data whenever an whole piece is retrieved. + piece->flushWrCache(getPieceStorage()->getWrDiskCache()); + if(piece->getWrDiskCacheEntry()->getError() != + WrDiskCacheEntry::CACHE_ERR_SUCCESS) { + piece->clearAllBlock(); + piece->clearWrCache(getPieceStorage()->getWrDiskCache()); + throw DOWNLOAD_FAILURE_EXCEPTION2 + (fmt("Write disk cache flush failure index=%lu", + static_cast(piece->getIndex())), + piece->getWrDiskCacheEntry()->getErrorCode()); + } + } A2_LOG_INFO(fmt(MSG_GOT_NEW_PIECE, getCuid(), static_cast(piece->getIndex()))); @@ -256,29 +289,14 @@ void BtPieceMessage::onWrongPiece(const SharedHandle& piece) A2_LOG_INFO(fmt(MSG_GOT_WRONG_PIECE, getCuid(), static_cast(piece->getIndex()))); - erasePieceOnDisk(piece); + if(piece->getWrDiskCacheEntry()) { + piece->clearWrCache(getPieceStorage()->getWrDiskCache()); + } piece->clearAllBlock(); piece->destroyHashContext(); getBtRequestFactory()->removeTargetPiece(piece); } -void BtPieceMessage::erasePieceOnDisk(const SharedHandle& piece) -{ - size_t BUFSIZE = 4096; - unsigned char buf[BUFSIZE]; - memset(buf, 0, BUFSIZE); - int64_t offset = - static_cast(piece->getIndex())*downloadContext_->getPieceLength(); - div_t res = div(piece->getLength(), BUFSIZE); - for(int i = 0; i < res.quot; ++i) { - getPieceStorage()->getDiskAdaptor()->writeData(buf, BUFSIZE, offset); - offset += BUFSIZE; - } - if(res.rem > 0) { - getPieceStorage()->getDiskAdaptor()->writeData(buf, res.rem, offset); - } -} - void BtPieceMessage::onChokingEvent(const BtChokingEvent& event) { if(!isInvalidate() && diff --git a/src/BtPieceMessage.h b/src/BtPieceMessage.h index 6dde2db4..adcb20ec 100644 --- a/src/BtPieceMessage.h +++ b/src/BtPieceMessage.h @@ -61,8 +61,6 @@ private: void onWrongPiece(const SharedHandle& piece); - void erasePieceOnDisk(const SharedHandle& piece); - void pushPieceData(int64_t offset, int32_t length) const; public: BtPieceMessage(size_t index = 0, int32_t begin = 0, int32_t blockLength = 0); diff --git a/src/DefaultPieceStorage.cc b/src/DefaultPieceStorage.cc index de179bad..63fba262 100644 --- a/src/DefaultPieceStorage.cc +++ b/src/DefaultPieceStorage.cc @@ -65,6 +65,8 @@ #include "bitfield.h" #include "SingletonHolder.h" #include "Notifier.h" +#include "WrDiskCache.h" +#include "RequestGroup.h" #ifdef ENABLE_BITTORRENT # include "bittorrent_helper.h" #endif // ENABLE_BITTORRENT @@ -81,7 +83,8 @@ DefaultPieceStorage::DefaultPieceStorage endGamePieceNum_(END_GAME_PIECE_NUM), option_(option), pieceStatMan_(new PieceStatMan(downloadContext->getNumPieces(), true)), - pieceSelector_(new RarestPieceSelector(pieceStatMan_)) + pieceSelector_(new RarestPieceSelector(pieceStatMan_)), + wrDiskCache_(0) { const std::string& pieceSelectorOpt = option_->get(PREF_STREAM_PIECE_SELECTOR); @@ -116,6 +119,13 @@ SharedHandle DefaultPieceStorage::checkOutPiece addUsedPiece(piece); } piece->addUser(cuid); + RequestGroup* group = downloadContext_->getOwnerRequestGroup(); + if((!group || !group->inMemoryDownload()) && + wrDiskCache_ && !piece->getWrDiskCacheEntry()) { + // So, we rely on the fact that diskAdaptor_ is not reinitialized + // in the session. + piece->initWrCache(wrDiskCache_, diskAdaptor_); + } return piece; } @@ -401,6 +411,7 @@ void DefaultPieceStorage::deleteUsedPiece(const SharedHandle& piece) return; } usedPieces_.erase(piece); + piece->releaseWrCache(wrDiskCache_); } // void DefaultPieceStorage::reduceUsedPieces(size_t upperBound) @@ -661,6 +672,29 @@ SharedHandle DefaultPieceStorage::getDiskAdaptor() { return diskAdaptor_; } +WrDiskCache* DefaultPieceStorage::getWrDiskCache() +{ + return wrDiskCache_; +} + +void DefaultPieceStorage::flushWrDiskCacheEntry() +{ + if(!wrDiskCache_) { + return; + } + // UsedPieceSet is sorted by piece index. It means we can flush + // cache by non-decreasing offset, which is good to reduce disk seek + // unless the file is heavily fragmented. + for(UsedPieceSet::const_iterator i = usedPieces_.begin(), + eoi = usedPieces_.end(); i != eoi; ++i) { + WrDiskCacheEntry* ce = (*i)->getWrDiskCacheEntry(); + if(ce) { + (*i)->flushWrCache(wrDiskCache_); + (*i)->releaseWrCache(wrDiskCache_); + } + } +} + int32_t DefaultPieceStorage::getPieceLength(size_t index) { return bitfieldMan_->getBlockLength(index); diff --git a/src/DefaultPieceStorage.h b/src/DefaultPieceStorage.h index 1dd35c29..328d0876 100644 --- a/src/DefaultPieceStorage.h +++ b/src/DefaultPieceStorage.h @@ -92,6 +92,8 @@ private: SharedHandle pieceSelector_; SharedHandle streamPieceSelector_; + + WrDiskCache* wrDiskCache_; #ifdef ENABLE_BITTORRENT void getMissingPiece (std::vector >& pieces, @@ -242,6 +244,10 @@ public: virtual SharedHandle getDiskAdaptor(); + virtual WrDiskCache* getWrDiskCache(); + + virtual void flushWrDiskCacheEntry(); + virtual int32_t getPieceLength(size_t index); virtual void advertisePiece(cuid_t cuid, size_t index); @@ -304,6 +310,11 @@ public: { return pieceSelector_; } + + void setWrDiskCache(WrDiskCache* wrDiskCache) + { + wrDiskCache_ = wrDiskCache; + } }; } // namespace aria2 diff --git a/src/DownloadCommand.cc b/src/DownloadCommand.cc index 574ed663..09a62fd3 100644 --- a/src/DownloadCommand.cc +++ b/src/DownloadCommand.cc @@ -62,6 +62,9 @@ #include "SinkStreamFilter.h" #include "FileEntry.h" #include "SocketRecvBuffer.h" +#include "Piece.h" +#include "WrDiskCacheEntry.h" +#include "DownloadFailureException.h" #ifdef ENABLE_MESSAGE_DIGEST # include "MessageDigest.h" # include "message_digest_helper.h" @@ -105,7 +108,9 @@ DownloadCommand::DownloadCommand peerStat_->downloadStart(); getSegmentMan()->registerPeerStat(peerStat_); - streamFilter_.reset(new SinkStreamFilter(pieceHashValidationEnabled_)); + WrDiskCache* wrDiskCache = getPieceStorage()->getWrDiskCache(); + streamFilter_.reset(new SinkStreamFilter(wrDiskCache, + pieceHashValidationEnabled_)); streamFilter_->init(); sinkFilterOnly_ = true; checkSocketRecvBuffer(); @@ -116,6 +121,37 @@ DownloadCommand::~DownloadCommand() { getSegmentMan()->updateFastestPeerStat(peerStat_); } +namespace { +void flushWrDiskCacheEntry(WrDiskCache* wrDiskCache, + const SharedHandle& segment) +{ + const SharedHandle& piece = segment->getPiece(); + if(piece && piece->getWrDiskCacheEntry()) { + piece->flushWrCache(wrDiskCache); + if(piece->getWrDiskCacheEntry()->getError() != + WrDiskCacheEntry::CACHE_ERR_SUCCESS) { + segment->clear(); + piece->clearWrCache(wrDiskCache); + throw DOWNLOAD_FAILURE_EXCEPTION2 + (fmt("Write disk cache flush failure index=%lu", + static_cast(piece->getIndex())), + piece->getWrDiskCacheEntry()->getErrorCode()); + } + } +} +} // namespace + +namespace { +void clearWrDiskCacheEntry(WrDiskCache* wrDiskCache, + const SharedHandle& segment) +{ + const SharedHandle& piece = segment->getPiece(); + if(piece && piece->getWrDiskCacheEntry()) { + piece->clearWrCache(wrDiskCache); + } +} +} // namespace + bool DownloadCommand::executeInternal() { if(getDownloadEngine()->getRequestGroupMan()->doesOverallDownloadSpeedExceed() || getRequestGroup()->doesDownloadSpeedExceed()) { @@ -218,6 +254,7 @@ bool DownloadCommand::executeInternal() { // completed. A2_LOG_INFO(fmt(MSG_SEGMENT_DOWNLOAD_COMPLETED, getCuid())); + #ifdef ENABLE_MESSAGE_DIGEST { @@ -226,6 +263,7 @@ bool DownloadCommand::executeInternal() { if(pieceHashValidationEnabled_ && !expectedPieceHash.empty()) { if( #ifdef ENABLE_BITTORRENT + // TODO Is this necessary? (!getPieceStorage()->isEndGame() || !getDownloadContext()->hasAttribute(CTX_ATTR_BT)) && #endif // ENABLE_BITTORRENT @@ -235,22 +273,26 @@ bool DownloadCommand::executeInternal() { validatePieceHash (segment, expectedPieceHash, segment->getDigest()); } else { - messageDigest_->reset(); - validatePieceHash - (segment, expectedPieceHash, - message_digest::digest - (messageDigest_, - getPieceStorage()->getDiskAdaptor(), - segment->getPosition(), - segment->getLength())); + try { + std::string actualHash = + segment->getPiece()->getDigestWithWrCache + (segment->getSegmentLength(), diskAdaptor); + validatePieceHash(segment, expectedPieceHash, actualHash); + } catch(RecoverableException& e) { + segment->clear(); + clearWrDiskCacheEntry(getPieceStorage()->getWrDiskCache(), + segment); + getSegmentMan()->cancelSegment(getCuid()); + throw; + } } } else { - getSegmentMan()->completeSegment(getCuid(), segment); + completeSegment(getCuid(), segment); } } #else // !ENABLE_MESSAGE_DIGEST - getSegmentMan()->completeSegment(getCuid(), segment); + completeSegment(getCuid(), segment); #endif // !ENABLE_MESSAGE_DIGEST } else { // If segment is not canceled here, in the next pipelining @@ -357,7 +399,7 @@ void DownloadCommand::validatePieceHash(const SharedHandle& segment, { if(actualHash == expectedHash) { A2_LOG_INFO(fmt(MSG_GOOD_CHUNK_CHECKSUM, util::toHex(actualHash).c_str())); - getSegmentMan()->completeSegment(getCuid(), segment); + completeSegment(getCuid(), segment); } else { A2_LOG_INFO(fmt(EX_INVALID_CHUNK_CHECKSUM, static_cast(segment->getIndex()), @@ -365,6 +407,7 @@ void DownloadCommand::validatePieceHash(const SharedHandle& segment, util::toHex(expectedHash).c_str(), util::toHex(actualHash).c_str())); segment->clear(); + clearWrDiskCacheEntry(getPieceStorage()->getWrDiskCache(), segment); getSegmentMan()->cancelSegment(getCuid()); throw DL_RETRY_EX (fmt("Invalid checksum index=%lu", @@ -374,6 +417,13 @@ void DownloadCommand::validatePieceHash(const SharedHandle& segment, #endif // ENABLE_MESSAGE_DIGEST +void DownloadCommand::completeSegment(cuid_t cuid, + const SharedHandle& segment) +{ + flushWrDiskCacheEntry(getPieceStorage()->getWrDiskCache(), segment); + getSegmentMan()->completeSegment(cuid, segment); +} + void DownloadCommand::installStreamFilter (const SharedHandle& streamFilter) { diff --git a/src/DownloadCommand.h b/src/DownloadCommand.h index e2bc374f..544da2a6 100644 --- a/src/DownloadCommand.h +++ b/src/DownloadCommand.h @@ -67,6 +67,8 @@ private: void checkLowestDownloadSpeed() const; + void completeSegment(cuid_t cuid, const SharedHandle& segment); + SharedHandle streamFilter_; bool sinkFilterOnly_; diff --git a/src/DownloadEngineFactory.cc b/src/DownloadEngineFactory.cc index 0aba1494..ee328c5e 100644 --- a/src/DownloadEngineFactory.cc +++ b/src/DownloadEngineFactory.cc @@ -137,6 +137,7 @@ DownloadEngineFactory::newDownloadEngine SharedHandle requestGroupMan(new RequestGroupMan(requestGroups, MAX_CONCURRENT_DOWNLOADS, op)); + requestGroupMan->initWrDiskCache(); e->setRequestGroupMan(requestGroupMan); e->setFileAllocationMan (SharedHandle(new FileAllocationMan())); diff --git a/src/Makefile.am b/src/Makefile.am index 8408ad8a..2e9217ef 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -240,7 +240,9 @@ SRCS = SocketCore.cc SocketCore.h\ Notifier.cc Notifier.h\ ValueBaseDiskWriter.h\ AnonDiskWriterFactory.h\ - XmlRpcRequestParserController.cc XmlRpcRequestParserController.h + XmlRpcRequestParserController.cc XmlRpcRequestParserController.h\ + WrDiskCache.cc WrDiskCache.h\ + WrDiskCacheEntry.cc WrDiskCacheEntry.h if MINGW_BUILD SRCS += WinConsoleFile.cc WinConsoleFile.h diff --git a/src/OptionHandlerFactory.cc b/src/OptionHandlerFactory.cc index d599bc0a..79e1952e 100644 --- a/src/OptionHandlerFactory.cc +++ b/src/OptionHandlerFactory.cc @@ -204,6 +204,15 @@ std::vector OptionHandlerFactory::createOptionHandlers() op->addTag(TAG_ADVANCED); handlers.push_back(op); } + { + OptionHandler* op(new UnitNumberOptionHandler + (PREF_DISK_CACHE, + TEXT_DISK_CACHE, + "0", + 0)); + op->addTag(TAG_ADVANCED); + handlers.push_back(op); + } { OptionHandler* op(new BooleanOptionHandler (PREF_DEFERRED_INPUT, diff --git a/src/Piece.cc b/src/Piece.cc index 283a34ee..1f7e4596 100644 --- a/src/Piece.cc +++ b/src/Piece.cc @@ -38,6 +38,11 @@ #include "A2STR.h" #include "util.h" #include "a2functional.h" +#include "WrDiskCache.h" +#include "WrDiskCacheEntry.h" +#include "LogFactory.h" +#include "fmt.h" +#include "DiskAdaptor.h" #ifdef ENABLE_MESSAGE_DIGEST # include "MessageDigest.h" #endif // ENABLE_MESSAGE_DIGEST @@ -45,7 +50,7 @@ namespace aria2 { Piece::Piece():index_(0), length_(0), blockLength_(BLOCK_LENGTH), bitfield_(0), - usedBySegment_(false) + usedBySegment_(false), wrCache_(0) #ifdef ENABLE_MESSAGE_DIGEST , nextBegin_(0) #endif // ENABLE_MESSAGE_DIGEST @@ -56,7 +61,7 @@ Piece::Piece(size_t index, int32_t length, int32_t blockLength) length_(length), blockLength_(blockLength), bitfield_(new BitfieldMan(blockLength_, length)), - usedBySegment_(false) + usedBySegment_(false), wrCache_(0) #ifdef ENABLE_MESSAGE_DIGEST ,nextBegin_(0) #endif // ENABLE_MESSAGE_DIGEST @@ -64,6 +69,7 @@ Piece::Piece(size_t index, int32_t length, int32_t blockLength) Piece::~Piece() { + delete wrCache_; delete bitfield_; } @@ -232,6 +238,56 @@ std::string Piece::getDigest() } } +namespace { +void updateHashWithRead(const SharedHandle& mdctx, + const SharedHandle& adaptor, + int64_t offset, size_t len) +{ + const size_t BUFSIZE = 4096; + unsigned char buf[BUFSIZE]; + ldiv_t res = ldiv(len, BUFSIZE); + for(int j = 0; j < res.quot; ++j) { + ssize_t nread = adaptor->readData(buf, BUFSIZE, offset); + if((size_t)nread != BUFSIZE) { + throw DL_ABORT_EX(fmt(EX_FILE_READ, "n/a", "data is too short")); + } + mdctx->update(buf, nread); + offset += nread; + } + if(res.rem) { + ssize_t nread = adaptor->readData(buf, res.rem, offset); + if(nread != res.rem) { + throw DL_ABORT_EX(fmt(EX_FILE_READ, "n/a", "data is too short")); + } + mdctx->update(buf, nread); + offset += nread; + } +} +} // namespace + +std::string Piece::getDigestWithWrCache +(size_t pieceLength, const SharedHandle& adaptor) +{ + SharedHandle mdctx(MessageDigest::create(hashType_)); + int64_t start = static_cast(index_)*pieceLength; + int64_t goff = start; + if(wrCache_) { + const WrDiskCacheEntry::DataCellSet& dataSet = wrCache_->getDataSet(); + for(WrDiskCacheEntry::DataCellSet::iterator i = dataSet.begin(), + eoi = dataSet.end(); i != eoi; ++i) { + if(goff < (*i)->goff) { + updateHashWithRead(mdctx, adaptor, goff, (*i)->goff - goff); + } + mdctx->update((*i)->data+(*i)->offset, (*i)->len); + goff = (*i)->goff + (*i)->len; + } + updateHashWithRead(mdctx, adaptor, goff, start+length_-goff); + } else { + updateHashWithRead(mdctx, adaptor, goff, length_); + } + return mdctx->digest(); +} + void Piece::destroyHashContext() { mdctx_.reset(); @@ -257,4 +313,55 @@ void Piece::removeUser(cuid_t cuid) users_.erase(std::remove(users_.begin(), users_.end(), cuid), users_.end()); } +void Piece::initWrCache(WrDiskCache* diskCache, + const SharedHandle& diskAdaptor) +{ + assert(wrCache_ == 0); + wrCache_ = new WrDiskCacheEntry(diskAdaptor); + bool rv = diskCache->add(wrCache_); + assert(rv); +} + +void Piece::flushWrCache(WrDiskCache* diskCache) +{ + assert(wrCache_); + ssize_t size = static_cast(wrCache_->getSize()); + diskCache->update(wrCache_, -size); + wrCache_->writeToDisk(); +} + +void Piece::clearWrCache(WrDiskCache* diskCache) +{ + assert(wrCache_); + ssize_t size = static_cast(wrCache_->getSize()); + diskCache->update(wrCache_, -size); + wrCache_->clear(); +} + +void Piece::updateWrCache(WrDiskCache* diskCache, unsigned char* data, + size_t offset, size_t len, int64_t goff) +{ + A2_LOG_DEBUG(fmt("updateWrCache entry=%p", wrCache_)); + assert(wrCache_); + WrDiskCacheEntry::DataCell* cell = new WrDiskCacheEntry::DataCell(); + cell->goff = goff; + cell->data = data; + cell->offset = offset; + cell->len = len; + bool rv; + rv = wrCache_->cacheData(cell); + assert(rv); + rv = diskCache->update(wrCache_, len); + assert(rv); +} + +void Piece::releaseWrCache(WrDiskCache* diskCache) +{ + if(wrCache_) { + diskCache->remove(wrCache_); + delete wrCache_; + wrCache_ = 0; + } +} + } // namespace aria2 diff --git a/src/Piece.h b/src/Piece.h index 65fa4fca..9905d7d7 100644 --- a/src/Piece.h +++ b/src/Piece.h @@ -47,6 +47,9 @@ namespace aria2 { class BitfieldMan; +class WrDiskCache; +class WrDiskCacheEntry; +class DiskAdaptor; #ifdef ENABLE_MESSAGE_DIGEST @@ -62,6 +65,7 @@ private: BitfieldMan* bitfield_; std::vector users_; bool usedBySegment_; + WrDiskCacheEntry* wrCache_; #ifdef ENABLE_MESSAGE_DIGEST int32_t nextBegin_; @@ -172,6 +176,10 @@ public: void destroyHashContext(); + // Returns raw hash value, not hex digest, which is calculated using + // cached data and data on disk. + std::string getDigestWithWrCache(size_t pieceLength, + const SharedHandle& adaptor); #endif // ENABLE_MESSAGE_DIGEST /** @@ -194,6 +202,18 @@ public: { usedBySegment_ = f; } + + void initWrCache(WrDiskCache* diskCache, + const SharedHandle& diskAdaptor); + void flushWrCache(WrDiskCache* diskCache); + void clearWrCache(WrDiskCache* diskCache); + void updateWrCache(WrDiskCache* diskCache, unsigned char* data, + size_t offset, size_t len, int64_t goff); + void releaseWrCache(WrDiskCache* diskCache); + WrDiskCacheEntry* getWrDiskCacheEntry() const + { + return wrCache_; + } }; } // namespace aria2 diff --git a/src/PieceStorage.h b/src/PieceStorage.h index d6d99978..14b7160a 100644 --- a/src/PieceStorage.h +++ b/src/PieceStorage.h @@ -51,6 +51,7 @@ class Piece; class Peer; #endif // ENABLE_BITTORRENT class DiskAdaptor; +class WrDiskCache; class PieceStorage { public: @@ -228,6 +229,11 @@ public: virtual SharedHandle getDiskAdaptor() = 0; + virtual WrDiskCache* getWrDiskCache() = 0; + + // Flushes write disk cache for in-flight piece and evicts them. + virtual void flushWrDiskCacheEntry() = 0; + virtual int32_t getPieceLength(size_t index) = 0; /** diff --git a/src/RequestGroup.cc b/src/RequestGroup.cc index 02154f52..c296c321 100644 --- a/src/RequestGroup.cc +++ b/src/RequestGroup.cc @@ -210,6 +210,7 @@ error_code::Value RequestGroup::downloadResult() const void RequestGroup::closeFile() { if(pieceStorage_) { + pieceStorage_->flushWrDiskCacheEntry(); pieceStorage_->getDiskAdaptor()->closeFile(); } } @@ -621,6 +622,9 @@ void RequestGroup::initPieceStorage() new DefaultPieceStorage(downloadContext_, option_.get()); SharedHandle psHolder(ps); #endif // !ENABLE_BITTORRENT + if(requestGroupMan_) { + ps->setWrDiskCache(requestGroupMan_->getWrDiskCache()); + } if(diskWriterFactory_) { ps->setDiskWriterFactory(diskWriterFactory_); } diff --git a/src/RequestGroupMan.cc b/src/RequestGroupMan.cc index ab54dff5..83914ea9 100644 --- a/src/RequestGroupMan.cc +++ b/src/RequestGroupMan.cc @@ -78,6 +78,7 @@ #include "SingletonHolder.h" #include "Notifier.h" #include "PeerStat.h" +#include "WrDiskCache.h" #ifdef ENABLE_BITTORRENT # include "bittorrent_helper.h" #endif // ENABLE_BITTORRENT @@ -100,12 +101,16 @@ RequestGroupMan::RequestGroupMan queueCheck_(true), removedErrorResult_(0), removedLastErrorResult_(error_code::FINISHED), - maxDownloadResult_(option->getAsInt(PREF_MAX_DOWNLOAD_RESULT)) + maxDownloadResult_(option->getAsInt(PREF_MAX_DOWNLOAD_RESULT)), + wrDiskCache_(0) { addRequestGroupIndex(requestGroups); } -RequestGroupMan::~RequestGroupMan() {} +RequestGroupMan::~RequestGroupMan() +{ + delete wrDiskCache_; +} bool RequestGroupMan::downloadFinished() { @@ -620,8 +625,8 @@ void RequestGroupMan::fillRequestGroupFromReserver(DownloadEngine* e) // reference. groupToAdd->dropPieceStorage(); configureRequestGroup(groupToAdd); - createInitialCommand(groupToAdd, commands, e); groupToAdd->setRequestGroupMan(this); + createInitialCommand(groupToAdd, commands, e); if(commands.empty()) { requestQueueCheck(); } @@ -1087,4 +1092,13 @@ void RequestGroupMan::setUriListParser uriListParser_ = uriListParser; } +void RequestGroupMan::initWrDiskCache() +{ + assert(wrDiskCache_ == 0); + size_t limit = option_->getAsInt(PREF_DISK_CACHE); + if(limit > 0) { + wrDiskCache_ = new WrDiskCache(limit); + } +} + } // namespace aria2 diff --git a/src/RequestGroupMan.h b/src/RequestGroupMan.h index 9eee73fe..91fa8463 100644 --- a/src/RequestGroupMan.h +++ b/src/RequestGroupMan.h @@ -58,6 +58,7 @@ class ServerStat; class Option; class OutputFile; class UriListParser; +class WrDiskCache; class RequestGroupMan { private: @@ -95,6 +96,8 @@ private: // UriListParser for deferred input. SharedHandle uriListParser_; + WrDiskCache* wrDiskCache_; + void formatDownloadResultFull (OutputFile& out, const char* status, @@ -341,6 +344,15 @@ public: { return netStat_; } + + WrDiskCache* getWrDiskCache() const + { + return wrDiskCache_; + } + + // Initializes WrDiskCache according to PREF_DISK_CACHE option. If + // its value is 0, cache storage will not be initialized. + void initWrDiskCache(); }; } // namespace aria2 diff --git a/src/SinkStreamFilter.cc b/src/SinkStreamFilter.cc index 089cf37e..a5e4566b 100644 --- a/src/SinkStreamFilter.cc +++ b/src/SinkStreamFilter.cc @@ -33,14 +33,20 @@ */ /* copyright --> */ #include "SinkStreamFilter.h" + +#include + #include "BinaryStream.h" #include "Segment.h" +#include "WrDiskCache.h" +#include "Piece.h" namespace aria2 { const std::string SinkStreamFilter::NAME("SinkStreamFilter"); -SinkStreamFilter::SinkStreamFilter(bool hashUpdate): +SinkStreamFilter::SinkStreamFilter(WrDiskCache* wrDiskCache, bool hashUpdate): + wrDiskCache_(wrDiskCache), hashUpdate_(hashUpdate), bytesProcessed_(0) {} @@ -60,7 +66,16 @@ ssize_t SinkStreamFilter::transform } else { wlen = inlen; } - out->writeData(inbuf, wlen, segment->getPositionToWrite()); + const SharedHandle& piece = segment->getPiece(); + if(piece && piece->getWrDiskCacheEntry()) { + assert(wrDiskCache_); + unsigned char* dataCopy = new unsigned char[wlen]; + memcpy(dataCopy, inbuf, wlen); + piece->updateWrCache(wrDiskCache_, dataCopy, 0, wlen, + segment->getPositionToWrite()); + } else { + out->writeData(inbuf, wlen, segment->getPositionToWrite()); + } #ifdef ENABLE_MESSAGE_DIGEST if(hashUpdate_) { segment->updateHash(segment->getWrittenLength(), inbuf, wlen); diff --git a/src/SinkStreamFilter.h b/src/SinkStreamFilter.h index c8dcc76f..908489a9 100644 --- a/src/SinkStreamFilter.h +++ b/src/SinkStreamFilter.h @@ -39,13 +39,15 @@ namespace aria2 { +class WrDiskCache; + class SinkStreamFilter:public StreamFilter { private: + WrDiskCache* wrDiskCache_; bool hashUpdate_; - size_t bytesProcessed_; public: - SinkStreamFilter(bool hashUpdate = false); + SinkStreamFilter(WrDiskCache* wrDiskCache = 0, bool hashUpdate = false); virtual void init() {} diff --git a/src/UnknownLengthPieceStorage.h b/src/UnknownLengthPieceStorage.h index f7a64e54..643395a7 100644 --- a/src/UnknownLengthPieceStorage.h +++ b/src/UnknownLengthPieceStorage.h @@ -231,6 +231,10 @@ public: virtual SharedHandle getDiskAdaptor(); + virtual WrDiskCache* getWrDiskCache() { return 0; } + + virtual void flushWrDiskCacheEntry() {} + virtual int32_t getPieceLength(size_t index); /** diff --git a/src/WrDiskCache.cc b/src/WrDiskCache.cc new file mode 100644 index 00000000..79af80e9 --- /dev/null +++ b/src/WrDiskCache.cc @@ -0,0 +1,129 @@ +/* */ +#include "WrDiskCache.h" +#include "WrDiskCacheEntry.h" +#include "LogFactory.h" +#include "fmt.h" + +namespace aria2 { + +WrDiskCache::WrDiskCache(size_t limit) + : limit_(limit), + total_(0), + clock_(0) +{} + +WrDiskCache::~WrDiskCache() +{ + if(total_) { + A2_LOG_WARN(fmt("Write disk cache is not empty size=%lu", + static_cast(total_))); + } +} + +bool WrDiskCache::add(WrDiskCacheEntry* ent) +{ + ent->setSizeKey(ent->getSize()); + ent->setLastUpdate(++clock_); + std::pair rv = set_.insert(ent); + if(rv.second) { + total_ += ent->getSize(); + ensureLimit(); + return true; + } else { + A2_LOG_WARN(fmt("Found duplicate cache entry a.{size=%lu,clock=%"PRId64 + "} b{size=%lu,clock=%"PRId64"}", + static_cast((*rv.first)->getSize()), + (*rv.first)->getLastUpdate(), + static_cast(ent->getSize()), + ent->getLastUpdate())); + return false; + } +} + +bool WrDiskCache::remove(WrDiskCacheEntry* ent) +{ + if(set_.erase(ent)) { + A2_LOG_DEBUG(fmt("Removed cache entry size=%lu, clock=%"PRId64, + static_cast(ent->getSize()), + ent->getLastUpdate())); + total_ -= ent->getSize(); + return true; + } else { + return false; + } +} + +bool WrDiskCache::update(WrDiskCacheEntry* ent, ssize_t delta) +{ + if(!set_.erase(ent)) { + return false; + } + A2_LOG_DEBUG(fmt("Update cache entry size=%lu, delta=%ld, clock=%"PRId64, + static_cast(ent->getSize()), + static_cast(delta), + ent->getLastUpdate())); + + ent->setSizeKey(ent->getSize()); + ent->setLastUpdate(++clock_); + set_.insert(ent); + + if(delta < 0) { + assert(total_ >= static_cast(-delta)); + } + total_ += delta; + ensureLimit(); + return true; +} + +void WrDiskCache::ensureLimit() +{ + while(total_ > limit_) { + EntrySet::iterator i = set_.begin(); + total_ -= (*i)->getSize(); + (*i)->writeToDisk(); + WrDiskCacheEntry* ent = *i; + A2_LOG_DEBUG(fmt("Force flush cache entry size=%lu, clock=%"PRId64, + static_cast(ent->getSizeKey()), + ent->getLastUpdate())); + set_.erase(i); + + ent->setSizeKey(ent->getSize()); + ent->setLastUpdate(++clock_); + set_.insert(ent); + } +} + +} // namespace aria2 diff --git a/src/WrDiskCache.h b/src/WrDiskCache.h new file mode 100644 index 00000000..08fee829 --- /dev/null +++ b/src/WrDiskCache.h @@ -0,0 +1,83 @@ +/* */ +#ifndef D_WR_DISK_CACHE_H +#define D_WR_DISK_CACHE_H + +#include "common.h" + +#include + +#include "a2functional.h" + +namespace aria2 { + +class WrDiskCacheEntry; + +class WrDiskCache { +public: + WrDiskCache(size_t limit); + ~WrDiskCache(); + // Adds the cache entry |ent| to the storage. The size of cached + // data of ent is added to total_. + bool add(WrDiskCacheEntry* ent); + // Removes the cache entry |ent| from the stroage. The size of + // cached data of ent is subtracted from total_. + bool remove(WrDiskCacheEntry* ent); + // Updates the already added entry |ent|. The |delta| means how many + // bytes is increased in this update. If the size is reduced, use + // negative value. + bool update(WrDiskCacheEntry* ent, ssize_t delta); + // Evicts entries from storage so that total size of cache is kept + // under the limit. + void ensureLimit(); + size_t getSize() const + { + return total_; + } +private: + typedef std::set > EntrySet; + // Maximum number of bytes the storage can cache. + size_t limit_; + // Current number of bytes cached. + size_t total_; + EntrySet set_; + int64_t clock_; +}; + +} // namespace aria2 + +#endif // D_WR_DISK_CACHE_H + diff --git a/src/WrDiskCacheEntry.cc b/src/WrDiskCacheEntry.cc new file mode 100644 index 00000000..06713669 --- /dev/null +++ b/src/WrDiskCacheEntry.cc @@ -0,0 +1,110 @@ +/* */ +#include "WrDiskCacheEntry.h" +#include "DiskAdaptor.h" +#include "RecoverableException.h" +#include "DownloadFailureException.h" +#include "LogFactory.h" +#include "fmt.h" + +namespace aria2 { + +WrDiskCacheEntry::WrDiskCacheEntry +(const SharedHandle& diskAdaptor) + : sizeKey_(0), + lastUpdate_(0), + size_(0), + error_(CACHE_ERR_SUCCESS), + errorCode_(error_code::UNDEFINED), + diskAdaptor_(diskAdaptor) +{} + +WrDiskCacheEntry::~WrDiskCacheEntry() +{ + if(!set_.empty()) { + A2_LOG_WARN(fmt("WrDiskCacheEntry is not empty size=%lu", + static_cast(size_))); + } + deleteDataCells(); +} + +void WrDiskCacheEntry::deleteDataCells() +{ + for(DataCellSet::iterator i = set_.begin(), eoi = set_.end(); i != eoi; + ++i) { + delete [] (*i)->data; + delete *i; + } + set_.clear(); + size_ = 0; +} + +void WrDiskCacheEntry::writeToDisk() +{ + DataCellSet::iterator i = set_.begin(), eoi = set_.end(); + try { + for(; i != eoi; ++i) { + A2_LOG_DEBUG(fmt("WrDiskCacheEntry flush goff=%"PRId64", len=%lu", + (*i)->goff, static_cast((*i)->len))); + diskAdaptor_->writeData((*i)->data+(*i)->offset, (*i)->len, + (*i)->goff); + } + } catch(RecoverableException& e) { + A2_LOG_ERROR(fmt("WrDiskCacheEntry flush error goff=%"PRId64", len=%lu", + (*i)->goff, static_cast((*i)->len))); + error_ = CACHE_ERR_ERROR; + errorCode_ = e.getErrorCode(); + } + deleteDataCells(); +} + +void WrDiskCacheEntry::clear() +{ + deleteDataCells(); +} + +bool WrDiskCacheEntry::cacheData(DataCell* dataCell) +{ + A2_LOG_DEBUG(fmt("WrDiskCacheEntry cache goff=%"PRId64", len=%lu", + dataCell->goff, static_cast(dataCell->len))); + if(set_.insert(dataCell).second) { + size_ += dataCell->len; + return true; + } else { + return false; + } +} + +} // namespace aria2 diff --git a/src/WrDiskCacheEntry.h b/src/WrDiskCacheEntry.h new file mode 100644 index 00000000..b3b31f6b --- /dev/null +++ b/src/WrDiskCacheEntry.h @@ -0,0 +1,142 @@ +/* */ +#ifndef D_WR_DISK_CACHE_ENTRY_H +#define D_WR_DISK_CACHE_ENTRY_H + +#include "common.h" + +#include + +#include "SharedHandle.h" +#include "a2functional.h" +#include "error_code.h" + +namespace aria2 { + +class DiskAdaptor; +class WrDiskCache; + +class WrDiskCacheEntry { +public: + struct DataCell { + // Where the data is going to be put in DiskAdaptor + int64_t goff; + // data must be len+offset bytes. Thus, the cached data is + // [data+offset, data+offset+len). + unsigned char *data; + size_t offset; + size_t len; + bool operator<(const DataCell& rhs) const + { + return goff < rhs.goff; + } + }; + + typedef std::set > DataCellSet; + + WrDiskCacheEntry(const SharedHandle& diskAdaptor); + ~WrDiskCacheEntry(); + + // Flushes the cached data to the disk and deletes them. + void writeToDisk(); + // Deletes cached data without flushing to the disk. + void clear(); + + // Caches |dataCell| + bool cacheData(DataCell* dataCell); + size_t getSize() const + { + return size_; + } + void setSizeKey(size_t sizeKey) + { + sizeKey_ = sizeKey; + } + size_t getSizeKey() const + { + return sizeKey_; + } + void setLastUpdate(int64_t clock) + { + lastUpdate_ = clock; + } + int64_t getLastUpdate() const + { + return lastUpdate_; + } + bool operator<(const WrDiskCacheEntry& rhs) const + { + return sizeKey_ > rhs.sizeKey_ || + (sizeKey_ == rhs.sizeKey_ && lastUpdate_ < rhs.lastUpdate_); + } + + enum { + CACHE_ERR_SUCCESS, + CACHE_ERR_ERROR + }; + + int getError() const + { + return error_; + } + error_code::Value getErrorCode() const + { + return errorCode_; + } + + const DataCellSet& getDataSet() const + { + return set_; + } +private: + void deleteDataCells(); + + size_t sizeKey_; + int64_t lastUpdate_; + + size_t size_; + + DataCellSet set_; + + int error_; + error_code::Value errorCode_; + + SharedHandle diskAdaptor_; +}; + +} // namespace aria2 + +#endif // D_WR_DISK_CACHE_ENTRY_H + diff --git a/src/prefs.cc b/src/prefs.cc index ad369015..6675de2e 100644 --- a/src/prefs.cc +++ b/src/prefs.cc @@ -338,6 +338,8 @@ const Pref* PREF_STOP_WITH_PROCESS = makePref("stop-with-process"); const Pref* PREF_ENABLE_MMAP = makePref("enable-mmap"); // value: true | false const Pref* PREF_FORCE_SAVE = makePref("force-save"); +// value: 1*digit +const Pref* PREF_DISK_CACHE = makePref("disk-cache"); /** * FTP related preferences diff --git a/src/prefs.h b/src/prefs.h index f9928c70..01ed3397 100644 --- a/src/prefs.h +++ b/src/prefs.h @@ -281,6 +281,8 @@ extern const Pref* PREF_STOP_WITH_PROCESS; extern const Pref* PREF_ENABLE_MMAP; // value: true | false extern const Pref* PREF_FORCE_SAVE; +// value: 1*digit +extern const Pref* PREF_DISK_CACHE; /** * FTP related preferences diff --git a/src/usage_text.h b/src/usage_text.h index 0d966fd5..7d0d5bcf 100644 --- a/src/usage_text.h +++ b/src/usage_text.h @@ -914,3 +914,15 @@ " if the download is completed or removed. This\n" \ " may be useful to save BitTorrent seeding which\n" \ " is recognized as completed state.") +#define TEXT_DISK_CACHE \ + _(" --disk-cache=SIZE Enable disk cache. If SIZE is 0, the disk cache\n" \ + " is disabled. This feature caches the downloaded\n" \ + " data in memory, which grows to at most SIZE\n" \ + " bytes. The cache storage is created for aria2\n" \ + " instance and shared by all downloads. The one\n" \ + " advantage of the disk cache is reduce the disk\n" \ + " seek time because the data is written in larger\n" \ + " unit and it is reordered by the offset of the\n" \ + " file. If the underlying file is heavily\n" \ + " fragmented it is not the case.\n" \ + " SIZE can include K or M(1K = 1024, 1M = 1024K).") diff --git a/test/Makefile.am b/test/Makefile.am index 711a186d..0a69c791 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -85,7 +85,9 @@ aria2c_SOURCES = AllTest.cc\ ParamedStringTest.cc\ RpcHelperTest.cc\ AbstractCommandTest.cc\ - SinkStreamFilterTest.cc + SinkStreamFilterTest.cc\ + WrDiskCacheTest.cc\ + WrDiskCacheEntryTest.cc if ENABLE_XML_RPC aria2c_SOURCES += XmlRpcRequestParserControllerTest.cc diff --git a/test/MockPieceStorage.h b/test/MockPieceStorage.h index 31e51449..16f480ad 100644 --- a/test/MockPieceStorage.h +++ b/test/MockPieceStorage.h @@ -226,6 +226,12 @@ public: return diskAdaptor; } + virtual WrDiskCache* getWrDiskCache() { + return 0; + } + + virtual void flushWrDiskCacheEntry() {} + void setDiskAdaptor(const SharedHandle& adaptor) { this->diskAdaptor = adaptor; } diff --git a/test/PieceTest.cc b/test/PieceTest.cc index 7a07fa1d..bc594b17 100644 --- a/test/PieceTest.cc +++ b/test/PieceTest.cc @@ -5,6 +5,9 @@ #include #include "util.h" +#include "DirectDiskAdaptor.h" +#include "ByteArrayDiskWriter.h" +#include "WrDiskCache.h" namespace aria2 { @@ -13,24 +16,33 @@ class PieceTest:public CppUnit::TestFixture { CPPUNIT_TEST_SUITE(PieceTest); CPPUNIT_TEST(testCompleteBlock); CPPUNIT_TEST(testGetCompletedLength); - + CPPUNIT_TEST(testFlushWrCache); #ifdef ENABLE_MESSAGE_DIGEST + CPPUNIT_TEST(testGetDigestWithWrCache); CPPUNIT_TEST(testUpdateHash); #endif // ENABLE_MESSAGE_DIGEST CPPUNIT_TEST_SUITE_END(); private: - + SharedHandle adaptor_; + SharedHandle writer_; public: - void setUp() {} + void setUp() + { + adaptor_.reset(new DirectDiskAdaptor()); + writer_.reset(new ByteArrayDiskWriter()); + adaptor_->setDiskWriter(writer_); + } void testCompleteBlock(); void testGetCompletedLength(); + void testFlushWrCache(); #ifdef ENABLE_MESSAGE_DIGEST + void testGetDigestWithWrCache(); void testUpdateHash(); #endif // ENABLE_MESSAGE_DIGEST @@ -62,8 +74,58 @@ void PieceTest::testGetCompletedLength() CPPUNIT_ASSERT_EQUAL(blockLength*3+100, p.getCompletedLength()); } +void PieceTest::testFlushWrCache() +{ + unsigned char* data; + Piece p(0, 1024); + WrDiskCache dc(64); + p.initWrCache(&dc, adaptor_); + data = new unsigned char[3]; + memcpy(data, "foo", 3); + p.updateWrCache(&dc, data, 0, 3, 0); + data = new unsigned char[4]; + memcpy(data, " bar", 4); + p.updateWrCache(&dc, data, 0, 4, 3); + p.flushWrCache(&dc); + + CPPUNIT_ASSERT_EQUAL(std::string("foo bar"), writer_->getString()); + + data = new unsigned char[3]; + memcpy(data, "foo", 3); + p.updateWrCache(&dc, data, 0, 3, 0); + CPPUNIT_ASSERT_EQUAL((size_t)3, dc.getSize()); + p.clearWrCache(&dc); + CPPUNIT_ASSERT_EQUAL((size_t)0, dc.getSize()); + p.releaseWrCache(&dc); + CPPUNIT_ASSERT(!p.getWrDiskCacheEntry()); +} + #ifdef ENABLE_MESSAGE_DIGEST +void PieceTest::testGetDigestWithWrCache() +{ + unsigned char* data; + Piece p(0, 26); + p.setHashType("sha-1"); + WrDiskCache dc(64); + // 012345678901234567890123456 + writer_->setString("abcde...ijklmnopq...uvwx.z"); + p.initWrCache(&dc, adaptor_); + data = new unsigned char[3]; + memcpy(data, "fgh", 3); + p.updateWrCache(&dc, data, 0, 3, 5); + data = new unsigned char[3]; + memcpy(data, "rst", 3); + p.updateWrCache(&dc, data, 0, 3, 17); + data = new unsigned char[1]; + memcpy(data, "y", 1); + p.updateWrCache(&dc, data, 0, 1, 24); + + CPPUNIT_ASSERT_EQUAL + (std::string("32d10c7b8cf96570ca04ce37f2a19d84240d3a89"), + util::toHex(p.getDigestWithWrCache(p.getLength(), adaptor_))); +} + void PieceTest::testUpdateHash() { Piece p(0, 16, 2*1024*1024); diff --git a/test/SinkStreamFilterTest.cc b/test/SinkStreamFilterTest.cc index 91bc6bfc..3b8bf83b 100644 --- a/test/SinkStreamFilterTest.cc +++ b/test/SinkStreamFilterTest.cc @@ -54,9 +54,7 @@ public: void setUp() { writer_.reset(new ByteArrayDiskWriter()); - sinkFilter_.reset(new SinkStreamFilter()); - filter_.reset(new SinkStreamFilter(sinkFilter_)); - sinkFilter_->init(); + filter_.reset(new SinkStreamFilter()); filter_->init(); segment_.reset(new MockSegment2(16)); } diff --git a/test/TestUtil.cc b/test/TestUtil.cc index 6dd5d811..62424c6f 100644 --- a/test/TestUtil.cc +++ b/test/TestUtil.cc @@ -91,4 +91,18 @@ std::string fileHexDigest } #endif // ENABLE_MESSAGE_DIGEST +WrDiskCacheEntry::DataCell* createDataCell(int64_t goff, + const char* data, + size_t offset) +{ + WrDiskCacheEntry::DataCell* cell = new WrDiskCacheEntry::DataCell(); + cell->goff = goff; + size_t len = strlen(data); + cell->data = new unsigned char[len]; + memcpy(cell->data, data, len); + cell->offset = offset; + cell->len = len; + return cell; +} + } // namespace aria2 diff --git a/test/TestUtil.h b/test/TestUtil.h index 9dc66b87..0eaf330c 100644 --- a/test/TestUtil.h +++ b/test/TestUtil.h @@ -4,6 +4,7 @@ #include "SharedHandle.h" #include "Cookie.h" +#include "WrDiskCacheEntry.h" namespace aria2 { @@ -50,4 +51,8 @@ std::string fileHexDigest (const SharedHandle& ctx, const std::string& filename); #endif // ENABLE_MESSAGE_DIGEST +WrDiskCacheEntry::DataCell* createDataCell(int64_t goff, + const char* data, + size_t offset = 0); + } // namespace aria2 diff --git a/test/WrDiskCacheEntryTest.cc b/test/WrDiskCacheEntryTest.cc new file mode 100644 index 00000000..ad1c894b --- /dev/null +++ b/test/WrDiskCacheEntryTest.cc @@ -0,0 +1,55 @@ +#include "WrDiskCacheEntry.h" + +#include + +#include + +#include "TestUtil.h" +#include "DirectDiskAdaptor.h" +#include "ByteArrayDiskWriter.h" + +namespace aria2 { + +class WrDiskCacheEntryTest:public CppUnit::TestFixture { + + CPPUNIT_TEST_SUITE(WrDiskCacheEntryTest); + CPPUNIT_TEST(testWriteToDisk); + CPPUNIT_TEST(testClear); + CPPUNIT_TEST_SUITE_END(); + + SharedHandle adaptor_; + SharedHandle writer_; +public: + void setUp() + { + adaptor_.reset(new DirectDiskAdaptor()); + writer_.reset(new ByteArrayDiskWriter()); + adaptor_->setDiskWriter(writer_); + } + + void testWriteToDisk(); + void testClear(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION( WrDiskCacheEntryTest ); + +void WrDiskCacheEntryTest::testWriteToDisk() +{ + WrDiskCacheEntry e(adaptor_); + e.cacheData(createDataCell(0, "??01234567", 2)); + e.cacheData(createDataCell(8, "890")); + e.writeToDisk(); + CPPUNIT_ASSERT_EQUAL((size_t)0, e.getSize()); + CPPUNIT_ASSERT_EQUAL(std::string("01234567890"), writer_->getString()); +} + +void WrDiskCacheEntryTest::testClear() +{ + WrDiskCacheEntry e(adaptor_); + e.cacheData(createDataCell(0, "foo")); + e.clear(); + CPPUNIT_ASSERT_EQUAL((size_t)0, e.getSize()); + CPPUNIT_ASSERT_EQUAL(std::string(), writer_->getString()); +} + +} // namespace aria2 diff --git a/test/WrDiskCacheTest.cc b/test/WrDiskCacheTest.cc new file mode 100644 index 00000000..6e0663a7 --- /dev/null +++ b/test/WrDiskCacheTest.cc @@ -0,0 +1,74 @@ +#include "WrDiskCache.h" + +#include + +#include + +#include "TestUtil.h" +#include "DirectDiskAdaptor.h" +#include "ByteArrayDiskWriter.h" + +namespace aria2 { + +class WrDiskCacheTest:public CppUnit::TestFixture { + + CPPUNIT_TEST_SUITE(WrDiskCacheTest); + CPPUNIT_TEST(testAdd); + CPPUNIT_TEST_SUITE_END(); + + SharedHandle adaptor_; + SharedHandle writer_; +public: + void setUp() + { + adaptor_.reset(new DirectDiskAdaptor()); + writer_.reset(new ByteArrayDiskWriter()); + adaptor_->setDiskWriter(writer_); + } + + void testAdd(); +}; + +CPPUNIT_TEST_SUITE_REGISTRATION( WrDiskCacheTest ); + +void WrDiskCacheTest::testAdd() +{ + WrDiskCache dc(20); + CPPUNIT_ASSERT_EQUAL((size_t)0, dc.getSize()); + WrDiskCacheEntry e1(adaptor_); + e1.cacheData(createDataCell(0, "who knows?")); + CPPUNIT_ASSERT(dc.add(&e1)); + CPPUNIT_ASSERT_EQUAL((size_t)10, dc.getSize()); + + WrDiskCacheEntry e2(adaptor_); + e2.cacheData(createDataCell(21, "seconddata")); + CPPUNIT_ASSERT(dc.add(&e2)); + CPPUNIT_ASSERT_EQUAL((size_t)20, dc.getSize()); + + WrDiskCacheEntry e3(adaptor_); + e3.cacheData(createDataCell(10, "hello")); + CPPUNIT_ASSERT(dc.add(&e3)); + CPPUNIT_ASSERT_EQUAL((size_t)15, dc.getSize()); + // e1 is flushed to the disk + CPPUNIT_ASSERT_EQUAL(std::string("who knows?"), writer_->getString()); + CPPUNIT_ASSERT_EQUAL((size_t)0, e1.getSize()); + + e3.cacheData(createDataCell(15, " world")); + CPPUNIT_ASSERT(dc.update(&e3, 6)); + + // e3 is flushed to the disk + CPPUNIT_ASSERT_EQUAL(std::string("who knows?hello world"), + writer_->getString()); + CPPUNIT_ASSERT_EQUAL((size_t)0, e3.getSize()); + CPPUNIT_ASSERT_EQUAL((size_t)10, dc.getSize()); + + e2.cacheData(createDataCell(31, "01234567890")); + CPPUNIT_ASSERT(dc.update(&e2, 11)); + // e2 is flushed to the disk + CPPUNIT_ASSERT_EQUAL(std::string("who knows?hello worldseconddata01234567890"), + writer_->getString()); + CPPUNIT_ASSERT_EQUAL((size_t)0, e2.getSize()); + CPPUNIT_ASSERT_EQUAL((size_t)0, dc.getSize()); +} + +} // namespace aria2