xrootd
|
00001 00002 // // 00003 // XrdClientConn // 00004 // // 00005 // Author: Fabrizio Furano (INFN Padova, 2004) // 00006 // Adapted from TXNetFile (root.cern.ch) originally done by // 00007 // Alvise Dorigo, Fabrizio Furano // 00008 // INFN Padova, 2003 // 00009 // // 00010 // High level handler of connections to xrootd. // 00011 // // 00013 00014 // $Id$ 00015 00016 #ifndef XRD_CONN_H 00017 #define XRD_CONN_H 00018 00019 00020 #include "XrdClient/XrdClientConst.hh" 00021 00022 #include "time.h" 00023 #include "XrdClient/XrdClientConnMgr.hh" 00024 #include "XrdClient/XrdClientMessage.hh" 00025 #include "XrdClient/XrdClientUrlInfo.hh" 00026 #include "XrdClient/XrdClientReadCache.hh" 00027 #include "XrdOuc/XrdOucHash.hh" 00028 00029 #define ConnectionManager XrdClientConn::GetConnectionMgr() 00030 #define SessionIDRepo XrdClientConn::GetSessionIDRepo() 00031 00032 class XrdClientAbs; 00033 class XrdSecProtocol; 00034 00035 class XrdClientConn { 00036 00037 public: 00038 00039 enum ESrvErrorHandlerRetval { 00040 kSEHRReturnMsgToCaller = 0, 00041 kSEHRBreakLoop = 1, 00042 kSEHRContinue = 2, 00043 kSEHRReturnNoMsgToCaller = 3, 00044 kSEHRRedirLimitReached = 4 00045 }; 00046 enum EThreeStateReadHandler { 00047 kTSRHReturnMex = 0, 00048 kTSRHReturnNullMex = 1, 00049 kTSRHContinue = 2 00050 }; 00051 00052 // To keep info about an open session 00053 struct SessionIDInfo { 00054 char id[16]; 00055 }; 00056 00057 int fLastDataBytesRecv; 00058 int fLastDataBytesSent; 00059 XErrorCode fOpenError; 00060 00061 XrdOucString fRedirOpaque; // Opaque info returned by the server when 00062 00063 // redirecting. To be used in the next opens 00064 XrdClientConn(); 00065 virtual ~XrdClientConn(); 00066 00067 inline bool CacheWillFit(long long bytes) { 00068 if (!fMainReadCache) 00069 return FALSE; 00070 return fMainReadCache->WillFit(bytes); 00071 } 00072 00073 bool CheckHostDomain(XrdOucString hostToCheck); 00074 short Connect(XrdClientUrlInfo Host2Conn, 00075 XrdClientAbsUnsolMsgHandler *unsolhandler); 00076 void Disconnect(bool ForcePhysicalDisc); 00077 virtual bool GetAccessToSrv(); 00078 XReqErrorType GoBackToRedirector(); 00079 00080 XrdOucString GetClientHostDomain() { return fgClientHostDomain; } 00081 00082 00083 static XrdClientPhyConnection *GetPhyConn(int LogConnID); 00084 00085 00086 // --------- Cache related stuff 00087 00088 long GetDataFromCache(const void *buffer, 00089 long long begin_offs, 00090 long long end_offs, 00091 bool PerfCalc, 00092 XrdClientIntvList &missingblks, 00093 long &outstandingblks ); 00094 00095 bool SubmitDataToCache(XrdClientMessage *xmsg, 00096 long long begin_offs, 00097 long long end_offs); 00098 00099 bool SubmitRawDataToCache(const void *buffer, 00100 long long begin_offs, 00101 long long end_offs); 00102 00103 void SubmitPlaceholderToCache(long long begin_offs, 00104 long long end_offs) { 00105 if (fMainReadCache) 00106 fMainReadCache->PutPlaceholder(begin_offs, end_offs); 00107 } 00108 00109 00110 void RemoveAllDataFromCache(bool keepwriteblocks=true) { 00111 if (fMainReadCache) 00112 fMainReadCache->RemoveItems(keepwriteblocks); 00113 } 00114 00115 void RemoveDataFromCache(long long begin_offs, 00116 long long end_offs, bool remove_overlapped = false) { 00117 if (fMainReadCache) 00118 fMainReadCache->RemoveItems(begin_offs, end_offs, remove_overlapped); 00119 } 00120 00121 void RemovePlaceholdersFromCache() { 00122 if (fMainReadCache) 00123 fMainReadCache->RemovePlaceholders(); 00124 } 00125 00126 void PrintCache() { 00127 if (fMainReadCache) 00128 fMainReadCache->PrintCache(); 00129 } 00130 00131 00132 bool GetCacheInfo( 00133 // The actual cache size 00134 int &size, 00135 00136 // The number of bytes submitted since the beginning 00137 long long &bytessubmitted, 00138 00139 // The number of bytes found in the cache (estimate) 00140 long long &byteshit, 00141 00142 // The number of reads which did not find their data 00143 // (estimate) 00144 long long &misscount, 00145 00146 // miss/totalreads ratio (estimate) 00147 float &missrate, 00148 00149 // number of read requests towards the cache 00150 long long &readreqcnt, 00151 00152 // ratio between bytes found / bytes submitted 00153 float &bytesusefulness 00154 ) { 00155 if (!fMainReadCache) return false; 00156 00157 fMainReadCache->GetInfo(size, 00158 bytessubmitted, 00159 byteshit, 00160 misscount, 00161 missrate, 00162 readreqcnt, 00163 bytesusefulness); 00164 return true; 00165 } 00166 00167 00168 void SetCacheSize(int CacheSize) { 00169 if (!fMainReadCache && CacheSize) 00170 fMainReadCache = new XrdClientReadCache(); 00171 00172 if (fMainReadCache) 00173 fMainReadCache->SetSize(CacheSize); 00174 } 00175 00176 void SetCacheRmPolicy(int RmPolicy) { 00177 if (fMainReadCache) 00178 fMainReadCache->SetBlkRemovalPolicy(RmPolicy); 00179 } 00180 00181 void UnPinCacheBlk(long long begin_offs, long long end_offs) { 00182 fMainReadCache->UnPinCacheBlk(begin_offs, end_offs); 00183 // Also use this to signal the possibility to proceed for a hard checkpoint 00184 fWriteWaitAck->Broadcast(); 00185 } 00186 00187 00188 // ------------------- 00189 00190 00191 int GetLogConnID() const { return fLogConnID; } 00192 00193 ERemoteServerType GetServerType() const { return fServerType; } 00194 00195 kXR_unt16 GetStreamID() const { return fPrimaryStreamid; } 00196 00197 inline XrdClientUrlInfo *GetLBSUrl() { return fLBSUrl; } 00198 inline XrdClientUrlInfo GetCurrentUrl() { return fUrl; } 00199 inline XrdClientUrlInfo GetRedirUrl() { return fREQUrl; } 00200 00201 XErrorCode GetOpenError() const { return fOpenError; } 00202 virtual XReqErrorType GoToAnotherServer(XrdClientUrlInfo &newdest); 00203 bool IsConnected() const { return fConnected; } 00204 bool IsPhyConnConnected(); 00205 00206 struct ServerResponseHeader 00207 LastServerResp; 00208 00209 struct ServerResponseBody_Error 00210 LastServerError; 00211 00212 void ClearLastServerError() { 00213 memset(&LastServerError, 0, sizeof(LastServerError)); 00214 LastServerError.errnum = kXR_noErrorYet; 00215 } 00216 00217 UnsolRespProcResult ProcessAsynResp(XrdClientMessage *unsolmsg); 00218 00219 virtual bool SendGenCommand(ClientRequest *req, 00220 const void *reqMoreData, 00221 void **answMoreDataAllocated, 00222 void *answMoreData, bool HasToAlloc, 00223 char *CmdName, int substreamid = 0); 00224 00225 int GetOpenSockFD() const { return fOpenSockFD; } 00226 00227 void SetClientHostDomain(const char *src) { fgClientHostDomain = src; } 00228 void SetConnected(bool conn) { fConnected = conn; } 00229 00230 void SetOpenError(XErrorCode err) { fOpenError = err; } 00231 00232 // Gets a parallel stream id to use to set the return path for a re 00233 int GetParallelStreamToUse(int reqsperstream); 00234 int GetParallelStreamCount(); // Returns the total number of connected streams 00235 00236 void SetRedirHandler(XrdClientAbs *rh) { fRedirHandler = rh; } 00237 00238 void SetRequestedDestHost(char *newh, kXR_int32 port) { 00239 fREQUrl = fUrl; 00240 fREQUrl.Host = newh; 00241 fREQUrl.Port = port; 00242 fREQUrl.SetAddrFromHost(); 00243 } 00244 00245 // Puts this instance in pause state for wsec seconds. 00246 // A value <= 0 revokes immediately the pause state 00247 void SetREQPauseState(kXR_int32 wsec) { 00248 // Lock mutex 00249 fREQWait->Lock(); 00250 00251 if (wsec > 0) 00252 fREQWaitTimeLimit = time(0) + wsec; 00253 else { 00254 fREQWaitTimeLimit = 0; 00255 fREQWait->Broadcast(); 00256 } 00257 00258 // UnLock mutex 00259 fREQWait->UnLock(); 00260 } 00261 00262 // Puts this instance in connect-pause state for wsec seconds. 00263 // Any future connection attempt will not happen before wsec 00264 // and the first one will be towards the given host 00265 void SetREQDelayedConnectState(kXR_int32 wsec) { 00266 // Lock mutex 00267 fREQConnectWait->Lock(); 00268 00269 if (wsec > 0) 00270 fREQConnectWaitTimeLimit = time(0) + wsec; 00271 else { 00272 fREQConnectWaitTimeLimit = 0; 00273 fREQConnectWait->Broadcast(); 00274 } 00275 00276 // UnLock mutex 00277 fREQConnectWait->UnLock(); 00278 } 00279 00280 void SetSID(kXR_char *sid); 00281 inline void SetUrl(XrdClientUrlInfo thisUrl) { fUrl = thisUrl; } 00282 00283 // Sends the request to the server, through logconn with ID LogConnID 00284 // The request is sent with a streamid 'child' of the current one, then marked as pending 00285 // Its answer will be caught asynchronously 00286 XReqErrorType WriteToServer_Async(ClientRequest *req, 00287 const void* reqMoreData, 00288 int substreamid = 0); 00289 00290 static XrdClientConnectionMgr *GetConnectionMgr() 00291 { return fgConnectionMgr;} //Instance of the conn manager 00292 00293 static XrdOucHash<SessionIDInfo> &GetSessionIDRepo() 00294 { return fSessionIDRepo; } 00295 00296 void GetSessionID(SessionIDInfo &sess) { 00297 XrdOucString sessname; 00298 char buf[20]; 00299 00300 snprintf(buf, 20, "%d", fUrl.Port); 00301 00302 sessname = fUrl.HostAddr; 00303 if (sessname.length() <= 0) 00304 sessname = fUrl.Host; 00305 00306 sessname += ":"; 00307 sessname += buf; 00308 00309 sess = *( fSessionIDRepo.Find(sessname.c_str()) ); 00310 } 00311 00312 long GetServerProtocol() { return fServerProto; } 00313 00314 short GetMaxRedirCnt() const { return fMaxGlobalRedirCnt; } 00315 void SetMaxRedirCnt(short mx) {fMaxGlobalRedirCnt = mx; } 00316 short GetRedirCnt() const { return fGlobalRedirCnt; } 00317 00318 bool DoWriteSoftCheckPoint(); 00319 bool DoWriteHardCheckPoint(); 00320 void UnPinCacheBlk(); 00321 00322 00323 // To give a max number of seconds for an operation to complete, no matter what happens inside 00324 // e.g. redirections, sleeps, failed connection attempts etc. 00325 void SetOpTimeLimit(int delta_secs); 00326 bool IsOpTimeLimitElapsed(time_t timenow); 00327 00328 00329 protected: 00330 void SetLogConnID(int cid) { fLogConnID = cid; } 00331 void SetStreamID(kXR_unt16 sid) { fPrimaryStreamid = sid; } 00332 00333 00334 00335 // The handler which first tried to connect somewhere 00336 XrdClientAbsUnsolMsgHandler *fUnsolMsgHandler; 00337 00338 XrdClientUrlInfo fUrl; // The current URL 00339 XrdClientUrlInfo *fLBSUrl; // Needed to save the load balancer url 00340 XrdClientUrlInfo fREQUrl; // For explicitly requested redirs 00341 00342 short fGlobalRedirCnt; // Number of redirections 00343 00344 private: 00345 00346 static XrdOucString fgClientHostDomain; // Save the client's domain name 00347 bool fConnected; 00348 bool fGettingAccessToSrv; // To avoid recursion in desperate situations 00349 time_t fGlobalRedirLastUpdateTimestamp; // Timestamp of last redirection 00350 00351 int fLogConnID; // Logical connection ID used 00352 kXR_unt16 fPrimaryStreamid; // Streamid used for normal communication 00353 // NB it's a copy of the one contained in 00354 // the logconn 00355 00356 short fMaxGlobalRedirCnt; 00357 XrdClientReadCache *fMainReadCache; 00358 00359 // The time limit for a transaction 00360 time_t fOpTimeLimit; 00361 00362 XrdClientAbs *fRedirHandler; // Pointer to a class inheriting from 00363 // XrdClientAbs providing methods 00364 // to handle a redir at higher level 00365 00366 XrdOucString fRedirInternalToken; // Token returned by the server when 00367 // redirecting. To be used in the next logins 00368 00369 XrdSysCondVar *fREQWaitResp; // For explicitly requested delayed async responses 00370 ServerResponseBody_Attn_asynresp * 00371 fREQWaitRespData; // For explicitly requested delayed async responses 00372 00373 time_t fREQWaitTimeLimit; // For explicitly requested pause state 00374 XrdSysCondVar *fREQWait; // For explicitly requested pause state 00375 time_t fREQConnectWaitTimeLimit; // For explicitly requested delayed reconnect 00376 XrdSysCondVar *fREQConnectWait; // For explicitly requested delayed reconnect 00377 00378 long fServerProto; // The server protocol 00379 ERemoteServerType fServerType; // Server type as returned by doHandShake() 00380 00381 static XrdOucHash<SessionIDInfo> 00382 fSessionIDRepo; // The repository of session IDs, shared. 00383 // Association between 00384 // <hostname>:<port> and a SessionIDInfo struct 00385 00386 int fOpenSockFD; // Descriptor of the underlying socket 00387 static XrdClientConnectionMgr *fgConnectionMgr; //Instance of the Connection Manager 00388 00389 XrdSysCondVar *fWriteWaitAck; 00390 XrdClientVector<ClientRequest> fWriteReqsToRetry; // To store the write reqs to retry in case of a disconnection 00391 00392 bool CheckErrorStatus(XrdClientMessage *, short &, char *); 00393 void CheckPort(int &port); 00394 void CheckREQPauseState(); 00395 void CheckREQConnectWaitState(); 00396 bool CheckResp(struct ServerResponseHeader *resp, const char *method); 00397 XrdClientMessage *ClientServerCmd(ClientRequest *req, 00398 const void *reqMoreData, 00399 void **answMoreDataAllocated, 00400 void *answMoreData, 00401 bool HasToAlloc, 00402 int substreamid = 0); 00403 XrdSecProtocol *DoAuthentication(char *plist, int plsiz); 00404 00405 ERemoteServerType DoHandShake(short log); 00406 00407 bool DoLogin(); 00408 bool DomainMatcher(XrdOucString dom, XrdOucString domlist); 00409 00410 XrdOucString GetDomainToMatch(XrdOucString hostname); 00411 00412 ESrvErrorHandlerRetval HandleServerError(XReqErrorType &, XrdClientMessage *, 00413 ClientRequest *); 00414 bool MatchStreamid(struct ServerResponseHeader *ServerResponse); 00415 00416 // Sends a close request, without waiting for an answer 00417 // useful (?) to be sent just before closing a badly working stream 00418 bool PanicClose(); 00419 00420 XrdOucString ParseDomainFromHostname(XrdOucString hostname); 00421 00422 XrdClientMessage *ReadPartialAnswer(XReqErrorType &, size_t &, 00423 ClientRequest *, bool, void**, 00424 EThreeStateReadHandler &); 00425 00426 void ClearSessionID(); 00427 00428 XReqErrorType WriteToServer(ClientRequest *req, 00429 const void* reqMoreData, 00430 short LogConnID, 00431 int substreamid = 0); 00432 00433 bool WaitResp(int secsmax); 00434 }; 00435 00436 00437 00438 #endif