aria2/src/DownloadEngine.cc
Tatsuhiro Tsujikawa 2fb9b5be97 2006-09-19 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>
To rewrite segment download mechanism for HTTP/FTP download.
	Use BitfieldMan to manage segment download.
	* src/HttpResponseCommand.h
	(executeInternal): Pass the reference of segment.
	* src/AbstractCommand.cc
	(prepareForRetry): Call segmentMan->cancelSegment here.
	(onAbort): Call segmentMan->cancelSegment here.
	* src/HttpDownloadCommand.cc
	(prepareForNextSegment): New function.
	* src/DownloadEngineFactory.cc
	(newConsoleEngine): Removed splitter.
	(newTorrentConsoleEngine): Removed splitter.
	* src/Request.h
	(segment): Renamed from seg.
	* src/FtpInitiateConnectionCommand.h
	(executeInternal): Pass the reference of segment.
	* src/AbstractCommand.h
	(executeInternal): Pass the reference of segment.
	* src/pref.h
	(PREF_SEGMENT_SIZE): New definition.
	* src/HttpProxyRequestCommand.h
	(executeInternal): Pass the reference of segment.
	* src/HttpResponseCommand.cc
	(checkResponse): Allowed status 206 when a request range starts 
0.
	(handleDefaultEncoding): Rewritten the code related to Segment.
	(handleOtherEncoding): Rewritten the code related to Segment.
	* src/SegmentMan.h
	(SegmentEntry): New class.
	(SegmentEntries): New type definition.
	(bitfield): New variable.
	(usedSegmentEntries): New variable.
	(onNullBitfield): New function.
	(checkoutSegment): New function.
	(segments): Removed.
	(splitter): Removed.
	(unregisterId): Removed.
	(getSegment): New function(overload)
	(getDownloadedSize): Removed.
	(cancelSegment): New function.
	(completeSegment): New function.
	(initBitfield): New function.
	(hasSegment): New function.
	(getDownloadLength): New function.
	* src/BitfieldMan.h
	(getStartIndex): New function.
	(getEndIndex): New function.
	(getMissingUnusedIndex): New function(overload).
	(getSparseMissingUnusedIndex): New function.	
	* src/BitfieldMan.cc
	(getMissingIndexRandomly): Handle the last byte of bitfield 
properly.
	(getMissingUnusedIndex): New function(overload).
	(Range): New class.
	(getStartIndex): New function.
	(getEndIndex): New function.
	(getSparseMissingUnusedIndex): New function.
	(isBitSetInternal): Return false if the given index is less than 
0.
	* src/HttpInitiateConnectionCommand.h
	(executeInternal): Pass the reference of segment.
	* src/FtpNegotiateCommand.h
	(executeInternal): Pass the reference of segment.
	* src/FtpNegotiateCommand.cc
	(recvSize): Initialize bitfield here.
	* src/FtpTunnelResponseCommand.h
	(executeInternal): Pass the reference of segment.
	* src/HttpConnection.cc
	(createRequest): Rewritten range header processing.
	* src/DownloadCommand.h
	(executeInternal): Pass the reference of segment.
	(prepareForRetry): Removed.
	(prepareForNextSegment): Added an argument segment. Made it a 
virtual
	function.
	* src/main.cc
	(main): Set the initial value of PREF_SEGMENT_SIZE to 1MB.
	* src/SegmentMan.cc
	(SegmentMan): Added bitfield. Removed splitter.
	(~SegmentMan): Added bitfield. Removed splitter.
	(unregisterId): Removed.
	(getSegment): Rewritten.
	(updateSegment): Rewritten.
	(save): Rewritten.
	(read): Rewritten.
	(finished): Rewritten.
	(getDownloadedSize): Removed.
	(initBitfield): New function.
	(FindSegmentEntryByIndex): New function object.
	(FindSegmentEntryByCuid): New function object.
	(checkoutSegment): New function.
	(onNullBitfield): New function.
	(getSegment): New function(overload).
	(CancelSegment): New function object.
	(cancelSegment): New function.
	(completeSegment): New function.
	(hasSegment): New function.
	(getDownloadLength): New function.
	* src/FtpInitiateConnectionCommand.cc
	(executeInternal): Load .aria2 file after hostname resolution 
