XRootD
Loading...
Searching...
No Matches
XrdClOperations.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3// Author: Krzysztof Jamrog <krzysztof.piotr.jamrog@cern.ch>,
4// Michal Simon <michal.simon@cern.ch>
5//------------------------------------------------------------------------------
6// This file is part of the XRootD software suite.
7//
8// XRootD is free software: you can redistribute it and/or modify
9// it under the terms of the GNU Lesser General Public License as published by
10// the Free Software Foundation, either version 3 of the License, or
11// (at your option) any later version.
12//
13// XRootD is distributed in the hope that it will be useful,
14// but WITHOUT ANY WARRANTY; without even the implied warranty of
15// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16// GNU General Public License for more details.
17//
18// You should have received a copy of the GNU Lesser General Public License
19// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
20//
21// In applying this licence, CERN does not waive the privileges and immunities
22// granted to it by virtue of its status as an Intergovernmental Organization
23// or submit itself to any jurisdiction.
24//------------------------------------------------------------------------------
25
26#include <stdexcept>
27#include <string>
29#include "XrdCl/XrdClLog.hh"
32
33namespace
34{
35 //----------------------------------------------------------------------------
37 //----------------------------------------------------------------------------
38 struct StopPipeline
39 {
40 StopPipeline( const XrdCl::XRootDStatus &status ) : status( status ) { }
42 };
43
44 //----------------------------------------------------------------------------
46 //----------------------------------------------------------------------------
47 struct RepeatOpeation { };
48
49 //----------------------------------------------------------------------------
51 //----------------------------------------------------------------------------
52 struct ReplaceOperation
53 {
54 ReplaceOperation( XrdCl::Operation<false> &&opr ) : opr( opr.ToHandled() )
55 {
56 }
57
58 std::unique_ptr<XrdCl::Operation<true>> opr;
59 };
60
61 //----------------------------------------------------------------------------
63 //----------------------------------------------------------------------------
64 struct ReplacePipeline
65 {
66 ReplacePipeline( XrdCl::Pipeline p ) : pipeline( std::move( p ) )
67 {
68 }
69
70 XrdCl::Pipeline pipeline;
71 };
72
73 //----------------------------------------------------------------------------
75 //----------------------------------------------------------------------------
76 struct IgnoreError { };
77}
78
79namespace XrdCl
80{
81
82 //----------------------------------------------------------------------------
83 // OperationHandler Constructor.
84 //----------------------------------------------------------------------------
86 responseHandler( handler )
87 {
88 }
89
90 //----------------------------------------------------------------------------
91 // OperationHandler::AddOperation
92 //----------------------------------------------------------------------------
94 {
95 if( nextOperation )
96 {
97 nextOperation->AddOperation( operation );
98 }
99 else
100 {
101 nextOperation.reset( operation );
102 }
103 }
104
105 //----------------------------------------------------------------------------
106 // OperationHandler::HandleResponseImpl
107 //----------------------------------------------------------------------------
108 void PipelineHandler::HandleResponseImpl( XRootDStatus *status,
109 AnyObject *response, HostList *hostList )
110 {
111 std::unique_ptr<PipelineHandler> myself( this );
112
113 // We need to copy status as original status object is destroyed in
114 // HandleResponse function
115 XRootDStatus st( *status );
116 if( responseHandler )
117 {
118 try
119 {
120 responseHandler->HandleResponseWithHosts( status, response, hostList );
121 }
122 catch( const StopPipeline &stop )
123 {
124 if( final ) final( stop.status );
125 prms.set_value( stop.status );
126 return;
127 }
128 catch( const RepeatOpeation &repeat )
129 {
130 Operation<true> *opr = currentOperation.release();
131 opr->handler.reset( myself.release() );
132 opr->Run( timeout, std::move( prms ), std::move( final ) );
133 return;
134 }
135 catch( ReplaceOperation &replace )
136 {
137 Operation<true> *opr = replace.opr.release();
138 opr->handler.reset( myself.release() );
139 opr->Run( timeout, std::move( prms ), std::move( final ) );
140 return;
141 }
142 catch( ReplacePipeline &replace )
143 {
144 Pipeline p = std::move( replace.pipeline );
145 Operation<true> *opr = p.operation.release();
146 opr->Run( timeout, std::move( prms ), std::move( final ) );
147 return;
148 }
149 catch( const IgnoreError &ignore )
150 {
151 st = XRootDStatus();
152 }
153 }
154 else
155 dealloc( status, response, hostList );
156
157 if( !st.IsOK() || !nextOperation )
158 {
159 if( final ) final( st );
160 prms.set_value( st );
161 return;
162 }
163
164 Operation<true> *opr = nextOperation.release();
165 opr->Run( timeout, std::move( prms ), std::move( final ) );
166 }
167
168 //----------------------------------------------------------------------------
169 // OperationHandler::HandleResponseWithHosts
170 //----------------------------------------------------------------------------
172 AnyObject *response, HostList *hostList )
173 {
174 HandleResponseImpl( status, response, hostList );
175 }
176
177 //----------------------------------------------------------------------------
178 // OperationHandler::HandleResponse
179 //----------------------------------------------------------------------------
181 AnyObject *response )
182 {
183 HandleResponseImpl( status, response );
184 }
185
186 //----------------------------------------------------------------------------
187 // OperationHandler::AssignToWorkflow
188 //----------------------------------------------------------------------------
190 std::promise<XRootDStatus> p,
191 std::function<void(const XRootDStatus&)> f,
192 Operation<true> *opr )
193 {
194 timeout = t;
195 prms = std::move( p );
196 if( !final ) final = std::move( f );
197 else if( f )
198 {
199 auto f1 = std::move( final );
200 final = [f1, f]( const XRootDStatus &st ){ f1( st ); f( st ); };
201 }
202 currentOperation.reset( opr );
203 }
204
205 //------------------------------------------------------------------------
206 // Assign the finalization routine
207 //------------------------------------------------------------------------
208 void PipelineHandler::Assign( std::function<void(const XRootDStatus&)> f )
209 {
210 final = std::move( f );
211 }
212
213 //------------------------------------------------------------------------
214 // Called by a pipeline on the handler of its first operation before Run
215 //------------------------------------------------------------------------
217 {
218 // Move any final-function from the handler of the last operaiton to the
219 // first. It will be moved along the pipeline of handlers while the
220 // pipeline is run.
221
222 if( final || !nextOperation ) return;
223 PipelineHandler *last = nextOperation->handler.get();
224 while( last )
225 {
226 Operation<true> *nextop = last->nextOperation.get();
227 if( !nextop ) break;
228 last = nextop->handler.get();
229 }
230 if( last )
231 {
232 // swap-then-move rather than only move as we need to guarantee that
233 // last->final is left without target.
234 std::function<void(const XRootDStatus&)> f;
235 f.swap( last->final );
236 Assign( std::move( f ) );
237 }
238 }
239
240 //------------------------------------------------------------------------
241 // Stop the current pipeline
242 //------------------------------------------------------------------------
243 void Pipeline::Stop( const XRootDStatus &status )
244 {
245 throw StopPipeline( status );
246 }
247
248 //------------------------------------------------------------------------
249 // Repeat current operation
250 //------------------------------------------------------------------------
252 {
253 throw RepeatOpeation();
254 }
255
256 //------------------------------------------------------------------------
257 // Replace current operation
258 //------------------------------------------------------------------------
260 {
261 throw ReplaceOperation( std::move( opr ) );
262 }
263
264 //------------------------------------------------------------------------
265 // Replace with pipeline
266 //------------------------------------------------------------------------
268 {
269 throw ReplacePipeline( std::move( p ) );
270 }
271
272 //------------------------------------------------------------------------
273 // Ignore error and proceed with the pipeline
274 //------------------------------------------------------------------------
276 {
277 throw IgnoreError();
278 }
279}
280
std::unique_ptr< PipelineHandler > handler
Operation handler.
void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
Callback function.
PipelineHandler()
Default Constructor.
void PreparePipelineStart()
Called by a pipeline on the handler of its first operation before Run.
void Assign(const Timeout &timeout, std::promise< XRootDStatus > prms, std::function< void(const XRootDStatus &)> final, Operation< true > *opr)
void HandleResponse(XRootDStatus *status, AnyObject *response)
Callback function.
void AddOperation(Operation< true > *operation)
static void Repeat()
Repeat current operation.
static void Stop(const XRootDStatus &status=XrdCl::XRootDStatus())
static void Replace(Operation< false > &&opr)
Replace current operation.
static void Ignore()
Ignore error and proceed with the pipeline.
Handle an async response.
std::vector< HostInfo > HostList