43const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
49const char *File::m_traceID =
"File";
53File::File(
const std::string& path,
long long iOffset,
long long iFileSize) :
57 m_cfi(
Cache::GetInstance().
GetTrace(),
Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks > 0),
60 m_file_size(iFileSize),
61 m_current_io(m_io_set.end()),
65 m_detach_time_logged(false),
70 m_prefetch_state(kOff),
71 m_prefetch_read_cnt(0),
72 m_prefetch_hit_cnt(0),
94 TRACEF(
Debug,
"~File() ended, prefetch score = " << m_prefetch_score);
101 File *file =
new File(path, offset, fileSize);
129 m_in_shutdown =
true;
131 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
133 m_prefetch_state = kStopped;
134 cache()->DeRegisterPrefetchFile(
this);
146 Stats delta = m_last_stats;
148 m_last_stats = m_stats.
Clone();
159 TRACEF(Dump,
"BlockRemovedFromWriteQ() block = " << (
void*) b <<
" idx= " << b->
m_offset/m_block_size);
167 TRACEF(Dump,
"BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
171 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
183 insert_remote_location(loc);
199 IoSet_i mi = m_io_set.find(io);
201 if (mi != m_io_set.end())
206 ", active_reads " << n_active_reads <<
207 ", active_prefetches " << io->m_active_prefetches <<
208 ", allow_prefetching " << io->m_allow_prefetching <<
209 ", ios_in_detach " << m_ios_in_detach);
211 "\tio_map.size() " << m_io_set.size() <<
212 ", block_map.size() " << m_block_map.size() <<
", file");
214 insert_remote_location(loc);
216 io->m_allow_prefetching =
false;
217 io->m_in_detach =
true;
220 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
222 if ( ! select_current_io_or_disable_prefetching(
false) )
224 TRACEF(
Debug,
"ioActive stopping prefetching after io " << io <<
" retreat.");
231 bool io_active_result;
233 if (n_active_reads > 0)
235 io_active_result =
true;
237 else if (m_io_set.size() - m_ios_in_detach == 1)
239 io_active_result = ! m_block_map.empty();
243 io_active_result = io->m_active_prefetches > 0;
246 if ( ! io_active_result)
251 TRACEF(
Info,
"ioActive for io " << io <<
" returning " << io_active_result <<
", file");
253 return io_active_result;
257 TRACEF(
Error,
"ioActive io " << io <<
" not found in IoSet. This should not happen.");
268 m_detach_time_logged =
false;
277 if ( ! m_in_shutdown)
279 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
283 m_detach_time_logged =
true;
285 TRACEF(
Debug,
"FinalizeSyncBeforeExit requesting sync to write detach stats");
289 TRACEF(
Debug,
"FinalizeSyncBeforeExit sync not required");
301 time_t now = time(0);
306 IoSet_i mi = m_io_set.find(io);
308 if (mi == m_io_set.end())
311 io->m_attach_time = now;
314 insert_remote_location(loc);
316 if (m_prefetch_state == kStopped)
318 m_prefetch_state = kOn;
319 cache()->RegisterPrefetchFile(
this);
324 TRACEF(
Error,
"AddIO() io = " << (
void*)io <<
" already registered.");
338 time_t now = time(0);
342 IoSet_i mi = m_io_set.find(io);
344 if (mi != m_io_set.end())
346 if (mi == m_current_io)
351 m_stats.
IoDetach(now - io->m_attach_time);
355 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
357 TRACEF(
Error,
"RemoveIO() io = " << (
void*)io <<
" Prefetching is not stopped/complete -- it should be by now.");
358 m_prefetch_state = kStopped;
359 cache()->DeRegisterPrefetchFile(
this);
364 TRACEF(
Error,
"RemoveIO() io = " << (
void*)io <<
" is NOT registered.");
376 static const char *tpfx =
"Open() ";
378 TRACEF(Dump, tpfx <<
"open file for disk cache");
385 struct stat data_stat, info_stat;
389 bool data_existed = (myOss.
Stat(m_filename.c_str(), &data_stat) ==
XrdOssOK);
390 bool info_existed = (myOss.
Stat(ifn.c_str(), &info_stat) ==
XrdOssOK);
393 char size_str[32]; sprintf(size_str,
"%lld", m_file_size);
394 myEnv.
Put(
"oss.asize", size_str);
406 m_data_file = myOss.
newFile(myUser);
407 if ((res = m_data_file->
Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) !=
XrdOssOK)
411 delete m_data_file; m_data_file = 0;
415 myEnv.
Put(
"oss.asize",
"64k");
421 m_data_file->
Close();
delete m_data_file; m_data_file = 0;
425 m_info_file = myOss.
newFile(myUser);
426 if ((res = m_info_file->
Open(ifn.c_str(), O_RDWR, 0600, myEnv)) !=
XrdOssOK)
430 delete m_info_file; m_info_file = 0;
431 m_data_file->
Close();
delete m_data_file; m_data_file = 0;
435 bool initialize_info_file =
true;
437 if (info_existed && m_cfi.
Read(m_info_file, ifn.c_str()))
439 TRACEF(
Debug, tpfx <<
"Reading existing info file. (data_existed=" << data_existed <<
440 ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
446 initialize_info_file =
false;
448 TRACEF(Warning, tpfx <<
"Basic sanity checks on data file failed, resetting info file, truncating data file.");
459 TRACEF(
Info, tpfx <<
"Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
460 initialize_info_file =
true;
469 if (initialize_info_file)
474 m_cfi.
Write(m_info_file, ifn.c_str());
475 m_info_file->
Fsync();
476 TRACEF(
Debug, tpfx <<
"Creating new file info, data size = " << m_file_size <<
" num blocks = " << m_cfi.
GetNBlocks());
483 m_prefetch_state = (m_cfi.
IsComplete()) ? kComplete : kStopped;
494bool File::overlap(
int blk,
503 const long long beg = blk * blk_size;
504 const long long end = beg + blk_size;
505 const long long req_end = req_off + req_size;
507 if (req_off < end && req_end > beg)
509 const long long ovlp_beg = std::max(beg, req_off);
510 const long long ovlp_end = std::min(end, req_end);
512 off = ovlp_beg - req_off;
513 blk_off = ovlp_beg - beg;
514 size = (int) (ovlp_end - ovlp_beg);
516 assert(size <= blk_size);
527Block* File::PrepareBlockRequest(
int i,
IO *io,
void *req_id,
bool prefetch)
535 const long long off = i * m_block_size;
536 const int last_block = m_num_blocks - 1;
537 const bool cs_net = cache()->RefConfiguration().is_cschk_net();
539 int blk_size, req_size;
540 if (i == last_block) {
541 blk_size = req_size = m_file_size - off;
542 if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
544 blk_size = req_size = m_block_size;
548 char *buf = cache()->RequestRAM(req_size);
552 b =
new (std::nothrow)
Block(
this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
562 m_prefetch_state = kHold;
563 cache()->DeRegisterPrefetchFile(
this);
568 TRACEF(Dump,
"PrepareBlockRequest() " << i <<
" prefetch " << prefetch <<
", allocation failed.");
575void File::ProcessBlockRequest(
Block *b)
583 snprintf(buf, 256,
"idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
585 TRACEF(Dump,
"ProcessBlockRequest() " << buf);
601 for (
BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
603 ProcessBlockRequest(*bi);
609void File::RequestBlocksDirect(
IO *io,
DirectResponseHandler *handler, std::vector<XrdOucIOVec>& ioVec,
int expected_size)
611 TRACEF(DumpXL,
"RequestBlocksDirect() issuing ReadV for n_chunks = " << (
int) ioVec.size() <<
", total_size = " << expected_size);
613 io->
GetInput()->
ReadV( *handler, ioVec.data(), (
int) ioVec.size());
618int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec,
int expected_size)
620 TRACEF(DumpXL,
"ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (
int) ioVec.size() <<
", total_size = " << expected_size);
622 long long rs = m_data_file->
ReadV(ioVec.data(), (
int) ioVec.size());
626 TRACEF(
Error,
"ReadBlocksFromDisk neg retval = " << rs);
630 if (rs != expected_size)
632 TRACEF(
Error,
"ReadBlocksFromDisk incomplete size = " << rs);
651 if (m_in_shutdown || io->m_in_detach)
654 return m_in_shutdown ? -ENOENT : -EBADF;
662 int ret = m_data_file->
Read(iUserBuff, iUserOff, iUserSize);
667 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
669 return ReadOpusCoalescere(io, &readV, 1, rh,
"Read() ");
676 TRACEF(Dump,
"ReadV() for " << readVnum <<
" chunks.");
680 if (m_in_shutdown || io->m_in_detach)
683 return m_in_shutdown ? -ENOENT : -EBADF;
696 return ReadOpusCoalescere(io, readV, readVnum, rh,
"ReadV() ");
701int File::ReadOpusCoalescere(
IO *io,
const XrdOucIOVec *readV,
int readVnum,
713 int prefetch_cnt = 0;
718 std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
720 std::vector<XrdOucIOVec> iovec_disk;
721 std::vector<XrdOucIOVec> iovec_direct;
722 int iovec_disk_total = 0;
723 int iovec_direct_total = 0;
725 for (
int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
732 const int idx_first = iUserOff / m_block_size;
733 const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
735 TRACEF(DumpXL, tpfx <<
"sid: " <<
Xrd::hex1 << rh->
m_seq_id <<
" idx_first: " << idx_first <<
" idx_last: " << idx_last);
737 enum LastBlock_e { LB_other, LB_disk, LB_direct };
739 LastBlock_e lbe = LB_other;
741 for (
int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
744 BlockMap_i bi = m_block_map.find(block_idx);
751 overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
754 if (bi != m_block_map.end())
756 inc_ref_count(bi->second);
757 TRACEF(Dump, tpfx << (
void*) iUserBuff <<
" inc_ref_count for existing block " << bi->second <<
" idx = " << block_idx);
759 if (bi->second->is_finished())
763 assert(bi->second->is_ok());
765 blks_ready[bi->second].emplace_back(
ChunkRequest(
nullptr, iUserBuff + off, blk_off, size) );
767 if (bi->second->m_prefetch)
778 bi->second->m_chunk_reqs.emplace_back(
ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
787 TRACEF(DumpXL, tpfx <<
"read from disk " << (
void*)iUserBuff <<
" idx = " << block_idx);
790 iovec_disk.back().size += size;
792 iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
793 iovec_disk_total += size;
807 Block *b = PrepareBlockRequest(block_idx, io, read_req,
false);
810 TRACEF(Dump, tpfx <<
"inc_ref_count new " << (
void*)iUserBuff <<
" idx = " << block_idx);
812 blks_to_request.push_back(b);
821 TRACEF(DumpXL, tpfx <<
"direct block " << block_idx <<
", blk_off " << blk_off <<
", size " << size);
823 if (lbe == LB_direct)
824 iovec_direct.back().size += size;
826 iovec_direct.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
827 iovec_direct_total += size;
836 inc_prefetch_hit_cnt(prefetch_cnt);
841 if ( ! blks_to_request.empty())
843 ProcessBlockRequests(blks_to_request);
844 blks_to_request.clear();
848 if ( ! iovec_direct.empty())
851 RequestBlocksDirect(io, direct_handler, iovec_direct, iovec_direct_total);
853 TRACEF(Dump, tpfx <<
"direct read requests sent out, n_chunks = " << (
int) iovec_direct.size() <<
", total_size = " << iovec_direct_total);
858 long long bytes_read = 0;
862 if ( ! blks_ready.empty())
864 for (
auto &bvi : blks_ready)
866 for (
auto &cr : bvi.second)
868 TRACEF(DumpXL, tpfx <<
"ub=" << (
void*)cr.m_buf <<
" from pre-finished block " << bvi.first->m_offset/m_block_size <<
" size " << cr.m_size);
869 memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
870 bytes_read += cr.m_size;
876 if ( ! iovec_disk.empty())
878 int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
879 TRACEF(DumpXL, tpfx <<
"from disk finished size = " << rc);
896 for (
auto &bvi : blks_ready)
897 dec_ref_count(bvi.first, (
int) bvi.second.size());
930 return error_cond ? error_cond : bytes_read;
942 long long offset = b->
m_offset - m_offset;
962 TRACEF(
Error,
"WriteToDisk() incomplete block write ret=" << retval <<
" (should be " << size <<
")");
972 const int blk_idx = (b->
m_offset - m_offset) / m_block_size;
975 TRACEF(Dump,
"WriteToDisk() success set bit for block " << b->
m_offset <<
" size=" << size);
977 bool schedule_sync =
false;
998 m_writes_during_sync.push_back(blk_idx);
1003 ++m_non_flushed_cnt;
1007 schedule_sync =
true;
1009 m_non_flushed_cnt = 0;
1016 cache()->ScheduleFileSync(
this);
1026 int ret = m_data_file->
Fsync();
1027 bool errorp =
false;
1032 m_cfi.
Write(m_info_file, m_filename.c_str());
1033 int cret = m_info_file->
Fsync();
1036 TRACEF(
Error,
"Sync cinfo file sync error " << cret);
1042 TRACEF(
Error,
"Sync data file sync error " << ret <<
", cinfo file has not been updated");
1048 TRACEF(
Error,
"Sync failed, unlinking local files and initiating shutdown of File object");
1055 m_writes_during_sync.clear();
1061 int written_while_in_sync;
1062 bool resync =
false;
1065 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1069 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1070 m_writes_during_sync.clear();
1074 if (written_while_in_sync > 0 && m_cfi.
IsComplete() && ! m_in_shutdown)
1079 TRACEF(Dump,
"Sync "<< written_while_in_sync <<
" blocks written during sync." << (resync ?
" File is now complete - resyncing." :
""));
1090void File::free_block(
Block* b)
1093 int i = b->
m_offset / m_block_size;
1094 TRACEF(Dump,
"free_block block " << b <<
" idx = " << i);
1095 size_t ret = m_block_map.erase(i);
1099 TRACEF(
Error,
"free_block did not erase " << i <<
" from map");
1109 m_prefetch_state = kOn;
1110 cache()->RegisterPrefetchFile(
this);
1116bool File::select_current_io_or_disable_prefetching(
bool skip_current)
1120 int io_size = (int) m_io_set.size();
1125 io_ok = (*m_io_set.begin())->m_allow_prefetching;
1128 m_current_io = m_io_set.begin();
1131 else if (io_size > 1)
1133 IoSet_i mi = m_current_io;
1134 if (skip_current && mi != m_io_set.end()) ++mi;
1136 for (
int i = 0; i < io_size; ++i)
1138 if (mi == m_io_set.end()) mi = m_io_set.begin();
1140 if ((*mi)->m_allow_prefetching)
1152 m_current_io = m_io_set.end();
1153 m_prefetch_state = kStopped;
1154 cache()->DeRegisterPrefetchFile(
this);
1162void File::ProcessDirectReadFinished(
ReadRequest *rreq,
int bytes_read,
int error_cond)
1168 TRACEF(
Error,
"Read(), direct read finished with error " << -error_cond <<
" " <<
XrdSysE2T(-error_cond));
1170 m_state_cond.
Lock();
1186 FinalizeReadRequest(rreq);
1213 TRACEF(Dump,
"ProcessBlockSuccess() ub=" << (
void*)creq.
m_buf <<
" from finished block " << b->
m_offset/m_block_size <<
" size " << creq.
m_size);
1216 m_state_cond.
Lock();
1221 rreq->m_stats.m_BytesMissed += creq.
m_size;
1223 rreq->m_stats.m_BytesHit += creq.
m_size;
1225 --rreq->m_n_chunk_reqs;
1228 inc_prefetch_hit_cnt(1);
1232 bool rreq_complete = rreq->is_complete();
1237 FinalizeReadRequest(rreq);
1251void File::ProcessBlockResponse(
Block *b,
int res)
1253 static const char* tpfx =
"ProcessBlockResponse ";
1255 TRACEF(Dump, tpfx <<
"block=" << b <<
", idx=" << b->
m_offset/m_block_size <<
", off=" << b->
m_offset <<
", res=" << res);
1257 if (res >= 0 && res != b->
get_size())
1261 TRACEF(
Error, tpfx <<
"Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1265 m_state_cond.
Lock();
1271 IoSet_i mi = m_io_set.find(io);
1272 if (mi != m_io_set.end())
1274 --io->m_active_prefetches;
1277 if (res < 0 && io->m_allow_prefetching)
1279 TRACEF(
Debug, tpfx <<
"after failed prefetch on io " << io <<
" disabling prefetching on this io.");
1280 io->m_allow_prefetching =
false;
1283 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1285 if ( ! select_current_io_or_disable_prefetching(
false) )
1287 TRACEF(
Debug, tpfx <<
"stopping prefetching after io " << b->
get_io() <<
" marked as bad.");
1293 if (b->
m_refcnt == 0 && (res < 0 || m_in_shutdown))
1309 TRACEF(Dump, tpfx <<
"inc_ref_count idx=" << b->
m_offset/m_block_size);
1310 if ( ! m_in_shutdown)
1315 cache()->AddWriteTask(b,
true);
1324 for (
auto &creq : creqs_to_notify)
1326 ProcessBlockSuccess(b, creq);
1335#if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1346 std::list<ReadRequest*> rreqs_to_complete;
1355 ProcessBlockError(b, rreq);
1358 rreqs_to_complete.push_back(rreq);
1363 creqs_to_keep.push_back(creq);
1367 bool reissue =
false;
1368 if ( ! creqs_to_keep.empty())
1370 ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1372 TRACEF(
Info,
"ProcessBlockResponse() requested block " << (
void*)b <<
" failed with another io " <<
1373 b->
get_io() <<
" - reissuing request with my io " << rreq->
m_io);
1382 for (
auto rreq : rreqs_to_complete)
1383 FinalizeReadRequest(rreq);
1386 ProcessBlockRequest(b);
1394 return m_filename.c_str();
1399int File::offsetIdx(
int iIdx)
const
1401 return iIdx - m_offset/m_block_size;
1415 TRACEF(DumpXL,
"Prefetch() entering.");
1419 if (m_prefetch_state != kOn)
1424 if ( ! select_current_io_or_disable_prefetching(
true) )
1426 TRACEF(
Error,
"Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1431 for (
int f = 0; f < m_num_blocks; ++f)
1435 int f_act = f + m_offset / m_block_size;
1437 BlockMap_i bi = m_block_map.find(f_act);
1438 if (bi == m_block_map.end())
1440 Block *b = PrepareBlockRequest(f_act, *m_current_io,
nullptr,
true);
1443 TRACEF(Dump,
"Prefetch take block " << f_act);
1447 inc_prefetch_read_cnt(1);
1452 TRACEF(Warning,
"Prefetch allocation failed for block " << f_act);
1461 TRACEF(
Debug,
"Prefetch file is complete, stopping prefetch.");
1462 m_prefetch_state = kComplete;
1463 cache()->DeRegisterPrefetchFile(
this);
1467 (*m_current_io)->m_active_prefetches += (int) blks.size();
1471 if ( ! blks.empty())
1473 ProcessBlockRequests(blks);
1482 return m_prefetch_score;
1495void File::insert_remote_location(
const std::string &loc)
1499 size_t p = loc.find_first_of(
'@');
1500 m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1507 if ( ! m_remote_locations.empty())
1511 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1515 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1518 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1520 s +=
'"'; s += *i; s +=
'"';
1521 if (j < nl) s +=
',';
#define ERRNO_AND_ERRSTR(err_code)
const char * XrdSysE2T(int errcode)
virtual int Ftruncate(unsigned long long flen)
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
virtual ssize_t Read(off_t offset, size_t size)
virtual ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
void Put(const char *varname, const char *value)
void Done(int result) override
int * ptr_n_cksum_errors()
vCkSum_t & ref_cksum_vec()
long long get_offset() const
vChunkRequest_t m_chunk_reqs
void * get_req_id() const
bool req_cksum_net() const
void reset_error_and_set_io(IO *io, void *rid)
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
static Cache & GetInstance()
Singleton access.
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
void Done(int result) override
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
void WriteBlockToDisk(Block *b)
std::string & GetLocalPath()
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
float GetPrefetchScore() const
friend class BlockResponseHandler
std::string GetRemoteLocations() const
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
friend class DirectResponseHandler
void initiate_emergency_shutdown()
void Sync()
Sync file cache inf o and output data with disk.
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
Stats DeltaStatsFromLastCall()
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Base cache-io class that implements some XrdOucCacheIO abstract methods.
XrdOucCacheIO * GetInput()
RAtomic_int m_active_read_reqs
number of active read requests
const char * GetLocation()
Status of cached file. Can be read from and written into a binary file.
void SetBitPrefetch(int i)
Mark block as obtained through prefetch.
static const char * s_infoExtension
void SetBitSynced(int i)
Mark block as synced to disk.
time_t GetNoCkSumTimeForUVKeep() const
CkSumCheck_e GetCkSumState() const
void WriteIOStatAttach()
Write open time in the last entry of access statistics.
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
void DowngradeCkSumState(CkSumCheck_e css_ref)
void ResetAllAccessStats()
Reset IO Stats.
bool TestBitPrefetch(int i) const
Test if block at the given index has been prefetched.
bool IsComplete() const
Get complete status.
bool IsCkSumCache() const
void SetBitWritten(int i)
Mark block as written to disk.
long long GetBufferSize() const
Get prefetch buffer size.
void WriteIOStat(Stats &s)
Write bytes missed, hits, and disk.
long long GetExpectedDataFileSize() const
Get expected data file size.
bool TestBitWritten(int i) const
Test if block at the given index is written to disk.
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
void SetCkSumState(CkSumCheck_e css)
void SetBufferSizeFileSizeAndCreationTime(long long bs, long long fs)
void WriteIOStatDetach(Stats &s)
Write close time together with bytes missed, hits, and disk.
int GetNBlocks() const
Get number of blocks represented in download-state bit-vector.
Statistics of cache utilisation by a File object.
void AddReadStats(const Stats &s)
long long m_BytesBypassed
number of bytes served directly through XrdCl
void AddWriteStats(long long bytes_written, int n_cks_errs)
void AddBytesHit(long long bh)
long long m_BytesHit
number of bytes served from disk
void IoDetach(int duration)
void DeltaToReference(const Stats &ref)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
std::list< Block * > BlockList_t
std::vector< ChunkRequest > vChunkRequest_t
std::list< Block * >::iterator BlockList_i
Contains parameters configurable from the xrootd config file.
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
CkSumCheck_e get_cs_Chk() const
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
bool should_uvkeep_purge(time_t delta) const
std::string m_data_space
oss space for data files
long long m_bufferSize
prefetch buffer size, default 1MB
std::string m_meta_space
oss space for metadata files (cinfo)
std::string m_username
username passed to oss plugin
void update_error_cond(int ec)