Change have entry indexing method

Now use increasing sequence of integer rather than timer value.
This commit is contained in:
Tatsuhiro Tsujikawa 2016-07-10 22:42:49 +09:00
parent f2aa7564b0
commit babdcb2c7d
12 changed files with 156 additions and 88 deletions

View file

@ -277,7 +277,8 @@ void BtPieceMessage::onNewPiece(const std::shared_ptr<Piece>& piece)
A2_LOG_INFO(fmt(MSG_GOT_NEW_PIECE, getCuid(), A2_LOG_INFO(fmt(MSG_GOT_NEW_PIECE, getCuid(),
static_cast<unsigned long>(piece->getIndex()))); static_cast<unsigned long>(piece->getIndex())));
getPieceStorage()->completePiece(piece); getPieceStorage()->completePiece(piece);
getPieceStorage()->advertisePiece(getCuid(), piece->getIndex()); getPieceStorage()->advertisePiece(getCuid(), piece->getIndex(),
global::wallclock());
} }
void BtPieceMessage::onWrongPiece(const std::shared_ptr<Piece>& piece) void BtPieceMessage::onWrongPiece(const std::shared_ptr<Piece>& piece)

View file

@ -92,8 +92,8 @@ DefaultBtInteractive::DefaultBtInteractive(
peer_(peer), peer_(peer),
metadataGetMode_(false), metadataGetMode_(false),
localNode_(nullptr), localNode_(nullptr),
lastHaveIndex_(0),
allowedFastSetSize_(10), allowedFastSetSize_(10),
haveTimer_(global::wallclock()),
keepAliveTimer_(global::wallclock()), keepAliveTimer_(global::wallclock()),
floodingTimer_(global::wallclock()), floodingTimer_(global::wallclock()),
inactiveTimer_(global::wallclock()), inactiveTimer_(global::wallclock()),
@ -172,7 +172,6 @@ DefaultBtInteractive::receiveAndSendHandshake()
void DefaultBtInteractive::doPostHandshakeProcessing() void DefaultBtInteractive::doPostHandshakeProcessing()
{ {
// Set time 0 to haveTimer to cache http/ftp download piece completion // Set time 0 to haveTimer to cache http/ftp download piece completion
haveTimer_ = Timer::zero();
keepAliveTimer_ = global::wallclock(); keepAliveTimer_ = global::wallclock();
floodingTimer_ = global::wallclock(); floodingTimer_ = global::wallclock();
pexTimer_ = Timer::zero(); pexTimer_ = Timer::zero();
@ -267,8 +266,9 @@ void DefaultBtInteractive::checkHave()
{ {
std::vector<size_t> haveIndexes; std::vector<size_t> haveIndexes;
pieceStorage_->getAdvertisedPieceIndexes(haveIndexes, cuid_, haveTimer_); lastHaveIndex_ = pieceStorage_->getAdvertisedPieceIndexes(haveIndexes, cuid_,
haveTimer_ = global::wallclock(); lastHaveIndex_);
// Use bitfield message if it is equal to or less than the total // Use bitfield message if it is equal to or less than the total
// size of have messages. // size of have messages.
if (5 + pieceStorage_->getBitfieldLength() <= haveIndexes.size() * 9) { if (5 + pieceStorage_->getBitfieldLength() <= haveIndexes.size() * 9) {

View file

@ -124,8 +124,10 @@ private:
DHTNode* localNode_; DHTNode* localNode_;
// The last haveIndex we have advertised to the peer.
uint64_t lastHaveIndex_;
size_t allowedFastSetSize_; size_t allowedFastSetSize_;
Timer haveTimer_;
Timer keepAliveTimer_; Timer keepAliveTimer_;
Timer floodingTimer_; Timer floodingTimer_;
FloodingStat floodingStat_; FloodingStat floodingStat_;

View file

@ -85,6 +85,10 @@ DefaultPieceStorage::DefaultPieceStorage(
endGame_(false), endGame_(false),
endGamePieceNum_(END_GAME_PIECE_NUM), endGamePieceNum_(END_GAME_PIECE_NUM),
option_(option), option_(option),
// The DefaultBtInteractive has the default value of
// lastHaveIndex of 0, so we need to make nextHaveIndex_ more
// than that.
nextHaveIndex_(1),
pieceStatMan_(std::make_shared<PieceStatMan>( pieceStatMan_(std::make_shared<PieceStatMan>(
downloadContext->getNumPieces(), true)), downloadContext->getNumPieces(), true)),
pieceSelector_(make_unique<RarestPieceSelector>(pieceStatMan_)), pieceSelector_(make_unique<RarestPieceSelector>(pieceStatMan_)),
@ -698,40 +702,44 @@ int32_t DefaultPieceStorage::getPieceLength(size_t index)
return bitfieldMan_->getBlockLength(index); return bitfieldMan_->getBlockLength(index);
} }
void DefaultPieceStorage::advertisePiece(cuid_t cuid, size_t index) void DefaultPieceStorage::advertisePiece(cuid_t cuid, size_t index,
Timer registeredTime)
{ {
HaveEntry entry(cuid, index, global::wallclock()); haves_.emplace_back(nextHaveIndex_++, cuid, index, std::move(registeredTime));
haves_.push_front(entry);
} }
void DefaultPieceStorage::getAdvertisedPieceIndexes( uint64_t DefaultPieceStorage::getAdvertisedPieceIndexes(
std::vector<size_t>& indexes, cuid_t myCuid, const Timer& lastCheckTime) std::vector<size_t>& indexes, cuid_t myCuid, uint64_t lastHaveIndex)
{ {
for (std::deque<HaveEntry>::const_iterator itr = haves_.begin(), auto it =
eoi = haves_.end(); std::upper_bound(std::begin(haves_), std::end(haves_), lastHaveIndex,
itr != eoi; ++itr) { [](uint64_t lastHaveIndex, const HaveEntry& have) {
const HaveEntry& have = *itr; return lastHaveIndex < have.haveIndex;
if (lastCheckTime > have.getRegisteredTime()) { });
break;
} if (it == std::end(haves_)) {
indexes.push_back(have.getIndex()); return lastHaveIndex;
} }
for (; it != std::end(haves_); ++it) {
indexes.push_back((*it).index);
}
return (*(std::end(haves_) - 1)).haveIndex;
} }
void DefaultPieceStorage::removeAdvertisedPiece( void DefaultPieceStorage::removeAdvertisedPiece(const Timer& expiry)
const std::chrono::seconds& elapsed)
{ {
auto itr = std::find_if( auto it = std::upper_bound(std::begin(haves_), std::end(haves_), expiry,
std::begin(haves_), std::end(haves_), [&elapsed](const HaveEntry& have) { [](const Timer& expiry, const HaveEntry& have) {
return have.getRegisteredTime().difference(global::wallclock()) >= return expiry < have.registeredTime;
elapsed; });
});
if (itr != std::end(haves_)) { A2_LOG_DEBUG(
A2_LOG_DEBUG(fmt(MSG_REMOVED_HAVE_ENTRY, fmt(MSG_REMOVED_HAVE_ENTRY,
static_cast<unsigned long>(std::end(haves_) - itr))); static_cast<unsigned long>(std::distance(std::begin(haves_), it))));
haves_.erase(itr, std::end(haves_));
} haves_.erase(std::begin(haves_), it);
} }
void DefaultPieceStorage::markAllPiecesDone() { bitfieldMan_->setAllBit(); } void DefaultPieceStorage::markAllPiecesDone() { bitfieldMan_->setAllBit(); }

View file

@ -55,23 +55,19 @@ class StreamPieceSelector;
#define END_GAME_PIECE_NUM 20 #define END_GAME_PIECE_NUM 20
class HaveEntry { struct HaveEntry {
private: HaveEntry(uint64_t haveIndex, cuid_t cuid, size_t index, Timer registeredTime)
cuid_t cuid_; : haveIndex(haveIndex),
size_t index_; cuid(cuid),
Timer registeredTime_; index(index),
registeredTime(std::move(registeredTime))
public:
HaveEntry(cuid_t cuid, size_t index, const Timer& registeredTime)
: cuid_(cuid), index_(index), registeredTime_(registeredTime)
{ {
} }
cuid_t getCuid() const { return cuid_; } uint64_t haveIndex;
cuid_t cuid;
size_t getIndex() const { return index_; } size_t index;
Timer registeredTime;
const Timer& getRegisteredTime() const { return registeredTime_; }
}; };
class DefaultPieceStorage : public PieceStorage { class DefaultPieceStorage : public PieceStorage {
@ -87,6 +83,10 @@ private:
bool endGame_; bool endGame_;
size_t endGamePieceNum_; size_t endGamePieceNum_;
const Option* option_; const Option* option_;
// The next unique index on HaveEntry, which is ever strictly
// increasing sequence of integer.
uint64_t nextHaveIndex_;
std::deque<HaveEntry> haves_; std::deque<HaveEntry> haves_;
std::shared_ptr<PieceStatMan> pieceStatMan_; std::shared_ptr<PieceStatMan> pieceStatMan_;
@ -238,14 +238,14 @@ public:
virtual int32_t getPieceLength(size_t index) CXX11_OVERRIDE; virtual int32_t getPieceLength(size_t index) CXX11_OVERRIDE;
virtual void advertisePiece(cuid_t cuid, size_t index) CXX11_OVERRIDE; virtual void advertisePiece(cuid_t cuid, size_t index,
Timer registeredTime) CXX11_OVERRIDE;
virtual void virtual uint64_t
getAdvertisedPieceIndexes(std::vector<size_t>& indexes, cuid_t myCuid, getAdvertisedPieceIndexes(std::vector<size_t>& indexes, cuid_t myCuid,
const Timer& lastCheckTime) CXX11_OVERRIDE; uint64_t lastHaveIndex) CXX11_OVERRIDE;
virtual void virtual void removeAdvertisedPiece(const Timer& expiry) CXX11_OVERRIDE;
removeAdvertisedPiece(const std::chrono::seconds& elapsed) CXX11_OVERRIDE;
virtual void markAllPiecesDone() CXX11_OVERRIDE; virtual void markAllPiecesDone() CXX11_OVERRIDE;

View file

@ -37,6 +37,7 @@
#include "RequestGroupMan.h" #include "RequestGroupMan.h"
#include "PieceStorage.h" #include "PieceStorage.h"
#include "RequestGroup.h" #include "RequestGroup.h"
#include "wallclock.h"
namespace aria2 { namespace aria2 {
@ -58,13 +59,19 @@ void HaveEraseCommand::preProcess()
void HaveEraseCommand::process() void HaveEraseCommand::process()
{ {
const RequestGroupList& groups = // we are making a copy of current wallclock.
auto expiry = global::wallclock();
expiry.advance(5_s);
const auto& groups =
getDownloadEngine()->getRequestGroupMan()->getRequestGroups(); getDownloadEngine()->getRequestGroupMan()->getRequestGroups();
for (auto& group : groups) { for (auto& group : groups) {
const auto& ps = group->getPieceStorage(); const auto& ps = group->getPieceStorage();
if (ps) { if (!ps) {
ps->removeAdvertisedPiece(5_s); continue;
} }
ps->removeAdvertisedPiece(expiry);
} }
} }

View file

@ -235,21 +235,22 @@ public:
* Adds piece index to advertise to other commands. They send have message * Adds piece index to advertise to other commands. They send have message
* based on this information. * based on this information.
*/ */
virtual void advertisePiece(cuid_t cuid, size_t index) = 0; virtual void advertisePiece(cuid_t cuid, size_t index,
Timer registerdTime) = 0;
/** /**
* indexes is filled with piece index which is not advertised by the caller * indexes is filled with piece index which is not advertised by the
* command and newer than lastCheckTime. * caller command and newer than lastHaveIndex.
*/ */
virtual void getAdvertisedPieceIndexes(std::vector<size_t>& indexes, virtual uint64_t getAdvertisedPieceIndexes(std::vector<size_t>& indexes,
cuid_t myCuid, cuid_t myCuid,
const Timer& lastCheckTime) = 0; uint64_t lastHaveIndex) = 0;
/** /**
* Removes have entry if specified seconds have elapsed since its * Removes have entry if its registeredTime is at least as old as
* registration. * expiry.
*/ */
virtual void removeAdvertisedPiece(const std::chrono::seconds& elapsed) = 0; virtual void removeAdvertisedPiece(const Timer& expiry) = 0;
/** /**
* Sets all bits in bitfield to 1. * Sets all bits in bitfield to 1.

View file

@ -997,7 +997,7 @@ void RequestGroup::releaseRuntimeResource(DownloadEngine* e)
peerStorage_ = nullptr; peerStorage_ = nullptr;
#endif // ENABLE_BITTORRENT #endif // ENABLE_BITTORRENT
if (pieceStorage_) { if (pieceStorage_) {
pieceStorage_->removeAdvertisedPiece(0_s); pieceStorage_->removeAdvertisedPiece(Timer::zero());
} }
// Don't reset segmentMan_ and pieceStorage_ here to provide // Don't reset segmentMan_ and pieceStorage_ here to provide
// progress information via RPC // progress information via RPC

View file

@ -374,7 +374,8 @@ bool SegmentMan::completeSegment(cuid_t cuid,
const std::shared_ptr<Segment>& segment) const std::shared_ptr<Segment>& segment)
{ {
pieceStorage_->completePiece(segment->getPiece()); pieceStorage_->completePiece(segment->getPiece());
pieceStorage_->advertisePiece(cuid, segment->getPiece()->getIndex()); pieceStorage_->advertisePiece(cuid, segment->getPiece()->getIndex(),
global::wallclock());
auto itr = std::find_if(usedSegmentEntries_.begin(), auto itr = std::find_if(usedSegmentEntries_.begin(),
usedSegmentEntries_.end(), FindSegmentEntry(segment)); usedSegmentEntries_.end(), FindSegmentEntry(segment));
if (itr == usedSegmentEntries_.end()) { if (itr == usedSegmentEntries_.end()) {

View file

@ -223,30 +223,22 @@ public:
virtual int32_t getPieceLength(size_t index) CXX11_OVERRIDE; virtual int32_t getPieceLength(size_t index) CXX11_OVERRIDE;
/** virtual void advertisePiece(cuid_t cuid, size_t index,
* Adds piece index to advertise to other commands. They send have message Timer registeredTime) CXX11_OVERRIDE
* based on this information. {
*/ }
virtual void advertisePiece(cuid_t cuid, size_t index) CXX11_OVERRIDE {}
/** /**
* Returns piece index which is not advertised by the caller command and * indexes is filled with piece index which is not advertised by the
* newer than lastCheckTime. * caller command and newer than lastHaveIndex.
*/ */
virtual void virtual uint64_t
getAdvertisedPieceIndexes(std::vector<size_t>& indexes, cuid_t myCuid, getAdvertisedPieceIndexes(std::vector<size_t>& indexes, cuid_t myCuid,
const Timer& lastCheckTime) CXX11_OVERRIDE uint64_t lastHaveIndex) CXX11_OVERRIDE
{ {
} }
/** virtual void removeAdvertisedPiece(const Timer& expiry) CXX11_OVERRIDE {}
* Removes have entry if specified seconds have elapsed since its
* registration.
*/
virtual void
removeAdvertisedPiece(const std::chrono::seconds& elapsed) CXX11_OVERRIDE
{
}
/** /**
* Sets all bits in bitfield to 1. * Sets all bits in bitfield to 1.

View file

@ -39,6 +39,7 @@ class DefaultPieceStorageTest : public CppUnit::TestFixture {
CPPUNIT_TEST(testGetCompletedLength); CPPUNIT_TEST(testGetCompletedLength);
CPPUNIT_TEST(testGetFilteredCompletedLength); CPPUNIT_TEST(testGetFilteredCompletedLength);
CPPUNIT_TEST(testGetNextUsedIndex); CPPUNIT_TEST(testGetNextUsedIndex);
CPPUNIT_TEST(testAdvertisePiece);
CPPUNIT_TEST_SUITE_END(); CPPUNIT_TEST_SUITE_END();
private: private:
@ -77,6 +78,7 @@ public:
void testGetCompletedLength(); void testGetCompletedLength();
void testGetFilteredCompletedLength(); void testGetFilteredCompletedLength();
void testGetNextUsedIndex(); void testGetNextUsedIndex();
void testAdvertisePiece();
}; };
CPPUNIT_TEST_SUITE_REGISTRATION(DefaultPieceStorageTest); CPPUNIT_TEST_SUITE_REGISTRATION(DefaultPieceStorageTest);
@ -401,4 +403,58 @@ void DefaultPieceStorageTest::testGetNextUsedIndex()
CPPUNIT_ASSERT_EQUAL((size_t)2, pss.getNextUsedIndex(0)); CPPUNIT_ASSERT_EQUAL((size_t)2, pss.getNextUsedIndex(0));
} }
void DefaultPieceStorageTest::testAdvertisePiece()
{
DefaultPieceStorage ps(dctx_, option_.get());
ps.advertisePiece(1, 100, Timer(10_s));
ps.advertisePiece(2, 101, Timer(11_s));
ps.advertisePiece(3, 102, Timer(11_s));
ps.advertisePiece(1, 103, Timer(12_s));
ps.advertisePiece(2, 104, Timer(100_s));
std::vector<size_t> res, ans;
uint64_t lastHaveIndex;
lastHaveIndex = ps.getAdvertisedPieceIndexes(res, 1, 0);
ans = std::vector<size_t>{100, 101, 102, 103, 104};
CPPUNIT_ASSERT_EQUAL((uint64_t)5, lastHaveIndex);
CPPUNIT_ASSERT(ans == res);
res.clear();
lastHaveIndex = ps.getAdvertisedPieceIndexes(res, 1, 3);
ans = std::vector<size_t>{103, 104};
CPPUNIT_ASSERT_EQUAL((uint64_t)5, lastHaveIndex);
CPPUNIT_ASSERT_EQUAL((size_t)2, res.size());
CPPUNIT_ASSERT(ans == res);
res.clear();
lastHaveIndex = ps.getAdvertisedPieceIndexes(res, 1, 5);
CPPUNIT_ASSERT_EQUAL((uint64_t)5, lastHaveIndex);
CPPUNIT_ASSERT_EQUAL((size_t)0, res.size());
// remove haves
ps.removeAdvertisedPiece(Timer(11_s));
res.clear();
lastHaveIndex = ps.getAdvertisedPieceIndexes(res, 1, 0);
ans = std::vector<size_t>{103, 104};
CPPUNIT_ASSERT_EQUAL((uint64_t)5, lastHaveIndex);
CPPUNIT_ASSERT_EQUAL((size_t)2, res.size());
CPPUNIT_ASSERT(ans == res);
ps.removeAdvertisedPiece(Timer(300_s));
res.clear();
lastHaveIndex = ps.getAdvertisedPieceIndexes(res, 1, 0);
CPPUNIT_ASSERT_EQUAL((uint64_t)0, lastHaveIndex);
CPPUNIT_ASSERT_EQUAL((size_t)0, res.size());
}
} // namespace aria2 } // namespace aria2

View file

@ -241,18 +241,18 @@ public:
void addPieceLengthList(int32_t length) { pieceLengthList.push_back(length); } void addPieceLengthList(int32_t length) { pieceLengthList.push_back(length); }
virtual void advertisePiece(cuid_t cuid, size_t index) CXX11_OVERRIDE {} virtual void advertisePiece(cuid_t cuid, size_t index,
Timer registeredTime) CXX11_OVERRIDE
{
}
virtual void virtual uint64_t
getAdvertisedPieceIndexes(std::vector<size_t>& indexes, cuid_t myCuid, getAdvertisedPieceIndexes(std::vector<size_t>& indexes, cuid_t myCuid,
const Timer& lastCheckTime) CXX11_OVERRIDE uint64_t lastHaveIndex) CXX11_OVERRIDE
{ {
} }
virtual void virtual void removeAdvertisedPiece(const Timer& expiry) CXX11_OVERRIDE {}
removeAdvertisedPiece(const std::chrono::seconds& elapsed) CXX11_OVERRIDE
{
}
virtual void markAllPiecesDone() CXX11_OVERRIDE {} virtual void markAllPiecesDone() CXX11_OVERRIDE {}