finishes.
	* src/Segment.h: Rewritten.
	* src/Segment.cc (operator<<): New function.
	* src/HttpDownloadCommand.h
	(prepareForNextSegment): New function.
	* src/Request.cc
	(resetUrl): Made segment null.
	* src/DownloadEngine.cc
	(~DownloadEngine): Call cleanQueue before deleting segmentMan.
	* src/HttpProxyRequestCommand.h
	(executeInternal): Pass the reference of segment.
	* src/DownloadCommand.cc
	(executeInternal): Rewritten the code related to Segment.
	(prepareForRetry): Removed.
	(prepareForNextSegment): Rewritten.
	* src/FtpTunnelResponseCommand.h
	(executeInternal): Pass the reference of segment.
	
	To add HTTP 1.1 persistent connection support(experimental)
	* src/HttpRequestCommand.cc
	(executeInternal): Disable keep alive if it is disabled by
	configuration.
	* src/Request.h
	(keepAlive): New variable.
	(isKeepAlive): New function.
	(setKeepAlive): New function.
	* src/pref.h
	(PREF_HTTP_KEEP_ALIVE): New definition.
	* src/HttpResponseCommand.cc
	(executeInternal): Check the remote server supports keep alive.
	* src/HttpConnection.cc
	(createRequest): Send "Connection: close" only if keep alive is
	disabled.
	* src/main.cc
	(main):
	Set the initial value(false) of PREF_KEEP_ALIVE to false.

	To add max download speed limit:
	* src/pref.h
	(PREF_MAX_SPEED_LIMIT): New definition.
	* src/PeerInteractionCommand.cc
	(executeInternal): Added max download speed limit. Not tested 
yet.
	* src/SegmentMan.h
	(PeerStats): New type definition.
	(peerStats): New variable.
	(registerPeerStat): New function.
	(FindPeerStat): New function object.
	(getPeerStat): New function.
	(calculateDownloadSpeed): New function.
	* src/SpeedCalc.h: New class.
	* src/SpeedCalc.cc: New class.
	* src/main.cc
	(main):
	Set the initial value of PREF_MAX_SPEED_LIMIT to 0(which means 
the
	download speed is not restricted).
	* src/PeerStat.h: New class.
	* src/SegmentMan.cc
	(registerPeerStat): New function.
	(calculateDownloadSpeed): New function.
	* src/DownloadCommand.cc
	(STARTUP_IDLE_TIME): New definition.
	(DownloadCommand): Register peerStat to segmentMan. Call 
peerStat->
	downloadStart.
	(~DownloadCommand): Call peerStat->downloadStop.
	(executeInternal): Added download speed limitter. Rewritten 
lowest
	speed limitter.

	* src/HttpConnection.cc
	(receiveResponse): Fixed: eohIndex[headerBuf] -> 
headerBuf[eohIndex].
	
	* src/AbstractCommand.cc
	(resolveHostname): Throw DlAbortEx if a name resolution failes.
	Added hostname to the error message.
	
	* src/ConsoleDownloadEngine.cc
	(calculateStatistics): Initialize psize with dlSize.

	* src/PieceMessage.cc
	(receivedAction): Do not call peerInteraction->abortPiece here.
	(onGotWrongPiece): Call peerInteraction->abortPiece here.

	* src/BitfieldMan.h
	(clearAllUseBit): New function.
	(setAllUseBit): New function.
	* src/BitfieldMan.cc
	(clearAllBit): Do not clear useBitfield here.
	(clearAllUseBit): New function.
	(setAllUseBit): New function.
	* src/Piece.cc
	(clearAllBlock): Call bitfield->clearAllUseBit().
2006-09-19 14:52:59 +00:00

297 lines
7.8 KiB
C++

