diff --git a/ChangeLog b/ChangeLog index 0d459bcc..708dd7e2 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,67 @@ +2009-06-24 Tatsuhiro Tsujikawa + + Added experimental support of WEB-Seeding for multi-file torrent. + Due to fundamental changes in file handling in HTTP/FTP code, many + functions are not working: PeerStat, ServerHost, proxy..etc + * src/AbstractCommand.cc + * src/AbstractCommand.h + * src/BitfieldMan.cc + * src/BitfieldMan.h + * src/CreateRequestCommand.cc + * src/CreateRequestCommand.h + * src/DefaultPieceStorage.cc + * src/DefaultPieceStorage.h + * src/DownloadCommand.cc + * src/DownloadCommand.h + * src/DownloadContext.cc + * src/DownloadContext.h + * src/FileEntry.cc + * src/FileEntry.h + * src/FtpDownloadCommand.cc + * src/FtpDownloadCommand.h + * src/FtpFinishDownloadCommand.cc + * src/FtpFinishDownloadCommand.h + * src/FtpInitiateConnectionCommand.cc + * src/FtpInitiateConnectionCommand.h + * src/FtpNegotiationCommand.cc + * src/FtpNegotiationCommand.h + * src/FtpTunnelResponseCommand.cc + * src/HttpDownloadCommand.cc + * src/HttpDownloadCommand.h + * src/HttpInitiateConnectionCommand.cc + * src/HttpInitiateConnectionCommand.h + * src/HttpProxyResponseCommand.cc + * src/HttpRequest.cc + * src/HttpRequest.h + * src/HttpRequestCommand.cc + * src/HttpRequestCommand.h + * src/HttpResponseCommand.cc + * src/HttpResponseCommand.h + * src/HttpSkipResponseCommand.cc + * src/HttpSkipResponseCommand.h + * src/InitiateConnectionCommand.cc + * src/InitiateConnectionCommand.h + * src/InitiateConnectionCommandFactory.cc + * src/InitiateConnectionCommandFactory.h + * src/Makefile.am + * src/PieceStorage.h + * src/RequestGroup.cc + * src/RequestGroup.h + * src/RequestGroupMan.cc + * src/SegmentMan.cc + * src/SegmentMan.h + * src/SingleFileDownloadContext.h + * src/StreamFileAllocationEntry.cc + * src/TrackerWatcherCommand.cc + * src/UnknownLengthPieceStorage.cc + * src/UnknownLengthPieceStorage.h + * src/array_fun.h + * src/bitfield.h + * src/download_helper.cc + * test/DownloadContextTest.cc + * test/Makefile.am + * test/MockDownloadContext.h + 2009-06-24 Tatsuhiro Tsujikawa Added tellWaiting XML-RPC method. diff --git a/src/AbstractCommand.cc b/src/AbstractCommand.cc index 39aa7a9e..b1004a48 100644 --- a/src/AbstractCommand.cc +++ b/src/AbstractCommand.cc @@ -47,7 +47,7 @@ #include "DlAbortEx.h" #include "DlRetryEx.h" #include "DownloadFailureException.h" -#include "InitiateConnectionCommandFactory.h" +#include "CreateRequestCommand.h" #include "SleepCommand.h" #ifdef ENABLE_ASYNC_DNS #include "AsyncNameResolver.h" @@ -62,9 +62,12 @@ #include "RequestGroupMan.h" #include "A2STR.h" #include "Util.h" +#include "LogFactory.h" +#include "DownloadContext.h" namespace aria2 { +// TODO1.5 Remove this AbstractCommand::AbstractCommand(int32_t cuid, const SharedHandle& req, RequestGroup* requestGroup, @@ -83,6 +86,25 @@ AbstractCommand::AbstractCommand(int32_t cuid, _requestGroup->increaseNumCommand(); } +AbstractCommand::AbstractCommand(int32_t cuid, + const SharedHandle& req, + const SharedHandle& fileEntry, + RequestGroup* requestGroup, + DownloadEngine* e, + const SocketHandle& s): + Command(cuid), _requestGroup(requestGroup), + req(req), _fileEntry(fileEntry), e(e), socket(s), + checkSocketIsReadable(false), checkSocketIsWritable(false), + nameResolverCheck(false) +{ + if(!socket.isNull() && socket->isOpen()) { + setReadCheckSocket(socket); + } + timeout = _requestGroup->getTimeout(); + _requestGroup->increaseStreamConnection(); + _requestGroup->increaseNumCommand(); +} + AbstractCommand::~AbstractCommand() { disableReadCheckSocket(); disableWriteCheckSocket(); @@ -125,7 +147,8 @@ bool AbstractCommand::execute() { if(!_requestGroup->getPieceStorage().isNull()) { _segments.clear(); _requestGroup->getSegmentMan()->getInFlightSegment(_segments, cuid); - while(_segments.size() < req->getMaxPipelinedRequest()) { + size_t maxSegments = req.isNull()?1:req->getMaxPipelinedRequest(); + while(_segments.size() < maxSegments) { SegmentHandle segment = _requestGroup->getSegmentMan()->getSegment(cuid); if(segment.isNull()) { break; @@ -135,7 +158,7 @@ bool AbstractCommand::execute() { if(_segments.empty()) { // TODO socket could be pooled here if pipelining is enabled... logger->info(MSG_NO_SEGMENT_AVAILABLE, cuid); - return prepareForRetry(1); + return true; } } return executeInternal(); @@ -146,6 +169,7 @@ bool AbstractCommand::execute() { } else { if(checkPoint.elapsed(timeout)) { // timeout triggers ServerStat error state. + SharedHandle ss = e->_requestGroupMan->getOrCreateServerStat(req->getHost(), req->getProtocol()); @@ -157,13 +181,18 @@ bool AbstractCommand::execute() { return false; } } catch(DlAbortEx& err) { - logger->error(MSG_DOWNLOAD_ABORTED, - DL_ABORT_EX2(StringFormat - ("URI=%s", req->getCurrentUrl().c_str()).str(),err), - cuid, req->getUrl().c_str()); - _requestGroup->addURIResult(req->getUrl(), err.getCode()); + if(req.isNull()) { + logger->debug(EX_EXCEPTION_CAUGHT, err); + } else { + logger->error(MSG_DOWNLOAD_ABORTED, + DL_ABORT_EX2(StringFormat + ("URI=%s", req->getCurrentUrl().c_str()).str(),err), + cuid, req->getUrl().c_str()); + _requestGroup->addURIResult(req->getUrl(), err.getCode()); + } onAbort(); - req->resetUrl(); + // TODO Do we need this? + //req->resetUrl(); tryReserved(); return true; } catch(DlRetryEx& err) { @@ -202,6 +231,16 @@ bool AbstractCommand::execute() { void AbstractCommand::tryReserved() { _requestGroup->removeServerHost(cuid); + if(_requestGroup->getDownloadContext()->getFileMode() == DownloadContext::SINGLE) { + const SharedHandle& entry = + _requestGroup->getDownloadContext()->getFileEntries().front(); + // Don't create new command if currently file length is unknown + // and there are no URI left. Because file length is unknown, we + // can assume that there are no in-flight request object. + if(entry->getLength() == 0 && entry->getRemainingUris().size() == 0) { + return; + } + } Commands commands; _requestGroup->createNextCommand(commands, e, 1); e->setNoWait(true); @@ -212,7 +251,18 @@ bool AbstractCommand::prepareForRetry(time_t wait) { if(!_requestGroup->getPieceStorage().isNull()) { _requestGroup->getSegmentMan()->cancelSegment(cuid); } - Command* command = InitiateConnectionCommandFactory::createInitiateConnectionCommand(cuid, req, _requestGroup, e); + if(!req.isNull()) { + _fileEntry->poolRequest(req); + } + if(!_segments.empty()) { + // TODO1.5 subtract 1 from getPositionToWrite() + SharedHandle fileEntry = _requestGroup->getDownloadContext()->findFileEntryByOffset(_segments.front()->getPositionToWrite()-1); + logger->debug("CUID#%d - Pooling request URI=%s", + cuid, req->getUrl().c_str()); + _requestGroup->getSegmentMan()->recognizeSegmentFor(_fileEntry); + } + + Command* command = new CreateRequestCommand(cuid, _requestGroup, e); if(wait == 0) { e->setNoWait(true); e->commands.push_back(command); @@ -225,16 +275,22 @@ bool AbstractCommand::prepareForRetry(time_t wait) { } void AbstractCommand::onAbort() { - // TODO This might be a problem if the failure is caused by proxy. - e->_requestGroupMan->getOrCreateServerStat(req->getHost(), - req->getProtocol())->setError(); + if(!req.isNull()) { + logger->debug(req->getCurrentUrl().c_str()); + // TODO This might be a problem if the failure is caused by proxy. + e->_requestGroupMan->getOrCreateServerStat(req->getHost(), + req->getProtocol())->setError(); + _requestGroup->removeIdenticalURI(req->getUrl()); + _fileEntry->removeRequest(req); + } logger->debug(MSG_UNREGISTER_CUID, cuid); //_segmentMan->unregisterId(cuid); if(!_requestGroup->getPieceStorage().isNull()) { _requestGroup->getSegmentMan()->cancelSegment(cuid); } - _requestGroup->removeIdenticalURI(req->getUrl()); + // TODO1.5 Should be moved to FileEntry + // _requestGroup->removeIdenticalURI(req->getUrl()); } void AbstractCommand::disableReadCheckSocket() { diff --git a/src/AbstractCommand.h b/src/AbstractCommand.h index fa3781f0..ba7a7aa4 100644 --- a/src/AbstractCommand.h +++ b/src/AbstractCommand.h @@ -38,6 +38,7 @@ #include "Command.h" #include "SharedHandle.h" #include "TimeA2.h" +#include "FileEntry.h" namespace aria2 { @@ -59,6 +60,7 @@ private: protected: RequestGroup* _requestGroup; SharedHandle req; + SharedHandle _fileEntry; DownloadEngine* e; SharedHandle socket; std::deque > _segments; @@ -140,6 +142,11 @@ public: RequestGroup* requestGroup, DownloadEngine* e, const SharedHandle& s = SharedHandle()); + AbstractCommand(int32_t cuid, const SharedHandle& req, + const SharedHandle& fileEntry, + RequestGroup* requestGroup, DownloadEngine* e, + const SharedHandle& s = SharedHandle()); + virtual ~AbstractCommand(); bool execute(); }; diff --git a/src/BitfieldMan.cc b/src/BitfieldMan.cc index 1fbfcb8b..2569c691 100644 --- a/src/BitfieldMan.cc +++ b/src/BitfieldMan.cc @@ -308,9 +308,10 @@ bool BitfieldMan::getMissingUnusedIndex(size_t& index) const } } -size_t BitfieldMan::getStartIndex(size_t index) const { - while(index < blocks && (isUseBitSet(index) || isBitSet(index))) { - index++; +template +static size_t getStartIndex(size_t index, const Array& bitfield, const unsigned char* useBitfield, size_t blocks) { + while(index < blocks && (bitfield::test(bitfield, blocks, index) || bitfield::test(useBitfield, blocks, index))) { + ++index; } if(blocks <= index) { return blocks; @@ -319,24 +320,33 @@ size_t BitfieldMan::getStartIndex(size_t index) const { } } -size_t BitfieldMan::getEndIndex(size_t index) const { - while(index < blocks && (!isUseBitSet(index) && !isBitSet(index))) { - index++; +template +static size_t getEndIndex(size_t index, const Array& bitfield, const unsigned char* useBitfield, size_t blocks) { + while(index < blocks && (!bitfield::test(bitfield, blocks, index) && !bitfield::test(useBitfield, blocks, index))) { + ++index; } return index; } -bool BitfieldMan::getSparseMissingUnusedIndex(size_t& index) const { - Range maxRange; - Range currentRange; +template +static bool getSparseMissingUnusedIndex +(size_t& index, + const Array& bitfield, + const unsigned char* useBitfield, + size_t blocks) +{ + BitfieldMan::Range maxRange; + BitfieldMan::Range currentRange; { size_t nextIndex = 0; while(nextIndex < blocks) { - currentRange.startIndex = getStartIndex(nextIndex); + currentRange.startIndex = + getStartIndex(nextIndex, bitfield, useBitfield, blocks); if(currentRange.startIndex == blocks) { break; } - currentRange.endIndex = getEndIndex(currentRange.startIndex); + currentRange.endIndex = + getEndIndex(currentRange.startIndex, bitfield, useBitfield, blocks); if(maxRange < currentRange) { maxRange = currentRange; } @@ -346,7 +356,7 @@ bool BitfieldMan::getSparseMissingUnusedIndex(size_t& index) const { if(maxRange.getSize()) { if(maxRange.startIndex == 0) { index = 0; - } else if(isUseBitSet(maxRange.startIndex-1)) { + } else if(bitfield::test(useBitfield, blocks, maxRange.startIndex-1)) { index = maxRange.getMidIndex(); } else { index = maxRange.startIndex; @@ -357,6 +367,22 @@ bool BitfieldMan::getSparseMissingUnusedIndex(size_t& index) const { } } +bool BitfieldMan::getSparseMissingUnusedIndex +(size_t& index, + const unsigned char* ignoreBitfield, + size_t ignoreBitfieldLength) const +{ + if(filterEnabled) { + return aria2::getSparseMissingUnusedIndex + (index, array(ignoreBitfield)|~array(filterBitfield)|array(bitfield), + useBitfield, blocks); + } else { + return aria2::getSparseMissingUnusedIndex + (index, array(ignoreBitfield)|array(bitfield), + useBitfield, blocks); + } +} + template static bool copyBitfield(unsigned char* dst, const Array& src, size_t blocks) { @@ -571,6 +597,21 @@ void BitfieldMan::addFilter(uint64_t offset, uint64_t length) { updateCache(); } +void BitfieldMan::removeFilter(uint64_t offset, uint64_t length) { + if(!filterBitfield) { + filterBitfield = new unsigned char[bitfieldLength]; + memset(filterBitfield, 0, bitfieldLength); + } + if(length > 0) { + size_t startBlock = offset/blockLength; + size_t endBlock = (offset+length-1)/blockLength; + for(size_t i = startBlock; i <= endBlock && i < blocks; i++) { + setBitInternal(filterBitfield, i, false); + } + } + updateCache(); +} + void BitfieldMan::enableFilter() { if(!filterBitfield) { filterBitfield = new unsigned char[bitfieldLength]; diff --git a/src/BitfieldMan.h b/src/BitfieldMan.h index 3287e2c0..909d82c7 100644 --- a/src/BitfieldMan.h +++ b/src/BitfieldMan.h @@ -83,7 +83,7 @@ private: size_t getEndIndex(size_t index) const; uint64_t getCompletedLength(bool useFilter) const; - +public: // [startIndex, endIndex) class Range { public: @@ -160,7 +160,11 @@ public: /** * affected by filter */ - bool getSparseMissingUnusedIndex(size_t& index) const; + bool getSparseMissingUnusedIndex + (size_t& index, + const unsigned char* ignoreBitfield, + size_t ignoreBitfieldLength) const; + /** * affected by filter */ @@ -243,6 +247,7 @@ public: void setAllUseBit(); void addFilter(uint64_t offset, uint64_t length); + void removeFilter(uint64_t offset, uint64_t length); /** * Clears filter and disables filter */ @@ -306,6 +311,10 @@ public: uint64_t getMissingUnusedLength(size_t startingIndex) const; + const unsigned char* getFilterBitfield() const + { + return filterBitfield; + } }; } // namespace aria2 diff --git a/src/CreateRequestCommand.cc b/src/CreateRequestCommand.cc new file mode 100644 index 00000000..824d26c4 --- /dev/null +++ b/src/CreateRequestCommand.cc @@ -0,0 +1,85 @@ +/* */ +#include "CreateRequestCommand.h" + +#include "InitiateConnectionCommandFactory.h" +#include "RequestGroup.h" +#include "Segment.h" +#include "DownloadContext.h" +#include "DlAbortEx.h" +#include "DownloadEngine.h" +#include "SocketCore.h" +#include "SegmentMan.h" + +namespace aria2 { + +CreateRequestCommand::CreateRequestCommand(int32_t cuid, + RequestGroup* requestGroup, + DownloadEngine* e): + AbstractCommand(cuid, SharedHandle(), requestGroup, e) +{ + setStatus(Command::STATUS_ONESHOT_REALTIME); + disableReadCheckSocket(); + disableWriteCheckSocket(); +} + +bool CreateRequestCommand::executeInternal() +{ + if(_segments.empty()) { + _fileEntry = _requestGroup->getDownloadContext()->findFileEntryByOffset(0); + } else { + // We assume all segments belongs to same file. + _fileEntry = _requestGroup->getDownloadContext()->findFileEntryByOffset + (_segments.front()->getPositionToWrite()); + } + req = _fileEntry->getRequest(_requestGroup->getURISelector()); + if(req.isNull()) { + if(!_requestGroup->getSegmentMan().isNull()) { + _requestGroup->getSegmentMan()->ignoreSegmentFor(_fileEntry); + } + throw DL_ABORT_EX("No URI available."); + } + + Command* command = + InitiateConnectionCommandFactory::createInitiateConnectionCommand + (cuid, req, _fileEntry, _requestGroup, e); + //ServerHostHandle sv(new ServerHost(command->getCuid(), req->getHost())); + //registerServerHost(sv); + e->setNoWait(true); + e->commands.push_back(command); + return true; +} + +} // namespace aria2 diff --git a/src/CreateRequestCommand.h b/src/CreateRequestCommand.h new file mode 100644 index 00000000..d4f5fae3 --- /dev/null +++ b/src/CreateRequestCommand.h @@ -0,0 +1,53 @@ +/* */ +#ifndef _D_CREATE_REQUEST_COMMAND_H_ +#define _D_CREATE_REQUEST_COMMAND_H_ + +#include "AbstractCommand.h" + +namespace aria2 { + +class CreateRequestCommand:public AbstractCommand { +public: + CreateRequestCommand(int32_t cuid, + RequestGroup* requestGroup, + DownloadEngine* e); +protected: + virtual bool executeInternal(); +}; + +} // namespace aria2 + +#endif // _D_CREATE_REQUEST_COMMAND_H_ diff --git a/src/DefaultPieceStorage.cc b/src/DefaultPieceStorage.cc index a47fd80e..3a707558 100644 --- a/src/DefaultPieceStorage.cc +++ b/src/DefaultPieceStorage.cc @@ -261,10 +261,11 @@ SharedHandle DefaultPieceStorage::getMissingFastPiece #endif // ENABLE_BITTORRENT -PieceHandle DefaultPieceStorage::getMissingPiece() +PieceHandle DefaultPieceStorage::getSparseMissingUnusedPiece +(const unsigned char* ignoreBitfield, size_t length) { size_t index; - if(bitfieldMan->getSparseMissingUnusedIndex(index)) { + if(bitfieldMan->getSparseMissingUnusedIndex(index, ignoreBitfield, length)) { return checkOutPiece(index); } else { return SharedHandle(); diff --git a/src/DefaultPieceStorage.h b/src/DefaultPieceStorage.h index 1bf01f62..36e42728 100644 --- a/src/DefaultPieceStorage.h +++ b/src/DefaultPieceStorage.h @@ -133,7 +133,8 @@ public: #endif // ENABLE_BITTORRENT - virtual SharedHandle getMissingPiece(); + virtual SharedHandle getSparseMissingUnusedPiece + (const unsigned char* ignoreBitfield, size_t length); virtual SharedHandle getMissingPiece(size_t index); diff --git a/src/DownloadCommand.cc b/src/DownloadCommand.cc index 7812d375..3c563984 100644 --- a/src/DownloadCommand.cc +++ b/src/DownloadCommand.cc @@ -66,10 +66,11 @@ namespace aria2 { DownloadCommand::DownloadCommand(int cuid, const RequestHandle& req, + const SharedHandle& fileEntry, RequestGroup* requestGroup, DownloadEngine* e, const SocketHandle& s): - AbstractCommand(cuid, req, requestGroup, e, s) + AbstractCommand(cuid, req, fileEntry, requestGroup, e, s) #ifdef ENABLE_MESSAGE_DIGEST , _pieceHashValidationEnabled(false) #endif // ENABLE_MESSAGE_DIGEST @@ -114,8 +115,13 @@ bool DownloadCommand::executeInternal() { size_t BUFSIZE = 16*1024; unsigned char buf[BUFSIZE]; size_t bufSize; - if(segment->getLength() > 0 && segment->getLength()-segment->getWrittenLength() < BUFSIZE) { - bufSize = segment->getLength()-segment->getWrittenLength(); + if(segment->getLength() > 0) { + if(segment->getPosition()+segment->getLength() <= static_cast(_fileEntry->getLastOffset())) { + bufSize = std::min(segment->getLength()-segment->getWrittenLength(), + BUFSIZE); + } else { + bufSize = std::min(static_cast(_fileEntry->getLastOffset()-_fileEntry->gtoloff(segment->getPositionToWrite())), BUFSIZE); + } } else { bufSize = BUFSIZE; } @@ -167,13 +173,15 @@ bool DownloadCommand::executeInternal() { bool segmentComplete = false; // Note that GrowSegment::complete() always returns false. if(_transferEncodingDecoder.isNull() && _contentEncodingDecoder.isNull()) { - if(segment->complete()) { + if(segment->complete() || + segment->getPositionToWrite() == _fileEntry->getLastOffset()) { segmentComplete = true; } else if(segment->getLength() == 0 && bufSize == 0 && !socket->wantRead() && !socket->wantWrite()) { segmentComplete = true; } - } else if(!_transferEncodingDecoder.isNull() && segment->complete()) { + } else if(!_transferEncodingDecoder.isNull() && + (segment->complete() || segment->getPositionToWrite() == _fileEntry->getLastOffset())) { segmentComplete = true; } else if((_transferEncodingDecoder.isNull() || _transferEncodingDecoder->finished()) && @@ -188,36 +196,40 @@ bool DownloadCommand::executeInternal() { } if(segmentComplete) { - logger->info(MSG_SEGMENT_DOWNLOAD_COMPLETED, cuid); + if(segment->complete() || segment->getLength() == 0) { + // If segment->getLength() == 0, the server doesn't provide + // content length, but the client detected that download + // completed. + logger->info(MSG_SEGMENT_DOWNLOAD_COMPLETED, cuid); #ifdef ENABLE_MESSAGE_DIGEST - { - std::string expectedPieceHash = - _requestGroup->getDownloadContext()->getPieceHash(segment->getIndex()); - if(_pieceHashValidationEnabled && !expectedPieceHash.empty()) { - if(segment->isHashCalculated()) { - logger->debug("Hash is available! index=%lu", - static_cast(segment->getIndex())); - validatePieceHash(segment, expectedPieceHash, segment->getHashString()); + { + std::string expectedPieceHash = + _requestGroup->getDownloadContext()->getPieceHash(segment->getIndex()); + if(_pieceHashValidationEnabled && !expectedPieceHash.empty()) { + if(segment->isHashCalculated()) { + logger->debug("Hash is available! index=%lu", + static_cast(segment->getIndex())); + validatePieceHash(segment, expectedPieceHash, segment->getHashString()); + } else { + _messageDigestContext->digestReset(); + validatePieceHash(segment, expectedPieceHash, + MessageDigestHelper::digest + (_messageDigestContext.get(), + _requestGroup->getPieceStorage()->getDiskAdaptor(), + segment->getPosition(), + segment->getLength())); + } } else { - _messageDigestContext->digestReset(); - validatePieceHash(segment, expectedPieceHash, - MessageDigestHelper::digest - (_messageDigestContext.get(), - _requestGroup->getPieceStorage()->getDiskAdaptor(), - segment->getPosition(), - segment->getLength())); + _requestGroup->getSegmentMan()->completeSegment(cuid, segment); } - } else { - _requestGroup->getSegmentMan()->completeSegment(cuid, segment); } - } #else // !ENABLE_MESSAGE_DIGEST - - _requestGroup->getSegmentMan()->completeSegment(cuid, segment); - + _requestGroup->getSegmentMan()->completeSegment(cuid, segment); #endif // !ENABLE_MESSAGE_DIGEST + } + checkLowestDownloadSpeed(); // this unit is going to download another segment. @@ -263,6 +275,10 @@ bool DownloadCommand::prepareForNextSegment() { // segment. if(_segments.size() == 1) { SegmentHandle tempSegment = _segments.front(); + if(!tempSegment->complete()) { + return prepareForRetry(0); + } + // TODO1.5 get segment for the same file only SegmentHandle nextSegment = _requestGroup->getSegmentMan()->getSegment(cuid, tempSegment->getIndex()+1); diff --git a/src/DownloadCommand.h b/src/DownloadCommand.h index 05d3b670..ad078746 100644 --- a/src/DownloadCommand.h +++ b/src/DownloadCommand.h @@ -76,6 +76,7 @@ protected: public: DownloadCommand(int cuid, const SharedHandle& req, + const SharedHandle& fileEntry, RequestGroup* requestGroup, DownloadEngine* e, const SharedHandle& s); diff --git a/src/DownloadContext.cc b/src/DownloadContext.cc index 2d3b888a..a5dda4d2 100644 --- a/src/DownloadContext.cc +++ b/src/DownloadContext.cc @@ -33,6 +33,9 @@ */ /* copyright --> */ #include "DownloadContext.h" + +#include + #include "FileEntry.h" namespace aria2 { @@ -79,4 +82,25 @@ int64_t DownloadContext::calculateSessionTime() const } } +SharedHandle +DownloadContext::findFileEntryByOffset(off_t offset) const +{ + if(_fileEntries.empty() || + (offset > 0 && + _fileEntries.back()->getOffset()+_fileEntries.back()->getLength() <= + static_cast(offset))){ + return SharedHandle(); + } + + SharedHandle obj(new FileEntry()); + obj->setOffset(offset); + std::deque >::const_iterator i = + std::upper_bound(_fileEntries.begin(), _fileEntries.end(), obj); + if(i != _fileEntries.end() && (*i)->getOffset() == offset) { + return *i; + } else { + return *(--i); + } +} + } // namespace aria2 diff --git a/src/DownloadContext.h b/src/DownloadContext.h index e40c9a18..f1aed08e 100644 --- a/src/DownloadContext.h +++ b/src/DownloadContext.h @@ -117,6 +117,10 @@ public: void resetDownloadStopTime(); int64_t calculateSessionTime() const; + + // Returns FileEntry at given offset. SharedHandle() is + // returned if no such FileEntry is found. + SharedHandle findFileEntryByOffset(off_t offset) const; }; typedef SharedHandle DownloadContextHandle; diff --git a/src/FileEntry.cc b/src/FileEntry.cc index 8abc1a5d..4e23f5d4 100644 --- a/src/FileEntry.cc +++ b/src/FileEntry.cc @@ -33,8 +33,11 @@ */ /* copyright --> */ #include "FileEntry.h" -#include "File.h" + +#include + #include "Util.h" +#include "URISelector.h" namespace aria2 { @@ -74,4 +77,66 @@ bool FileEntry::exists() const return File(getPath()).exists(); } +off_t FileEntry::gtoloff(off_t goff) const +{ + assert(offset <= goff); + return goff-offset; +} + +void FileEntry::getUris(std::deque& uris) const +{ + uris.insert(uris.end(), _spentUris.begin(), _spentUris.end()); + uris.insert(uris.end(), _uris.begin(), _uris.end()); +} + +std::string FileEntry::selectUri(const SharedHandle& uriSelector) +{ + return uriSelector->select(_uris); +} + +SharedHandle +FileEntry::getRequest(const SharedHandle& selector) +{ + SharedHandle req; + if(_requestPool.empty()) { + while(1) { + std::string uri = selector->select(_uris); + if(uri.empty()) { + return req; + } + req.reset(new Request()); + if(req->setUrl(uri)) { + _spentUris.push_back(uri); + _inFlightRequests.push_back(req); + return req; + } else { + req.reset(); + } + } + } else { + req = _requestPool.back(); + _requestPool.pop_back(); + _inFlightRequests.push_back(req); + return req; + } +} + +void FileEntry::poolRequest(const SharedHandle& request) +{ + removeRequest(request); + _requestPool.push_back(request); +} + +bool FileEntry::removeRequest(const SharedHandle& request) +{ + for(std::deque >::iterator i = + _inFlightRequests.begin(); i != _inFlightRequests.end(); ++i) { + if((*i).get() == request.get()) { + _inFlightRequests.erase(i); + return true; + } + } + return false; +} + } // namespace aria2 diff --git a/src/FileEntry.h b/src/FileEntry.h index 7cc27619..05ba6067 100644 --- a/src/FileEntry.h +++ b/src/FileEntry.h @@ -43,17 +43,23 @@ #include "SharedHandle.h" #include "File.h" +#include "Request.h" namespace aria2 { +class URISelector; + class FileEntry { private: std::string path; std::deque _uris; + std::deque _spentUris; uint64_t length; off_t offset; bool extracted; bool requested; + std::deque > _requestPool; + std::deque > _inFlightRequests; public: FileEntry():length(0), offset(0), extracted(false), requested(false) {} @@ -86,6 +92,8 @@ public: void setOffset(off_t offset) { this->offset = offset; } + off_t getLastOffset() { return offset+length; } + bool isExtracted() const { return extracted; } void setExtracted(bool flag) { this->extracted = flag; } @@ -96,14 +104,53 @@ public: void setupDir(); + // TODO1.5 remove this in favor of getRemainingUris() const std::deque& getAssociatedUris() const { return _uris; } + const std::deque& getRemainingUris() const + { + return _uris; + } + + const std::deque& getSpentUris() const + { + return _spentUris; + } + + void setUris(const std::deque& uris) + { + _uris = uris; + } + + // Inserts _uris and _spentUris into uris. + void getUris(std::deque& uris) const; + + std::string selectUri(const SharedHandle& uriSelector); + + // If pooled Request object is available, one of them is removed + // from the pool and returned. If pool is empty, then select URI + // using selectUri(selector) and construct Request object using it + // and return the Request object. + SharedHandle getRequest(const SharedHandle& selector); + + void poolRequest(const SharedHandle& request); + + bool removeRequest(const SharedHandle& request); + + size_t countInFlightRequest() const + { + return _inFlightRequests.size(); + } + bool operator<(const FileEntry& fileEntry) const; bool exists() const; + + // Translate global offset goff to file local offset. + off_t gtoloff(off_t goff) const; }; typedef SharedHandle FileEntryHandle; diff --git a/src/FtpDownloadCommand.cc b/src/FtpDownloadCommand.cc index 64929662..49a62b5c 100644 --- a/src/FtpDownloadCommand.cc +++ b/src/FtpDownloadCommand.cc @@ -49,12 +49,13 @@ namespace aria2 { FtpDownloadCommand::FtpDownloadCommand (int cuid, const RequestHandle& req, + const SharedHandle& fileEntry, RequestGroup* requestGroup, const SharedHandle& ftpConnection, DownloadEngine* e, const SocketHandle& dataSocket, const SocketHandle& ctrlSocket) - :DownloadCommand(cuid, req, requestGroup, e, dataSocket), + :DownloadCommand(cuid, req, fileEntry, requestGroup, e, dataSocket), _ftpConnection(ftpConnection), ctrlSocket(ctrlSocket) {} @@ -64,8 +65,9 @@ FtpDownloadCommand::~FtpDownloadCommand() {} bool FtpDownloadCommand::prepareForNextSegment() { if(getOption()->getAsBool(PREF_FTP_REUSE_CONNECTION) && - (uint64_t)_segments.front()->getPositionToWrite() == _requestGroup->getTotalLength()) { - Command* command = new FtpFinishDownloadCommand(cuid, req, _requestGroup, _ftpConnection, e, ctrlSocket); + static_cast(_fileEntry->gtoloff(_segments.front()->getPositionToWrite())) == _fileEntry->getLength()) { + Command* command = new FtpFinishDownloadCommand + (cuid, req, _fileEntry, _requestGroup, _ftpConnection, e, ctrlSocket); e->commands.push_back(command); if(_requestGroup->downloadFinished()) { diff --git a/src/FtpDownloadCommand.h b/src/FtpDownloadCommand.h index c4b2829e..1a2b3139 100644 --- a/src/FtpDownloadCommand.h +++ b/src/FtpDownloadCommand.h @@ -51,6 +51,7 @@ protected: public: FtpDownloadCommand(int cuid, const SharedHandle& req, + const SharedHandle& fileEntry, RequestGroup* requestGroup, const SharedHandle& ftpConnection, DownloadEngine* e, diff --git a/src/FtpFinishDownloadCommand.cc b/src/FtpFinishDownloadCommand.cc index ae2009d1..bf8af2a2 100644 --- a/src/FtpFinishDownloadCommand.cc +++ b/src/FtpFinishDownloadCommand.cc @@ -53,11 +53,12 @@ namespace aria2 { FtpFinishDownloadCommand::FtpFinishDownloadCommand (int cuid, const RequestHandle& req, + const SharedHandle& fileEntry, RequestGroup* requestGroup, const SharedHandle& ftpConnection, DownloadEngine* e, const SharedHandle& socket) - :AbstractCommand(cuid, req, requestGroup, e, socket), + :AbstractCommand(cuid, req, fileEntry, requestGroup, e, socket), _ftpConnection(ftpConnection) { e->addSocketForReadCheck(socket, this); diff --git a/src/FtpFinishDownloadCommand.h b/src/FtpFinishDownloadCommand.h index 2edc6d5b..81adf934 100644 --- a/src/FtpFinishDownloadCommand.h +++ b/src/FtpFinishDownloadCommand.h @@ -51,6 +51,7 @@ protected: public: FtpFinishDownloadCommand(int cuid, const SharedHandle& req, + const SharedHandle& fileEntry, RequestGroup* requestGroup, const SharedHandle& ftpConnection, DownloadEngine* e, diff --git a/src/FtpInitiateConnectionCommand.cc b/src/FtpInitiateConnectionCommand.cc index b6c15423..7453337a 100644 --- a/src/FtpInitiateConnectionCommand.cc +++ b/src/FtpInitiateConnectionCommand.cc @@ -56,9 +56,10 @@ namespace aria2 { FtpInitiateConnectionCommand::FtpInitiateConnectionCommand (int cuid, const RequestHandle& req, + const SharedHandle& fileEntry, RequestGroup* requestGroup, DownloadEngine* e) - :InitiateConnectionCommand(cuid, req, requestGroup, e) {} + :InitiateConnectionCommand(cuid, req, fileEntry, requestGroup, e) {} FtpInitiateConnectionCommand::~FtpInitiateConnectionCommand() {} @@ -86,7 +87,8 @@ Command* FtpInitiateConnectionCommand::createNextCommand (new HttpConnection(cuid, socket, getOption().get())); HttpRequestCommand* c = - new HttpRequestCommand(cuid, req, _requestGroup, hc, e, socket); + new HttpRequestCommand(cuid, req, _fileEntry, + _requestGroup, hc, e, socket); c->setProxyRequest(proxyRequest); command = c; } else if(proxyMethod == V_TUNNEL) { @@ -99,7 +101,8 @@ Command* FtpInitiateConnectionCommand::createNextCommand } else { if(proxyMethod == V_TUNNEL) { command = - new FtpNegotiationCommand(cuid, req, _requestGroup, e, pooledSocket, + new FtpNegotiationCommand(cuid, req, _fileEntry, + _requestGroup, e, pooledSocket, FtpNegotiationCommand::SEQ_SEND_CWD, options["baseWorkingDir"]); } else if(proxyMethod == V_GET) { @@ -109,7 +112,8 @@ Command* FtpInitiateConnectionCommand::createNextCommand (new HttpConnection(cuid, pooledSocket, getOption().get())); HttpRequestCommand* c = - new HttpRequestCommand(cuid, req, _requestGroup, hc, e, pooledSocket); + new HttpRequestCommand(cuid, req, _fileEntry, + _requestGroup, hc, e, pooledSocket); c->setProxyRequest(proxyRequest); command = c; } else { @@ -126,10 +130,12 @@ Command* FtpInitiateConnectionCommand::createNextCommand req->getPort()); socket.reset(new SocketCore()); socket->establishConnection(resolvedAddresses.front(), req->getPort()); - command = new FtpNegotiationCommand(cuid, req, _requestGroup, e, socket); + command = new FtpNegotiationCommand(cuid, req, _fileEntry, + _requestGroup, e, socket); } else { command = - new FtpNegotiationCommand(cuid, req, _requestGroup, e, pooledSocket, + new FtpNegotiationCommand(cuid, req, _fileEntry, + _requestGroup, e, pooledSocket, FtpNegotiationCommand::SEQ_SEND_CWD, options["baseWorkingDir"]); } diff --git a/src/FtpInitiateConnectionCommand.h b/src/FtpInitiateConnectionCommand.h index e915753d..6a6cded7 100644 --- a/src/FtpInitiateConnectionCommand.h +++ b/src/FtpInitiateConnectionCommand.h @@ -46,6 +46,7 @@ protected: const SharedHandle& proxyRequest); public: FtpInitiateConnectionCommand(int cuid, const SharedHandle& req, + const SharedHandle& fileEntry, RequestGroup* requestGroup, DownloadEngine* e); virtual ~FtpInitiateConnectionCommand(); diff --git a/src/FtpNegotiationCommand.cc b/src/FtpNegotiationCommand.cc index 1a857c03..cbdea101 100644 --- a/src/FtpNegotiationCommand.cc +++ b/src/FtpNegotiationCommand.cc @@ -68,14 +68,16 @@ namespace aria2 { -FtpNegotiationCommand::FtpNegotiationCommand(int32_t cuid, - const RequestHandle& req, - RequestGroup* requestGroup, - DownloadEngine* e, - const SocketHandle& s, - Seq seq, - const std::string& baseWorkingDir): - AbstractCommand(cuid, req, requestGroup, e, s), sequence(seq), +FtpNegotiationCommand::FtpNegotiationCommand +(int32_t cuid, + const RequestHandle& req, + const SharedHandle& fileEntry, + RequestGroup* requestGroup, + DownloadEngine* e, + const SocketHandle& s, + Seq seq, + const std::string& baseWorkingDir): + AbstractCommand(cuid, req, fileEntry, requestGroup, e, s), sequence(seq), ftp(new FtpConnection(cuid, socket, req, e->getAuthConfigFactory()->createAuthConfig(req), getOption().get())) @@ -96,7 +98,8 @@ bool FtpNegotiationCommand::executeInternal() { return prepareForRetry(0); } else if(sequence == SEQ_NEGOTIATION_COMPLETED) { FtpDownloadCommand* command = - new FtpDownloadCommand(cuid, req, _requestGroup, ftp, e, dataSocket, socket); + new FtpDownloadCommand + (cuid, req, _fileEntry, _requestGroup, ftp, e, dataSocket, socket); command->setStartupIdleTime(getOption()->getAsInt(PREF_STARTUP_IDLE_TIME)); command->setLowestDownloadSpeedLimit(getOption()->getAsInt(PREF_LOWEST_SPEED_LIMIT)); if(!_requestGroup->isSingleHostMultiConnectionEnabled()) { @@ -328,6 +331,7 @@ bool FtpNegotiationCommand::onFileSizeDetermined(uint64_t totalLength) SingleFileDownloadContextHandle dctx = dynamic_pointer_cast(_requestGroup->getDownloadContext()); dctx->setTotalLength(totalLength); + _fileEntry->setLength(totalLength); dctx->setFilename (strconcat(dctx->getDir(), "/", Util::urldecode(req->getFile()))); _requestGroup->preDownloadProcessing(); @@ -429,7 +433,7 @@ bool FtpNegotiationCommand::recvSize() { return onFileSizeDetermined(size); } else { - _requestGroup->validateTotalLength(size); + _requestGroup->validateTotalLength(_fileEntry->getLength(), size); } } else { diff --git a/src/FtpNegotiationCommand.h b/src/FtpNegotiationCommand.h index 18487033..bdbc2c15 100644 --- a/src/FtpNegotiationCommand.h +++ b/src/FtpNegotiationCommand.h @@ -123,6 +123,7 @@ protected: public: FtpNegotiationCommand(int32_t cuid, const SharedHandle& req, + const SharedHandle& fileEntry, RequestGroup* requestGroup, DownloadEngine* e, const SharedHandle& s, diff --git a/src/FtpTunnelResponseCommand.cc b/src/FtpTunnelResponseCommand.cc index 121bd24a..293b88c8 100644 --- a/src/FtpTunnelResponseCommand.cc +++ b/src/FtpTunnelResponseCommand.cc @@ -54,7 +54,8 @@ FtpTunnelResponseCommand::~FtpTunnelResponseCommand() {} Command* FtpTunnelResponseCommand::getNextCommand() { - return new FtpNegotiationCommand(cuid, req, _requestGroup, e, socket); + return new FtpNegotiationCommand(cuid, req, _fileEntry, + _requestGroup, e, socket); } } // namespace aria2 diff --git a/src/HttpDownloadCommand.cc b/src/HttpDownloadCommand.cc index 69b09be1..9819efea 100644 --- a/src/HttpDownloadCommand.cc +++ b/src/HttpDownloadCommand.cc @@ -52,12 +52,13 @@ namespace aria2 { HttpDownloadCommand::HttpDownloadCommand (int cuid, const RequestHandle& req, + const SharedHandle& fileEntry, RequestGroup* requestGroup, const SharedHandle& httpResponse, const HttpConnectionHandle& httpConnection, DownloadEngine* e, const SocketHandle& socket) - :DownloadCommand(cuid, req, requestGroup, e, socket), + :DownloadCommand(cuid, req, fileEntry, requestGroup, e, socket), _httpResponse(httpResponse), _httpConnection(httpConnection) {} @@ -67,7 +68,8 @@ bool HttpDownloadCommand::prepareForNextSegment() { bool downloadFinished = _requestGroup->downloadFinished(); if(req->isPipeliningEnabled() && !downloadFinished) { HttpRequestCommand* command = - new HttpRequestCommand(cuid, req, _requestGroup, _httpConnection, e, + new HttpRequestCommand(cuid, req, _fileEntry, + _requestGroup, _httpConnection, e, socket); // Set proxy request here. aria2 sends the HTTP request specialized for // proxy. @@ -81,8 +83,7 @@ bool HttpDownloadCommand::prepareForNextSegment() { (req->isKeepAliveEnabled() && ((!_transferEncodingDecoder.isNull() && _requestGroup->downloadFinished()) || - (uint64_t)_segments.front()->getPositionToWrite() == - _requestGroup->getTotalLength()))) { + static_cast(_fileEntry->gtoloff(_segments.front()->getPositionToWrite())) == _fileEntry->getLength()))) { e->poolSocket(req, isProxyDefined(), socket); } // The request was sent assuming that server supported pipelining, but diff --git a/src/HttpDownloadCommand.h b/src/HttpDownloadCommand.h index c4343e31..e1e995da 100644 --- a/src/HttpDownloadCommand.h +++ b/src/HttpDownloadCommand.h @@ -51,6 +51,7 @@ protected: public: HttpDownloadCommand(int cuid, const SharedHandle& req, + const SharedHandle& fileEntry, RequestGroup* requestGroup, const SharedHandle& httpResponse, const SharedHandle& httpConnection, diff --git a/src/HttpInitiateConnectionCommand.cc b/src/HttpInitiateConnectionCommand.cc index 797b2632..b9552db5 100644 --- a/src/HttpInitiateConnectionCommand.cc +++ b/src/HttpInitiateConnectionCommand.cc @@ -53,9 +53,10 @@ namespace aria2 { HttpInitiateConnectionCommand::HttpInitiateConnectionCommand (int cuid, const RequestHandle& req, + const SharedHandle& fileEntry, RequestGroup* requestGroup, DownloadEngine* e): - InitiateConnectionCommand(cuid, req, requestGroup, e) {} + InitiateConnectionCommand(cuid, req, fileEntry, requestGroup, e) {} HttpInitiateConnectionCommand::~HttpInitiateConnectionCommand() {} @@ -81,7 +82,9 @@ Command* HttpInitiateConnectionCommand::createNextCommand } else if(proxyMethod == V_GET) { SharedHandle httpConnection (new HttpConnection(cuid, socket, getOption().get())); - HttpRequestCommand* c = new HttpRequestCommand(cuid, req, _requestGroup, + HttpRequestCommand* c = new HttpRequestCommand(cuid, req, + _fileEntry, + _requestGroup, httpConnection, e, socket); c->setProxyRequest(proxyRequest); @@ -93,7 +96,9 @@ Command* HttpInitiateConnectionCommand::createNextCommand } else { SharedHandle httpConnection (new HttpConnection(cuid, pooledSocket, getOption().get())); - HttpRequestCommand* c = new HttpRequestCommand(cuid, req, _requestGroup, + HttpRequestCommand* c = new HttpRequestCommand(cuid, req, + _fileEntry, + _requestGroup, httpConnection, e, pooledSocket); if(proxyMethod == V_GET) { @@ -113,8 +118,8 @@ Command* HttpInitiateConnectionCommand::createNextCommand socket = pooledSocket; } SharedHandle httpConnection(new HttpConnection(cuid, socket, getOption().get())); - command = new HttpRequestCommand(cuid, req, _requestGroup, httpConnection, - e, socket); + command = new HttpRequestCommand(cuid, req, _fileEntry, _requestGroup, + httpConnection, e, socket); } return command; } diff --git a/src/HttpInitiateConnectionCommand.h b/src/HttpInitiateConnectionCommand.h index cd7ec83c..54f76523 100644 --- a/src/HttpInitiateConnectionCommand.h +++ b/src/HttpInitiateConnectionCommand.h @@ -46,6 +46,7 @@ protected: const SharedHandle& proxyRequest); public: HttpInitiateConnectionCommand(int cuid, const SharedHandle& req, + const SharedHandle& fileEntry, RequestGroup* requestGroup, DownloadEngine* e); diff --git a/src/HttpProxyResponseCommand.cc b/src/HttpProxyResponseCommand.cc index 65bdff2d..0054d6d9 100644 --- a/src/HttpProxyResponseCommand.cc +++ b/src/HttpProxyResponseCommand.cc @@ -54,7 +54,8 @@ HttpProxyResponseCommand::~HttpProxyResponseCommand() {} Command* HttpProxyResponseCommand::getNextCommand() { - return new HttpRequestCommand(cuid, req, _requestGroup, httpConnection, e, socket); + return new HttpRequestCommand(cuid, req, _fileEntry, + _requestGroup, httpConnection, e, socket); } } // namespace aria2 diff --git a/src/HttpRequest.cc b/src/HttpRequest.cc index 10f243e7..d46f68ac 100644 --- a/src/HttpRequest.cc +++ b/src/HttpRequest.cc @@ -73,7 +73,7 @@ off_t HttpRequest::getStartByte() const if(segment.isNull()) { return 0; } else { - return segment->getPositionToWrite(); + return _fileEntry->gtoloff(segment->getPositionToWrite()); } } @@ -83,7 +83,7 @@ off_t HttpRequest::getEndByte() const return 0; } else { if(request->isPipeliningEnabled()) { - return segment->getPosition()+segment->getLength()-1; + return _fileEntry->gtoloff(segment->getPosition()+segment->getLength()-1); } else { return 0; } diff --git a/src/HttpRequest.h b/src/HttpRequest.h index f84596a5..9ec32171 100644 --- a/src/HttpRequest.h +++ b/src/HttpRequest.h @@ -42,6 +42,7 @@ #include "SharedHandle.h" #include "Request.h" +#include "FileEntry.h" namespace aria2 { @@ -59,6 +60,8 @@ private: SharedHandle request; + SharedHandle _fileEntry; + SharedHandle segment; uint64_t entityLength; @@ -232,6 +235,16 @@ public: // Returns AuthConfig used in the last invocation of // createRequest(). const SharedHandle& getAuthConfig() const; + + void setFileEntry(const SharedHandle& fileEntry) + { + _fileEntry = fileEntry; + } + + const SharedHandle& getFileEntry() const + { + return _fileEntry; + } }; typedef SharedHandle HttpRequestHandle; diff --git a/src/HttpRequestCommand.cc b/src/HttpRequestCommand.cc index 7acd3fdf..d95e616b 100644 --- a/src/HttpRequestCommand.cc +++ b/src/HttpRequestCommand.cc @@ -58,11 +58,12 @@ namespace aria2 { HttpRequestCommand::HttpRequestCommand (int cuid, const RequestHandle& req, + const SharedHandle& fileEntry, RequestGroup* requestGroup, const HttpConnectionHandle& httpConnection, DownloadEngine* e, const SocketHandle& s) - :AbstractCommand(cuid, req, requestGroup, e, s), + :AbstractCommand(cuid, req, fileEntry, requestGroup, e, s), _httpConnection(httpConnection) { setTimeout(getOption()->getAsInt(PREF_CONNECT_TIMEOUT)); @@ -74,6 +75,7 @@ HttpRequestCommand::~HttpRequestCommand() {} static SharedHandle createHttpRequest(const SharedHandle& req, + const SharedHandle& fileEntry, const SharedHandle& segment, uint64_t totalLength, const SharedHandle