XRootD
Loading...
Searching...
No Matches
XrdClXRootDMsgHandler.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
25#ifndef __XRD_CL_XROOTD_MSG_HANDLER_HH__
26#define __XRD_CL_XROOTD_MSG_HANDLER_HH__
27
31#include "XrdCl/XrdClMessage.hh"
33#include "XrdCl/XrdClLog.hh"
35
41
46
48
49#include <sys/uio.h>
50#include <arpa/inet.h> // for network unmarshaling stuff
51
52#include <array>
53#include <list>
54#include <memory>
55#include <atomic>
56#include <memory>
57
58namespace XrdCl
59{
60 class PostMaster;
61 class SIDManager;
62 class URL;
63 class LocalFileHandler;
64 class Socket;
65
66 //----------------------------------------------------------------------------
67 // Single entry in the redirect-trace-back
68 //----------------------------------------------------------------------------
70 {
78
79 RedirectEntry( const URL &from, const URL &to, Type type ) :
80 from( from ), to( to ), type( type )
81 {
82
83 }
84
89
90 std::string ToString( bool prevok = true )
91 {
92 const std::string tostr = to.GetLocation();
93 const std::string fromstr = from.GetLocation();
94
95 if( prevok )
96 {
97 switch( type )
98 {
99 case EntryRedirect: return "Redirected from: " + fromstr + " to: "
100 + tostr;
101
102 case EntryRedirectOnWait: return "Server responded with wait. "
103 "Falling back to virtual redirector: " + tostr;
104
105 case EntryRetry: return "Retrying: " + tostr;
106
107 case EntryWait: return "Waited at server request. Resending: "
108 + tostr;
109 }
110 }
111 return "Failed at: " + fromstr + ", retrying at: " + tostr;
112 }
113 };
114
115 //----------------------------------------------------------------------------
117 //----------------------------------------------------------------------------
119 {
120 friend class HandleRspJob;
121
122 public:
123 //------------------------------------------------------------------------
132 //------------------------------------------------------------------------
134 ResponseHandler *respHandler,
135 const URL *url,
136 std::shared_ptr<SIDManager> sidMgr,
137 LocalFileHandler *lFileHandler):
138 pRequest( msg ),
139 pResponseHandler( respHandler ),
140 pUrl( *url ),
141 pEffectiveDataServerUrl( 0 ),
142 pSidMgr( sidMgr ),
143 pLFileHandler( lFileHandler ),
144 pExpiration( 0 ),
145 pRedirectAsAnswer( false ),
146 pOksofarAsAnswer( false ),
147 pHasLoadBalancer( false ),
148 pHasSessionId( false ),
149 pChunkList( 0 ),
150 pKBuff( 0 ),
151 pRedirectCounter( 0 ),
152 pNotAuthorizedCounter( 0 ),
153
154 pAsyncOffset( 0 ),
155 pAsyncChunkIndex( 0 ),
156
157 pPgWrtCksumBuff( 4 ),
158 pPgWrtCurrentPageOffset( 0 ),
159 pPgWrtCurrentPageNb( 0 ),
160
161 pOtherRawStarted( false ),
162
163 pFollowMetalink( false ),
164
165 pStateful( false ),
166
167 pAggregatedWaitTime( 0 ),
168
169 pMsgInFly( false ),
170
171 pTimeoutFence( false ),
172
173 pDirListStarted( false ),
174 pDirListWithStat( false ),
175
176 pCV( 0 ),
177
178 pSslErrCnt( 0 )
179 {
180 pPostMaster = DefaultEnv::GetPostMaster();
181 if( msg->GetSessionId() )
182 pHasSessionId = true;
183
184 Log *log = DefaultEnv::GetLog();
185 log->Debug( ExDbgMsg, "[%s] MsgHandler created: 0x%x (message: %s ).",
186 pUrl.GetHostId().c_str(), this,
187 pRequest->GetDescription().c_str() );
188
189 ClientRequestHdr *hdr = (ClientRequestHdr*)pRequest->GetBuffer();
190 if( ntohs( hdr->requestid ) == kXR_pgread )
191 {
192 ClientPgReadRequest *pgrdreq = (ClientPgReadRequest*)pRequest->GetBuffer();
193 pCrc32cDigests.reserve( XrdOucPgrwUtils::csNum( ntohll( pgrdreq->offset ),
194 ntohl( pgrdreq->rlen ) ) );
195 }
196
197 if( ntohs( hdr->requestid ) == kXR_readv )
198 pBodyReader.reset( new AsyncVectorReader( *url, *pRequest ) );
199 else if( ntohs( hdr->requestid ) == kXR_read )
200 pBodyReader.reset( new AsyncRawReader( *url, *pRequest ) );
201 else
202 pBodyReader.reset( new AsyncDiscardReader( *url, *pRequest ) );
203 }
204
205 //------------------------------------------------------------------------
207 //------------------------------------------------------------------------
209 {
210 DumpRedirectTraceBack();
211
212 if( !pHasSessionId )
213 delete pRequest;
214 delete pEffectiveDataServerUrl;
215
216 pRequest = reinterpret_cast<Message*>( 0xDEADBEEF );
217 pResponseHandler = reinterpret_cast<ResponseHandler*>( 0xDEADBEEF );
218 pPostMaster = reinterpret_cast<PostMaster*>( 0xDEADBEEF );
219 pLFileHandler = reinterpret_cast<LocalFileHandler*>( 0xDEADBEEF );
220 pChunkList = reinterpret_cast<ChunkList*>( 0xDEADBEEF );
221 pEffectiveDataServerUrl = reinterpret_cast<URL*>( 0xDEADBEEF );
222
223 Log *log = DefaultEnv::GetLog();
224 log->Debug( ExDbgMsg, "[%s] Destroying MsgHandler: 0x%x.",
225 pUrl.GetHostId().c_str(), this );
226 }
227
228 //------------------------------------------------------------------------
234 //------------------------------------------------------------------------
235 virtual uint16_t Examine( std::shared_ptr<Message> &msg );
236
237 //------------------------------------------------------------------------
246 //------------------------------------------------------------------------
247 virtual uint16_t InspectStatusRsp();
248
249 //------------------------------------------------------------------------
253 //------------------------------------------------------------------------
254 virtual uint16_t GetSid() const;
255
256 //------------------------------------------------------------------------
260 //------------------------------------------------------------------------
261 virtual void Process();
262
263 //------------------------------------------------------------------------
273 //------------------------------------------------------------------------
275 Socket *socket,
276 uint32_t &bytesRead );
277
278 //------------------------------------------------------------------------
283 //------------------------------------------------------------------------
284 virtual uint8_t OnStreamEvent( StreamEvent event,
285 XRootDStatus status );
286
287 //------------------------------------------------------------------------
289 //------------------------------------------------------------------------
290 virtual void OnStatusReady( const Message *message,
291 XRootDStatus status );
292
293 //------------------------------------------------------------------------
295 //------------------------------------------------------------------------
296 virtual bool IsRaw() const;
297
298 //------------------------------------------------------------------------
307 //------------------------------------------------------------------------
309 uint32_t &bytesWritten );
310
311 //------------------------------------------------------------------------
315 //------------------------------------------------------------------------
316 void WaitDone( time_t now );
317
318 //------------------------------------------------------------------------
320 //------------------------------------------------------------------------
321 void SetExpiration( time_t expiration )
322 {
323 pExpiration = expiration;
324 }
325
326 //------------------------------------------------------------------------
328 //------------------------------------------------------------------------
330 {
331 return pExpiration;
332 }
333
334 //------------------------------------------------------------------------
337 //------------------------------------------------------------------------
338 void SetRedirectAsAnswer( bool redirectAsAnswer )
339 {
340 pRedirectAsAnswer = redirectAsAnswer;
341 }
342
343 //------------------------------------------------------------------------
346 //------------------------------------------------------------------------
347 void SetOksofarAsAnswer( bool oksofarAsAnswer )
348 {
349 pOksofarAsAnswer = oksofarAsAnswer;
350 }
351
352 //------------------------------------------------------------------------
354 //------------------------------------------------------------------------
355 const Message *GetRequest() const
356 {
357 return pRequest;
358 }
359
360 //------------------------------------------------------------------------
362 //------------------------------------------------------------------------
363 void SetLoadBalancer( const HostInfo &loadBalancer )
364 {
365 if( !loadBalancer.url.IsValid() )
366 return;
367 pLoadBalancer = loadBalancer;
368 pHasLoadBalancer = true;
369 }
370
371 //------------------------------------------------------------------------
373 //------------------------------------------------------------------------
374 void SetHostList( HostList *hostList )
375 {
376 pHosts.reset( hostList );
377 }
378
379 //------------------------------------------------------------------------
381 //------------------------------------------------------------------------
382 void SetChunkList( ChunkList *chunkList )
383 {
384 pChunkList = chunkList;
385 if( pBodyReader )
386 pBodyReader->SetChunkList( chunkList );
387 if( chunkList )
388 pChunkStatus.resize( chunkList->size() );
389 else
390 pChunkStatus.clear();
391 }
392
393 void SetCrc32cDigests( std::vector<uint32_t> && crc32cDigests )
394 {
395 pCrc32cDigests = std::move( crc32cDigests );
396 }
397
398 //------------------------------------------------------------------------
400 //------------------------------------------------------------------------
402 {
403 pKBuff = kbuff;
404 }
405
406 //------------------------------------------------------------------------
408 //------------------------------------------------------------------------
409 void SetRedirectCounter( uint16_t redirectCounter )
410 {
411 pRedirectCounter = redirectCounter;
412 }
413
414 void SetFollowMetalink( bool followMetalink )
415 {
416 pFollowMetalink = followMetalink;
417 }
418
419 void SetStateful( bool stateful )
420 {
421 pStateful = stateful;
422 }
423
424 //------------------------------------------------------------------------
428 //------------------------------------------------------------------------
429 void PartialReceived();
430
431 private:
432
433 //------------------------------------------------------------------------
435 //------------------------------------------------------------------------
436 void HandleError( XRootDStatus status );
437
438 //------------------------------------------------------------------------
440 //------------------------------------------------------------------------
441 Status RetryAtServer( const URL &url, RedirectEntry::Type entryType );
442
443 //------------------------------------------------------------------------
445 //------------------------------------------------------------------------
446 void HandleResponse();
447
448 //------------------------------------------------------------------------
450 //------------------------------------------------------------------------
451 XRootDStatus *ProcessStatus();
452
453 //------------------------------------------------------------------------
456 //------------------------------------------------------------------------
457 Status ParseResponse( AnyObject *&response );
458
459 //------------------------------------------------------------------------
462 //------------------------------------------------------------------------
463 Status ParseXAttrResponse( char *data, size_t len, AnyObject *&response );
464
465 //------------------------------------------------------------------------
468 //------------------------------------------------------------------------
469 Status RewriteRequestRedirect( const URL &newUrl );
470
471 //------------------------------------------------------------------------
473 //------------------------------------------------------------------------
474 Status RewriteRequestWait();
475
476 //------------------------------------------------------------------------
478 //------------------------------------------------------------------------
479 void UpdateTriedCGI(uint32_t errNo=0);
480
481 //------------------------------------------------------------------------
483 //------------------------------------------------------------------------
484 void SwitchOnRefreshFlag();
485
486 //------------------------------------------------------------------------
489 //------------------------------------------------------------------------
490 void HandleRspOrQueue();
491
492 //------------------------------------------------------------------------
494 //------------------------------------------------------------------------
495 void HandleLocalRedirect( URL *url );
496
497 //------------------------------------------------------------------------
502 //------------------------------------------------------------------------
503 bool IsRetriable();
504
505 //------------------------------------------------------------------------
512 //------------------------------------------------------------------------
513 bool OmitWait( Message &request, const URL &url );
514
515 //------------------------------------------------------------------------
521 //------------------------------------------------------------------------
522 bool RetriableErrorResponse( const Status &status );
523
524 //------------------------------------------------------------------------
526 //------------------------------------------------------------------------
527 void DumpRedirectTraceBack();
528
535 //------------------------------------------------------------------------
536 template<typename T>
537 Status ReadFromBuffer( char *&buffer, size_t &buflen, T& result );
538
539 //------------------------------------------------------------------------
546 //------------------------------------------------------------------------
547 Status ReadFromBuffer( char *&buffer, size_t &buflen, std::string &result );
548
549 //------------------------------------------------------------------------
557 //------------------------------------------------------------------------
558 Status ReadFromBuffer( char *&buffer, size_t &buflen, size_t size,
559 std::string &result );
560
561 //------------------------------------------------------------------------
562 // Helper struct for async reading of chunks
563 //------------------------------------------------------------------------
564 struct ChunkStatus
565 {
566 ChunkStatus(): sizeError( false ), done( false ) {}
567 bool sizeError;
568 bool done;
569 };
570
571 typedef std::list<std::unique_ptr<RedirectEntry>> RedirectTraceBack;
572
573 static const size_t CksumSize = sizeof( uint32_t );
574 static const size_t PageWithCksum = XrdSys::PageSize + CksumSize;
575 static const size_t MaxSslErrRetry = 3;
576
577 inline static size_t NbPgPerRsp( uint64_t offset, uint32_t dlen )
578 {
579 uint32_t pgcnt = 0;
580 uint32_t remainder = offset % XrdSys::PageSize;
581 if( remainder > 0 )
582 {
583 // account for the first unaligned page
584 ++pgcnt;
585 // the size of the 1st unaligned page
586 uint32_t _1stpg = XrdSys::PageSize - remainder;
587 if( _1stpg + CksumSize > dlen )
588 _1stpg = dlen - CksumSize;
589 dlen -= _1stpg + CksumSize;
590 }
591 pgcnt += dlen / PageWithCksum;
592 if( dlen % PageWithCksum )
593 ++ pgcnt;
594 return pgcnt;
595 }
596
597 Message *pRequest;
598 std::shared_ptr<Message> pResponse; //< the ownership is shared with MsgReader
599 std::vector<std::shared_ptr<Message>> pPartialResps; //< the ownership is shared with MsgReader
600 ResponseHandler *pResponseHandler;
601 URL pUrl;
602 URL *pEffectiveDataServerUrl;
603 PostMaster *pPostMaster;
604 std::shared_ptr<SIDManager> pSidMgr;
605 LocalFileHandler *pLFileHandler;
606 XRootDStatus pStatus;
607 Status pLastError;
608 time_t pExpiration;
609 bool pRedirectAsAnswer;
610 bool pOksofarAsAnswer;
611 std::unique_ptr<HostList> pHosts;
612 bool pHasLoadBalancer;
613 HostInfo pLoadBalancer;
614 bool pHasSessionId;
615 std::string pRedirectUrl;
616 ChunkList *pChunkList;
617 std::vector<uint32_t> pCrc32cDigests;
618 XrdSys::KernelBuffer *pKBuff;
619 std::vector<ChunkStatus> pChunkStatus;
620 uint16_t pRedirectCounter;
621 uint16_t pNotAuthorizedCounter;
622
623 uint32_t pAsyncOffset;
624 uint32_t pAsyncChunkIndex;
625
626 std::unique_ptr<AsyncPageReader> pPageReader;
627 std::unique_ptr<AsyncRawReaderIntfc> pBodyReader;
628
629 Buffer pPgWrtCksumBuff;
630 uint32_t pPgWrtCurrentPageOffset;
631 uint32_t pPgWrtCurrentPageNb;
632
633 bool pOtherRawStarted;
634
635 bool pFollowMetalink;
636
637 bool pStateful;
638 int pAggregatedWaitTime;
639
640 std::unique_ptr<RedirectEntry> pRdirEntry;
641 RedirectTraceBack pRedirectTraceBack;
642
643 bool pMsgInFly;
644
645 //------------------------------------------------------------------------
646 // true if MsgHandler is both in inQueue and installed in respective
647 // Stream (this could happen if server gave oksofar response), otherwise
648 // false
649 //------------------------------------------------------------------------
650 std::atomic<bool> pTimeoutFence;
651
652 //------------------------------------------------------------------------
653 // if we are serving chunked data to the user's handler in case of
654 // kXR_dirlist we need to memorize if the response contains stat info or
655 // not (the information is only encoded in the first chunk)
656 //------------------------------------------------------------------------
657 bool pDirListStarted;
658 bool pDirListWithStat;
659
660 //------------------------------------------------------------------------
661 // synchronization is needed in case the MsgHandler has been configured
662 // to serve kXR_oksofar as a response to the user's handler
663 //------------------------------------------------------------------------
664 XrdSysCondVar pCV;
665
666 //------------------------------------------------------------------------
667 // Count of consecutive `errTlsSslError` errors
668 //------------------------------------------------------------------------
669 size_t pSslErrCnt;
670 };
671}
672
673#endif // __XRD_CL_XROOTD_MSG_HANDLER_HH__
kXR_unt16 requestid
Definition XProtocol.hh:157
@ kXR_read
Definition XProtocol.hh:125
@ kXR_readv
Definition XProtocol.hh:137
@ kXR_pgread
Definition XProtocol.hh:142
Object for discarding data.
Object for reading out data from the kXR_read response.
Object for reading out data from the VectorRead response.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
Handle diagnostics.
Definition XrdClLog.hh:101
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
The message representation used throughout the system.
const std::string & GetDescription() const
Get the description of the message.
uint64_t GetSessionId() const
Get the session ID the message is meant for.
StreamEvent
Events that may have occurred to the stream.
A hub for dispatching and receiving messages.
Handle an async response.
A network socket.
URL representation.
Definition XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition XrdClURL.hh:94
std::string GetLocation() const
Get location (protocol://host:port/path)
Definition XrdClURL.cc:330
bool IsValid() const
Is the url valid.
Definition XrdClURL.cc:438
Handle/Process/Forward XRootD messages.
void SetRedirectCounter(uint16_t redirectCounter)
Set the redirect counter.
virtual uint16_t Examine(std::shared_ptr< Message > &msg)
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten)
void SetFollowMetalink(bool followMetalink)
void SetChunkList(ChunkList *chunkList)
Set the chunk list.
void SetHostList(HostList *hostList)
Set host list.
void SetCrc32cDigests(std::vector< uint32_t > &&crc32cDigests)
const Message * GetRequest() const
Get the request pointer.
void SetLoadBalancer(const HostInfo &loadBalancer)
Set the load balancer.
virtual uint8_t OnStreamEvent(StreamEvent event, XRootDStatus status)
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead)
XRootDMsgHandler(Message *msg, ResponseHandler *respHandler, const URL *url, std::shared_ptr< SIDManager > sidMgr, LocalFileHandler *lFileHandler)
virtual void OnStatusReady(const Message *message, XRootDStatus status)
The requested action has been performed and the status is available.
virtual bool IsRaw() const
Are we a raw writer or not?
time_t GetExpiration()
Get a timestamp after which we give up.
void SetOksofarAsAnswer(bool oksofarAsAnswer)
void SetKernelBuffer(XrdSys::KernelBuffer *kbuff)
Set the kernel buffer.
virtual void Process()
Process the message if it was "taken" by the examine action.
void SetExpiration(time_t expiration)
Set a timestamp after which we give up.
void SetRedirectAsAnswer(bool redirectAsAnswer)
virtual uint16_t InspectStatusRsp()
virtual uint16_t GetSid() const
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
std::vector< HostInfo > HostList
const uint64_t ExDbgMsg
std::vector< ChunkInfo > ChunkList
List of chunks.
static const int PageSize
URL url
URL of the host.
RedirectEntry(const URL &from, const URL &to, Type type)
std::string ToString(bool prevok=true)
Procedure execution status.