xrootd
XrdClAsyncSocketHandler.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #ifndef __XRD_CL_ASYNC_SOCKET_HANDLER_HH__
20 #define __XRD_CL_ASYNC_SOCKET_HANDLER_HH__
21 
22 #include "XrdCl/XrdClSocket.hh"
23 #include "XrdCl/XrdClConstants.hh"
24 #include "XrdCl/XrdClDefaultEnv.hh"
25 #include "XrdCl/XrdClPoller.hh"
28 
29 #include <sys/types.h>
30 #include <sys/socket.h>
31 
32 namespace XrdCl
33 {
34  class Stream;
35 
36  //----------------------------------------------------------------------------
39  //----------------------------------------------------------------------------
41  {
42  //------------------------------------------------------------------------
43  // We need an extra task for rescheduling of HS request that received
44  // a wait response.
45  //------------------------------------------------------------------------
46  class WaitTask: public XrdCl::Task
47  {
48  public:
50  pHandler( handler ), pMsg( msg )
51  {
52  std::ostringstream o;
53  o << "WaitTask for: 0x" << msg;
54  SetName( o.str() );
55  }
56 
57  virtual time_t Run( time_t now )
58  {
60  return 0;
61  }
62 
63  private:
66  };
67 
68  public:
69  //------------------------------------------------------------------------
71  //------------------------------------------------------------------------
72  AsyncSocketHandler( Poller *poller,
73  TransportHandler *transport,
74  AnyObject *channelData,
75  uint16_t subStreamNum );
76 
77  //------------------------------------------------------------------------
79  //------------------------------------------------------------------------
81 
82  //------------------------------------------------------------------------
84  //------------------------------------------------------------------------
85  void SetAddress( const XrdNetAddr &address )
86  {
87  pSockAddr = address;
88  }
89 
90  //------------------------------------------------------------------------
92  //------------------------------------------------------------------------
93  const XrdNetAddr &GetAddress() const
94  {
95  return pSockAddr;
96  }
97 
98  //------------------------------------------------------------------------
100  //------------------------------------------------------------------------
101  Status Connect( time_t timeout );
102 
103  //------------------------------------------------------------------------
105  //------------------------------------------------------------------------
106  Status Close();
107 
108  //------------------------------------------------------------------------
110  //------------------------------------------------------------------------
111  void SetStream( Stream *stream );
112 
113  //------------------------------------------------------------------------
115  //------------------------------------------------------------------------
116  virtual void Event( uint8_t type, XrdCl::Socket */*socket*/ );
117 
118  //------------------------------------------------------------------------
120  //------------------------------------------------------------------------
122  {
124  return Status( stFatal, errPollerError );
125  return Status();
126  }
127 
128  //------------------------------------------------------------------------
130  //------------------------------------------------------------------------
132  {
133  if( !pPoller->EnableWriteNotification( pSocket, false ) )
134  return Status( stFatal, errPollerError );
135  return Status();
136  }
137 
138  //------------------------------------------------------------------------
140  //------------------------------------------------------------------------
141  const std::string &GetStreamName()
142  {
143  return pStreamName;
144  }
145 
146  //------------------------------------------------------------------------
148  //------------------------------------------------------------------------
150  {
151  return pLastActivity;
152  }
153 
154  private:
155 
156  //------------------------------------------------------------------------
157  // Connect returned
158  //------------------------------------------------------------------------
159  void OnConnectionReturn();
160 
161  //------------------------------------------------------------------------
162  // Got a write readiness event
163  //------------------------------------------------------------------------
164  void OnWrite();
165 
166  //------------------------------------------------------------------------
167  // Got a write readiness event while handshaking
168  //------------------------------------------------------------------------
170 
171 
172  Status WriteMessageAndRaw( Message *toWrite, Message *&sign );
173 
174  Status WriteSeparately( Message *toWrite, Message *&sign );
175 
176  //------------------------------------------------------------------------
177  // Write the current message
178  //------------------------------------------------------------------------
179  Status WriteCurrentMessage( Message *toWrite );
180 
181  //------------------------------------------------------------------------
182  // Write the message, its signature and its body
183  //------------------------------------------------------------------------
184  Status WriteVMessage( Message *toWrite,
185  Message *&sign,
186  ChunkList *chunks,
187  uint32_t *asyncOffset );
188 
189  //------------------------------------------------------------------------
190  // Got a read readiness event
191  //------------------------------------------------------------------------
192  void OnRead();
193 
194  //------------------------------------------------------------------------
195  // Got a read readiness event while handshaking
196  //------------------------------------------------------------------------
197  void OnReadWhileHandshaking();
198 
199  //------------------------------------------------------------------------
200  // Read a message
201  //------------------------------------------------------------------------
202  Status ReadMessage( Message *&toRead );
203 
204  //------------------------------------------------------------------------
205  // Handle fault
206  //------------------------------------------------------------------------
207  void OnFault( Status st );
208 
209  //------------------------------------------------------------------------
210  // Handle fault while handshaking
211  //------------------------------------------------------------------------
212  void OnFaultWhileHandshaking( Status st );
213 
214  //------------------------------------------------------------------------
215  // Handle write timeout event
216  //------------------------------------------------------------------------
217  void OnWriteTimeout();
218 
219  //------------------------------------------------------------------------
220  // Handle read timeout event
221  //------------------------------------------------------------------------
222  void OnReadTimeout();
223 
224  //------------------------------------------------------------------------
225  // Handle timeout event while handshaking
226  //------------------------------------------------------------------------
228 
229  //------------------------------------------------------------------------
230  // Get signature for given message
231  //------------------------------------------------------------------------
232  Status GetSignature( Message *toSign, Message *&sign );
233 
234  //------------------------------------------------------------------------
235  // Initialize the iovec with given message
236  //------------------------------------------------------------------------
237  inline void ToIov( Message &msg, iovec &iov );
238 
239  //------------------------------------------------------------------------
240  // Update iovec after write
241  //------------------------------------------------------------------------
242  inline void UpdateAfterWrite( Message &msg, iovec &iov, int &bytesRead );
243 
244  //------------------------------------------------------------------------
245  // Add chunks to the given iovec
246  //------------------------------------------------------------------------
247  inline uint32_t ToIov( ChunkList *chunks,
248  const uint32_t *offset,
249  iovec *iov );
250 
251  //------------------------------------------------------------------------
252  // Update raw data after write
253  //------------------------------------------------------------------------
254  inline void UpdateAfterWrite( ChunkList *chunks,
255  uint32_t *offset,
256  iovec *iov,
257  int &bytesWritten );
258 
259  //------------------------------------------------------------------------
260  // Retry hand shake message
261  //------------------------------------------------------------------------
262  void RetryHSMsg( Message *msg );
263 
264  //------------------------------------------------------------------------
265  // Extract the value of a wait response
266  //
267  // @param rsp : the server response
268  // @return : if rsp is a wait response then its value
269  // otherwise -1
270  //------------------------------------------------------------------------
271  inline kXR_int32 HandleWaitRsp( Message *rsp );
272 
273  //------------------------------------------------------------------------
278  //------------------------------------------------------------------------
279  Status ClassifyErrno( int error );
280 
281  //------------------------------------------------------------------------
282  // Data members
283  //------------------------------------------------------------------------
287  uint16_t pSubStreamNum;
289  std::string pStreamName;
303  // true means the handler owns the server response
304  std::pair<IncomingMsgHandler*, bool> pIncHandler;
307  uint32_t pIncMsgSize;
308  uint32_t pOutMsgSize;
310  };
311 }
312 
313 #endif // __XRD_CL_ASYNC_SOCKET_HANDLER_HH__
const XrdNetAddr & GetAddress() const
Get the address that the socket is connected to.
Definition: XrdClAsyncSocketHandler.hh:93
Status Connect(time_t timeout)
Connect to the currently set address.
Definition: XrdClAnyObject.hh:32
Interface.
Definition: XrdClPoller.hh:33
Interface for socket pollers.
Definition: XrdClPoller.hh:86
Definition: XrdClAsyncSocketHandler.hh:46
virtual void Event(uint8_t type, XrdCl::Socket *)
Handle a socket event.
std::vector< ChunkInfo > ChunkList
List of chunks.
Definition: XrdClXRootDResponses.hh:784
Message * pOutgoing
Definition: XrdClAsyncSocketHandler.hh:293
bool pHandShakeDone
Definition: XrdClAsyncSocketHandler.hh:298
The message representation used throughout the system.
Definition: XrdClMessage.hh:29
Status ClassifyErrno(int error)
time_t GetLastActivity()
Get timestamp of last registered socket activity.
Definition: XrdClAsyncSocketHandler.hh:149
bool pHeaderDone
Definition: XrdClAsyncSocketHandler.hh:302
std::pair< IncomingMsgHandler *, bool > pIncHandler
Definition: XrdClAsyncSocketHandler.hh:304
void OnFaultWhileHandshaking(Status st)
AnyObject * pChannelData
Definition: XrdClAsyncSocketHandler.hh:286
uint16_t pSubStreamNum
Definition: XrdClAsyncSocketHandler.hh:287
time_t pConnectionStarted
Definition: XrdClAsyncSocketHandler.hh:300
Status ReadMessage(Message *&toRead)
Definition: XrdNetAddr.hh:41
Procedure execution status.
Definition: XrdClStatus.hh:109
Status WriteMessageAndRaw(Message *toWrite, Message *&sign)
void SetName(const std::string &name)
Set name of the task.
Definition: XrdClTaskManager.hh:60
AsyncSocketHandler(Poller *poller, TransportHandler *transport, AnyObject *channelData, uint16_t subStreamNum)
Constructor.
Status WriteVMessage(Message *toWrite, Message *&sign, ChunkList *chunks, uint32_t *asyncOffset)
bool pOutMsgDone
Definition: XrdClAsyncSocketHandler.hh:305
Status Close()
Close the connection.
WaitTask(XrdCl::AsyncSocketHandler *handler, XrdCl::Message *msg)
Definition: XrdClAsyncSocketHandler.hh:49
Data structure that carries the handshake information.
Definition: XrdClPostMasterInterfaces.hh:256
Interface for a task to be run by the TaskManager.
Definition: XrdClTaskManager.hh:35
uint32_t pOutMsgSize
Definition: XrdClAsyncSocketHandler.hh:308
Perform the handshake and the authentication for each physical stream.
Definition: XrdClPostMasterInterfaces.hh:302
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33
Status EnableUplink()
Enable uplink.
Definition: XrdClAsyncSocketHandler.hh:121
Definition: XrdClAnyObject.hh:25
Status GetSignature(Message *toSign, Message *&sign)
Socket * pSocket
Definition: XrdClAsyncSocketHandler.hh:290
Poller * pPoller
Definition: XrdClAsyncSocketHandler.hh:284
const std::string & GetStreamName()
Get stream name.
Definition: XrdClAsyncSocketHandler.hh:141
TransportHandler * pTransport
Definition: XrdClAsyncSocketHandler.hh:285
XrdNetAddr pSockAddr
Definition: XrdClAsyncSocketHandler.hh:296
const uint16_t errPollerError
Definition: XrdClStatus.hh:74
void SetStream(Stream *stream)
Set a stream object to be notified about the status of the operations.
void RetryHSMsg(Message *msg)
Status DisableUplink()
Disable uplink.
Definition: XrdClAsyncSocketHandler.hh:131
void OnFault(Status st)
~AsyncSocketHandler()
Destructor.
time_t pLastActivity
Definition: XrdClAsyncSocketHandler.hh:309
time_t pConnectionTimeout
Definition: XrdClAsyncSocketHandler.hh:301
int kXR_int32
Definition: XPtypes.hh:89
Message * pHSOutgoing
Definition: XrdClAsyncSocketHandler.hh:295
Message status handler.
Definition: XrdClPostMasterInterfaces.hh:167
void ToIov(Message &msg, iovec &iov)
Stream.
Definition: XrdClStream.hh:47
Message * pIncoming
Definition: XrdClAsyncSocketHandler.hh:291
std::string pStreamName
Definition: XrdClAsyncSocketHandler.hh:289
Status WriteCurrentMessage(Message *toWrite)
void UpdateAfterWrite(Message &msg, iovec &iov, int &bytesRead)
XrdCl::Message * pMsg
Definition: XrdClAsyncSocketHandler.hh:65
XrdCl::AsyncSocketHandler * pHandler
Definition: XrdClAsyncSocketHandler.hh:64
kXR_int32 HandleWaitRsp(Message *rsp)
Message * pHSIncoming
Definition: XrdClAsyncSocketHandler.hh:292
uint32_t pIncMsgSize
Definition: XrdClAsyncSocketHandler.hh:307
Status WriteSeparately(Message *toWrite, Message *&sign)
virtual time_t Run(time_t now)
Definition: XrdClAsyncSocketHandler.hh:57
A network socket.
Definition: XrdClSocket.hh:36
OutgoingMsgHandler * pOutHandler
Definition: XrdClAsyncSocketHandler.hh:306
Definition: XrdClAsyncSocketHandler.hh:40
virtual bool EnableWriteNotification(Socket *socket, bool notify, uint16_t timeout=60)=0
uint16_t pTimeoutResolution
Definition: XrdClAsyncSocketHandler.hh:299
Message * pSignature
Definition: XrdClAsyncSocketHandler.hh:294
void SetAddress(const XrdNetAddr &address)
Set address.
Definition: XrdClAsyncSocketHandler.hh:85
Stream * pStream
Definition: XrdClAsyncSocketHandler.hh:288
HandShakeData * pHandShakeData
Definition: XrdClAsyncSocketHandler.hh:297