XRootD
Loading...
Searching...
No Matches
XrdPfc::File Class Reference

#include <XrdPfcFile.hh>

Collaboration diagram for XrdPfc::File:

Public Member Functions

void AddIO (IO *io)
void BlockRemovedFromWriteQ (Block *)
 Handle removal of a block from Cache's write queue.
void BlocksRemovedFromWriteQ (std::list< Block * > &)
 Handle removal of a set of blocks from Cache's write queue.
int dec_ref_cnt ()
bool FinalizeSyncBeforeExit ()
 Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
int Fstat (struct stat &sbuff)
int get_ref_cnt ()
size_t GetAccessCnt () const
int GetBlockSize () const
long long GetFileSize () const
const Info::AStatGetLastAccessStats () const
const std::string & GetLocalPath () const
XrdSysErrorGetLog () const
int GetNBlocks () const
int GetNDownloadedBlocks () const
int GetPrefetchCountOnIO (IO *io)
long long GetPrefetchedBytes () const
float GetPrefetchScore () const
std::string GetRemoteLocations () const
XrdSysTraceGetTrace () const
int inc_ref_cnt ()
long long initiate_emergency_shutdown ()
bool ioActive (IO *io)
 Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close().
void ioUpdated (IO *io)
 Notification from IO that it has been updated (remote open).
bool is_in_emergency_shutdown ()
const char * lPath () const
 Log path.