/* <!-- copyright */
/*
* aria2 - a simple utility for downloading files faster
*
* Copyright (C) 2006 Tatsuhiro Tsujikawa
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
/* copyright --> */
#include "DownloadEngine.h"
#include "Util.h"
#include "LogFactory.h"
#include "TimeA2.h"
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <algorithm>
using namespace std;
DownloadEngine::DownloadEngine():noWait(false), segmentMan(0) {
logger = LogFactory::getInstance();
}
DownloadEngine::~DownloadEngine() {
cleanQueue();
delete segmentMan;
}
void DownloadEngine::cleanQueue() {
for_each(commands.begin(), commands.end(), Deleter());
commands.clear();
}
void DownloadEngine::run() {
initStatistics();
Time cp;
cp.setTimeInSec(0);
Commands activeCommands;
while(!commands.empty()) {
if(cp.elapsed(1)) {
cp.reset();
int max = commands.size();
for(int i = 0; i < max; i++) {
Command* com = commands.front();
commands.pop_front();
if(com->execute()) {
delete com;
}
}
} else {
for(Commands::iterator itr = activeCommands.begin();
itr != activeCommands.end(); itr++) {
Commands::iterator comItr = find(commands.begin(), commands.end(),
*itr);
assert(comItr != commands.end());
Command* command = *itr;
commands.erase(comItr);
if(command->execute()) {
delete command;
}
}
}
afterEachIteration();
activeCommands.clear();
if(!noWait && !commands.empty()) {
waitData(activeCommands);
}
noWait = false;
calculateStatistics();
}
onEndOfRun();
}
void DownloadEngine::shortSleep() const {
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 1000;
fd_set rfds;
FD_ZERO(&rfds);
select(0, &rfds, NULL, NULL, &tv);
}
class SetDescriptor {
private:
int* max_ptr;
fd_set* rfds_ptr;
fd_set* wfds_ptr;
public:
SetDescriptor(int* max_ptr, fd_set* rfds_ptr, fd_set* wfds_ptr):
max_ptr(max_ptr),
rfds_ptr(rfds_ptr),
wfds_ptr(wfds_ptr) {}
void operator()(const SocketEntry& entry) {
int fd = entry.socket->getSockfd();
switch(entry.type) {
case SocketEntry::TYPE_RD:
FD_SET(fd, rfds_ptr);
break;
case SocketEntry::TYPE_WR:
FD_SET(fd, wfds_ptr);
break;
}
if(*max_ptr < fd) {
*max_ptr = fd;
}
}
#ifdef ENABLE_ASYNC_DNS
void operator()(const NameResolverEntry& entry) {
int tempFd = entry.nameResolver->getFds(rfds_ptr, wfds_ptr);
if(*max_ptr < tempFd) {
*max_ptr = tempFd;
}
}
#endif // ENABLE_ASYNC_DNS
};
class AccumulateActiveCommand {
private:
Commands* activeCommands_ptr;
fd_set* rfds_ptr;
fd_set* wfds_ptr;
public:
AccumulateActiveCommand(Commands* activeCommands_ptr,
fd_set* rfds_ptr,
fd_set* wfds_ptr):
activeCommands_ptr(activeCommands_ptr),
rfds_ptr(rfds_ptr),
wfds_ptr(wfds_ptr) {}
void operator()(const SocketEntry& entry) {
if(FD_ISSET(entry.socket->getSockfd(), rfds_ptr) ||
FD_ISSET(entry.socket->getSockfd(), wfds_ptr)) {
activeCommands_ptr->push_back(entry.command);
}
/*
switch(entry.type) {
case SocketEntry::TYPE_RD:
if(FD_ISSET(entry.socket->getSockfd(), rfds_ptr)) {
activeCommands_ptr->push_back(entry.command);
}
break;
case SocketEntry::TYPE_WR:
if(FD_ISSET(entry.socket->getSockfd(), wfds_ptr)) {
activeCommands_ptr->push_back(entry.command);
}
break;
}
*/
}
#ifdef ENABLE_ASYNC_DNS
void operator()(const NameResolverEntry& entry) {
entry.nameResolver->process(rfds_ptr, wfds_ptr);
switch(entry.nameResolver->getStatus()) {
case NameResolver::STATUS_SUCCESS:
case NameResolver::STATUS_ERROR:
activeCommands_ptr->push_back(entry.command);
break;
default:
break;
}
}
#endif // ENABLE_ASYNC_DNS
};
void DownloadEngine::waitData(Commands& activeCommands) {
fd_set rfds;
fd_set wfds;
int retval = 0;
struct timeval tv;
memcpy(&rfds, &rfdset, sizeof(fd_set));
memcpy(&wfds, &wfdset, sizeof(fd_set));
tv.tv_sec = 1;
tv.tv_usec = 0;
retval = select(fdmax+1, &rfds, &wfds, NULL, &tv);
if(retval > 0) {
for_each(socketEntries.begin(), socketEntries.end(),
AccumulateActiveCommand(&activeCommands, &rfds, &wfds));
#ifdef ENABLE_ASYNC_DNS
for_each(nameResolverEntries.begin(), nameResolverEntries.end(),
AccumulateActiveCommand(&activeCommands, &rfds, &wfds));
#endif // ENABLE_ASYNC_DNS
sort(activeCommands.begin(), activeCommands.end());
activeCommands.erase(unique(activeCommands.begin(),
activeCommands.end()),
activeCommands.end());
}
}
void DownloadEngine::updateFdSet() {
fdmax = 0;
FD_ZERO(&rfdset);
FD_ZERO(&wfdset);
#ifdef ENABLE_ASYNC_DNS
for_each(nameResolverEntries.begin(), nameResolverEntries.end(),
SetDescriptor(&fdmax, &rfdset, &wfdset));
#endif // ENABLE_ASYNC_DNS
for_each(socketEntries.begin(), socketEntries.end(),
SetDescriptor(&fdmax, &rfdset, &wfdset));
}
bool DownloadEngine::addSocket(const SocketEntry& entry) {
SocketEntries::iterator itr =
find(socketEntries.begin(), socketEntries.end(), entry);
if(itr == socketEntries.end()) {
socketEntries.push_back(entry);
updateFdSet();
return true;
} else {
return false;
}
}
bool DownloadEngine::deleteSocket(const SocketEntry& entry) {
SocketEntries::iterator itr =
find(socketEntries.begin(), socketEntries.end(), entry);
if(itr == socketEntries.end()) {
return false;
} else {
socketEntries.erase(itr);
updateFdSet();
return true;
}
}
bool DownloadEngine::addSocketForReadCheck(const SocketHandle& socket,
Command* command) {
SocketEntry entry(socket, command, SocketEntry::TYPE_RD);
return addSocket(entry);
}
bool DownloadEngine::deleteSocketForReadCheck(const SocketHandle& socket,
Command* command) {
SocketEntry entry(socket, command, SocketEntry::TYPE_RD);
return deleteSocket(entry);
}
bool DownloadEngine::addSocketForWriteCheck(const SocketHandle& socket,
Command* command) {
SocketEntry entry(socket, command, SocketEntry::TYPE_WR);
return addSocket(entry);
}
bool DownloadEngine::deleteSocketForWriteCheck(const SocketHandle& socket,
Command* command) {
SocketEntry entry(socket, command, SocketEntry::TYPE_WR);
return deleteSocket(entry);
}
#ifdef ENABLE_ASYNC_DNS
bool DownloadEngine::addNameResolverCheck(const NameResolverHandle& resolver,
Command* command) {
NameResolverEntry entry(resolver, command);
NameResolverEntries::iterator itr = find(nameResolverEntries.begin(),
nameResolverEntries.end(),
entry);
if(itr == nameResolverEntries.end()) {
nameResolverEntries.push_back(entry);
updateFdSet();
return true;
} else {
return false;
}
}
bool DownloadEngine::deleteNameResolverCheck(const NameResolverHandle& resolver,
Command* command) {
NameResolverEntry entry(resolver, command);
NameResolverEntries::iterator itr = find(nameResolverEntries.begin(),
nameResolverEntries.end(),
entry);
if(itr == nameResolverEntries.end()) {
return false;
} else {
nameResolverEntries.erase(itr);
updateFdSet();
return true;
}
}
#endif // ENABLE_ASYNC_DNS