113 for (i = 0; i < 4; i++)
130 return "request previously failed";
146const char *XrdFrmTransfer::Fetch()
149 static const mode_t fMode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
150 static const int crOpts = (O_CREAT|O_TRUNC)<<8|
XRDOSS_mkpath;
156 const char *eTxt, *retMsg = 0;
157 char lfnpath[MAXPATHLEN+1024+512+8], *Lfn, Rfn[MAXPATHLEN+256], *theSrc;
159 int iXfr, pdSZ, lfnEnd, rc, isURL = 0, doRM = 0;
166 return "lfn2rfn failed";
168 isURL = (*Rfn !=
'/');
174 {
if (xfrCmd[2]) iXfr = 2;
175 else return "url copies not configured";
177 if (xfrCmd[0]) iXfr = 0;
178 else return "non-url copies not configured";
183 if ((eTxt = ffCheck()))
return eTxt;
189 {
DEBUG(xfrP->
PFN <<
" exists; not fetched.");
196 lfnEnd = strlen(Lfn);
197 strlcpy(lfnpath, Lfn,
sizeof(lfnpath)-8);
199 {strcpy(&lfnpath[lfnEnd],
".anew");
200 strcpy(&xfrP->
PFN[xfrP->
pfnEnd],
".anew");
205 cmdArg.theCmd = xfrCmd[iXfr];
207 cmdArg.theSrc = theSrc;
208 cmdArg.theDst = xfrP->
PFN;
210 if (!SetupCmd(&cmdArg))
return "incoming transfer setup failed";
221 {
Say.Emsg(
"Fetch", rc,
"create placeholder for", lfnpath);
222 return "create failed";
236 if (!(rc = cmdArg.theCmd->Run(pdBuff, pdSZ)))
238 {
Say.Emsg(
"Fetch", lfnpath,
"fetched but not resident!"); fSize = 0;}
239 else {fSize = pfnStat.st_size;
241 FetchDone(lfnpath, pfnStat, rc);
252 if (rc == -2) {xfrP->
RetCode = 2; retMsg =
"file not found";}
253 else retMsg =
"fetch failed";
260 {time_t eNow = time(0);
262 inqT =
static_cast<int>(xfrET - time_t(xfrP->
reqData.
addTOD));
263 if ((xfrT =
static_cast<int>(eNow - xfrET)) <= 0) xfrT = 1;
267 sprintf(sbuff,
"Got: %lld qt: %d xt: %d up: ", fSize, inqT, xfrT);
268 lfnpath[lfnEnd] =
'\0';
272 {
if (rc < 0) rc = -rc;
273 snprintf(lfnpath+lfnEnd,
sizeof(lfnpath)-lfnEnd-1,
274 "\n&tod=%lld&sz=%lld&qt=%d&tm=%d&op=%c&rc=%d%s%s",
275 static_cast<long long>(eNow), fSize, inqT, xfrT,
276 xfrP->
Act, rc, (pdSZ ?
"&pd=" :
""), (pdSZ ? pdBuff :
""));
290const char *XrdFrmTransfer::FetchDone(
char *lfnpath,
struct stat &
Stat,
int &rc)
298 cpyInfo.
Attr.cpyTime =
static_cast<long long>(
Stat.st_mtime);
299 if ((rc = cpyInfo.
Set(xfrP->
PFN)))
300 Say.Emsg(
"Fetch", rc,
"set copy time xattr on", xfrP->
PFN);
306 {
struct stat lkfStat;
307 strcpy(&xfrP->
PFN[xfrP->
pfnEnd+5],
".lock");
308 if (!
stat(xfrP->
PFN, &lkfStat))
310 else {
struct utimbuf tbuff;
311 tbuff.actime = tbuff.modtime =
Stat.st_mtime;
312 if ((rc = utime(xfrP->
PFN, &tbuff)))
313 Say.Emsg(
"Fetch", rc,
"set utime on", xfrP->
PFN);
321 Say.Emsg(
"Fetch", rc,
"rename", lfnpath);
327 return (rc ?
"Failed" : 0);
334const char *XrdFrmTransfer::ffCheck()
341 {
char ffPath[MAXPATHLEN+8];
348 strcpy(&xfrP->
PFN[xfrP->
pfnEnd],
".fail");
363void XrdFrmTransfer::ffMake(
int nofile){
364 static const mode_t fMode = S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH;
365 static const mode_t dMode = S_IXUSR|S_IWGRP|S_IXGRP|S_IXOTH | fMode;
366 char ffPath[MAXPATHLEN+8], *ffP;
379 strcpy(&xfrP->
PFN[xfrP->
pfnEnd],
".fail");
386 myFD =
open(ffP, O_CREAT, fMode);
390 {
struct utimbuf tbuff;
391 tbuff.actime = time(0); tbuff.modtime = 2;
404 if (parg) xP->
Start(*(
int *)parg);
434 else qWant = (
void *)&anyQ;
438 {
Say.Emsg(
"main", retc,
"create xfr thread");
return 0;}
478 *cmdBuff =
'\0'; n =
sizeof(cmdBuff) - 4; cP = cmdBuff;
479 for (i = 0; i < k; i++)
482 {
Say.Emsg(
"Setup",E2BIG,
"build command line for", xfrP->
reqData.
LFN);
485 strcpy(cP, pdata[i]); cP += pdlen[i];
517 {
char buff1[280], buff2[80];
518 sprintf(buff1,
"%s for %s", xfrP->
RetCode ?
"failed" :
"complete",
520 if (xfrP->
RetCode == 0) *buff2 = 0;
521 else sprintf(buff2,
"; %s", (Msg ? Msg :
"reason unknown"));
525 <<(xfrP->
RetCode ?
" failed " :
" complete ")
527 <<
' ' <<(Msg ? Msg :
""));
538int XrdFrmTransfer::TrackDC(
char *Lfn,
char *Mdp,
char *Rfn)
541 char *FName, *Slash, *Slush = 0, *begRfn = Rfn;
547 && (Slash = index(Rfn,
'/')) && *(Slash+1) ==
'/'
548 && (Slash = index(Slash+2,
'/')) && *(Slash+1) ==
'/') begRfn = Slash+1;
552 if (!(FName = rindex(begRfn,
'/')) || FName == begRfn)
return 0;
553 *FName = 0; Slash = Slush = FName;
558 while(Slash != begRfn && !pTab.
Find(Rfn))
559 {
do {Slash--;}
while(Slash != begRfn && *Slash !=
'/');
560 if (Slush) *Slush =
'/';
561 *Slash = 0; Slush = Slash;
569 if (Slash == begRfn) n = 0;
570 else n = (n >= 0 ? Slash - begRfn : FName - begRfn);
571 sprintf(Mdp,
"%d", n);
580int XrdFrmTransfer::TrackDC(
char *Rfn)
586 if (!(Slash = rindex(Rfn,
'/')) || Slash == Rfn)
return 0;
602const char *XrdFrmTransfer::Throw()
606 struct stat begStat, endStat;
609 const char *eTxt, *retMsg = 0;
610 char Rfn[MAXPATHLEN+256], *lfnpath = xfrP->
reqData.
LFN, *theDest;
613 int iXfr, isURL, pdSZ, rc, mDP = -1;
619 return "lfn2rfn failed";
621 isURL = (*Rfn !=
'/');
627 {
if (xfrCmd[3]) iXfr = 3;
628 else return "url copies not configured";
630 if (xfrCmd[1]) iXfr = 1;
631 else return "non-url copies not configured";
637 return (xfrP->
reqFQ ?
"file not found" : 0);
641 if ((eTxt = ffCheck()))
return eTxt;
648 if (isMigr && (eTxt = ThrowOK(&Chk)))
649 {
if (*eTxt)
return eTxt;
657 cmdArg.theCmd = xfrCmd[iXfr];
659 cmdArg.theDst = theDest;
660 cmdArg.theSrc = xfrP->
PFN;
663 mDP = TrackDC(lfnpath+xfrP->
reqData.
LFO, cmdArg.theMDP, Rfn);
664 if (!SetupCmd(&cmdArg))
return "outgoing transfer setup failed";
674 if ((rc = cmdArg.theCmd->Run(pdBuff, pdSZ)))
675 {
if (isMigr) ffMake(rc == -2);
676 retMsg =
"copy failed";
681 if (!rc && mDP >= 0) TrackDC(Rfn);
689 {
Say.Emsg(
"Throw", lfnpath,
"transferred but not found!");
690 retMsg =
"unable to verify copy";
692 if (begStat.st_mtime != endStat.st_mtime
693 || begStat.st_size != endStat.st_size)
694 {
Say.Emsg(
"Throw", lfnpath,
"modified during transfer!");
695 retMsg =
"file modified during copy"; rc = 1;
706 else if (isMigr) ThrowDone(&Chk, endStat.st_mtime);
713 {time_t eNow = time(0);
715 long long Fsize = begStat.st_size;
716 inqT =
static_cast<int>(xfrET - time_t(xfrP->
reqData.
addTOD));
717 if ((xfrT =
static_cast<int>(eNow - xfrET)) <= 0) xfrT = 1;
721 sprintf(sbuff,
"Put: %lld qt: %d xt: %d up: ",Fsize,inqT,xfrT);
725 {
char monBuff[MAXPATHLEN+1024+512+8];
726 if (rc < 0) rc = -rc;
727 snprintf(monBuff,
sizeof(monBuff),
728 "%s\n&tod=%lld&sz=%lld&qt=%d&tm=%d&op=%c&rc=%d%s%s",
729 xfrP->
reqData.
LFN,
static_cast<long long>(eNow), Fsize,
730 inqT, xfrT, xfrP->
Act, rc,
731 (pdSZ ?
"&pd=" :
""), (pdSZ ? pdBuff :
""));
745void XrdFrmTransfer::Throwaway()
754 DEBUG(
"removed '" <<xfrP->
PFN <<
"'");
764void XrdFrmTransfer::ThrowDone(
XrdFrmTranChk *cP, time_t endTime)
771 cpyInfo.
Attr.cpyTime =
static_cast<long long>(endTime);
773 Say.Emsg(
"Throw",
"Unable to set copy time xattr for", xfrP->
PFN);
775 {strcpy(&xfrP->
PFN[xfrP->
pfnEnd],
".lock");
781 strcpy(&xfrP->
PFN[xfrP->
pfnEnd],
".lock");
783 {
struct utimbuf tbuff;
784 tbuff.actime = tbuff.modtime = endTime;
785 if (utime(xfrP->
PFN, &tbuff))
786 Say.Emsg(
"Throw", errno,
"set utime for", xfrP->
PFN);
801 fdClose() : Num(-1) {}
802 ~fdClose() {
if (Num >= 0)
close(Num);}
811 if ((fnFD.Num = XrdSysFD_Open(xfrP->
PFN, O_RDWR)) < 0)
812 return "unable to open file";
818 {strcpy(&xfrP->
PFN[xfrP->
pfnEnd],
".lock");
819 statRC =
stat(xfrP->
PFN, &lokStat);
822 if (statRC && !
Config.
runNew)
return "missing lock file";
829 cpyInfo.
Attr.cpyTime =
static_cast<long long>(lokStat.st_mtime);
830 else if (cpyInfo.
Get(xfrP->
PFN, fnFD.Num) <= 0)
831 return "unable to get copy time xattr";
836 if (cpyInfo.
Attr.cpyTime >=
static_cast<long long>(cP->
Stat->st_mtime))
838 return "already migrated";
845 cP->
lkfx = statRC == 0;
XrdOucPup XrdCmsParser::Pup & Say
void * InitXfer(void *parg)
#define XRDSYSTHREAD_BIND
const kXR_char XROOTD_MON_MAPMIGR
const kXR_char XROOTD_MON_MAPSTAG
int Get(const char *iName, char *buff, int blen)
int Init(const char *qPath)
static void Rm(const char *Path, int islfn=0)
static void Add(const char *tID, const char *Path, long long Size, mode_t Mode)
struct XrdFrmConfig::Cmd xfrCmd[4]
int NeedsCTA(const char *Lfn)
XrdNetCmsNotify * cmsPath
static const int cmdStats
static const int cmdAlloc
int RemotePath(const char *oldp, char *newp, int newpsz)
int Stat(const char *xLfn, const char *xPfn, struct stat *buff)
XrdOucName2Name * the_N2N
static kXR_unt32 Map(char code, const char *uname, const char *path)
static const char * checkFF(const char *Path)
static void Done(XrdFrmXfrJob *xP, const char *Msg)
static XrdFrmXfrJob * Get(int ioQType)
int Have(const char *Path, int isPfn=1)
int Gone(const char *Path, int isPfn=1)
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual int Rename(const char *oPath, const char *nPath, XrdOucEnv *oEnvP=0, XrdOucEnv *nEnvP=0)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
void Put(const char *varname, const char *value)
T * Add(const char *KeyVal, T *KeyData, const int LifeTime=0, XrdOucHash_Options opt=Hash_default)
T * Find(const char *KeyVal, time_t *KeyTime=0)
int Subs(XrdOucMsubsInfo &Info, char **Data, int *Dlen)
int Setup(const char *prog, XrdSysError *errP=0, int(*Proc)(XrdOucStream *, char **, int)=0)
int Serialize(int Opts=0)
static int makePath(char *path, mode_t mode, bool reset=false)
int Get(const char *Path, int fd=-1)
int Set(const char *Path, int fd=-1)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
XrdFrmTranArg(XrdOucEnv *Env)
XrdFrmTranChk(struct stat *sP)