xrootd
XrdClZipCache.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #ifndef SRC_XRDZIP_XRDZIPINFLCACHE_HH_
26 #define SRC_XRDZIP_XRDZIPINFLCACHE_HH_
27 
29 #include <zlib.h>
30 #include <exception>
31 #include <string>
32 #include <vector>
33 #include <mutex>
34 #include <queue>
35 #include <tuple>
36 
37 namespace XrdCl
38 {
39  //---------------------------------------------------------------------------
41  //---------------------------------------------------------------------------
42  struct ZipError : public std::exception
43  {
45  {
46  }
47 
49  };
50 
51  //---------------------------------------------------------------------------
53  //---------------------------------------------------------------------------
54  class ZipCache
55  {
56  public:
57 
58  typedef std::vector<char> buffer_t;
59 
60  private:
61 
62  typedef std::tuple<uint64_t, uint32_t, void*, ResponseHandler*> read_args_t;
63  typedef std::tuple<XRootDStatus, uint64_t, buffer_t> read_resp_t;
64 
66  {
67  inline bool operator() ( const read_resp_t &lhs, const read_resp_t &rhs ) const
68  {
69  return std::get<1>( lhs ) > std::get<1>( rhs );
70  }
71  };
72 
73  typedef std::priority_queue<read_resp_t, std::vector<read_resp_t>, greater_read_resp_t> resp_queue_t;
74 
75  public:
76 
77  struct ReadHandler : public ResponseHandler // TODO once we drop ZipArchiveReader this class can be removed
78  {
79  ReadHandler( uint64_t offset, uint32_t length, ZipCache &self ) : offset( offset ), buffer( length ), self( self )
80  {
81  }
82 
83  void HandleResponse( XRootDStatus *status, AnyObject *response )
84  {
85  self.QueueRsp( *status, offset, std::move( buffer ) );
86  delete status;
87  delete response;
88  delete this;
89  }
90 
91  uint64_t offset;
93  ZipCache &self;
94  };
95 
96  ZipCache() : inabsoff( 0 )
97  {
98  strm.zalloc = Z_NULL;
99  strm.zfree = Z_NULL;
100  strm.opaque = Z_NULL;
101  strm.avail_in = 0;
102  strm.next_in = Z_NULL;
103  strm.avail_out = 0;
104  strm.next_out = Z_NULL;
105 
106  // make sure zlib doesn't look for gzip headers, in order to do so
107  // pass negative window bits !!!
108  int rc = inflateInit2( &strm, -MAX_WBITS );
109  XrdCl::XRootDStatus st = ToXRootDStatus( rc, "inflateInit2" );
110  if( !st.IsOK() ) throw ZipError( st );
111  }
112 
114  {
115  inflateEnd( &strm );
116  }
117 
118  inline void QueueReq( uint64_t offset, uint32_t length, void *buffer, ResponseHandler *handler )
119  {
120  std::unique_lock<std::mutex> lck( mtx );
121  rdreqs.emplace( offset, length, buffer, handler );
122  Decompress();
123  }
124 
125  inline void QueueRsp( const XRootDStatus &st, uint64_t offset, buffer_t &&buffer )
126  {
127  std::unique_lock<std::mutex> lck( mtx );
128  rdrsps.emplace( st, offset, std::move( buffer ) );
129  Decompress();
130  }
131 
132  private:
133 
134  inline bool HasInput() const
135  {
136  return strm.avail_in != 0;
137  }
138 
139  inline bool HasOutput() const
140  {
141  return strm.avail_out != 0;
142  }
143 
144  inline void Input( const read_resp_t &rdrsp )
145  {
146  const buffer_t &buffer = std::get<2>( rdrsp );
147  strm.avail_in = buffer.size();
148  strm.next_in = (Bytef*)buffer.data();
149  }
150 
151  inline void Output( const read_args_t &rdreq )
152  {
153  strm.avail_out = std::get<1>( rdreq );
154  strm.next_out = (Bytef*)std::get<2>( rdreq );
155  }
156 
157  inline bool Consecutive( const read_resp_t &resp ) const
158  {
159  return ( std::get<1>( resp ) == inabsoff );
160  }
161 
162  void Decompress()
163  {
164  while( HasInput() || HasOutput() || !rdreqs.empty() || !rdrsps.empty() )
165  {
166  if( !HasOutput() && !rdreqs.empty() )
167  Output( rdreqs.front() );
168 
169  if( !HasInput() && !rdrsps.empty() && Consecutive( rdrsps.top() ) ) // the response might come out of order so we need to check the offset
170  Input( rdrsps.top() );
171 
172  if( !HasInput() || !HasOutput() ) return;
173 
174  // check the response status
175  XRootDStatus st = std::get<0>( rdrsps.top() );
176  if( !st.IsOK() ) return CallHandler( st );
177 
178  // the available space in output buffer before inflating
179  uInt avail_before = strm.avail_in;
180  // decompress the data
181  int rc = inflate( &strm, Z_SYNC_FLUSH );
182  st = ToXRootDStatus( rc, "inflate" );
183  if( !st.IsOK() ) return CallHandler( st ); // report error to user handler
184  // update the absolute input offset by the number of bytes we consumed
185  inabsoff += avail_before - strm.avail_in;
186 
187  if( !strm.avail_out ) // the output buffer is empty meaning a request has been fulfilled
189 
190  // the input buffer is empty meaning a response has been consumed
191  // (we need to check if there are any elements in the responses
192  // queue as the input buffer might have been set directly by the user)
193  if( !strm.avail_in && !rdrsps.empty() )
194  rdrsps.pop();
195  }
196  }
197 
198  static inline AnyObject* PkgRsp( ChunkInfo *chunk )
199  {
200  if( !chunk ) return nullptr;
201  AnyObject *rsp = new AnyObject();
202  rsp->Set( chunk );
203  return rsp;
204  }
205 
206  inline void CallHandler( const XRootDStatus &st )
207  {
208  read_args_t args = std::move( rdreqs.front() );
209  rdreqs.pop();
210 
211  ChunkInfo *chunk = nullptr;
212  if( st.IsOK() ) chunk = new ChunkInfo( std::get<0>( args ),
213  std::get<1>( args ),
214  std::get<2>( args ) );
215 
216  ResponseHandler *handler = std::get<3>( args );
217  handler->HandleResponse( new XRootDStatus( st ), PkgRsp( chunk ) );
218  }
219 
220  XrdCl::XRootDStatus ToXRootDStatus( int rc, const std::string &func )
221  {
222  std::string msg = "[zlib] " + func + " : ";
223 
224  switch( rc )
225  {
226  case Z_STREAM_END :
227  case Z_OK : return XrdCl::XRootDStatus();
228  case Z_BUF_ERROR : return XrdCl::XRootDStatus( XrdCl::stOK, XrdCl::suContinue );
229  case Z_MEM_ERROR : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errInternal, Z_MEM_ERROR, msg + "not enough memory." );
230  case Z_VERSION_ERROR : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errInternal, Z_VERSION_ERROR, msg + "version mismatch." );
231  case Z_STREAM_ERROR : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errInvalidArgs, Z_STREAM_ERROR, msg + "invalid argument." );
232  case Z_NEED_DICT : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError, Z_NEED_DICT, msg + "need dict.");
233  case Z_DATA_ERROR : return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errDataError, Z_DATA_ERROR, msg + "corrupted data." );
235  }
236  }
237 
238  z_stream strm; // the zlib stream we will use for reading
239 
240  std::mutex mtx;
241  uint64_t inabsoff; //< the absolute offset in the input file (compressed), ensures the user is actually streaming the data
242  std::queue<read_args_t> rdreqs; //< pending read requests (we only allow read requests to be submitted in order)
243  resp_queue_t rdrsps; //< pending read responses (due to multiple-streams the read response may come out of order)
244  };
245 
246 }
247 
248 #endif /* SRC_XRDZIP_XRDZIPINFLCACHE_HH_ */
XrdCl::ChunkInfo
Describe a data chunk for vector read.
Definition: XrdClXRootDResponses.hh:904
XrdCl::ZipCache::ReadHandler::ReadHandler
ReadHandler(uint64_t offset, uint32_t length, ZipCache &self)
Definition: XrdClZipCache.hh:79
XrdCl::ZipCache::resp_queue_t
std::priority_queue< read_resp_t, std::vector< read_resp_t >, greater_read_resp_t > resp_queue_t
Definition: XrdClZipCache.hh:73
XrdClXRootDResponses.hh
XrdCl::ResponseHandler
Handle an async response.
Definition: XrdClXRootDResponses.hh:1037
XrdCl::ZipCache::rdrsps
resp_queue_t rdrsps
Definition: XrdClZipCache.hh:243
XrdCl::stOK
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
XrdCl::ZipError::status
XrdCl::XRootDStatus status
Definition: XrdClZipCache.hh:48
XrdCl::ZipCache::buffer_t
std::vector< char > buffer_t
Definition: XrdClZipCache.hh:58
XrdCl::errDataError
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:62
XrdCl::suContinue
const uint16_t suContinue
Definition: XrdClStatus.hh:39
XrdCl::ZipCache::ToXRootDStatus
XrdCl::XRootDStatus ToXRootDStatus(int rc, const std::string &func)
Definition: XrdClZipCache.hh:220
XrdCl::ZipCache::Decompress
void Decompress()
Definition: XrdClZipCache.hh:162
XrdCl::ZipCache::HasInput
bool HasInput() const
Definition: XrdClZipCache.hh:134
XrdCl::AnyObject::Set
void Set(Type object, bool own=true)
Definition: XrdClAnyObject.hh:59
XrdCl::ZipCache::rdreqs
std::queue< read_args_t > rdreqs
Definition: XrdClZipCache.hh:242
XrdCl::ResponseHandler::HandleResponse
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Definition: XrdClXRootDResponses.hh:1066
XrdCl::errInternal
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:55
XrdCl::ZipCache::read_resp_t
std::tuple< XRootDStatus, uint64_t, buffer_t > read_resp_t
Definition: XrdClZipCache.hh:63
XrdCl::ZipCache::strm
z_stream strm
Definition: XrdClZipCache.hh:238
XrdCl::XRootDStatus
Request status.
Definition: XrdClXRootDResponses.hh:215
XrdCl::ZipCache::greater_read_resp_t
Definition: XrdClZipCache.hh:66
XrdCl::ZipCache::CallHandler
void CallHandler(const XRootDStatus &st)
Definition: XrdClZipCache.hh:206
XrdCl::ZipCache::ReadHandler::self
ZipCache & self
Definition: XrdClZipCache.hh:93
XrdCl::ZipCache::greater_read_resp_t::operator()
bool operator()(const read_resp_t &lhs, const read_resp_t &rhs) const
Definition: XrdClZipCache.hh:67
XrdCl::ZipCache::QueueReq
void QueueReq(uint64_t offset, uint32_t length, void *buffer, ResponseHandler *handler)
Definition: XrdClZipCache.hh:118
XrdCl::ZipCache::inabsoff
uint64_t inabsoff
Definition: XrdClZipCache.hh:241
XrdCl::Status::IsOK
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:120
XrdCl::errUnknown
const uint16_t errUnknown
Unknown error.
Definition: XrdClStatus.hh:49
XrdCl::ZipCache::ReadHandler
Definition: XrdClZipCache.hh:78
XrdCl::ZipCache::Input
void Input(const read_resp_t &rdrsp)
Definition: XrdClZipCache.hh:144
XrdCl
Definition: XrdClAnyObject.hh:26
XrdCl::ZipCache::QueueRsp
void QueueRsp(const XRootDStatus &st, uint64_t offset, buffer_t &&buffer)
Definition: XrdClZipCache.hh:125
XrdCl::ZipCache::HasOutput
bool HasOutput() const
Definition: XrdClZipCache.hh:139
XrdCl::ZipCache::mtx
std::mutex mtx
Definition: XrdClZipCache.hh:240
XrdCl::ZipCache::Consecutive
bool Consecutive(const read_resp_t &resp) const
Definition: XrdClZipCache.hh:157
XrdCl::ZipError::ZipError
ZipError(const XrdCl::XRootDStatus &status)
Definition: XrdClZipCache.hh:44
XrdCl::ZipCache
Utility class for inflating a compressed buffer.
Definition: XrdClZipCache.hh:55
XrdCl::stError
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
XrdCl::ZipCache::ReadHandler::offset
uint64_t offset
Definition: XrdClZipCache.hh:91
XrdCl::ZipCache::ZipCache
ZipCache()
Definition: XrdClZipCache.hh:96
XrdCl::ZipCache::read_args_t
std::tuple< uint64_t, uint32_t, void *, ResponseHandler * > read_args_t
Definition: XrdClZipCache.hh:62
XrdCl::ZipCache::~ZipCache
~ZipCache()
Definition: XrdClZipCache.hh:113
XrdCl::ZipCache::PkgRsp
static AnyObject * PkgRsp(ChunkInfo *chunk)
Definition: XrdClZipCache.hh:198
XrdCl::ZipCache::ReadHandler::HandleResponse
void HandleResponse(XRootDStatus *status, AnyObject *response)
Definition: XrdClZipCache.hh:83
XrdCl::AnyObject
Definition: XrdClAnyObject.hh:33
XrdCl::ZipCache::ReadHandler::buffer
buffer_t buffer
Definition: XrdClZipCache.hh:92
XrdCl::ZipCache::Output
void Output(const read_args_t &rdreq)
Definition: XrdClZipCache.hh:151
XrdCl::ZipError
An exception for carrying the XRootDStatus of InflCache.
Definition: XrdClZipCache.hh:43
XrdCl::errInvalidArgs
const uint16_t errInvalidArgs
Definition: XrdClStatus.hh:57