diff --git a/ChangeLog b/ChangeLog index 17bb3473..61d533d5 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,301 @@ +2006-07-19 Tatsuhiro Tsujikawa + + * src/SharedHandle.h: New class. + + To wrap Socket, Command, PeerMessage and Peer with SharedHandle: + + * src/HttpResponseCommand.h + (HttpResponseCommand): Wrapped Socket. + * src/SocketCore.h + (operator==): New function. + (operator!=): New function. + (operator<): New function. + (getSockfd): New function. + (isOpen): New function. + (writeData): New function. + * src/SocketCore.cc + (operator==): New function. + (operator!=): New function. + (operator<): New function. + * src/AbstractCommand.h + (socket): Changed its type to SocketHandle. + (setReadCheckSocket): Replaced Socket with SocketHandle. + (setWriteCheckSocket): Replaced Socket with SocketHandle. + (disableReadCheckSocket): New function. + (disableWriteCheckSocket): New function. + (readCheckTarget): Changed its type to SocketHandle. + (writeCheckTarget): Changed its type to SocketHandle. + (AbstractCommand): Replaced Socket with SocketHandle. + * src/AbstractCommand.cc + (AbstractCommand): Replaced Socket with SocketHandle. + (~AbstractCommand): Removed the deallocation for Socket object. + (disableReadCheckSocket): New function. + (setReadCheckSocket): Replaced Socket with SocketHandle. + (disableWriteCheckSocket): New function. + (setWriteCheckSocket): Replaced Socket with SocketHandle. + * src/HttpDownloadCommand.cc + (DownloadCommand): Replaced Socket with SocketHandle. + * src/PeerAbstractCommand.h + (socket): Changed its type to SocketHandle. + (peer): Changed its type to PeerHandle. + (setReadCheckSocket): Replaced Socket with SocketHandle. + (setWriteCheckSocket): Replaced Socket with SocketHandle. + (disableReadCheckSocket): New function. + (disableWriteCheckSocket): New function. + (readCheckTarget): Changed its type to SocketHandle. + (writeCheckTarget): Changed its type to SocketHandle. + (PeerAbstractCommand): Replaced Socket with SocketHandle. + Replaced Peer with PeerHandle. + * src/HttpRequestCommand.cc + (HttpRequestCommand): Replaced Socket with SocketHandle. + Use disableReadCheckSocket. + * src/PeerInitiateConnectionCommand.h + (PeerInitiateConnectionCommand): Replaced Peer with PeerHandle. + * src/PeerChokeCommand.cc + (UploadFaster::operator()): Replaced Peer with PeerHandle. + (DownloadFaster::operator()): Replaced Peer with PeerHandle. + (execute): Use PeerHandle. + * src/PeerConnection.h + (HandshakeMessage.h): Removed include of HandshakeMessage.h. + (socket): Changed its type to SocketHandle. + (PeerConnection): Replaced Socket with SocketHandle. + * src/PeerConnection.cc + (PeerConnection): Replaced Socket with SocketHandle. + * src/PeerInteractionCommand.h + (PeerInteractionCommand): Replaced socket with SocketHandle. + Replaced Peer with PeerHandle. + * src/PeerInteractionCommand.cc + (PeerInteractionCommand): Replaced Socket with SocketHandle. + Replaced Peer with PeerHandle. + (executeInternal): Use disableWriteCheckSocket. + Use HandshakeMessageHandle. + (receiveMessages): Use PeerMessageHandle. + (prepareForNextPeer): Use PeerHandle. + * src/HttpProxyRequestCommand.h + (HttpProxyRequestCommand): Replaced Socket with SocketHandle. + * src/HttpResponseCommand.cc + (HttpResponseCommand): Replaced Socket with SocketHandle. + * src/TorrentMan.cc + (nullPeer): Added external reference. + (~TorrentMan): Removed the deallocation of the elements of peers. + (addPeer): Rewritten. + (isPeerAvailable): Use nullPeer. + (deleteOldpeers): Replaced with deleteErrorPeer. + (deleteErrorPeer): New function. + (getPeer): Use PeerHandle and nullPeer. + (hasMissingPiece): Replaced Peer with PeerHandle. + (getMissingPieceIndex): Replaced Peer with PeerHandle. + (getMissingFastPieceIndex): Replaced Peer with PeerHandle. + (getMissingFastPiece): Replaced Peer with PeerHandle. + (getMissingPiece): Replaced Peer with PeerHandle. + * src/FtpNegotiateCommand.cc + (FtpNegotiationCommand): Replaced Peer with PeerHandle. + (~FtpNegotiationCommand): Removed the deallocation of Sockets. + (recvGreeting): Use disableWriteCheckSocket. + (recvPasv): Removed the allocation of Socket. + Use disableReadCheckSocket. + (sendRestPasv): Use disableWriteCheckSocket. + (recvRetr): Changed assertion. + * src/PeerInteraction.h + (SharedHandle.h): Included SharedHandle.h. + (PeerMessageHandle): New type definition. + (HandshakeMessageHandle): New type definition. + (MessageQueue): Changed. Now its element is of type PeerMessageHandle. + (peer): Changed its type to PeerHandle. + (createHandshakeMessage): Replaced HandshakeMessage with + HandshakeMessageHandle. + (createPeerMessage): Replaced PeerMessageHandle with PeerMessage. + (PeerInteraction): Replaced Peer with PeerHandle. + Replaced Socket with SocketHandle. + (addMessage): Replaced PeerMessage with PeerMessageHandle. + (receiveMessage): Replaced PeerMessage with PeerMessageHandle. + (receiveHandshake): Replaced HandshakeMessage with + HandshakeMessageHandle. + * src/PeerInteraction.cc + (PeerInteraction): Replaced Peer with PeerHandle. + Replaced Socket with SocketHandle. + (~PeerInteraction): Removed the deallocation of the elements of + messageQueue. + (MsgPushBack::operator()): Replaced PeerMessage with PeerMessageHandle. + (isSendingMessageInProgress): Replaced PeerMessage with + PeerMessageHandle. + (sendMessages): Use PeerMessageHandle. Removed try-catch block. + (addMessage): Replaced PeerMessage with PeerMessageHandle. + (rejectAllPieceMessageInQueue): Use PeerMessageHandle. + (rejectPieceMessageInQueue): Use PeerMessageHandle. + (abortPiece): Use PeerMessageHandle. + (receiveHandshake): Replaced HandshakeMessage with + HandshakeMessageHandle. Removed try-catch block. + (createHandshakeMessage): Replaced HandshakeMessage with + HandshakeMessageHandle. + (receiveMessage): Replaced PeerMessage with PeerMessageHandle. + Removed try-catch block. + (createPeerMessage): Replaced PeerMessage with PeerMessageHandle. + * src/HttpProxyResponseCommand.cc + (HttpProxyRequestCommand): Replaced Socket with SocketHandle. + * src/FtpTunnelResponseCommand.h + (FtpTunnelResponseCommand): Replaced Socket with SocketHandle. + * src/HttpConnection.cc + (HttpConnection): Replaced Socket with SocketHandle. + * src/PeerAbstractCommand.cc + (PeerAbstractCommand): Replaced Socket with SocketHandle. + (~PeerAbstractCommand): Removed the deallocation of socket. + Use disableReadCheckSocket, disableWriteCheckSocket. + (disableReadCheckSocket): New function. + (setReadCheckSocket): Replaced Socket with SocketHandle. + (disableWriteCheckSocket): New function. + (setWriteCheckSocket): Replaced Socket with SocketHandle. + * src/InitiateConnectionCommandFactory.h: Corrected indentation. + * src/FtpTunnelRequestCommand.cc + (FtpTunnelRequestCommand): Replaced Socket with SocketHandle. + (~FtpTunnelRequestCommand): Corrected indentation. + * src/DownloadCommand.h + (DownloadCommand): Replaced Socket with SocketHandle. + * src/PeerListenCommand.cc + (PeerListenCommand): Removed the initialization of socket. + (~PeerListenCommand): Removed the deallocation of socket. + (bindPort): Use SocketHandle. + (execute): Use SocketHandle and PeerHandle. + * src/FtpDowndloadCommand.cc + (FtpDownloadCommand): Replaced Socket with SocketHandle. + (~FtpDownloadCommand): Removed the deallocation of ctrlSocket. + * src/main.cc + (main): Corrected indentation. + * src/HttpInitiateConnectionCommand.cc + (HttpInitiateConnectionCommand): Replaced Socket with SocketHandle. + (executeInternal): Removed the allocation of socket. + * src/HttpRequestCommand.h + (HttpRequestCommand): Replaced Socket with SocketHandle. + * src/FtpNegotiationCommand.h + (dataSocket): Changed its type to SocketHandle. + (serverSocket): Changed its type to SocketHandle. + (FtpNegotiationCommand): Replaced Socket with SocketHandle. + * src/TorrentMan.h + (MAX_PEER_UPDATE): Removed. + (MAX_PEERS): New definition. + (Peers): The element is now of type PeerHandle. + (addPeer): Replaced Peer with PeerHandle. Removed 'duplicate' argument. + (getPeer): Replaced Peer with PeerHandle. + (deleteOldErrorPeers): Removed. + (deleteErrorPeer): New function. + (hasMissingPiece): Replaced Peer with PeerHandle. + (getMissingPieceIndex): Replaced Peer with PeerHandle. + (getMissingPiece): Replaced Peer with PeerHandle. + (getMissingFastPieceIndex): Replaced Peer with PeerHandle. + (getMissingFastPiece): Replaced Peer with PeerHandle. + (addActivePeer): Replaced Peer with PeerHandle. + (deleteActivePeer): Replaced Peer with PeerHandle. + Added a check for the return value of find. + * src/FtpTunnelResponseCommand.cc + (FtpTunnelResponseCommand): Replaced Socket with SocketHandle. + * src/FtpInitiateConnectionCommand.cc + (executeInternal): Removed the allocation of socket. + * src/DownloadEngine.h + (Sockets): An element is now of type SocketHandle. + (SockCmdMap): A key is of type SocketHandle, a value is of type int. + (CommandUuids): New type definition. + (rsockets): Changed its type to SockCmdMap. + (wsockets): Changed its type to SockCmdMap. + (addSocket): Rewritten. + (deleteSocket): Rewritten. + (addSocketForReadCheck): Rewritten. + (deleteSocketForReadCheck): Rewritten. + (addSocketForWriteCheck): Rewritten. + (deleteSocketForWriteCheck): Rewritten. + (PairFind): New template class. + * src/HttpDownloadCommand.h + (HttpDownloadCommand): Replaced Socket with SocketHandle. + * src/FtpConnection.cc + (FtpConnection): Replaced Socket with SocketHandle. + (sendPort): Removed the allocation of serverSocket. Removed try-catch + block. + * src/InitiateConnectionCommandFactory.cc + (DlAbortEx.h): Included DlAbortEx.h. + (createInitiateConnectionCommand): Throw exception if the protocol of + requested URI is not supported. + * src/Peer.cc + (nullPeer): Changed its type to PeerHandle. + (operator==): New function. + (operator!=): New function. + * src/Peer.h + (SharedHandle.h): Included SharedHandle.h. + (operator==): New function. + (operator!=): New function. + (Peer): Added the default constructor. + Use resetStatus() to initialize member variables. + (nullPeer): Removed. + * src/TrackerUpdateCommand.cc + (execute): Brushed up using SharedHandle. Replaced MAX_PEER_UPDATE + with MIN_PEERS. + * src/PeerListenCommand.h + (socket): Changed its type to SocketHandle. + * src/Command.h + (CommandUuid): New type definition. + (uuid): New variable. + (uuidGen): New variable. + (Command): Added the initialization of uuid. + (getUuid): New function. + * src/Socket.h + (Socket): Removed. + (SocketHandle): New type definition. + * src/DownloadEngine.h + (FindCommand): New function object. + (run): The portion of socket check was rewritten. + (SetDescriptor): New function object. + (AccumulateActiveCommandUuid): New function object. + (waitData): Rewritten. + (addSocket): Rewritten. + (deleteSocket): Rewritten. + (addSocketForReadCheck): Rewritten. + (addSocketForWriteCheck): Rewritten. + (deleteSocketForReadCheck): Rewritten. + (deleteSocketForWriteCheck): Rewritten. + * src/HttpProxyResponseCommand.h + (HttpProxyResponseCommand): Replaced Socket with SocketHandle. + * src/HttpConnection.h + (socket): Changed its type to SocketHandle. + (HttpConnection): Replaced Socket with SocketHandle. + * src/PeerInitiateConnectionCommand.cc + (PeerInitiateConnectionCommand): Replaced Peer with PeerHandle. + (executeInternal): Removed the allocation of socket. + (prepareForNextPeer): Use PeerHandle. + * src/PeerMessage.h + (peer): Changed its type to PeerHandle. + (getPeer): Replaced Peer with PeerHandle. + (setPeer): Replaced Peer with PeerHandle. + * src/DownloadCommand.cc + (DownloadCommand): Replaced Socket with SocketHandle. + * src/FtpConnection.h + (socket): Changed its type to SocketHandle. + (FtpConnection): Replaced Socket with SocketHandle. + (sendPort); Replaced Socket with SocketHandle. + * src/FtpDowndloadCommand.h + (ctrlSocket): Changed its type to SocketHandle. + (FtpDownloadCommand): Replaced Socket with SocketHandle. + * src/HttpProxyRequestCommand.cc + (HttpProxyRequestCommand): Replaced Socket with SocketHandle. + * src/FtpTunnelRequestCommand.h + (FtpTunnelRequestCommand): Replaced Socket with SocketHandle. + + etc + + * src/PeerChokeCommand.h + (setAllPeerChoked): Removed. + (setAllPeerResetDelta): Removed. + * src/PeerChokeCommand.cc + (setAllPeerChoked): Removed. + (ChokePeer): New function object. + (setAllPeerResetDelta): Removed. + (ResetDelta): New function object. + (orderByDownloadRate): Fixed a bug: use DowloadFaster, not UploadFaster + (execute): Show download speed when the local node is a seeder. + setAllPeerChoked and setAllPeerResetDelta were rewritten + using STL. + * src/TrackerWatcherCommand.h + (MIN_PEERS): Removed. + * src/TorrentMan.cc + (getPeer): Replaced MAX_PEER_UPDATE with MIN_PEERS. + 2006-07-07 Tatsuhiro Tsujikawa To fix the bug that .aria2 file is not saved if downloading is stopped diff --git a/TODO b/TODO index b590fc88..881056d8 100644 --- a/TODO +++ b/TODO @@ -11,3 +11,4 @@ * Refacturing HttpConnection and FtpConnection * Query resource by location * Log version +* List available os, version, etc for metalink \ No newline at end of file diff --git a/src/AbstractCommand.cc b/src/AbstractCommand.cc index a622959a..ffd72393 100644 --- a/src/AbstractCommand.cc +++ b/src/AbstractCommand.cc @@ -28,24 +28,18 @@ #include "SleepCommand.h" #include "prefs.h" -AbstractCommand::AbstractCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s): - Command(cuid), req(req), e(e), checkSocketIsReadable(false), checkSocketIsWritable(false) { - - if(s != NULL) { - socket = new Socket(*s); - setReadCheckSocket(socket); - } else { - socket = NULL; - } +AbstractCommand::AbstractCommand(int cuid, Request* req, DownloadEngine* e, + const SocketHandle& s): + Command(cuid), req(req), e(e), socket(s), + checkSocketIsReadable(false), checkSocketIsWritable(false) { + + setReadCheckSocket(socket); timeout = this->e->option->getAsInt(PREF_TIMEOUT); } AbstractCommand::~AbstractCommand() { - setReadCheckSocket(NULL); - setWriteCheckSocket(NULL); - if(socket != NULL) { - delete(socket); - } + disableReadCheckSocket(); + disableWriteCheckSocket(); } bool AbstractCommand::execute() { @@ -124,44 +118,52 @@ void AbstractCommand::onAbort(Exception* ex) { e->segmentMan->unregisterId(cuid); } -void AbstractCommand::setReadCheckSocket(Socket* socket) { - if(socket == NULL) { - if(checkSocketIsReadable) { - e->deleteSocketForReadCheck(readCheckTarget); - checkSocketIsReadable = false; - readCheckTarget = NULL; - } +void AbstractCommand::disableReadCheckSocket() { + if(checkSocketIsReadable) { + e->deleteSocketForReadCheck(readCheckTarget, getUuid()); + checkSocketIsReadable = false; + readCheckTarget = SocketHandle(); + } +} + +void AbstractCommand::setReadCheckSocket(const SocketHandle& socket) { + if(!socket->isOpen()) { + disableReadCheckSocket(); } else { if(checkSocketIsReadable) { if(readCheckTarget != socket) { - e->deleteSocketForReadCheck(readCheckTarget); - e->addSocketForReadCheck(socket, this); + e->deleteSocketForReadCheck(readCheckTarget, getUuid()); + e->addSocketForReadCheck(socket, getUuid()); readCheckTarget = socket; } } else { - e->addSocketForReadCheck(socket, this); + e->addSocketForReadCheck(socket, getUuid()); checkSocketIsReadable = true; readCheckTarget = socket; } } } -void AbstractCommand::setWriteCheckSocket(Socket* socket) { - if(socket == NULL) { - if(checkSocketIsWritable) { - e->deleteSocketForWriteCheck(writeCheckTarget); - checkSocketIsWritable = false; - writeCheckTarget = NULL; - } +void AbstractCommand::disableWriteCheckSocket() { + if(checkSocketIsWritable) { + e->deleteSocketForWriteCheck(writeCheckTarget, getUuid()); + checkSocketIsWritable = false; + writeCheckTarget = SocketHandle(); + } +} + +void AbstractCommand::setWriteCheckSocket(const SocketHandle& socket) { + if(!socket->isOpen()) { + disableWriteCheckSocket(); } else { if(checkSocketIsWritable) { if(writeCheckTarget != socket) { - e->deleteSocketForWriteCheck(writeCheckTarget); - e->addSocketForWriteCheck(socket, this); + e->deleteSocketForWriteCheck(writeCheckTarget, getUuid()); + e->addSocketForWriteCheck(socket, getUuid()); writeCheckTarget = socket; } } else { - e->addSocketForWriteCheck(socket, this); + e->addSocketForWriteCheck(socket, getUuid()); checkSocketIsWritable = true; writeCheckTarget = socket; } diff --git a/src/AbstractCommand.h b/src/AbstractCommand.h index 6eb6e9f4..6d84013d 100644 --- a/src/AbstractCommand.h +++ b/src/AbstractCommand.h @@ -35,23 +35,25 @@ private: protected: Request* req; DownloadEngine* e; - Socket* socket; + SocketHandle socket; void tryReserved(); virtual bool prepareForRetry(int wait); virtual void onAbort(Exception* ex); virtual bool executeInternal(Segment segment) = 0; - void setReadCheckSocket(Socket* socket); - void setWriteCheckSocket(Socket* socket); + void setReadCheckSocket(const SocketHandle& socket); + void setWriteCheckSocket(const SocketHandle& socket); + void disableReadCheckSocket(); + void disableWriteCheckSocket(); void setTimeout(int timeout) { this->timeout = timeout; } private: bool checkSocketIsReadable; bool checkSocketIsWritable; - Socket* readCheckTarget; - Socket* writeCheckTarget; + SocketHandle readCheckTarget; + SocketHandle writeCheckTarget; public: - AbstractCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s= NULL); + AbstractCommand(int cuid, Request* req, DownloadEngine* e, const SocketHandle& s = SocketHandle()); virtual ~AbstractCommand(); bool execute(); }; diff --git a/src/Command.h b/src/Command.h index 4b0639cd..dd06a420 100644 --- a/src/Command.h +++ b/src/Command.h @@ -25,18 +25,24 @@ #include "common.h" #include "LogFactory.h" +typedef int CommandUuid; + class Command { +private: + CommandUuid uuid; + static int uuidGen; protected: int cuid; const Logger* logger; public: - Command(int cuid):cuid(cuid) { + Command(int cuid):uuid(uuidGen++), cuid(cuid) { logger = LogFactory::getInstance(); } virtual ~Command() {} virtual bool execute() = 0; int getCuid() const { return cuid; } + const CommandUuid& getUuid() const { return uuid; } }; #endif // _D_COMMAND_H_ diff --git a/src/DownloadCommand.cc b/src/DownloadCommand.cc index 98b117c6..441b00de 100644 --- a/src/DownloadCommand.cc +++ b/src/DownloadCommand.cc @@ -27,7 +27,10 @@ #include "InitiateConnectionCommandFactory.h" #include "message.h" -DownloadCommand::DownloadCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s):AbstractCommand(cuid, req, e, s), lastSize(0) {} +DownloadCommand::DownloadCommand(int cuid, Request* req, DownloadEngine* e, + const SocketHandle& s): + AbstractCommand(cuid, req, e, s), lastSize(0) { +} DownloadCommand::~DownloadCommand() {} diff --git a/src/DownloadCommand.h b/src/DownloadCommand.h index b1f4d2a0..9f10de02 100644 --- a/src/DownloadCommand.h +++ b/src/DownloadCommand.h @@ -38,7 +38,8 @@ protected: bool prepareForRetry(int wait); bool prepareForNextSegment(); public: - DownloadCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s); + DownloadCommand(int cuid, Request* req, DownloadEngine* e, + const SocketHandle& s); virtual ~DownloadCommand(); virtual TransferEncoding* getTransferEncoding(const string& transferEncoding) = 0; diff --git a/src/DownloadEngine.cc b/src/DownloadEngine.cc index e2bef95f..5b0469a7 100644 --- a/src/DownloadEngine.cc +++ b/src/DownloadEngine.cc @@ -47,10 +47,26 @@ void DownloadEngine::cleanQueue() { commands.clear(); } + +class FindCommand { +private: + CommandUuid uuid; +public: + FindCommand(const CommandUuid& uuid):uuid(uuid) {} + + bool operator()(const Command* command) { + if(command->getUuid() == uuid) { + return true; + } else { + return false; + } + } +}; + void DownloadEngine::run() { initStatistics(); Time cp; - Sockets activeSockets; + CommandUuids activeCommandUuids; while(!commands.empty()) { if(cp.elapsed(1)) { cp.reset(); @@ -59,27 +75,26 @@ void DownloadEngine::run() { Command* com = commands.front(); commands.pop_front(); if(com->execute()) { - delete(com); + delete com; } } } else { - for(Sockets::iterator itr = activeSockets.begin(); - itr != activeSockets.end(); itr++) { - Socket* socket = *itr; - SockCmdMap::iterator mapItr = sockCmdMap.find(socket); - if(mapItr != sockCmdMap.end()) { - Command* com = (*mapItr).second; - commands.erase(remove(commands.begin(), commands.end(), com)); - if(com->execute()) { - delete(com); - } + for(CommandUuids::iterator itr = activeCommandUuids.begin(); + itr != activeCommandUuids.end(); itr++) { + Commands::iterator comItr = find_if(commands.begin(), commands.end(), + FindCommand(*itr)); + assert(comItr != commands.end()); + Command* com = *comItr; + commands.erase(comItr); + if(com->execute()) { + delete com; } } } afterEachIteration(); - activeSockets.clear(); + activeCommandUuids.clear(); if(!noWait && !commands.empty()) { - waitData(activeSockets); + waitData(activeCommandUuids); } noWait = false; calculateStatistics(); @@ -96,7 +111,40 @@ void DownloadEngine::shortSleep() const { select(0, &rfds, NULL, NULL, &tv); } -void DownloadEngine::waitData(Sockets& activeSockets) { +class SetDescriptor { +private: + fd_set* fds_ptr; + int* max_ptr; +public: + SetDescriptor(int* max_ptr, fd_set* fds_ptr) + :fds_ptr(fds_ptr), max_ptr(max_ptr) {} + + void operator()(const pair& pa) { + int fd = pa.first->getSockfd(); + FD_SET(fd, fds_ptr); + if(*max_ptr < fd) { + *max_ptr = fd; + } + } +}; + +class AccumulateActiveCommandUuid { +private: + CommandUuids* activeCommandUuids_ptr; + fd_set* fds_ptr; +public: + AccumulateActiveCommandUuid(CommandUuids* activeCommandUuids_ptr, + fd_set* fds_ptr) + :activeCommandUuids_ptr(activeCommandUuids_ptr), fds_ptr(fds_ptr) {} + + void operator()(const pair& pa) { + if(FD_ISSET(pa.first->getSockfd(), fds_ptr)) { + activeCommandUuids_ptr->push_back(pa.second); + } + } +}; + +void DownloadEngine::waitData(CommandUuids& activeCommandUuids) { fd_set rfds; fd_set wfds; int retval = 0; @@ -106,18 +154,9 @@ void DownloadEngine::waitData(Sockets& activeSockets) { FD_ZERO(&rfds); FD_ZERO(&wfds); int max = 0; - for(Sockets::iterator itr = rsockets.begin(); itr != rsockets.end(); itr++) { - FD_SET((*itr)->getSockfd(), &rfds); - if(max < (*itr)->getSockfd()) { - max = (*itr)->getSockfd(); - } - } - for(Sockets::iterator itr = wsockets.begin(); itr != wsockets.end(); itr++) { - FD_SET((*itr)->getSockfd(), &wfds); - if(max < (*itr)->getSockfd()) { - max = (*itr)->getSockfd(); - } - } + for_each(rsockmap.begin(), rsockmap.end(), SetDescriptor(&max, &rfds)); + for_each(wsockmap.begin(), wsockmap.end(), SetDescriptor(&max, &wfds)); + tv.tv_sec = 1; tv.tv_usec = 0; retval = select(max+1, &rfds, &wfds, NULL, &tv); @@ -126,64 +165,60 @@ void DownloadEngine::waitData(Sockets& activeSockets) { } } if(retval > 0) { - for(Sockets::iterator itr = rsockets.begin(); itr != rsockets.end(); itr++) { - if(FD_ISSET((*itr)->getSockfd(), &rfds)) { - activeSockets.push_back(*itr); - } - } - for(Sockets::iterator itr = wsockets.begin(); itr != wsockets.end(); itr++) { - if(FD_ISSET((*itr)->getSockfd(), &wfds)) { - activeSockets.push_back(*itr); - } - } - sort(activeSockets.begin(), activeSockets.end()); - activeSockets.erase(unique(activeSockets.begin(), activeSockets.end()), activeSockets.end()); + for_each(rsockmap.begin(), rsockmap.end(), + AccumulateActiveCommandUuid(&activeCommandUuids, &rfds)); + for_each(wsockmap.begin(), wsockmap.end(), + AccumulateActiveCommandUuid(&activeCommandUuids, &wfds)); + sort(activeCommandUuids.begin(), activeCommandUuids.end()); + activeCommandUuids.erase(unique(activeCommandUuids.begin(), + activeCommandUuids.end()), + activeCommandUuids.end()); } } -bool DownloadEngine::addSocket(Sockets& sockets, Socket* socket, Command* command) { - Sockets::iterator itr = find(sockets.begin(), - sockets.end(), - socket); - if(itr == sockets.end()) { - sockets.push_back(socket); - SockCmdMap::value_type vt(socket, command); - sockCmdMap.insert(vt); +bool DownloadEngine::addSocket(SockCmdMap& sockmap, + const SocketHandle& socket, + const CommandUuid& commandUuid) { + SockCmdMap::iterator itr = find_if(sockmap.begin(), sockmap.end(), + PairFind(socket, commandUuid)); + if(itr == sockmap.end()) { + SockCmdMap::value_type vt(socket, commandUuid); + sockmap.insert(vt); return true; } else { return false; } } -bool DownloadEngine::deleteSocket(Sockets& sockets, Socket* socket) { - Sockets::iterator itr = find(sockets.begin(), - sockets.end(), - socket); - if(itr != sockets.end()) { - sockets.erase(itr); - SockCmdMap::iterator mapItr = sockCmdMap.find(socket); - if(mapItr != sockCmdMap.end()) { - sockCmdMap.erase(mapItr); - } - return true; - } else { +bool DownloadEngine::deleteSocket(SockCmdMap& sockmap, + const SocketHandle& socket, + const CommandUuid& commandUuid) { + SockCmdMap::iterator itr = find_if(sockmap.begin(), sockmap.end(), + PairFind(socket, commandUuid)); + if(itr == sockmap.end()) { return false; + } else { + sockmap.erase(itr); + return true; } } -bool DownloadEngine::addSocketForReadCheck(Socket* socket, Command* command) { - return addSocket(rsockets, socket, command); +bool DownloadEngine::addSocketForReadCheck(const SocketHandle& socket, + const CommandUuid& commandUuid) { + return addSocket(rsockmap, socket, commandUuid); } -bool DownloadEngine::deleteSocketForReadCheck(Socket* socket) { - return deleteSocket(rsockets , socket); +bool DownloadEngine::deleteSocketForReadCheck(const SocketHandle& socket, + const CommandUuid& commandUuid) { + return deleteSocket(rsockmap, socket, commandUuid); } -bool DownloadEngine::addSocketForWriteCheck(Socket* socket, Command* command) { - return addSocket(wsockets, socket, command); +bool DownloadEngine::addSocketForWriteCheck(const SocketHandle& socket, + const CommandUuid& commandUuid) { + return addSocket(wsockmap, socket, commandUuid); } -bool DownloadEngine::deleteSocketForWriteCheck(Socket* socket) { - return deleteSocket(wsockets, socket); +bool DownloadEngine::deleteSocketForWriteCheck(const SocketHandle& socket, + const CommandUuid& commandUuid) { + return deleteSocket(wsockmap, socket, commandUuid); } - diff --git a/src/DownloadEngine.h b/src/DownloadEngine.h index dd9d10fc..9acbeda4 100644 --- a/src/DownloadEngine.h +++ b/src/DownloadEngine.h @@ -29,20 +29,22 @@ #include "Logger.h" #include "Option.h" -typedef deque Sockets; +typedef deque Sockets; typedef deque Commands; -typedef multimap SockCmdMap; +typedef deque CommandUuids; +typedef multimap SockCmdMap; class DownloadEngine { private: - void waitData(Sockets& activeSockets); - Sockets rsockets; - Sockets wsockets; - SockCmdMap sockCmdMap; - + void waitData(CommandUuids& activeCommandUuids); + SockCmdMap rsockmap; + SockCmdMap wsockmap; + void shortSleep() const; - bool addSocket(Sockets& sockets, Socket* socket, Command* command); - bool deleteSocket(Sockets& sockets, Socket* socket); + bool addSocket(SockCmdMap& sockmap, const SocketHandle& socket, + const CommandUuid& commandUuid); + bool deleteSocket(SockCmdMap& sockmap, const SocketHandle& socket, + const CommandUuid& commandUuid); protected: const Logger* logger; virtual void initStatistics() = 0; @@ -62,12 +64,33 @@ public: void cleanQueue(); - bool addSocketForReadCheck(Socket* socket, Command* command); - bool deleteSocketForReadCheck(Socket* socket); - bool addSocketForWriteCheck(Socket* socket, Command* command); - bool deleteSocketForWriteCheck(Socket* socket); + bool addSocketForReadCheck(const SocketHandle& socket, + const CommandUuid& commandUuid); + bool deleteSocketForReadCheck(const SocketHandle& socket, + const CommandUuid& commandUuid); + bool addSocketForWriteCheck(const SocketHandle& socket, + const CommandUuid& commandUuid); + bool deleteSocketForWriteCheck(const SocketHandle& socket, + const CommandUuid& command); }; +template +class PairFind { +private: + T1 first; + T2 second; +public: + PairFind(T1 t1, T2 t2):first(t1), second(t2) {} + + bool operator()(const pair& pa) { + if(pa.first == first && pa.second == second) { + return true; + } else { + return false; + } + } +}; + #endif // _D_DOWNLOAD_ENGINE_H_ diff --git a/src/FtpConnection.cc b/src/FtpConnection.cc index 0f947636..ba8e2000 100644 --- a/src/FtpConnection.cc +++ b/src/FtpConnection.cc @@ -27,7 +27,9 @@ #include "prefs.h" #include "LogFactory.h" -FtpConnection::FtpConnection(int cuid, const Socket* socket, const Request* req, const Option* op):cuid(cuid), socket(socket), req(req), option(op) { +FtpConnection::FtpConnection(int cuid, const SocketHandle& socket, + const Request* req, const Option* op) + :cuid(cuid), socket(socket), req(req), option(op) { logger = LogFactory::getInstance(); } @@ -75,27 +77,22 @@ void FtpConnection::sendPasv() const { socket->writeData(request); } -Socket* FtpConnection::sendPort() const { - Socket* serverSocket = new Socket(); - try { - serverSocket->beginListen(); - - pair addrinfo; - socket->getAddrInfo(addrinfo); - int ipaddr[4]; - sscanf(addrinfo.first.c_str(), "%d.%d.%d.%d", - &ipaddr[0], &ipaddr[1], &ipaddr[2], &ipaddr[3]); - serverSocket->getAddrInfo(addrinfo); - string request = "PORT "+ - Util::itos(ipaddr[0])+","+Util::itos(ipaddr[1])+","+ - Util::itos(ipaddr[2])+","+Util::itos(ipaddr[3])+","+ - Util::itos(addrinfo.second/256)+","+Util::itos(addrinfo.second%256)+"\r\n"; - logger->info(MSG_SENDING_REQUEST, cuid, request.c_str()); - socket->writeData(request); - } catch (Exception* ex) { - delete serverSocket; - throw; - } +SocketHandle FtpConnection::sendPort() const { + SocketHandle serverSocket; + serverSocket->beginListen(); + + pair addrinfo; + socket->getAddrInfo(addrinfo); + int ipaddr[4]; + sscanf(addrinfo.first.c_str(), "%d.%d.%d.%d", + &ipaddr[0], &ipaddr[1], &ipaddr[2], &ipaddr[3]); + serverSocket->getAddrInfo(addrinfo); + string request = "PORT "+ + Util::itos(ipaddr[0])+","+Util::itos(ipaddr[1])+","+ + Util::itos(ipaddr[2])+","+Util::itos(ipaddr[3])+","+ + Util::itos(addrinfo.second/256)+","+Util::itos(addrinfo.second%256)+"\r\n"; + logger->info(MSG_SENDING_REQUEST, cuid, request.c_str()); + socket->writeData(request); return serverSocket; } diff --git a/src/FtpConnection.h b/src/FtpConnection.h index 8d91b932..1dd36a8e 100644 --- a/src/FtpConnection.h +++ b/src/FtpConnection.h @@ -35,7 +35,7 @@ using namespace std; class FtpConnection { private: int cuid; - const Socket* socket; + SocketHandle socket; const Request* req; const Option* option; const Logger* logger; @@ -46,7 +46,8 @@ private: bool isEndOfResponse(int status, const string& response) const; bool bulkReceiveResponse(pair& response); public: - FtpConnection(int cuid, const Socket* socket, const Request* req, const Option* op); + FtpConnection(int cuid, const SocketHandle& socket, + const Request* req, const Option* op); ~FtpConnection(); void sendUser() const; void sendPass() const; @@ -54,7 +55,7 @@ public: void sendCwd() const; void sendSize() const; void sendPasv() const; - Socket* sendPort() const; + SocketHandle sendPort() const; void sendRest(const Segment& segment) const; void sendRetr() const; diff --git a/src/FtpDownloadCommand.cc b/src/FtpDownloadCommand.cc index db9a492a..25368973 100644 --- a/src/FtpDownloadCommand.cc +++ b/src/FtpDownloadCommand.cc @@ -21,17 +21,13 @@ /* copyright --> */ #include "FtpDownloadCommand.h" -FtpDownloadCommand::FtpDownloadCommand(int cuid, Request* req, DownloadEngine* e, const Socket* dataSocket, const Socket* ctrlSocket): - DownloadCommand(cuid, req, e, dataSocket) -{ - this->ctrlSocket = new Socket(*ctrlSocket); -} +FtpDownloadCommand::FtpDownloadCommand(int cuid, Request* req, + DownloadEngine* e, + const SocketHandle& dataSocket, + const SocketHandle& ctrlSocket) + :DownloadCommand(cuid, req, e, dataSocket), ctrlSocket(ctrlSocket) {} -FtpDownloadCommand::~FtpDownloadCommand() { - if(ctrlSocket != NULL) { - delete ctrlSocket; - } -} +FtpDownloadCommand::~FtpDownloadCommand() {} TransferEncoding* FtpDownloadCommand::getTransferEncoding(const string& name) { return NULL; diff --git a/src/FtpDownloadCommand.h b/src/FtpDownloadCommand.h index 4e41782b..74ca7382 100644 --- a/src/FtpDownloadCommand.h +++ b/src/FtpDownloadCommand.h @@ -26,9 +26,11 @@ class FtpDownloadCommand : public DownloadCommand { private: - Socket* ctrlSocket; + SocketHandle ctrlSocket; public: - FtpDownloadCommand(int cuid, Request* req, DownloadEngine* e, const Socket* dataSocket, const Socket* ctrlSocket); + FtpDownloadCommand(int cuid, Request* req, DownloadEngine* e, + const SocketHandle& dataSocket, + const SocketHandle& ctrlSocket); ~FtpDownloadCommand(); TransferEncoding* getTransferEncoding(const string& name); diff --git a/src/FtpInitiateConnectionCommand.cc b/src/FtpInitiateConnectionCommand.cc index 7f12e598..2a67222e 100644 --- a/src/FtpInitiateConnectionCommand.cc +++ b/src/FtpInitiateConnectionCommand.cc @@ -44,7 +44,6 @@ bool FtpInitiateConnectionCommand::executeInternal(Segment segment) { } } - socket = new Socket(); Command* command; if(useHttpProxy()) { logger->info(MSG_CONNECTING_TO_SERVER, cuid, diff --git a/src/FtpNegotiationCommand.cc b/src/FtpNegotiationCommand.cc index af0565b9..67b08f6b 100644 --- a/src/FtpNegotiationCommand.cc +++ b/src/FtpNegotiationCommand.cc @@ -26,22 +26,17 @@ #include "message.h" #include "prefs.h" -FtpNegotiationCommand::FtpNegotiationCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s): - AbstractCommand(cuid, req, e, s), - dataSocket(NULL), serverSocket(NULL), sequence(SEQ_RECV_GREETING) +FtpNegotiationCommand::FtpNegotiationCommand(int cuid, Request* req, + DownloadEngine* e, + const SocketHandle& s): + AbstractCommand(cuid, req, e, s), sequence(SEQ_RECV_GREETING) { ftp = new FtpConnection(cuid, socket, req, e->option); - setReadCheckSocket(NULL); + disableReadCheckSocket(); setWriteCheckSocket(socket); } FtpNegotiationCommand::~FtpNegotiationCommand() { - if(dataSocket != NULL) { - delete dataSocket; - } - if(serverSocket != NULL) { - delete serverSocket; - } delete ftp; } @@ -50,7 +45,8 @@ bool FtpNegotiationCommand::executeInternal(Segment segment) { if(sequence == SEQ_RETRY) { return prepareForRetry(0); } else if(sequence == SEQ_NEGOTIATION_COMPLETED) { - FtpDownloadCommand* command = new FtpDownloadCommand(cuid, req, e, dataSocket, socket); + FtpDownloadCommand* command = + new FtpDownloadCommand(cuid, req, e, dataSocket, socket); e->commands.push_back(command); return true; } else { @@ -71,7 +67,7 @@ bool FtpNegotiationCommand::recvGreeting() { sequence = SEQ_SEND_USER; setReadCheckSocket(socket); - setWriteCheckSocket(NULL); + disableWriteCheckSocket(); return true; } @@ -219,14 +215,12 @@ bool FtpNegotiationCommand::recvPasv() { throw new DlRetryEx(EX_BAD_STATUS, status); } // make a data connection to the server. - dataSocket = new Socket(); - logger->info(MSG_CONNECTING_TO_SERVER, cuid, dest.first.c_str(), dest.second); dataSocket->establishConnection(dest.first, dest.second); - setReadCheckSocket(NULL); + disableReadCheckSocket(); setWriteCheckSocket(dataSocket); sequence = SEQ_SEND_REST_PASV; @@ -236,7 +230,7 @@ bool FtpNegotiationCommand::recvPasv() { bool FtpNegotiationCommand::sendRestPasv(const Segment& segment) { dataSocket->setBlockingMode(); setReadCheckSocket(socket); - setWriteCheckSocket(NULL); + disableWriteCheckSocket(); return sendRest(segment); } @@ -274,7 +268,7 @@ bool FtpNegotiationCommand::recvRetr() { throw new DlRetryEx(EX_BAD_STATUS, status); } if(e->option->get(PREF_FTP_PASV_ENABLED) != V_TRUE) { - assert(serverSocket); + assert(serverSocket->getSockfd() != -1); dataSocket = serverSocket->acceptConnection(); } sequence = SEQ_NEGOTIATION_COMPLETED; diff --git a/src/FtpNegotiationCommand.h b/src/FtpNegotiationCommand.h index 930fab0e..e89317d6 100644 --- a/src/FtpNegotiationCommand.h +++ b/src/FtpNegotiationCommand.h @@ -73,14 +73,15 @@ private: bool recvRetr(); bool processSequence(const Segment& segment); - Socket* dataSocket; - Socket* serverSocket; + SocketHandle dataSocket; + SocketHandle serverSocket; int sequence; FtpConnection* ftp; protected: bool executeInternal(Segment segment); public: - FtpNegotiationCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s); + FtpNegotiationCommand(int cuid, Request* req, DownloadEngine* e, + const SocketHandle& s); ~FtpNegotiationCommand(); }; diff --git a/src/FtpTunnelRequestCommand.cc b/src/FtpTunnelRequestCommand.cc index 86018070..24a8049b 100644 --- a/src/FtpTunnelRequestCommand.cc +++ b/src/FtpTunnelRequestCommand.cc @@ -23,9 +23,12 @@ #include "FtpTunnelResponseCommand.h" #include "HttpConnection.h" -FtpTunnelRequestCommand::FtpTunnelRequestCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s):AbstractCommand(cuid, req, e, s) { - setReadCheckSocket(NULL); - setWriteCheckSocket(NULL); +FtpTunnelRequestCommand::FtpTunnelRequestCommand(int cuid, Request* req, + DownloadEngine* e, + const SocketHandle& s) + :AbstractCommand(cuid, req, e, s) { + disableReadCheckSocket(); + disableWriteCheckSocket(); } FtpTunnelRequestCommand::~FtpTunnelRequestCommand() {} @@ -35,7 +38,8 @@ bool FtpTunnelRequestCommand::executeInternal(Segment segment) { HttpConnection httpConnection(cuid, socket, req, e->option); httpConnection.sendProxyRequest(); - FtpTunnelResponseCommand* command = new FtpTunnelResponseCommand(cuid, req, e, socket); + FtpTunnelResponseCommand* command + = new FtpTunnelResponseCommand(cuid, req, e, socket); e->commands.push_back(command); return true; } diff --git a/src/FtpTunnelRequestCommand.h b/src/FtpTunnelRequestCommand.h index 35df06e1..b02d1806 100644 --- a/src/FtpTunnelRequestCommand.h +++ b/src/FtpTunnelRequestCommand.h @@ -28,7 +28,8 @@ class FtpTunnelRequestCommand : public AbstractCommand { protected: bool executeInternal(Segment segment); public: - FtpTunnelRequestCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s); + FtpTunnelRequestCommand(int cuid, Request* req, DownloadEngine* e, + const SocketHandle& s); ~FtpTunnelRequestCommand(); }; diff --git a/src/FtpTunnelResponseCommand.cc b/src/FtpTunnelResponseCommand.cc index 0efe840f..d0e63888 100644 --- a/src/FtpTunnelResponseCommand.cc +++ b/src/FtpTunnelResponseCommand.cc @@ -24,7 +24,10 @@ #include "DlRetryEx.h" #include "message.h" -FtpTunnelResponseCommand::FtpTunnelResponseCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s):AbstractCommand(cuid, req, e, s) { +FtpTunnelResponseCommand::FtpTunnelResponseCommand(int cuid, Request* req, + DownloadEngine* e, + const SocketHandle& s) + :AbstractCommand(cuid, req, e, s) { http = new HttpConnection(cuid, socket, req, e->option); } @@ -43,7 +46,8 @@ bool FtpTunnelResponseCommand::executeInternal(Segment segment) { if(status != 200) { throw new DlRetryEx(EX_PROXY_CONNECTION_FAILED); } - FtpNegotiationCommand* command = new FtpNegotiationCommand(cuid, req, e, socket); + FtpNegotiationCommand* command + = new FtpNegotiationCommand(cuid, req, e, socket); e->commands.push_back(command); return true; } diff --git a/src/FtpTunnelResponseCommand.h b/src/FtpTunnelResponseCommand.h index bf304f29..7d0e19ae 100644 --- a/src/FtpTunnelResponseCommand.h +++ b/src/FtpTunnelResponseCommand.h @@ -31,7 +31,8 @@ private: protected: bool executeInternal(Segment segment); public: - FtpTunnelResponseCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s); + FtpTunnelResponseCommand(int cuid, Request* req, DownloadEngine* e, + const SocketHandle& s); ~FtpTunnelResponseCommand(); }; diff --git a/src/HttpConnection.cc b/src/HttpConnection.cc index d48b53f5..e63000a2 100644 --- a/src/HttpConnection.cc +++ b/src/HttpConnection.cc @@ -27,7 +27,8 @@ #include "prefs.h" #include "LogFactory.h" -HttpConnection::HttpConnection(int cuid, const Socket* socket, const Request* req, const Option* op): +HttpConnection::HttpConnection(int cuid, const SocketHandle& socket, + const Request* req, const Option* op): cuid(cuid), socket(socket), req(req), option(op), headerBufLength(0) { logger = LogFactory::getInstance(); } diff --git a/src/HttpConnection.h b/src/HttpConnection.h index d340bbcc..7207aa1f 100644 --- a/src/HttpConnection.h +++ b/src/HttpConnection.h @@ -46,14 +46,15 @@ private: bool useProxyGet() const; string getProxyAuthString() const; int cuid; - const Socket* socket; + SocketHandle socket; const Request* req; const Option* option; const Logger* logger; char headerBuf[HEADERBUF_SIZE+1]; int headerBufLength; public: - HttpConnection(int cuid, const Socket* socket, const Request* req, const Option* op); + HttpConnection(int cuid, const SocketHandle& socket, const Request* req, + const Option* op); /** * Sends Http request. diff --git a/src/HttpDownloadCommand.cc b/src/HttpDownloadCommand.cc index 551f1a12..3077ab16 100644 --- a/src/HttpDownloadCommand.cc +++ b/src/HttpDownloadCommand.cc @@ -33,8 +33,8 @@ using namespace std; HttpDownloadCommand::HttpDownloadCommand(int cuid, Request* req, DownloadEngine* e, - const Socket* socket): - DownloadCommand(cuid, req, e, socket) + const SocketHandle& socket) + :DownloadCommand(cuid, req, e, socket) { ChunkedEncoding* ce = new ChunkedEncoding(); transferEncodings["chunked"] = ce; diff --git a/src/HttpDownloadCommand.h b/src/HttpDownloadCommand.h index d81da732..440fa9ec 100644 --- a/src/HttpDownloadCommand.h +++ b/src/HttpDownloadCommand.h @@ -37,7 +37,8 @@ class HttpDownloadCommand:public DownloadCommand { private: map transferEncodings; public: - HttpDownloadCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s); + HttpDownloadCommand(int cuid, Request* req, DownloadEngine* e, + const SocketHandle& s); ~HttpDownloadCommand(); TransferEncoding* getTransferEncoding(const string& transferEncoding); diff --git a/src/HttpInitiateConnectionCommand.cc b/src/HttpInitiateConnectionCommand.cc index 7bc29f96..73657192 100644 --- a/src/HttpInitiateConnectionCommand.cc +++ b/src/HttpInitiateConnectionCommand.cc @@ -27,12 +27,13 @@ #include "message.h" #include "prefs.h" -HttpInitiateConnectionCommand::HttpInitiateConnectionCommand(int cuid, Request* req, DownloadEngine* e):AbstractCommand(cuid, req, e) {} +HttpInitiateConnectionCommand::HttpInitiateConnectionCommand(int cuid, + Request* req, + DownloadEngine* e):AbstractCommand(cuid, req, e) {} HttpInitiateConnectionCommand::~HttpInitiateConnectionCommand() {} bool HttpInitiateConnectionCommand::executeInternal(Segment segment) { - socket = new Socket(); // socket->establishConnection(...); Command* command; if(useProxy()) { diff --git a/src/HttpProxyRequestCommand.cc b/src/HttpProxyRequestCommand.cc index 60b7fa89..0bf66558 100644 --- a/src/HttpProxyRequestCommand.cc +++ b/src/HttpProxyRequestCommand.cc @@ -23,8 +23,11 @@ #include "HttpConnection.h" #include "HttpProxyResponseCommand.h" -HttpProxyRequestCommand::HttpProxyRequestCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s):AbstractCommand(cuid, req, e, s) { - setReadCheckSocket(NULL); +HttpProxyRequestCommand::HttpProxyRequestCommand(int cuid, Request* req, + DownloadEngine* e, + const SocketHandle& s) + :AbstractCommand(cuid, req, e, s) { + disableReadCheckSocket(); setWriteCheckSocket(socket); } diff --git a/src/HttpProxyRequestCommand.h b/src/HttpProxyRequestCommand.h index 6317faa0..2b5a646a 100644 --- a/src/HttpProxyRequestCommand.h +++ b/src/HttpProxyRequestCommand.h @@ -28,7 +28,8 @@ class HttpProxyRequestCommand : public AbstractCommand { protected: bool executeInternal(Segment segment); public: - HttpProxyRequestCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s); + HttpProxyRequestCommand(int cuid, Request* req, DownloadEngine* e, + const SocketHandle& s); ~HttpProxyRequestCommand(); }; diff --git a/src/HttpProxyResponseCommand.cc b/src/HttpProxyResponseCommand.cc index bfcb4496..51cb6580 100644 --- a/src/HttpProxyResponseCommand.cc +++ b/src/HttpProxyResponseCommand.cc @@ -24,7 +24,10 @@ #include "DlRetryEx.h" #include "message.h" -HttpProxyResponseCommand::HttpProxyResponseCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s):AbstractCommand(cuid, req, e, s) { +HttpProxyResponseCommand::HttpProxyResponseCommand(int cuid, Request* req, + DownloadEngine* e, + const SocketHandle& s) + :AbstractCommand(cuid, req, e, s) { http = new HttpConnection(cuid, socket, req, e->option); } diff --git a/src/HttpProxyResponseCommand.h b/src/HttpProxyResponseCommand.h index f9f7c4bf..ad938cf0 100644 --- a/src/HttpProxyResponseCommand.h +++ b/src/HttpProxyResponseCommand.h @@ -31,7 +31,8 @@ private: protected: bool executeInternal(Segment segment); public: - HttpProxyResponseCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s); + HttpProxyResponseCommand(int cuid, Request* req, DownloadEngine* e, + const SocketHandle& s); ~HttpProxyResponseCommand(); }; diff --git a/src/HttpRequestCommand.cc b/src/HttpRequestCommand.cc index 2a5e75a5..9f5ba45a 100644 --- a/src/HttpRequestCommand.cc +++ b/src/HttpRequestCommand.cc @@ -23,8 +23,11 @@ #include "HttpResponseCommand.h" #include "HttpConnection.h" -HttpRequestCommand::HttpRequestCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s):AbstractCommand(cuid, req, e, s) { - setReadCheckSocket(NULL); +HttpRequestCommand::HttpRequestCommand(int cuid, Request* req, + DownloadEngine* e, + const SocketHandle& s) + :AbstractCommand(cuid, req, e, s) { + disableReadCheckSocket(); setWriteCheckSocket(socket); } diff --git a/src/HttpRequestCommand.h b/src/HttpRequestCommand.h index d224c5c3..d3fa923b 100644 --- a/src/HttpRequestCommand.h +++ b/src/HttpRequestCommand.h @@ -29,7 +29,8 @@ protected: bool executeInternal(Segment segment); Command* getNextCommand() const; public: - HttpRequestCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s); + HttpRequestCommand(int cuid, Request* req, DownloadEngine* e, + const SocketHandle& s); ~HttpRequestCommand(); }; diff --git a/src/HttpResponseCommand.cc b/src/HttpResponseCommand.cc index 97670683..d9f16977 100644 --- a/src/HttpResponseCommand.cc +++ b/src/HttpResponseCommand.cc @@ -29,8 +29,10 @@ #include #include -HttpResponseCommand::HttpResponseCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s): - AbstractCommand(cuid, req, e, s) { +HttpResponseCommand::HttpResponseCommand(int cuid, Request* req, + DownloadEngine* e, + const SocketHandle& s) + :AbstractCommand(cuid, req, e, s) { http = new HttpConnection(cuid, socket, req, e->option); } @@ -154,7 +156,6 @@ bool HttpResponseCommand::handleOtherEncoding(const string& transferEncoding, co } void HttpResponseCommand::createHttpDownloadCommand(const string& transferEncoding) { - HttpDownloadCommand* command = new HttpDownloadCommand(cuid, req, e, socket); TransferEncoding* enc = NULL; if(transferEncoding.size() && (enc = command->getTransferEncoding(transferEncoding)) == NULL) { diff --git a/src/HttpResponseCommand.h b/src/HttpResponseCommand.h index bdf192ee..19a092c4 100644 --- a/src/HttpResponseCommand.h +++ b/src/HttpResponseCommand.h @@ -38,7 +38,8 @@ private: protected: bool executeInternal(Segment segment); public: - HttpResponseCommand(int cuid, Request* req, DownloadEngine* e, const Socket* s); + HttpResponseCommand(int cuid, Request* req, DownloadEngine* e, + const SocketHandle& s); ~HttpResponseCommand(); }; diff --git a/src/InitiateConnectionCommandFactory.cc b/src/InitiateConnectionCommandFactory.cc index 8af1532d..7f6e75f7 100644 --- a/src/InitiateConnectionCommandFactory.cc +++ b/src/InitiateConnectionCommandFactory.cc @@ -22,6 +22,7 @@ #include "InitiateConnectionCommandFactory.h" #include "HttpInitiateConnectionCommand.h" #include "FtpInitiateConnectionCommand.h" +#include "DlAbortEx.h" Command* InitiateConnectionCommandFactory::createInitiateConnectionCommand(int cuid, Request* req, DownloadEngine* e) { if(req->getProtocol() == "http" @@ -35,6 +36,6 @@ Command* InitiateConnectionCommandFactory::createInitiateConnectionCommand(int c return new FtpInitiateConnectionCommand(cuid, req, e); } else { // these protocols are not supported yet - return NULL; + throw new DlAbortEx("%s is not supported yet.", req->getProtocol().c_str()); } } diff --git a/src/InitiateConnectionCommandFactory.h b/src/InitiateConnectionCommandFactory.h index dea10ed4..57cfe1be 100644 --- a/src/InitiateConnectionCommandFactory.h +++ b/src/InitiateConnectionCommandFactory.h @@ -28,7 +28,8 @@ class InitiateConnectionCommandFactory { public: - static Command* createInitiateConnectionCommand(int cuid, Request* req, DownloadEngine* e); + static Command* createInitiateConnectionCommand(int cuid, Request* req, + DownloadEngine* e); }; #endif // _D_INITIATE_CONNECTION_COMMAND_FACTORY_H_ diff --git a/src/Makefile.am b/src/Makefile.am index 3b16130b..f80591b9 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1,8 +1,8 @@ bin_PROGRAMS = aria2c aria2c_SOURCES = main.cc -SRCS = Socket.cc Socket.h\ +SRCS = Socket.h\ SocketCore.cc SocketCore.h\ - Command.h\ + Command.cc Command.h\ AbstractCommand.cc AbstractCommand.h\ InitiateConnectionCommandFactory.cc InitiateConnectionCommandFactory.h\ DownloadCommand.cc DownloadCommand.h\ @@ -104,7 +104,8 @@ SRCS += MetaEntry.h\ RejectMessage.cc RejectMessage.h\ AllowedFastMessage.cc AllowedFastMessage.h\ SuggestPieceMessage.cc SuggestPieceMessage.h\ - SimplePeerMessage.cc SimplePeerMessage.h + SimplePeerMessage.cc SimplePeerMessage.h\ + SharedHandle.h endif # ENABLE_BITTORRENT if ENABLE_METALINK diff --git a/src/Makefile.in b/src/Makefile.in index 8cffb7f3..678a8f85 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -89,7 +89,8 @@ bin_PROGRAMS = aria2c$(EXEEXT) @ENABLE_BITTORRENT_TRUE@ RejectMessage.cc RejectMessage.h\ @ENABLE_BITTORRENT_TRUE@ AllowedFastMessage.cc AllowedFastMessage.h\ @ENABLE_BITTORRENT_TRUE@ SuggestPieceMessage.cc SuggestPieceMessage.h\ -@ENABLE_BITTORRENT_TRUE@ SimplePeerMessage.cc SimplePeerMessage.h +@ENABLE_BITTORRENT_TRUE@ SimplePeerMessage.cc SimplePeerMessage.h\ +@ENABLE_BITTORRENT_TRUE@ SharedHandle.h @ENABLE_METALINK_TRUE@am__append_2 = Metalinker.cc Metalinker.h\ @ENABLE_METALINK_TRUE@ MetalinkEntry.cc MetalinkEntry.h\ @@ -121,8 +122,8 @@ AR = ar ARFLAGS = cru libaria2c_a_AR = $(AR) $(ARFLAGS) libaria2c_a_LIBADD = -am__libaria2c_a_SOURCES_DIST = Socket.cc Socket.h SocketCore.cc \ - SocketCore.h Command.h AbstractCommand.cc AbstractCommand.h \ +am__libaria2c_a_SOURCES_DIST = Socket.h SocketCore.cc SocketCore.h \ + Command.cc Command.h AbstractCommand.cc AbstractCommand.h \ InitiateConnectionCommandFactory.cc \ InitiateConnectionCommandFactory.h DownloadCommand.cc \ DownloadCommand.h HttpInitiateConnectionCommand.cc \ @@ -186,8 +187,8 @@ am__libaria2c_a_SOURCES_DIST = Socket.cc Socket.h SocketCore.cc \ HaveNoneMessage.cc HaveNoneMessage.h RejectMessage.cc \ RejectMessage.h AllowedFastMessage.cc AllowedFastMessage.h \ SuggestPieceMessage.cc SuggestPieceMessage.h \ - SimplePeerMessage.cc SimplePeerMessage.h Metalinker.cc \ - Metalinker.h MetalinkEntry.cc MetalinkEntry.h \ + SimplePeerMessage.cc SimplePeerMessage.h SharedHandle.h \ + Metalinker.cc Metalinker.h MetalinkEntry.cc MetalinkEntry.h \ MetalinkResource.cc MetalinkResource.h MetalinkProcessor.h \ Xml2MetalinkProcessor.cc Xml2MetalinkProcessor.h @ENABLE_BITTORRENT_TRUE@am__objects_1 = Data.$(OBJEXT) \ @@ -240,7 +241,7 @@ am__libaria2c_a_SOURCES_DIST = Socket.cc Socket.h SocketCore.cc \ @ENABLE_METALINK_TRUE@ MetalinkEntry.$(OBJEXT) \ @ENABLE_METALINK_TRUE@ MetalinkResource.$(OBJEXT) \ @ENABLE_METALINK_TRUE@ Xml2MetalinkProcessor.$(OBJEXT) -am__objects_3 = Socket.$(OBJEXT) SocketCore.$(OBJEXT) \ +am__objects_3 = SocketCore.$(OBJEXT) Command.$(OBJEXT) \ AbstractCommand.$(OBJEXT) \ InitiateConnectionCommandFactory.$(OBJEXT) \ DownloadCommand.$(OBJEXT) \ @@ -420,7 +421,7 @@ sharedstatedir = @sharedstatedir@ sysconfdir = @sysconfdir@ target_alias = @target_alias@ aria2c_SOURCES = main.cc -SRCS = Socket.cc Socket.h SocketCore.cc SocketCore.h Command.h \ +SRCS = Socket.h SocketCore.cc SocketCore.h Command.cc Command.h \ AbstractCommand.cc AbstractCommand.h \ InitiateConnectionCommandFactory.cc \ InitiateConnectionCommandFactory.h DownloadCommand.cc \ @@ -547,6 +548,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/CancelMessage.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ChokeMessage.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ChunkedEncoding.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Command.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/ConsoleDownloadEngine.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/CookieBox.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/CopyDiskAdaptor.Po@am__quote@ @@ -614,7 +616,6 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SimpleLogger.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SimplePeerMessage.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SleepCommand.Po@am__quote@ -@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Socket.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SocketCore.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SplitFirstSegmentSplitter.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SplitSlowestSegmentSplitter.Po@am__quote@ diff --git a/src/Peer.cc b/src/Peer.cc index 7df0d081..019f9a06 100644 --- a/src/Peer.cc +++ b/src/Peer.cc @@ -21,7 +21,7 @@ /* copyright --> */ #include "Peer.h" -Peer* Peer::nullPeer = new Peer("", 0, 0, 0); +PeerHandle nullPeer = PeerHandle(new Peer("", 0, 0, 0)); void Peer::updateBitfield(int index, int operation) { if(operation == 1) { @@ -82,3 +82,11 @@ void Peer::setAllBitfield() { void Peer::updateLatency(int latency) { this->latency = (this->latency*20+latency*80)/200; } + +bool operator==(const Peer& p1, const Peer& p2) { + return p1.ipaddr == p2.ipaddr && p1.port == p2.port; +} + +bool operator!=(const Peer& p1, const Peer& p2) { + return !(p1 == p2); +} diff --git a/src/Peer.h b/src/Peer.h index 5bc8a27a..d37e6aad 100644 --- a/src/Peer.h +++ b/src/Peer.h @@ -24,6 +24,7 @@ #include "common.h" #include "BitfieldMan.h" +#include "SharedHandle.h" #include #include @@ -33,6 +34,8 @@ using namespace std; #define DEFAULT_LATENCY 1500 class Peer { + friend bool operator==(const Peer& p1, const Peer& p2); + friend bool operator!=(const Peer& p1, const Peer& p2); public: int entryId; string ipaddr; @@ -60,22 +63,20 @@ private: int deltaDownload; int latency; public: - Peer(string ipaddr, int port, int pieceLength, long long int totalLength): - entryId(0), ipaddr(ipaddr), port(port), - amChoking(true), amInterested(false), - peerChoking(true), peerInterested(false), - tryCount(0), error(0), cuid(0), - chokingRequired(true), optUnchoking(false), - snubbing(false), - bitfield(NULL), - fastExtensionEnabled(false), - peerUpload(0), peerDownload(0), - pieceLength(pieceLength), - deltaUpload(0), deltaDownload(0), - latency(DEFAULT_LATENCY) { + Peer(string ipaddr, int port, int pieceLength, long long int totalLength) + :entryId(0), ipaddr(ipaddr), port(port), error(0), + peerUpload(0), peerDownload(0), pieceLength(pieceLength) + { + resetStatus(); this->bitfield = new BitfieldMan(pieceLength, totalLength); } + Peer():entryId(0), ipaddr(""), port(0), bitfield(0), + peerUpload(0), peerDownload(0), pieceLength(0) + { + resetStatus(); + } + ~Peer() { if(bitfield != NULL) { delete bitfield; @@ -146,8 +147,11 @@ public: void updateLatency(int latency); int getLatency() const { return latency; } - - static Peer* nullPeer; }; +bool operator==(const Peer& p1, const Peer& p2); +bool operator!=(const Peer& p1, const Peer& p2); + +typedef SharedHandle PeerHandle; + #endif // _D_PEER_H_ diff --git a/src/PeerAbstractCommand.cc b/src/PeerAbstractCommand.cc index 651c865f..b53e8ca0 100644 --- a/src/PeerAbstractCommand.cc +++ b/src/PeerAbstractCommand.cc @@ -26,27 +26,20 @@ #include "message.h" #include "prefs.h" -PeerAbstractCommand::PeerAbstractCommand(int cuid, Peer* peer, TorrentDownloadEngine* e, const Socket* s): - Command(cuid), e(e), peer(peer), +PeerAbstractCommand::PeerAbstractCommand(int cuid, const PeerHandle& peer, + TorrentDownloadEngine* e, + const SocketHandle& s) + :Command(cuid), e(e), socket(s), peer(peer), checkSocketIsReadable(false), checkSocketIsWritable(false), uploadLimitCheck(false), uploadLimit(0) { - - if(s != NULL) { - socket = new Socket(*s); - setReadCheckSocket(socket); - } else { - socket = NULL; - } + setReadCheckSocket(socket); timeout = e->option->getAsInt(PREF_TIMEOUT); e->torrentMan->connections++; } PeerAbstractCommand::~PeerAbstractCommand() { - setReadCheckSocket(NULL); - setWriteCheckSocket(NULL); - if(socket != NULL) { - delete(socket); - } + disableReadCheckSocket(); + disableWriteCheckSocket(); e->torrentMan->connections--; } @@ -92,44 +85,52 @@ void PeerAbstractCommand::onAbort(Exception* ex) { logger->debug("CUID#%d - Peer %s:%d banned.", cuid, peer->ipaddr.c_str(), peer->port); } -void PeerAbstractCommand::setReadCheckSocket(Socket* socket) { - if(socket == NULL) { - if(checkSocketIsReadable) { - e->deleteSocketForReadCheck(readCheckTarget); - checkSocketIsReadable = false; - readCheckTarget = NULL; - } +void PeerAbstractCommand::disableReadCheckSocket() { + if(checkSocketIsReadable) { + e->deleteSocketForReadCheck(readCheckTarget, getUuid()); + checkSocketIsReadable = false; + readCheckTarget = SocketHandle(); + } +} + +void PeerAbstractCommand::setReadCheckSocket(const SocketHandle& socket) { + if(!socket->isOpen()) { + disableReadCheckSocket(); } else { if(checkSocketIsReadable) { if(readCheckTarget != socket) { - e->deleteSocketForReadCheck(readCheckTarget); - e->addSocketForReadCheck(socket, this); + e->deleteSocketForReadCheck(readCheckTarget, getUuid()); + e->addSocketForReadCheck(socket, getUuid()); readCheckTarget = socket; } } else { - e->addSocketForReadCheck(socket, this); + e->addSocketForReadCheck(socket, getUuid()); checkSocketIsReadable = true; readCheckTarget = socket; } } } -void PeerAbstractCommand::setWriteCheckSocket(Socket* socket) { - if(socket == NULL) { - if(checkSocketIsWritable) { - e->deleteSocketForWriteCheck(writeCheckTarget); - checkSocketIsWritable = false; - writeCheckTarget = NULL; - } +void PeerAbstractCommand::disableWriteCheckSocket() { + if(checkSocketIsWritable) { + e->deleteSocketForWriteCheck(writeCheckTarget, getUuid()); + checkSocketIsWritable = false; + writeCheckTarget = SocketHandle(); + } +} + +void PeerAbstractCommand::setWriteCheckSocket(const SocketHandle& socket) { + if(!socket->isOpen()) { + disableWriteCheckSocket(); } else { if(checkSocketIsWritable) { if(writeCheckTarget != socket) { - e->deleteSocketForWriteCheck(writeCheckTarget); - e->addSocketForWriteCheck(socket, this); + e->deleteSocketForWriteCheck(writeCheckTarget, getUuid()); + e->addSocketForWriteCheck(socket, getUuid()); writeCheckTarget = socket; } } else { - e->addSocketForWriteCheck(socket, this); + e->addSocketForWriteCheck(socket, getUuid()); checkSocketIsWritable = true; writeCheckTarget = socket; } diff --git a/src/PeerAbstractCommand.h b/src/PeerAbstractCommand.h index 2458d31e..4f240492 100644 --- a/src/PeerAbstractCommand.h +++ b/src/PeerAbstractCommand.h @@ -33,26 +33,30 @@ private: int timeout; protected: TorrentDownloadEngine* e; - Socket* socket; - Peer* peer; + SocketHandle socket; + PeerHandle peer; void setTimeout(int timeout) { this->timeout = timeout; } virtual bool prepareForNextPeer(int wait); virtual bool prepareForRetry(int wait); virtual void onAbort(Exception* ex); virtual bool executeInternal() = 0; - void setReadCheckSocket(Socket* socket); - void setWriteCheckSocket(Socket* socket); + void setReadCheckSocket(const SocketHandle& socket); + void setWriteCheckSocket(const SocketHandle& socket); + void disableReadCheckSocket(); + void disableWriteCheckSocket(); void setUploadLimit(int uploadLimit); void setUploadLimitCheck(bool check); private: bool checkSocketIsReadable; bool checkSocketIsWritable; - Socket* readCheckTarget; - Socket* writeCheckTarget; + SocketHandle readCheckTarget; + SocketHandle writeCheckTarget; bool uploadLimitCheck; int uploadLimit; public: - PeerAbstractCommand(int cuid, Peer* peer, TorrentDownloadEngine* e, const Socket* s = NULL); + PeerAbstractCommand(int cuid, const PeerHandle& peer, + TorrentDownloadEngine* e, + const SocketHandle& s = SocketHandle()); virtual ~PeerAbstractCommand(); bool execute(); }; diff --git a/src/PeerChokeCommand.cc b/src/PeerChokeCommand.cc index 04fb481c..7a32e6da 100644 --- a/src/PeerChokeCommand.cc +++ b/src/PeerChokeCommand.cc @@ -26,12 +26,13 @@ PeerChokeCommand::PeerChokeCommand(int cuid, int interval, TorrentDownloadEngine PeerChokeCommand::~PeerChokeCommand() {} -void PeerChokeCommand::setAllPeerChoked(Peers& peers) const { - for(Peers::iterator itr = peers.begin(); itr != peers.end(); itr++) { - Peer* peer = *itr; +class ChokePeer { +public: + ChokePeer() {} + void operator()(PeerHandle& peer) { peer->chokingRequired = true; } -} +}; void PeerChokeCommand::optUnchokingPeer(Peers& peers) const { if(peers.empty()) { @@ -52,17 +53,18 @@ void PeerChokeCommand::optUnchokingPeer(Peers& peers) const { } } -void PeerChokeCommand::setAllPeerResetDelta(Peers& peers) const { - for(Peers::iterator itr = peers.begin(); itr != peers.end(); itr++) { - Peer* peer = *itr; +class ResetDelta { +public: + ResetDelta() {} + void operator()(PeerHandle& peer) { peer->resetDeltaUpload(); peer->resetDeltaDownload(); } -} +}; class UploadFaster { public: - bool operator() (const Peer* left, const Peer* right) const { + bool operator() (const PeerHandle& left, const PeerHandle& right) const { return left->getDeltaUpload() > right->getDeltaUpload(); } }; @@ -73,13 +75,13 @@ void PeerChokeCommand::orderByUploadRate(Peers& peers) const { class DownloadFaster { public: - bool operator() (const Peer* left, const Peer* right) const { + bool operator() (const PeerHandle& left, const PeerHandle& right) const { return left->getDeltaDownload() > right->getDeltaDownload(); } }; void PeerChokeCommand::orderByDownloadRate(Peers& peers) const { - sort(peers.begin(), peers.end(), UploadFaster()); + sort(peers.begin(), peers.end(), DownloadFaster()); } bool PeerChokeCommand::execute() { @@ -89,7 +91,7 @@ bool PeerChokeCommand::execute() { if(checkPoint.elapsed(interval)) { checkPoint.reset(); Peers peers = e->torrentMan->getActivePeers(); - setAllPeerChoked(peers); + for_each(peers.begin(), peers.end(), ChokePeer()); if(e->torrentMan->downloadComplete()) { orderByDownloadRate(peers); } else { @@ -97,24 +99,30 @@ bool PeerChokeCommand::execute() { } int unchokingCount = 4;//peers.size() >= 4 ? 4 : peers.size(); for(Peers::iterator itr = peers.begin(); itr != peers.end() && unchokingCount > 0; ) { - Peer* peer = *itr; + PeerHandle peer = *itr; if(peer->peerInterested && !peer->snubbing) { unchokingCount--; peer->chokingRequired = false; peer->optUnchoking = false; itr = peers.erase(itr); - logger->debug("cat01, unchoking %s, delta=%d", peer->ipaddr.c_str(), peer->getDeltaUpload()); + logger->debug("cat01, unchoking %s, delta=%d", + peer->ipaddr.c_str(), + e->torrentMan->downloadComplete() ? + peer->getDeltaDownload() : peer->getDeltaUpload()); } else { itr++; } } for(Peers::iterator itr = peers.begin(); itr != peers.end(); ) { - Peer* peer = *itr; + PeerHandle peer = *itr; if(!peer->peerInterested && !peer->snubbing) { peer->chokingRequired = false; peer->optUnchoking = false; itr = peers.erase(itr); - logger->debug("cat02, unchoking %s, delta=%d", peer->ipaddr.c_str(), peer->getDeltaUpload()); + logger->debug("cat02, unchoking %s, delta=%d", + peer->ipaddr.c_str(), + e->torrentMan->downloadComplete() ? + peer->getDeltaDownload() : peer->getDeltaUpload()); break; } else { itr++; @@ -125,7 +133,9 @@ bool PeerChokeCommand::execute() { rotate = 0; } rotate++; - setAllPeerResetDelta(e->torrentMan->getActivePeers()); + for_each(e->torrentMan->getActivePeers().begin(), + e->torrentMan->getActivePeers().end(), + ResetDelta()); } e->commands.push_back(this); return false; diff --git a/src/PeerChokeCommand.h b/src/PeerChokeCommand.h index 16fdd701..c0019bf6 100644 --- a/src/PeerChokeCommand.h +++ b/src/PeerChokeCommand.h @@ -35,8 +35,6 @@ private: void orderByUploadRate(Peers& peers) const; void orderByDownloadRate(Peers& peers) const; - void setAllPeerChoked(Peers& peers) const; - void setAllPeerResetDelta(Peers& peers) const; void optUnchokingPeer(Peers& peers) const; public: diff --git a/src/PeerConnection.cc b/src/PeerConnection.cc index aa3149e9..104e65d6 100644 --- a/src/PeerConnection.cc +++ b/src/PeerConnection.cc @@ -28,7 +28,7 @@ #include PeerConnection::PeerConnection(int cuid, - const Socket* socket, + const SocketHandle& socket, const Option* op) :cuid(cuid), socket(socket), diff --git a/src/PeerConnection.h b/src/PeerConnection.h index e96f37e9..f7c238a4 100644 --- a/src/PeerConnection.h +++ b/src/PeerConnection.h @@ -27,7 +27,6 @@ #include "Logger.h" #include "TorrentMan.h" #include "PeerMessage.h" -#include "HandshakeMessage.h" #include "common.h" // we assume maximum length of incoming message is "piece" message with 16KB @@ -37,7 +36,7 @@ class PeerConnection { private: int cuid; - const Socket* socket; + SocketHandle socket; const Option* option; const Logger* logger; @@ -48,7 +47,7 @@ private: int lenbufLength; public: - PeerConnection(int cuid, const Socket* socket, const Option* op); + PeerConnection(int cuid, const SocketHandle& socket, const Option* op); ~PeerConnection(); // Returns the number of bytes written diff --git a/src/PeerInitiateConnectionCommand.cc b/src/PeerInitiateConnectionCommand.cc index cea9a12b..59056227 100644 --- a/src/PeerInitiateConnectionCommand.cc +++ b/src/PeerInitiateConnectionCommand.cc @@ -27,16 +27,13 @@ #include "prefs.h" PeerInitiateConnectionCommand::PeerInitiateConnectionCommand(int cuid, - Peer* peer, + const PeerHandle& peer, TorrentDownloadEngine* e) :PeerAbstractCommand(cuid, peer, e) {} PeerInitiateConnectionCommand::~PeerInitiateConnectionCommand() {} bool PeerInitiateConnectionCommand::executeInternal() { - socket = new Socket(); - // socket->establishConnection(...); - Command* command; logger->info(MSG_CONNECTING_TO_SERVER, cuid, peer->ipaddr.c_str(), peer->port); @@ -50,10 +47,11 @@ bool PeerInitiateConnectionCommand::executeInternal() { // TODO this method removed when PeerBalancerCommand is implemented bool PeerInitiateConnectionCommand::prepareForNextPeer(int wait) { if(e->torrentMan->isPeerAvailable()) { - Peer* peer = e->torrentMan->getPeer(); + PeerHandle peer = e->torrentMan->getPeer(); int newCuid = e->torrentMan->getNewCuid(); peer->cuid = newCuid; - PeerInitiateConnectionCommand* command = new PeerInitiateConnectionCommand(newCuid, peer, e); + PeerInitiateConnectionCommand* command = + new PeerInitiateConnectionCommand(newCuid, peer, e); e->commands.push_back(command); } return true; diff --git a/src/PeerInitiateConnectionCommand.h b/src/PeerInitiateConnectionCommand.h index b6599544..66313b95 100644 --- a/src/PeerInitiateConnectionCommand.h +++ b/src/PeerInitiateConnectionCommand.h @@ -30,7 +30,8 @@ protected: bool prepareForRetry(int wait); bool prepareForNextPeer(int wait); public: - PeerInitiateConnectionCommand(int cuid, Peer* peer, TorrentDownloadEngine* e); + PeerInitiateConnectionCommand(int cuid, const PeerHandle& peer, + TorrentDownloadEngine* e); ~PeerInitiateConnectionCommand(); }; diff --git a/src/PeerInteraction.cc b/src/PeerInteraction.cc index d3c4ebde..8ef87146 100644 --- a/src/PeerInteraction.cc +++ b/src/PeerInteraction.cc @@ -28,10 +28,10 @@ #include PeerInteraction::PeerInteraction(int cuid, - const Socket* socket, + const PeerHandle& peer, + const SocketHandle& socket, const Option* op, - TorrentMan* torrentMan, - Peer* peer) + TorrentMan* torrentMan) :cuid(cuid), uploadLimit(0), torrentMan(torrentMan), @@ -43,7 +43,6 @@ PeerInteraction::PeerInteraction(int cuid, PeerInteraction::~PeerInteraction() { delete peerConnection; - for_each(messageQueue.begin(), messageQueue.end(), Deleter()); } class MsgPushBack { @@ -52,14 +51,14 @@ private: public: MsgPushBack(MessageQueue* messageQueue):messageQueue(messageQueue) {} - void operator()(PeerMessage* msg) { + void operator()(const PeerMessageHandle& msg) { messageQueue->push_back(msg); } }; bool PeerInteraction::isSendingMessageInProgress() const { if(messageQueue.size() > 0) { - PeerMessage* peerMessage = messageQueue.front(); + const PeerMessageHandle& peerMessage = messageQueue.front(); if(peerMessage->isInProgress()) { return true; } @@ -70,7 +69,7 @@ bool PeerInteraction::isSendingMessageInProgress() const { void PeerInteraction::sendMessages(int uploadSpeed) { MessageQueue tempQueue; while(messageQueue.size() > 0) { - PeerMessage* msg = messageQueue.front(); + PeerMessageHandle msg = messageQueue.front(); messageQueue.pop_front(); if(uploadLimit != 0 && uploadLimit*1024 <= uploadSpeed && msg->getId() == PieceMessage::ID && !msg->isInProgress()) { @@ -78,27 +77,20 @@ void PeerInteraction::sendMessages(int uploadSpeed) { //((PieceMessage*)msg)->incrementPendingCount(); tempQueue.push_back(msg); } else { - try { - msg->send(); - } catch(Exception* ex) { - delete msg; - throw; - } + msg->send(); if(msg->isInProgress()) { messageQueue.push_front(msg); break; - } else { - delete msg; } } } for_each(tempQueue.begin(), tempQueue.end(), MsgPushBack(&messageQueue)); } -void PeerInteraction::addMessage(PeerMessage* peerMessage) { +void PeerInteraction::addMessage(const PeerMessageHandle& peerMessage) { messageQueue.push_back(peerMessage); if(peerMessage->getId() == RequestMessage::ID) { - RequestMessage* requestMessage = (RequestMessage*)peerMessage; + RequestMessage* requestMessage = (RequestMessage*)peerMessage.get(); RequestSlot requestSlot(requestMessage->getIndex(), requestMessage->getBegin(), requestMessage->getLength(), @@ -113,8 +105,8 @@ void PeerInteraction::rejectAllPieceMessageInQueue() { itr != messageQueue.end();) { // Don't delete piece message which is in the allowed fast set. if((*itr)->getId() == PieceMessage::ID && !(*itr)->isInProgress() - && !isInFastSet(((PieceMessage*)*itr)->getIndex())) { - PieceMessage* pieceMessage = (PieceMessage*)*itr; + && !isInFastSet(((PieceMessage*)(*itr).get())->getIndex())) { + PieceMessage* pieceMessage = (PieceMessage*)(*itr).get(); logger->debug("CUID#%d - Reject piece message in queue because" " peer has been choked. index=%d, begin=%d, length=%d", cuid, @@ -126,7 +118,6 @@ void PeerInteraction::rejectAllPieceMessageInQueue() { pieceMessage->getBegin(), pieceMessage->getBlockLength())); } - delete pieceMessage; itr = messageQueue.erase(itr); } else { itr++; @@ -140,14 +131,13 @@ void PeerInteraction::rejectPieceMessageInQueue(int index, int begin, int length for(MessageQueue::iterator itr = messageQueue.begin(); itr != messageQueue.end();) { if((*itr)->getId() == PieceMessage::ID && !(*itr)->isInProgress()) { - PieceMessage* pieceMessage = (PieceMessage*)*itr; + PieceMessage* pieceMessage = (PieceMessage*)(*itr).get(); if(pieceMessage->getIndex() == index && pieceMessage->getBegin() == begin && pieceMessage->getBlockLength() == length) { logger->debug("CUID#%d - Reject piece message in queue because cancel" " message received. index=%d, begin=%d, length=%d", cuid, index, begin, length); - delete pieceMessage; itr = messageQueue.erase(itr); if(peer->isFastExtensionEnabled()) { tempQueue.push_back(createRejectMessage(index, begin, length)); @@ -187,8 +177,7 @@ void PeerInteraction::abortPiece(Piece& piece) { itr != messageQueue.end();) { if((*itr)->getId() == RequestMessage::ID && !(*itr)->isInProgress() && - ((RequestMessage*)*itr)->getIndex() == piece.getIndex()) { - delete *itr; + ((RequestMessage*)(*itr).get())->getIndex() == piece.getIndex()) { itr = messageQueue.erase(itr); } else { itr++; @@ -286,7 +275,7 @@ int PeerInteraction::countRequestSlot() const { return requestSlots.size(); } -HandshakeMessage* PeerInteraction::receiveHandshake(bool quickReply) { +HandshakeMessageHandle PeerInteraction::receiveHandshake(bool quickReply) { char msg[HANDSHAKE_MESSAGE_LENGTH]; int msgLength = HANDSHAKE_MESSAGE_LENGTH; bool retval = peerConnection->receiveHandshake(msg, msgLength); @@ -301,13 +290,8 @@ HandshakeMessage* PeerInteraction::receiveHandshake(bool quickReply) { if(!retval) { return NULL; } - HandshakeMessage* handshakeMessage = createHandshakeMessage(msg, msgLength); - try { - handshakeMessage->check(); - } catch(Exception* e) { - delete handshakeMessage; - throw; - } + HandshakeMessageHandle handshakeMessage(createHandshakeMessage(msg, msgLength)); + handshakeMessage->check(); if(handshakeMessage->isFastExtensionSupported()) { peer->setFastExtensionEnabled(true); logger->info("CUID#%d - Fast extension enabled.", cuid); @@ -315,30 +299,25 @@ HandshakeMessage* PeerInteraction::receiveHandshake(bool quickReply) { return handshakeMessage; } -HandshakeMessage* PeerInteraction::createHandshakeMessage(const char* msg, int msgLength) { +HandshakeMessageHandle PeerInteraction::createHandshakeMessage(const char* msg, int msgLength) { HandshakeMessage* message = HandshakeMessage::create(msg, msgLength); setPeerMessageCommonProperty(message); return message; } -PeerMessage* PeerInteraction::receiveMessage() { +PeerMessageHandle PeerInteraction::receiveMessage() { char msg[MAX_PAYLOAD_LEN]; int msgLength = 0; if(!peerConnection->receiveMessage(msg, msgLength)) { return NULL; } - PeerMessage* peerMessage = createPeerMessage(msg, msgLength); - try { - peerMessage->check(); - } catch(Exception* e) { - delete peerMessage; - throw; - } + PeerMessageHandle peerMessage(createPeerMessage(msg, msgLength)); + peerMessage->check(); return peerMessage; } -PeerMessage* PeerInteraction::createPeerMessage(const char* msg, int msgLength) { +PeerMessageHandle PeerInteraction::createPeerMessage(const char* msg, int msgLength) { PeerMessage* peerMessage; if(msgLength == 0) { // keep-alive diff --git a/src/PeerInteraction.h b/src/PeerInteraction.h index f3e9ff7d..e0c0f969 100644 --- a/src/PeerInteraction.h +++ b/src/PeerInteraction.h @@ -42,12 +42,15 @@ #include "AllowedFastMessage.h" #include "SuggestPieceMessage.h" #include "RequestSlot.h" +#include "SharedHandle.h" #define REQUEST_TIME_OUT 60 #define ALLOWED_FAST_SET_SIZE 10 +typedef SharedHandle PeerMessageHandle; +typedef SharedHandle HandshakeMessageHandle; typedef deque RequestSlots; -typedef deque MessageQueue; +typedef deque MessageQueue; class PeerInteraction { private: @@ -58,7 +61,7 @@ private: int uploadLimit; TorrentMan* torrentMan; PeerConnection* peerConnection; - Peer* peer; + PeerHandle peer; Pieces pieces; // allowed fast piece indexes that local client has sent Integers fastSet; @@ -66,19 +69,19 @@ private: const Logger* logger; void getNewPieceAndSendInterest(int pieceNum); - PeerMessage* createPeerMessage(const char* msg, int msgLength); - HandshakeMessage* createHandshakeMessage(const char* msg, int msgLength); + PeerMessageHandle createPeerMessage(const char* msg, int msgLength); + HandshakeMessageHandle createHandshakeMessage(const char* msg, int msgLength); void setPeerMessageCommonProperty(PeerMessage* peerMessage); int countRequestSlot() const; public: PeerInteraction(int cuid, - const Socket* socket, + const PeerHandle& peer, + const SocketHandle& socket, const Option* op, - TorrentMan* torrentMan, - Peer* peer); + TorrentMan* torrentMan); ~PeerInteraction(); - void addMessage(PeerMessage* peerMessage); + void addMessage(const PeerMessageHandle& peerMessage); void rejectPieceMessageInQueue(int index, int begin, int length); void rejectAllPieceMessageInQueue(); void onChoked(); @@ -116,8 +119,8 @@ public: void sendBitfield(); void sendAllowedFast(); - PeerMessage* receiveMessage(); - HandshakeMessage* receiveHandshake(bool quickReply = false); + PeerMessageHandle receiveMessage(); + HandshakeMessageHandle receiveHandshake(bool quickReply = false); RequestMessage* createRequestMessage(int index, int blockIndex); CancelMessage* createCancelMessage(int index, int begin, int length); diff --git a/src/PeerInteractionCommand.cc b/src/PeerInteractionCommand.cc index 89d536fd..555cc7db 100644 --- a/src/PeerInteractionCommand.cc +++ b/src/PeerInteractionCommand.cc @@ -28,28 +28,30 @@ #include "prefs.h" #include -PeerInteractionCommand::PeerInteractionCommand(int cuid, Peer* peer, +PeerInteractionCommand::PeerInteractionCommand(int cuid, + const PeerHandle& p, TorrentDownloadEngine* e, - const Socket* s, int sequence) - :PeerAbstractCommand(cuid, peer, e, s), sequence(sequence) { + const SocketHandle& s, + int sequence) + :PeerAbstractCommand(cuid, p, e, s), sequence(sequence) { if(sequence == INITIATOR_SEND_HANDSHAKE) { - setReadCheckSocket(NULL); + disableReadCheckSocket(); setWriteCheckSocket(socket); setTimeout(e->option->getAsInt(PREF_PEER_CONNECTION_TIMEOUT)); } - peerInteraction = new PeerInteraction(cuid, socket, e->option, - e->torrentMan, this->peer); + peerInteraction = new PeerInteraction(cuid, peer, socket, e->option, + e->torrentMan); peerInteraction->setUploadLimit(e->option->getAsInt(PREF_UPLOAD_LIMIT)); setUploadLimit(e->option->getAsInt(PREF_UPLOAD_LIMIT)); chokeUnchokeCount = 0; haveCount = 0; keepAliveCount = 0; - e->torrentMan->addActivePeer(this->peer); + e->torrentMan->addActivePeer(peer); } PeerInteractionCommand::~PeerInteractionCommand() { delete peerInteraction; - e->torrentMan->deleteActivePeer(this->peer); + e->torrentMan->deleteActivePeer(peer); } bool PeerInteractionCommand::executeInternal() { @@ -58,7 +60,7 @@ bool PeerInteractionCommand::executeInternal() { setReadCheckSocket(socket); setTimeout(e->option->getAsInt(PREF_TIMEOUT)); } - setWriteCheckSocket(NULL); + disableWriteCheckSocket(); setUploadLimitCheck(false); switch(sequence) { @@ -73,15 +75,15 @@ bool PeerInteractionCommand::executeInternal() { break; } } - HandshakeMessage* handshakeMessage = peerInteraction->receiveHandshake(); - if(handshakeMessage == NULL) { + HandshakeMessageHandle handshakeMessage = + peerInteraction->receiveHandshake(); + if(handshakeMessage.get() == NULL) { break; } peer->setPeerId(handshakeMessage->peerId); logger->info(MSG_RECEIVE_PEER_MESSAGE, cuid, peer->ipaddr.c_str(), peer->port, handshakeMessage->toString().c_str()); - delete handshakeMessage; haveCheckTime.reset(); peerInteraction->sendBitfield(); peerInteraction->sendAllowedFast(); @@ -89,16 +91,15 @@ bool PeerInteractionCommand::executeInternal() { break; } case RECEIVER_WAIT_HANDSHAKE: { - HandshakeMessage* handshakeMessage = + HandshakeMessageHandle handshakeMessage = peerInteraction->receiveHandshake(true); - if(handshakeMessage == NULL) { + if(handshakeMessage.get() == NULL) { break; } peer->setPeerId(handshakeMessage->peerId); logger->info(MSG_RECEIVE_PEER_MESSAGE, cuid, peer->ipaddr.c_str(), peer->port, handshakeMessage->toString().c_str()); - delete handshakeMessage; haveCheckTime.reset(); peerInteraction->sendBitfield(); peerInteraction->sendAllowedFast(); @@ -174,8 +175,8 @@ void PeerInteractionCommand::decideChoking() { void PeerInteractionCommand::receiveMessages() { for(int i = 0; i < 50; i++) { - PeerMessage* message = peerInteraction->receiveMessage(); - if(message == NULL) { + PeerMessageHandle message = peerInteraction->receiveMessage(); + if(message.get() == NULL) { return; } logger->info(MSG_RECEIVE_PEER_MESSAGE, cuid, @@ -200,23 +201,18 @@ void PeerInteractionCommand::receiveMessages() { haveCount++; break; } - try { - message->receivedAction(); - delete message; - } catch(Exception* ex) { - delete message; - throw; - } + message->receivedAction(); } } // TODO this method removed when PeerBalancerCommand is implemented bool PeerInteractionCommand::prepareForNextPeer(int wait) { if(e->torrentMan->isPeerAvailable()) { - Peer* peer = e->torrentMan->getPeer(); + PeerHandle peer = e->torrentMan->getPeer(); int newCuid = e->torrentMan->getNewCuid(); peer->cuid = newCuid; - PeerInitiateConnectionCommand* command = new PeerInitiateConnectionCommand(newCuid, peer, e); + PeerInitiateConnectionCommand* command = + new PeerInitiateConnectionCommand(newCuid, peer, e); e->commands.push_back(command); } return true; diff --git a/src/PeerInteractionCommand.h b/src/PeerInteractionCommand.h index 9de8c9b7..037f9029 100644 --- a/src/PeerInteractionCommand.h +++ b/src/PeerInteractionCommand.h @@ -57,7 +57,9 @@ protected: bool prepareForNextPeer(int wait); void onAbort(Exception* ex); public: - PeerInteractionCommand(int cuid, Peer* peer, TorrentDownloadEngine* e, const Socket* s, int sequence); + PeerInteractionCommand(int cuid, const PeerHandle& peer, + TorrentDownloadEngine* e, + const SocketHandle& s, int sequence); ~PeerInteractionCommand(); enum Seq { diff --git a/src/PeerListenCommand.cc b/src/PeerListenCommand.cc index 43e00d3a..d81e16a0 100644 --- a/src/PeerListenCommand.cc +++ b/src/PeerListenCommand.cc @@ -23,13 +23,9 @@ #include "PeerInteractionCommand.h" PeerListenCommand::PeerListenCommand(int cuid, TorrentDownloadEngine* e) - :Command(cuid), e(e), socket(NULL) {} + :Command(cuid), e(e) {} -PeerListenCommand::~PeerListenCommand() { - if(socket != NULL) { - delete socket; - } -} +PeerListenCommand::~PeerListenCommand() {} int PeerListenCommand::bindPort(int portRangeStart, int portRangeEnd) { if(portRangeStart > portRangeEnd) { @@ -37,7 +33,6 @@ int PeerListenCommand::bindPort(int portRangeStart, int portRangeEnd) { } for(int port = portRangeStart; port <= portRangeEnd; port++) { try { - socket = new Socket(); socket->beginListen(port); logger->info("CUID#%d - using port %d for accepting new connections", cuid, port); @@ -45,9 +40,8 @@ int PeerListenCommand::bindPort(int portRangeStart, int portRangeEnd) { } catch(Exception* ex) { logger->error("CUID#%d - an error occurred while binding port=%d", ex, cuid, port); + socket->closeConnection(); delete ex; - delete socket; - socket = NULL; } } return -1; @@ -57,44 +51,33 @@ bool PeerListenCommand::execute() { if(e->torrentMan->isHalt()) { return true; } - try { - for(int i = 0; i < 3 && socket->isReadable(0); i++) { - Socket* peerSocket = NULL; - try { - peerSocket = socket->acceptConnection(); - pair peerInfo; - peerSocket->getPeerInfo(peerInfo); - pair localInfo; - peerSocket->getAddrInfo(localInfo); - if(peerInfo.first != localInfo.first && - e->torrentMan->connections < MAX_PEERS) { - Peer* peer = new Peer(peerInfo.first, peerInfo.second, - e->torrentMan->pieceLength, - e->torrentMan->getTotalLength()); - if(e->torrentMan->addPeer(peer, true)) { - int newCuid = e->torrentMan->getNewCuid(); - peer->cuid = newCuid; - PeerInteractionCommand* command = - new PeerInteractionCommand(newCuid, peer, e, peerSocket, - PeerInteractionCommand::RECEIVER_WAIT_HANDSHAKE); - e->commands.push_back(command); - logger->debug("CUID#%d - incoming connection, adding new command CUID#%d", cuid, newCuid); - } else { - delete peer; - } + for(int i = 0; i < 3 && socket->isReadable(0); i++) { + SocketHandle peerSocket; + try { + peerSocket = socket->acceptConnection(); + pair peerInfo; + peerSocket->getPeerInfo(peerInfo); + pair localInfo; + peerSocket->getAddrInfo(localInfo); + if(peerInfo.first != localInfo.first && + e->torrentMan->connections < MAX_PEERS) { + PeerHandle peer = PeerHandle(new Peer(peerInfo.first, peerInfo.second, + e->torrentMan->pieceLength, + e->torrentMan->getTotalLength())); + if(e->torrentMan->addPeer(peer)) { + int newCuid = e->torrentMan->getNewCuid(); + peer->cuid = newCuid; + PeerInteractionCommand* command = + new PeerInteractionCommand(newCuid, peer, e, peerSocket, + PeerInteractionCommand::RECEIVER_WAIT_HANDSHAKE); + e->commands.push_back(command); + logger->debug("CUID#%d - incoming connection, adding new command CUID#%d", cuid, newCuid); } - delete peerSocket; - } catch(Exception* ex) { - logger->error("CUID#%d - error in accepting connection", ex, cuid); - delete ex; - if(peerSocket != NULL) { - delete peerSocket; - } - } - } - } catch(Exception* e) { - logger->error("CUID#%d - Exception occurred.", e, cuid); - delete e; + } + } catch(Exception* ex) { + logger->error("CUID#%d - error in accepting connection", ex, cuid); + delete ex; + } } e->commands.push_back(this); return false; diff --git a/src/PeerListenCommand.h b/src/PeerListenCommand.h index c4a33e68..03ddc84a 100644 --- a/src/PeerListenCommand.h +++ b/src/PeerListenCommand.h @@ -28,7 +28,7 @@ class PeerListenCommand : public Command { private: TorrentDownloadEngine* e; - Socket* socket; + SocketHandle socket; public: PeerListenCommand(int cuid, TorrentDownloadEngine* e); ~PeerListenCommand(); diff --git a/src/PeerMessage.h b/src/PeerMessage.h index 4170aa8b..467f3c6d 100644 --- a/src/PeerMessage.h +++ b/src/PeerMessage.h @@ -33,7 +33,7 @@ class PeerMessage { protected: bool inProgress; int cuid; - Peer* peer; + PeerHandle peer; PeerInteraction* peerInteraction; const Logger* logger; public: @@ -47,8 +47,8 @@ public: void setCuid(int cuid) { this->cuid = cuid; } - Peer* getPeer() const { return this->peer; } - void setPeer(Peer* peer) { + PeerHandle getPeer() const { return this->peer; } + void setPeer(const PeerHandle& peer) { this->peer = peer; } PeerInteraction* getPeerInteraction() const { return peerInteraction; } diff --git a/src/Socket.h b/src/Socket.h index 5667f207..f909dcea 100644 --- a/src/Socket.h +++ b/src/Socket.h @@ -22,101 +22,9 @@ #ifndef _D_SOCKET_H_ #define _D_SOCKET_H_ -#include #include "SocketCore.h" -#include "common.h" +#include "SharedHandle.h" -using namespace std; - -class Socket { -private: - SocketCore* core; - /** - * This method doesn't increment the use count of core. - */ - Socket(SocketCore* core); -public: - Socket(); - Socket(const Socket& s); - ~Socket(); - - Socket& operator=(const Socket& s); - - /** - * Returns socket descriptor of this socket. - * @returns socket descriptor of this socket. - */ - int getSockfd() const { return core->sockfd; } - - /** - * @see SocketCore::beginListen() - */ - void beginListen(int port = 0) const; - - /** - * @see SocketCore::getAddrInfo() - */ - void getAddrInfo(pair& addrinfo) const; - - /** - * @see SocketCore::getPeerInfo(); - */ - void getPeerInfo(pair& peerinfo) const; - - /** - * @see SocketCore::acceptConnection() - */ - Socket* acceptConnection() const; - - /** - * @see SocketCore::establishConnection() - */ - void establishConnection(const string& host, int port) const; - - /** - * @see SocketCore::setBlockingMode() - */ - void setBlockingMode() const; - - /** - * @see SocketCore::closeConnection() - */ - void closeConnection() const; - - /** - * @see SocketCore::isWritable() - */ - bool isWritable(int timeout) const; - - /** - * @see SocketCore::isReadable() - */ - bool isReadable(int timeout) const; - - /** - * @see SocketCore::writeData() - */ - void writeData(const char* data, int len) const; - /** - * A covenient function that can take string class parameter and - * internally calls SocketCore::writeData(). - */ - void writeData(const string& str) const; - - /** - * @see SocketCore::readData() - */ - void readData(char* data, int& len) const; - - /** - * @see SocketCore::peekData() - */ - void peekData(char* data, int& len) const; - - /** - * @see SocketCore::initiateSecureConnection() - */ - void initiateSecureConnection() const; -}; +typedef SharedHandle SocketHandle; #endif // _D_SOCKET_H_ diff --git a/src/SocketCore.cc b/src/SocketCore.cc index acac581b..8936f0c3 100644 --- a/src/SocketCore.cc +++ b/src/SocketCore.cc @@ -456,3 +456,14 @@ void SocketCore::initiateSecureConnection() { #endif // HAVE_LIBGNUTLS } +bool operator==(const SocketCore& s1, const SocketCore& s2) { + return s1.sockfd == s2.sockfd; +} + +bool operator!=(const SocketCore& s1, const SocketCore& s2) { + return s1.sockfd != s2.sockfd; +} + +bool operator<(const SocketCore& s1, const SocketCore& s2) { + return s1.sockfd < s2.sockfd; +} diff --git a/src/SocketCore.h b/src/SocketCore.h index 06cdf0dd..f49f8d41 100644 --- a/src/SocketCore.h +++ b/src/SocketCore.h @@ -22,9 +22,9 @@ #ifndef _D_SOCKET_CORE_H_ #define _D_SOCKET_CORE_H_ +#include "common.h" #include #include -#include "common.h" #ifdef HAVE_LIBSSL // for SSL @@ -37,7 +37,9 @@ using namespace std; class SocketCore { - friend class Socket; + friend bool operator==(const SocketCore& s1, const SocketCore& s2); + friend bool operator!=(const SocketCore& s1, const SocketCore& s2); + friend bool operator<(const SocketCore& s1, const SocketCore& s2); private: // socket endpoint descriptor int sockfd; @@ -68,6 +70,10 @@ public: SocketCore(); ~SocketCore(); + int getSockfd() const { return sockfd; } + + bool isOpen() const { return sockfd != -1; } + /** * Creates a socket and listens form connection on it. * @param port port to listen. If 0 is specified, os automaticaly @@ -143,6 +149,7 @@ public: * @param len length of data */ void writeData(const char* data, int len); + void writeData(const string& msg) { writeData(msg.c_str(), msg.size()); } /** * Reads up to len bytes from this socket. @@ -176,5 +183,4 @@ public: */ void initiateSecureConnection() ; }; - #endif // _D_SOCKET_CORE_H_ diff --git a/src/TorrentMan.cc b/src/TorrentMan.cc index 97f8dcea..7cd26417 100644 --- a/src/TorrentMan.cc +++ b/src/TorrentMan.cc @@ -41,18 +41,28 @@ #include #include +extern PeerHandle nullPeer; + TorrentMan::TorrentMan():bitfield(NULL), - peerEntryIdCounter(0), cuidCounter(0), - downloadLength(0), uploadLength(0), - preDownloadLength(0), preUploadLength(0), - deltaDownloadLength(0), deltaUploadLength(0), + peerEntryIdCounter(0), + cuidCounter(0), + downloadLength(0), + uploadLength(0), + preDownloadLength(0), + preUploadLength(0), + deltaDownloadLength(0), + deltaUploadLength(0), storeDir("."), setupComplete(false), halt(false), interval(DEFAULT_ANNOUNCE_INTERVAL), minInterval(DEFAULT_ANNOUNCE_MIN_INTERVAL), - complete(0), incomplete(0), - connections(0), trackers(0), diskAdaptor(NULL) { + complete(0), + incomplete(0), + connections(0), + trackers(0), + diskAdaptor(NULL) +{ logger = LogFactory::getInstance(); } @@ -60,9 +70,6 @@ TorrentMan::~TorrentMan() { if(bitfield != NULL) { delete bitfield; } - for(Peers::iterator itr = peers.begin(); itr != peers.end(); itr++) { - delete *itr; - } if(diskAdaptor != NULL) { delete diskAdaptor; } @@ -73,32 +80,25 @@ void TorrentMan::updatePeers(const Peers& peers) { this->peers = peers; } -bool TorrentMan::addPeer(Peer* peer, bool duplicate) { +bool TorrentMan::addPeer(const PeerHandle& peer) { if(peers.size() >= MAX_PEER_LIST_SIZE) { - deleteOldErrorPeers(); + deleteErrorPeer(); } - if(duplicate) { - for(Peers::iterator itr = peers.begin(); itr != peers.end(); itr++) { - Peer* p = *itr; - if(p->ipaddr == peer->ipaddr && p->port == peer->port && p->error > 0) { - return false; - } - } + Peers::iterator itr = find(peers.begin(), peers.end(), peer); + if(itr == peers.end()) { + ++peerEntryIdCounter; + peer->entryId = peerEntryIdCounter; + peers.push_back(peer); + return true; } else { - if(peers.size() >= MAX_PEER_LIST_SIZE) { + const PeerHandle& peer = *itr; + if(peer->error >= MAX_PEER_ERROR || peer->cuid != 0) { return false; - } - for(Peers::iterator itr = peers.begin(); itr != peers.end(); itr++) { - Peer* p = *itr; - if(p->ipaddr == peer->ipaddr && p->port == peer->port) { - return false; - } - } + } else { + *itr = peer; + return true; + } } - ++peerEntryIdCounter; - peer->entryId = peerEntryIdCounter; - peers.push_back(peer); - return true; } /* @@ -114,14 +114,13 @@ void TorrentMan::updatePeer(const Peer& peer) { */ bool TorrentMan::isPeerAvailable() const { - return getPeer() != Peer::nullPeer; + return getPeer() != nullPeer; } -void TorrentMan::deleteOldErrorPeers() { +void TorrentMan::deleteErrorPeer() { for(Peers::iterator itr = peers.begin(); itr != peers.end();) { - Peer* p = *itr; - if(p->error >= MAX_PEER_ERROR && p->cuid == 0) { - delete p; + const PeerHandle& p = *itr; + if(p->error > 0 && p->cuid == 0) { itr = peers.erase(itr); } else { itr++; @@ -129,29 +128,29 @@ void TorrentMan::deleteOldErrorPeers() { } } -Peer* TorrentMan::getPeer() const { - if(connections > MAX_PEER_UPDATE) { - return Peer::nullPeer; +PeerHandle TorrentMan::getPeer() const { + if(connections > MIN_PEERS) { + return nullPeer; } for(Peers::const_iterator itr = peers.begin(); itr != peers.end(); itr++) { - Peer* p = *itr; + const PeerHandle& p = *itr; if(p->cuid == 0 && p->error < MAX_PEER_ERROR) { return p; } } - return Peer::nullPeer; + return nullPeer; } bool TorrentMan::isEndGame() const { return bitfield->countMissingBlock() <= END_GAME_PIECE_NUM; } -bool TorrentMan::hasMissingPiece(const Peer* peer) const { +bool TorrentMan::hasMissingPiece(const PeerHandle& peer) const { return bitfield->hasMissingPiece(peer->getBitfield(), peer->getBitfieldLength()); } -int TorrentMan::getMissingPieceIndex(const Peer* peer) const { +int TorrentMan::getMissingPieceIndex(const PeerHandle& peer) const { int index = -1; if(isEndGame()) { index = bitfield->getMissingIndex(peer->getBitfield(), @@ -163,7 +162,7 @@ int TorrentMan::getMissingPieceIndex(const Peer* peer) const { return index; } -int TorrentMan::getMissingFastPieceIndex(const Peer* peer) const { +int TorrentMan::getMissingFastPieceIndex(const PeerHandle& peer) const { int index = -1; if(peer->isFastExtensionEnabled() && peer->countFastSet() > 0) { BitfieldMan tempBitfield(pieceLength, totalLength); @@ -184,12 +183,12 @@ int TorrentMan::getMissingFastPieceIndex(const Peer* peer) const { return index; } -Piece TorrentMan::getMissingFastPiece(const Peer* peer) { +Piece TorrentMan::getMissingFastPiece(const PeerHandle& peer) { int index = getMissingFastPieceIndex(peer); return checkOutPiece(index); } -Piece TorrentMan::getMissingPiece(const Peer* peer) { +Piece TorrentMan::getMissingPiece(const PeerHandle& peer) { int index = getMissingPieceIndex(peer); return checkOutPiece(index); } diff --git a/src/TorrentMan.h b/src/TorrentMan.h index 972d5059..6155efca 100644 --- a/src/TorrentMan.h +++ b/src/TorrentMan.h @@ -46,7 +46,7 @@ using namespace std; #define DEFAULT_ANNOUNCE_INTERVAL 1800 #define DEFAULT_ANNOUNCE_MIN_INTERVAL 1800 #define MAX_PEERS 55 -#define MAX_PEER_UPDATE 15 +#define MIN_PEERS 15 #define MAX_PEER_LIST_SIZE 250 #define END_GAME_PIECE_NUM 20 #define MAX_PEER_ERROR 5 @@ -61,7 +61,7 @@ public: index(index) {} }; -typedef deque Peers; +typedef deque Peers; typedef deque Haves; typedef deque PieceIndexes; typedef deque Pieces; @@ -133,18 +133,18 @@ public: // TODO do not use this method void updatePeers(const Peers& peers); - bool addPeer(Peer* peer, bool duplicate = false); + bool addPeer(const PeerHandle& peer); //void updatePeer(const Peer* peer); const Peers& getPeers() const { return peers; } - Peer* getPeer() const; + PeerHandle getPeer() const; bool isPeerAvailable() const; - void deleteOldErrorPeers(); + void deleteErrorPeer(); - bool hasMissingPiece(const Peer* peer) const; - int getMissingPieceIndex(const Peer* peer) const; - int getMissingFastPieceIndex(const Peer* peer) const; - Piece getMissingPiece(const Peer* peer); - Piece getMissingFastPiece(const Peer* peer); + bool hasMissingPiece(const PeerHandle& peer) const; + int getMissingPieceIndex(const PeerHandle& peer) const; + int getMissingFastPieceIndex(const PeerHandle& peer) const; + Piece getMissingPiece(const PeerHandle& peer); + Piece getMissingFastPiece(const PeerHandle& peer); void completePiece(const Piece& piece); void cancelPiece(const Piece& piece); void updatePiece(const Piece& piece); @@ -246,15 +246,17 @@ public: void onDownloadComplete(); - void addActivePeer(Peer* peer) { + void addActivePeer(const PeerHandle& peer) { activePeers.push_back(peer); } Peers& getActivePeers() { return this->activePeers; } - void deleteActivePeer(Peer* peer) { + void deleteActivePeer(const PeerHandle& peer) { Peers::iterator itr = find(activePeers.begin(), activePeers.end(), peer); - activePeers.erase(itr); + if(itr != activePeers.end()) { + activePeers.erase(itr); + } } bool isHalt() const { return halt; } diff --git a/src/TrackerUpdateCommand.cc b/src/TrackerUpdateCommand.cc index 0998ccd1..49fe6f3a 100644 --- a/src/TrackerUpdateCommand.cc +++ b/src/TrackerUpdateCommand.cc @@ -69,24 +69,14 @@ bool TrackerUpdateCommand::execute() { if(!e->segmentMan->finished()) { return prepareForRetry(); } - MetaEntry* entry = NULL; char* trackerResponse = NULL; int trackerResponseLength = 0; + try { - try { - trackerResponse = getTrackerResponse(trackerResponseLength); - entry = MetaFileUtil::bdecoding(trackerResponse, - trackerResponseLength); - if(trackerResponse != NULL) { - delete [] trackerResponse; - } - } catch(Exception* e) { - if(trackerResponse != NULL) { - delete [] trackerResponse; - } - throw; - } - Dictionary* response = (Dictionary*)entry; + trackerResponse = getTrackerResponse(trackerResponseLength); + SharedHandle entry(MetaFileUtil::bdecoding(trackerResponse, + trackerResponseLength)); + Dictionary* response = (Dictionary*)entry.get(); Data* failureReason = (Data*)response->get("failure reason"); if(failureReason != NULL) { throw new DlAbortEx("Tracker returned failure reason: %s", failureReason->toString().c_str()); @@ -126,7 +116,9 @@ bool TrackerUpdateCommand::execute() { logger->debug("CUID#%d - Incomplete:%d", cuid, e->torrentMan->incomplete); } - if(dynamic_cast(response->get("peers"))) { + if(!e->torrentMan->isHalt() && + e->torrentMan->connections < MIN_PEERS && + dynamic_cast(response->get("peers"))) { Data* peers = (Data*)response->get("peers"); if(peers != NULL && peers->getLen() > 0) { for(int i = 0; i < peers->getLen(); i += 6) { @@ -139,24 +131,24 @@ bool TrackerUpdateCommand::execute() { snprintf(ipaddr, sizeof(ipaddr), "%d.%d.%d.%d", ipaddr1, ipaddr2, ipaddr3, ipaddr4); - Peer* peer = new Peer(ipaddr, port, e->torrentMan->pieceLength, - e->torrentMan->getTotalLength()); + PeerHandle peer = + PeerHandle(new Peer(ipaddr, port, e->torrentMan->pieceLength, + e->torrentMan->getTotalLength())); if(e->torrentMan->addPeer(peer)) { logger->debug("CUID#%d - Adding peer %s:%d", cuid, peer->ipaddr.c_str(), peer->port); - } else { - delete peer; } } } else { logger->info("CUID#%d - No peer list received.", cuid); } while(e->torrentMan->isPeerAvailable() && - e->torrentMan->connections < MAX_PEER_UPDATE) { - Peer* peer = e->torrentMan->getPeer(); + e->torrentMan->connections < MIN_PEERS) { + PeerHandle peer = e->torrentMan->getPeer(); int newCuid = e->torrentMan->getNewCuid(); peer->cuid = newCuid; - PeerInitiateConnectionCommand* command = new PeerInitiateConnectionCommand(newCuid, peer, e); + PeerInitiateConnectionCommand* command = + new PeerInitiateConnectionCommand(newCuid, peer, e); e->commands.push_back(command); logger->debug("CUID#%d - Adding new command CUID#%d", cuid, newCuid); } @@ -166,10 +158,10 @@ bool TrackerUpdateCommand::execute() { } } catch(Exception* err) { logger->error("CUID#%d - Error occurred while processing tracker response.", cuid, err); - delete(err); + delete err; } - if(entry != NULL) { - delete entry; + if(trackerResponse != NULL) { + delete [] trackerResponse; } e->torrentMan->trackers = 0; diff --git a/src/TrackerWatcherCommand.h b/src/TrackerWatcherCommand.h index 37708456..fd0cccf7 100644 --- a/src/TrackerWatcherCommand.h +++ b/src/TrackerWatcherCommand.h @@ -26,8 +26,6 @@ #include "TorrentDownloadEngine.h" #include "Time.h" -#define MIN_PEERS 15 - class TrackerWatcherCommand : public Command { private: TorrentDownloadEngine* e; diff --git a/src/main.cc b/src/main.cc index c46f360f..856c5090 100644 --- a/src/main.cc +++ b/src/main.cc @@ -870,16 +870,17 @@ int main(int argc, char* argv[]) { } te->torrentMan->setPort(port); te->commands.push_back(listenCommand); + te->commands.push_back(new TrackerWatcherCommand(te->torrentMan->getNewCuid(), te, te->torrentMan->minInterval)); te->commands.push_back(new TrackerUpdateCommand(te->torrentMan->getNewCuid(), - te)); + te)); te->commands.push_back(new TorrentAutoSaveCommand(te->torrentMan->getNewCuid(), - te, - op->getAsInt(PREF_AUTO_SAVE_INTERVAL))); + te, + op->getAsInt(PREF_AUTO_SAVE_INTERVAL))); te->commands.push_back(new PeerChokeCommand(te->torrentMan->getNewCuid(), - 10, te)); + 10, te)); te->run(); if(te->torrentMan->downloadComplete()) {