void Prefetch ()
int Read (IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
 Normal read.
int ReadV (IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
 Vector read.
const StatsRefStats () const
void RemoveIO (IO *io)
void RequestSyncOfDetachStats ()
 Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
void StopPrefetchingOnIO (IO *io)
void Sync ()
 Sync file cache inf o and output data with disk.
void WriteBlockToDisk (Block *b)

Static Public Member Functions

static File * FileOpen (const std::string &path, long long offset, long long fileSize, XrdOucCacheIO *inputIO)
 Static constructor that also does Open. Returns null ptr if Open fails.

Friends

class BlockResponseHandler
class Cache
class DirectResponseHandler

Detailed Description

Definition at line 202 of file XrdPfcFile.hh.

Member Function Documentation

◆ AddIO()

void File::AddIO ( IO * io)

Definition at line 351 of file XrdPfcFile.cc.

352{
353 // Called from Cache::GetFile() when a new IO asks for the file.
354
355 TRACEF(Debug, "AddIO() io = " << (void*)io);
356
357 time_t now = time(0);
358 std::string loc(io->GetLocation());
359
360 m_state_cond.Lock();
361
362 IoSet_i mi = m_io_set.find(io);
363
364 if (mi == m_io_set.end())
365 {
366 m_io_set.insert(io);
367 io->m_attach_time = now;
368 m_delta_stats.IoAttach();
369
370 insert_remote_location(loc);
371
372 if (m_prefetch_state == kStopped)
373 {
374 m_prefetch_state = kOn;
375 cache()->RegisterPrefetchFile(this);
376 }
377 }
378 else
379 {
380 TRACEF(Error, "AddIO() io = " << (void*)io << " already registered.");
381 }
382
383 m_state_cond.UnLock();
384}
#define TRACEF(act, x)
bool Debug
const char * GetLocation()
Definition XrdPfcIO.hh:41

References Debug, Error, XrdPfc::IO::GetLocation(), and TRACEF.

Referenced by XrdPfc::Cache::GetFile().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ BlockRemovedFromWriteQ()

void File::BlockRemovedFromWriteQ ( Block * b)

Handle removal of a block from Cache's write queue.

Definition at line 213 of file XrdPfcFile.cc.

214{
215 TRACEF(Dump, "BlockRemovedFromWriteQ() block = " << (void*) b << " idx= " << b->m_offset/m_block_size);
216
217 XrdSysCondVarHelper _lck(m_state_cond);
218 dec_ref_count(b);
219}
long long m_offset

References XrdPfc::Block::m_offset, and TRACEF.

◆ BlocksRemovedFromWriteQ()

void File::BlocksRemovedFromWriteQ ( std::list< Block * > & blocks)

Handle removal of a set of blocks from Cache's write queue.

Definition at line 221 of file XrdPfcFile.cc.

222{
223 TRACEF(Dump, "BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
224
225 XrdSysCondVarHelper _lck(m_state_cond);
226
227 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
228 {
229 dec_ref_count(*i);
230 }
231}

References TRACEF.

Referenced by XrdPfc::Cache::RemoveWriteQEntriesFor().

Here is the caller graph for this function:

◆ dec_ref_cnt()

int XrdPfc::File::dec_ref_cnt ( )
inline

Definition at line 288 of file XrdPfcFile.hh.

288{ return --m_ref_cnt; }

◆ FileOpen()

File * File::FileOpen ( const std::string & path,
long long offset,
long long fileSize,
XrdOucCacheIO * inputIO )
static

Static constructor that also does Open. Returns null ptr if Open fails.

Definition at line 143 of file XrdPfcFile.cc.

144{
145 File *file = new File(path, offset, fileSize);
146 if ( ! file->Open(inputIO))
147 {
148 delete file;
149 file = 0;
150 }
151 return file;
152}
XrdOucString File

References File.

Referenced by XrdPfc::Cache::GetFile().

Here is the caller graph for this function:

◆ FinalizeSyncBeforeExit()

bool File::FinalizeSyncBeforeExit ( )

Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.

Definition at line 327 of file XrdPfcFile.cc.

328{
329 // Returns true if sync is required.
330 // This method is called after corresponding IO is detached from PosixCache.
331
332 XrdSysCondVarHelper _lck(m_state_cond);
333 if ( ! m_in_shutdown)
334 {
335 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
336 {
337 report_and_merge_delta_stats();
338 m_cfi.WriteIOStatDetach(m_stats);
339 m_detach_time_logged = true;
340 m_in_sync = true;
341 TRACEF(Debug, "FinalizeSyncBeforeExit requesting sync to write detach stats");
342 return true;
343 }
344 }
345 TRACEF(Debug, "FinalizeSyncBeforeExit sync not required");
346 return false;
347}

References Debug, and TRACEF.

◆ Fstat()

int File::Fstat ( struct stat & sbuff)

Definition at line 660 of file XrdPfcFile.cc.

661{
662 // Stat on an open file.
663 // Corrects size to actual full size of the file.
664 // Sets atime to 0 if the file is only partially downloaded, in accordance
665 // with pfc.onlyifcached settings.
666 // Called from IO::Fstat() and Cache::Stat() when the file is active.
667 // Returns 0 on success, -errno on error.
668
669 int res;
670
671 if ((res = m_data_file->Fstat(&sbuff))) return res;
672
673 sbuff.st_size = m_file_size;
674
675 bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
676 if ( ! is_cached)
677 sbuff.st_atime = 0;
678
679 return 0;
680}

References stat.

Referenced by XrdPfc::Cache::ConsiderCached(), and XrdPfc::Cache::Stat().

Here is the caller graph for this function:

◆ get_ref_cnt()

int XrdPfc::File::get_ref_cnt ( )
inline

Definition at line 286 of file XrdPfcFile.hh.

286{ return m_ref_cnt; }

◆ GetAccessCnt()

size_t XrdPfc::File::GetAccessCnt ( ) const
inline

Definition at line 276 of file XrdPfcFile.hh.

276{ return m_cfi.GetAccessCnt(); }

◆ GetBlockSize()

int XrdPfc::File::GetBlockSize ( ) const
inline

Definition at line 277 of file XrdPfcFile.hh.

277{ return m_cfi.GetBufferSize(); }

◆ GetFileSize()

long long XrdPfc::File::GetFileSize ( ) const
inline

Definition at line 267 of file XrdPfcFile.hh.

267{ return m_file_size; }

◆ GetLastAccessStats()

const Info::AStat * XrdPfc::File::GetLastAccessStats ( ) const
inline

Definition at line 275 of file XrdPfcFile.hh.

275{ return m_cfi.GetLastAccessStats(); }

◆ GetLocalPath()

const std::string & XrdPfc::File::GetLocalPath ( ) const
inline

Definition at line 262 of file XrdPfcFile.hh.

262{ return m_filename; }

Referenced by XrdPfc::Cache::AddWriteTask(), and XrdPfc::Cache::ReleaseFile().

Here is the caller graph for this function:

◆ GetLog()

XrdSysError * File::GetLog ( ) const

Definition at line 1730 of file XrdPfcFile.cc.

1731{
1732 return Cache::TheOne().GetLog();
1733}
XrdSysError * GetLog() const
Definition XrdPfc.hh:304
static const Cache & TheOne()
Definition XrdPfc.cc:137

References XrdPfc::Cache::GetLog(), and XrdPfc::Cache::TheOne().

Here is the call graph for this function:

◆ GetNBlocks()

int XrdPfc::File::GetNBlocks ( ) const
inline

Definition at line 278 of file XrdPfcFile.hh.

278{ return m_cfi.GetNBlocks(); }

◆ GetNDownloadedBlocks()

int XrdPfc::File::GetNDownloadedBlocks ( ) const
inline

Definition at line 279 of file XrdPfcFile.hh.

279{ return m_cfi.GetNDownloadedBlocks(); }

◆ GetPrefetchCountOnIO()

int XrdPfc::File::GetPrefetchCountOnIO ( IO * io)

◆ GetPrefetchedBytes()

long long XrdPfc::File::GetPrefetchedBytes ( ) const
inline

Definition at line 280 of file XrdPfcFile.hh.

280{ return m_prefetch_bytes; }

◆ GetPrefetchScore()

float File::GetPrefetchScore ( ) const

Definition at line 1725 of file XrdPfcFile.cc.

1726{
1727 return m_prefetch_score;
1728}

◆ GetRemoteLocations()

std::string File::GetRemoteLocations ( ) const

Definition at line 1749 of file XrdPfcFile.cc.

1750{
1751 std::string s;
1752 if ( ! m_remote_locations.empty())
1753 {
1754 size_t sl = 0;
1755 int nl = 0;
1756 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1757 {
1758 sl += i->size();
1759 }
1760 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1761 s = '[';
1762 int j = 1;
1763 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1764 {
1765 s += '"'; s += *i; s += '"';
1766 if (j < nl) s += ',';
1767 }
1768 s += ']';
1769 }
1770 else
1771 {
1772 s = "[]";
1773 }
1774 return s;
1775}

◆ GetTrace()

XrdSysTrace * File::GetTrace ( ) const

Definition at line 1735 of file XrdPfcFile.cc.

1736{
1737 return Cache::TheOne().GetTrace();
1738}
XrdSysTrace * GetTrace() const
Definition XrdPfc.hh:305

References XrdPfc::Cache::GetTrace(), and XrdPfc::Cache::TheOne().

Here is the call graph for this function:

◆ inc_ref_cnt()

int XrdPfc::File::inc_ref_cnt ( )
inline

Definition at line 287 of file XrdPfcFile.hh.

287{ return ++m_ref_cnt; }

◆ initiate_emergency_shutdown()

long long File::initiate_emergency_shutdown ( )

Definition at line 156 of file XrdPfcFile.cc.

157{
158 // Called from Cache::Unlink() when the file is currently open.
159 // Cache::Unlink is also called on FSync error and when wrong number of bytes
160 // is received from a remote read.
161 //
162 // From this point onward the file will not be written to, cinfo file will
163 // not be updated, and all new read requests will return -ENOENT.
164 //
165 // File's entry in the Cache's active map is set to nullptr and will be
166 // removed from there shortly, in any case, well before this File object
167 // shuts down. Cache::Unlink() also reports the appropriate purge event.
168
169 XrdSysCondVarHelper _lck(m_state_cond);
170
171 m_in_shutdown = true;
172
173 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
174 {
175 m_prefetch_state = kStopped;
176 cache()->DeRegisterPrefetchFile(this);
177 }
178
179 report_and_merge_delta_stats();
180
181 return m_st_blocks;
182}

Referenced by XrdPfc::Cache::UnlinkFile().

Here is the caller graph for this function:

◆ ioActive()

bool File::ioActive ( IO * io)

Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close().

Definition at line 244 of file XrdPfcFile.cc.

245{
246 // Returns true if delay is needed.
247
248 TRACEF(Debug, "ioActive start for io " << io);
249
250 std::string loc(io->GetLocation());
251
252 {
253 XrdSysCondVarHelper _lck(m_state_cond);
254
255 IoSet_i mi = m_io_set.find(io);
256
257 if (mi != m_io_set.end())
258 {
259 unsigned int n_active_reads = io->m_active_read_reqs;
260
261 TRACE(Info, "ioActive for io " << io <<
262 ", active_reads " << n_active_reads <<
263 ", active_prefetches " << io->m_active_prefetches <<
264 ", allow_prefetching " << io->m_allow_prefetching <<
265 ", ios_in_detach " << m_ios_in_detach);
266 TRACEF(Info,
267 "\tio_map.size() " << m_io_set.size() <<
268 ", block_map.size() " << m_block_map.size() << ", file");
269
270 insert_remote_location(loc);
271
272 io->m_allow_prefetching = false;
273 io->m_in_detach = true;
274
275 // Check if any IO is still available for prfetching. If not, stop it.
276 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
277 {
278 if ( ! select_current_io_or_disable_prefetching(false) )
279 {
280 TRACEF(Debug, "ioActive stopping prefetching after io " << io << " retreat.");
281 }
282 }
283
284 // On last IO, consider write queue blocks. Note, this also contains
285 // blocks being prefetched.
286
287 bool io_active_result;
288
289 if (n_active_reads > 0)
290 {
291 io_active_result = true;
292 }
293 else if (m_io_set.size() - m_ios_in_detach == 1)
294 {
295 io_active_result = ! m_block_map.empty();
296 }
297 else
298 {
299 io_active_result = io->m_active_prefetches > 0;
300 }
301
302 if ( ! io_active_result)
303 {
304 ++m_ios_in_detach;
305 }
306
307 TRACEF(Info, "ioActive for io " << io << " returning " << io_active_result << ", file");
308
309 return io_active_result;
310 }
311 else
312 {
313 TRACEF(Error, "ioActive io " << io <<" not found in IoSet. This should not happen.");
314 return false;
315 }
316 }
317}
#define TRACE(act, x)
Definition XrdTrace.hh:63
RAtomic_int m_active_read_reqs
number of active read requests
Definition XrdPfcIO.hh:67

References Debug, Error, XrdPfc::IO::GetLocation(), XrdPfc::IO::m_active_read_reqs, TRACE, and TRACEF.

Here is the call graph for this function:

◆ ioUpdated()

void File::ioUpdated ( IO * io)

Notification from IO that it has been updated (remote open).

Definition at line 235 of file XrdPfcFile.cc.

236{
237 std::string loc(io->GetLocation());
238 XrdSysCondVarHelper _lck(m_state_cond);
239 insert_remote_location(loc);
240}

References XrdPfc::IO::GetLocation().

Here is the call graph for this function:

◆ is_in_emergency_shutdown()

bool XrdPfc::File::is_in_emergency_shutdown ( )
inline

Definition at line 291 of file XrdPfcFile.hh.

291{ return m_in_shutdown; }

◆ lPath()

const char * File::lPath ( ) const

Log path.

Definition at line 1637 of file XrdPfcFile.cc.

1638{
1639 return m_filename.c_str();
1640}

Referenced by XrdPfc::Cache::ProcessWriteTasks(), and XrdPfc::Cache::RemoveWriteQEntriesFor().

Here is the caller graph for this function:

◆ Prefetch()

void File::Prefetch ( )

Definition at line 1652 of file XrdPfcFile.cc.

1653{
1654 // Check that block is not on disk and not in RAM.
1655 // TODO: Could prefetch several blocks at once!
1656 // blks_max could be an argument
1657
1658 BlockList_t blks;
1659
1660 TRACEF(DumpXL, "Prefetch() entering.");
1661 {
1662 XrdSysCondVarHelper _lck(m_state_cond);
1663
1664 if (m_prefetch_state != kOn)
1665 {
1666 return;
1667 }
1668
1669 if ( ! select_current_io_or_disable_prefetching(true) )
1670 {
1671 TRACEF(Error, "Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1672 return;
1673 }
1674
1675 // Select block(s) to fetch.
1676 for (int f = 0; f < m_num_blocks; ++f)
1677 {
1678 if ( ! m_cfi.TestBitWritten(f))
1679 {
1680 int f_act = f + m_offset / m_block_size;
1681
1682 BlockMap_i bi = m_block_map.find(f_act);
1683 if (bi == m_block_map.end())
1684 {
1685 Block *b = PrepareBlockRequest(f_act, *m_current_io, nullptr, true);
1686 if (b)
1687 {
1688 TRACEF(Dump, "Prefetch take block " << f_act);
1689 blks.push_back(b);
1690 // Note: block ref_cnt not increased, it will be when placed into write queue.
1691
1692 inc_prefetch_read_cnt(1);
1693 }
1694 else
1695 {
1696 // This shouldn't happen as prefetching stops when RAM is 70% full.
1697 TRACEF(Warning, "Prefetch allocation failed for block " << f_act);
1698 }
1699 break;
1700 }
1701 }
1702 }
1703
1704 if (blks.empty())
1705 {
1706 TRACEF(Debug, "Prefetch file is complete, stopping prefetch.");
1707 m_prefetch_state = kComplete;
1708 cache()->DeRegisterPrefetchFile(this);
1709 }
1710 else
1711 {
1712 (*m_current_io)->m_active_prefetches += (int) blks.size();
1713 }
1714 }
1715
1716 if ( ! blks.empty())
1717 {
1718 ProcessBlockRequests(blks);
1719 }
1720}
std::list< Block * > BlockList_t

References Debug, Error, and TRACEF.

Referenced by XrdPfc::Cache::Prefetch().

Here is the caller graph for this function:

◆ Read()

int File::Read ( IO * io,
char * buff,
long long offset,
int size,
ReadReqRH * rh )

Normal read.

Definition at line 845 of file XrdPfcFile.cc.

846{
847 // rrc_func is ONLY called from async processing.
848 // If this function returns anything other than -EWOULDBLOCK, rrc_func needs to be called by the caller.
849 // This streamlines implementation of synchronous IO::Read().
850
851 TRACEF(Dump, "Read() sid: " << Xrd::hex1 << rh->m_seq_id << " size: " << iUserSize);
852
853 m_state_cond.Lock();
854
855 if (m_in_shutdown || io->m_in_detach)
856 {
857 m_state_cond.UnLock();
858 return m_in_shutdown ? -ENOENT : -EBADF;
859 }
860
861 // Shortcut -- file is fully downloaded.
862
863 if (m_cfi.IsComplete())
864 {
865 m_state_cond.UnLock();
866 int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize);
867 if (ret > 0) {
868 XrdSysCondVarHelper _lck(m_state_cond);
869 m_delta_stats.AddBytesHit(ret);
870 check_delta_stats();
871 }
872 return ret;
873 }
874
875 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
876
877 return ReadOpusCoalescere(io, &readV, 1, rh, "Read() ");
878}
unsigned short m_seq_id
Definition XrdPfcFile.hh:53

References Xrd::hex1, XrdPfc::ReadReqRH::m_seq_id, and TRACEF.

Referenced by XrdPfc::IOFileBlock::Read().

Here is the caller graph for this function:

◆ ReadV()

int File::ReadV ( IO * io,
const XrdOucIOVec * readV,
int readVnum,
ReadReqRH * rh )

Vector read.

Definition at line 882 of file XrdPfcFile.cc.

883{
884 TRACEF(Dump, "ReadV() for " << readVnum << " chunks.");
885
886 m_state_cond.Lock();
887
888 if (m_in_shutdown || io->m_in_detach)
889 {
890 m_state_cond.UnLock();
891 return m_in_shutdown ? -ENOENT : -EBADF;
892 }
893
894 // Shortcut -- file is fully downloaded.
895
896 if (m_cfi.IsComplete())
897 {
898 m_state_cond.UnLock();
899 int ret = m_data_file->ReadV(const_cast<XrdOucIOVec*>(readV), readVnum);
900 if (ret > 0) {
901 XrdSysCondVarHelper _lck(m_state_cond);
902 m_delta_stats.AddBytesHit(ret);
903 check_delta_stats();
904 }
905 return ret;
906 }
907
908 return ReadOpusCoalescere(io, readV, readVnum, rh, "ReadV() ");
909}

References TRACEF.

◆ RefStats()

const Stats & XrdPfc::File::RefStats ( ) const
inline

Definition at line 281 of file XrdPfcFile.hh.

281{ return m_stats; }

◆ RemoveIO()

void File::RemoveIO ( IO * io)

Definition at line 388 of file XrdPfcFile.cc.

389{
390 // Called from Cache::ReleaseFile.
391
392 TRACEF(Debug, "RemoveIO() io = " << (void*)io);
393
394 time_t now = time(0);
395
396 m_state_cond.Lock();
397
398 IoSet_i mi = m_io_set.find(io);
399
400 if (mi != m_io_set.end())
401 {
402 if (mi == m_current_io)
403 {
404 ++m_current_io;
405 }
406
407 m_delta_stats.IoDetach(now - io->m_attach_time);
408 m_io_set.erase(mi);
409 --m_ios_in_detach;
410
411 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
412 {
413 TRACEF(Error, "RemoveIO() io = " << (void*)io << " Prefetching is not stopped/complete -- it should be by now.");
414 m_prefetch_state = kStopped;
415 cache()->DeRegisterPrefetchFile(this);
416 }
417 }
418 else
419 {
420 TRACEF(Error, "RemoveIO() io = " << (void*)io << " is NOT registered.");
421 }
422
423 m_state_cond.UnLock();
424}

References Debug, Error, and TRACEF.

Referenced by XrdPfc::Cache::ReleaseFile().

Here is the caller graph for this function:

◆ RequestSyncOfDetachStats()

void File::RequestSyncOfDetachStats ( )

Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.

Definition at line 321 of file XrdPfcFile.cc.

322{
323 XrdSysCondVarHelper _lck(m_state_cond);
324 m_detach_time_logged = false;
325}

◆ StopPrefetchingOnIO()

void XrdPfc::File::StopPrefetchingOnIO ( IO * io)

◆ Sync()

void File::Sync ( )

Sync file cache inf o and output data with disk.

Definition at line 1251 of file XrdPfcFile.cc.

1252{
1253 TRACEF(Dump, "Sync()");
1254
1255 int ret = m_data_file->Fsync();
1256 bool errorp = false;
1257 if (ret == XrdOssOK)
1258 {
1259 Stats loc_stats;
1260 {
1261 XrdSysCondVarHelper _lck(&m_state_cond);
1262 report_and_merge_delta_stats();
1263 loc_stats = m_stats;
1264 }
1265 m_cfi.WriteIOStat(loc_stats);
1266 m_cfi.Write(m_info_file, m_filename.c_str());
1267 int cret = m_info_file->Fsync();
1268 if (cret != XrdOssOK)
1269 {
1270 TRACEF(Error, "Sync cinfo file sync error " << cret);
1271 errorp = true;
1272 }
1273 }
1274 else
1275 {
1276 TRACEF(Error, "Sync data file sync error " << ret << ", cinfo file has not been updated");
1277 errorp = true;
1278 }
1279
1280 if (errorp)
1281 {
1282 TRACEF(Error, "Sync failed, unlinking local files and initiating shutdown of File object");
1283
1284 // Unlink will also call this->initiate_emergency_shutdown()
1285 Cache::GetInstance().UnlinkFile(m_filename, false);
1286
1287 XrdSysCondVarHelper _lck(&m_state_cond);
1288
1289 m_writes_during_sync.clear();
1290 m_in_sync = false;
1291
1292 return;
1293 }
1294
1295 int written_while_in_sync;
1296 bool resync = false;
1297 {
1298 XrdSysCondVarHelper _lck(&m_state_cond);
1299 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1300 {
1301 m_cfi.SetBitSynced(*i);
1302 }
1303 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1304 m_writes_during_sync.clear();
1305
1306 // If there were writes during sync and the file is now complete,
1307 // let us call Sync again without resetting the m_in_sync flag.
1308 if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown)
1309 resync = true;
1310 else
1311 m_in_sync = false;
1312 }
1313 TRACEF(Dump, "Sync "<< written_while_in_sync << " blocks written during sync." << (resync ? " File is now complete - resyncing." : ""));
1314
1315 if (resync)
1316 Sync();
1317}
#define XrdOssOK
Definition XrdOss.hh:54
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:136
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1264
void Sync()
Sync file cache inf o and output data with disk.
XrdPosixStats Stats

References Error, XrdPfc::Cache::GetInstance(), Sync(), TRACEF, XrdPfc::Cache::UnlinkFile(), and XrdOssOK.

Referenced by Sync().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ WriteBlockToDisk()

void File::WriteBlockToDisk ( Block * b)

Definition at line 1165 of file XrdPfcFile.cc.

1166{
1167 // write block buffer into disk file
1168 long long offset = b->m_offset - m_offset;
1169 long long size = b->get_size();
1170 ssize_t retval;
1171
1172 if (m_cfi.IsCkSumCache())
1173 if (b->has_cksums())
1174 retval = m_data_file->pgWrite(b->get_buff(), offset, size, b->ref_cksum_vec().data(), 0);
1175 else
1176 retval = m_data_file->pgWrite(b->get_buff(), offset, size, 0, 0);
1177 else
1178 retval = m_data_file->Write(b->get_buff(), offset, size);
1179
1180 if (retval < size)
1181 {
1182 if (retval < 0) {
1183 TRACEF(Error, "WriteToDisk() write error " << retval);
1184 } else {
1185 TRACEF(Error, "WriteToDisk() incomplete block write ret=" << retval << " (should be " << size << ")");
1186 }
1187
1188 XrdSysCondVarHelper _lck(m_state_cond);
1189
1190 dec_ref_count(b);
1191
1192 return;
1193 }
1194
1195 const int blk_idx = (b->m_offset - m_offset) / m_block_size;
1196
1197 // Set written bit.
1198 TRACEF(Dump, "WriteToDisk() success set bit for block " << b->m_offset << " size=" << size);
1199
1200 bool schedule_sync = false;
1201 {
1202 XrdSysCondVarHelper _lck(m_state_cond);
1203
1204 m_cfi.SetBitWritten(blk_idx);
1205
1206 if (b->m_prefetch)
1207 {
1208 m_cfi.SetBitPrefetch(blk_idx);
1209 }
1210 if (b->req_cksum_net() && ! b->has_cksums() && m_cfi.IsCkSumNet())
1211 {
1212 m_cfi.ResetCkSumNet();
1213 }
1214
1215 // Set synced bit or stash block index if in actual sync.
1216 // Synced state is only written out to cinfo file when data file is synced.
1217 if (m_in_sync)
1218 {
1219 m_writes_during_sync.push_back(blk_idx);
1220 }
1221 else
1222 {
1223 m_cfi.SetBitSynced(blk_idx);
1224 ++m_non_flushed_cnt;
1225 if ((m_cfi.IsComplete() || m_non_flushed_cnt >= Cache::GetInstance().RefConfiguration().m_flushCnt) &&
1226 ! m_in_shutdown)
1227 {
1228 schedule_sync = true;
1229 m_in_sync = true;
1230 m_non_flushed_cnt = 0;
1231 }
1232 }
1233 // As soon as the reference count is decreased on the block, the
1234 // file object may be deleted. Thus, to avoid holding both locks at a time,
1235 // we defer the ref count decrease until later if a sync is needed
1236 if (!schedule_sync) {
1237 dec_ref_count(b);
1238 }
1239 }
1240
1241 if (schedule_sync)
1242 {
1243 cache()->ScheduleFileSync(this);
1244 XrdSysCondVarHelper _lck(m_state_cond);
1245 dec_ref_count(b);
1246 }
1247}
int get_size() const
vCkSum_t & ref_cksum_vec()
bool req_cksum_net() const
char * get_buff() const
bool has_cksums() const
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:225
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition XrdPfc.hh:125

References Error, XrdPfc::Block::get_buff(), XrdPfc::Block::get_size(), XrdPfc::Cache::GetInstance(), XrdPfc::Block::has_cksums(), XrdPfc::Block::m_offset, XrdPfc::Block::m_prefetch, XrdPfc::Block::ref_cksum_vec(), XrdPfc::Block::req_cksum_net(), and TRACEF.

Referenced by XrdPfc::Cache::ProcessWriteTasks().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ BlockResponseHandler

friend class BlockResponseHandler
friend

Definition at line 205 of file XrdPfcFile.hh.

References BlockResponseHandler.

Referenced by BlockResponseHandler.

◆ Cache

friend class Cache
friend

Definition at line 204 of file XrdPfcFile.hh.

References Cache.

Referenced by Cache.

◆ DirectResponseHandler

friend class DirectResponseHandler
friend

Definition at line 206 of file XrdPfcFile.hh.

References DirectResponseHandler.

Referenced by DirectResponseHandler.


The documentation for this class was generated from the following files: