39#include <netinet/in.h>
44#include "XrdVersion.hh"
83 int XrdCmsProtocol::readWait = 1000;
152 static int thePort = -1;
153 char *cfn = pi->
ConfigFN, buff[128];
158 {
if (pi->
Port && pi->
Port != thePort)
159 {sprintf(buff,
"%d disallowed; only using port %d",pi->
Port,thePort);
160 Say.
Emsg(
"Config",
"Alternate port", buff);
173 {
while(*parms ==
' ') parms++;
176 while(*parms !=
' ' && *parms) parms++;
183 Say.
Say(
"Copr. 2003-2020 Stanford University/SLAC cmsd.");
187 if (cfn) cfn = strdup(cfn);
212 {Reply_Delay(Arg, theDelay);
return 0;}
218 Say.
Emsg(
"Protocol",
"invalid request code from", myNode->
Ident);
219 else if ((etxt = (myNode->*Method)(Arg)))
221 {
DEBUGR(etxt+1 <<
" delayed " <<Arg.waitVal <<
" seconds");
223 }
else if (*etxt ==
'.')
return -ECONNABORTED;
241 if ((dlen = lp->
Peek((
char *)&Hdr,
sizeof(Hdr),readWait)) !=
sizeof(Hdr))
242 {
if (dlen <= 0) lp->
setEtext(
"login not received");
249 {
if (!strncmp((
char *)&Hdr,
"login ", 6))
250 lp->
setEtext(
"protocol version 1 unsupported");
265void XrdCmsProtocol::Pander(
const char *manager,
int mport)
270 time_t ddmsg = time(0);
271 unsigned int Mode, Role = 0;
274 int Lvl=0, Netopts=0, waits=6, tries=6, fails=0, xport=mport;
275 int rc, fsUtil, KickedOut, blRedir, myNID = Manager->
ManTree->
Register();
278 const char *Reason = 0, *manp = manager;
279 const int manblen =
sizeof(manbuff);
284 DEBUG(myRole <<
" services to " <<manager <<
':' <<mport);
288 memset(&loginData, 0,
sizeof(loginData));
296 loginData.
HoldTime=
static_cast<int>(getpid());
330 {
Say.
Emsg(
"Pander",
"Suspend state still active."); waits=6;}
335 {
DEBUG(
"restarting at root node " <<manager <<
':' <<mport);
336 manp = manager; xport = mport; Lvl = 0;
337 }
else if (rc < 0)
break;
339 DEBUG(
"trying to connect to lvl " <<Lvl <<
' ' <<manp <<
':' <<xport);
343 {
Say.
Emsg(
"Pander",
"Is hostname", manp,
"spelled correctly "
344 "or just not running?");
348 else {tries = 6; Netopts = 0;}
349 if ((Lvl = Manager->
myMans->
Next(xport,manbuff,manblen)))
351 else {
if (manp != manager) fails++;
356 Netopts = 0; tries = waits = 6;
362 {
char *oldName = strdup(Link->
Host());
363 Say.
Emsg(
"Protocol", oldName,
"is missing an IPv6 ptr record; "
364 "attempting local registration as", manp);
367 "registration failed; address mismatch.");
370 "is now locally registered as", manp);
377 if (!(myNode = Manager->
Add(Link, Lvl+1, terminate)))
379 if (terminate)
break;
380 Say.
Emsg(
"Pander",
"Unable to obtain node object.");
389 if (fails >= 6 && manp == manager)
398 Data = loginData; Data.
Mode =
Mode | myShare | myTimeZ;
402 const char *sname = cgiEnv.Get(
"site");
403 Say.
Emsg(
"Protocol",
"Logged into", sname, Link->
Name());
405 Manager->
Verify(Link, (
const char *)Data.
SID, sname);
406 Reason = Dispatch(isUp, TimeOut, 2);
414 if (Data.
SID) {free(Data.SID); Data.SID = 0;}
415 if (Data.
envCGI) {free(Data.envCGI); Data.envCGI = 0;}
420 : (Reason ? Reason :
"lost connection")));
428 Sync(); Manager->
Delete(myNode); myNode = 0; Reason = 0;
443 if (!KickedOut && (Lvl = Manager->
myMans->
Next(xport,manbuff,manblen)))
444 {manp = manbuff; continue;}
446 if (manp != manager) fails++;
447 manp = manager; xport = mport;
477 if ((Routing=Admit()))
479 if (RSlot) {myWay = isLateral; tOut = -1;}
482 if ((Reason = Dispatch(myWay, tOut, 2))) lp->
setEtext(Reason);
493 if (!myNode)
return -1;
503 myNode->
UnLock();
delete myNode; myNode = 0;
529 bool isLoggedIn = loggedIn != 0;
532 ProtLink = ProtStack;
539 if (reason)
Say.
Emsg(
"Protocol", lp->
ID,
"logged out;", reason);
540 else Say.
Emsg(
"Protocol", lp->
ID,
"logged out.");
542 if (reason)
Say.
Emsg(
"Protocol", lp->
ID,
"login failed;", reason);
570char *getAltName(
char *sid,
char *buff,
int blen)
572 char *atsign, *spacec, *retval = 0;
575 if ((atsign = index(sid,
'@')))
577 if ((spacec = index(atsign,
' ')))
580 if (n > 3 && n < blen)
581 {strcpy(buff, atsign);
594 char *
envP = 0, envBuff[256], myBuff[4096];
600 int addedp = 0, Status = 0, isPeer = 0, isProxy = 0;
601 int isMan, isServ, isSubm, wasSuspended = 0, Share = 100, tZone = 0;
606 {snprintf(envBuff,
sizeof(envBuff),
"site=%s",
Config.
mySite);
621 if (!Source.Admit(Link, Data,
Config.
mySID, envP))
return 0;
633 {
const char *altName = cgiEnv.Get(
"myHN");
634 const char *altType =
"stated mapping";
635 char hBF[256], *oldName = strdup(Link->
Host());
636 if (!altName) {altName = getAltName((
char *)Data.
SID, hBF,
sizeof(hBF));
637 altType =
"inferred mapping";
639 Say.
Emsg(
"Protocol",
"DNS lookup for", oldName,
"failed; "
640 "IPv6 ptr record missing!");
642 {
Say.
Emsg(
"Protocol", oldName,
"did not supply a fallback "
643 "mapping; using IPv6 address.");
646 snprintf(buff,
sizeof(buff),
"%s -> %s", oldName, altName);
647 Say.
Emsg(
"Protocol",
"Attempting to use", altType, buff);
649 {
Say.
Emsg(
"Protocol", buff, altType,
"failed; address mismatch.");
652 "is now locally registered as", altName);
662 return Admit_Redirector(wasSuspended);
668 return Login_Failed(
"configuration disallows subscribers");
697 else return Login_Failed(
"invalid login role");
708 Reason =
"configuration only allows proxies";
710 else if (isProxy) Reason =
"configuration disallows proxies";
712 Reason =
"configuration disallows peers";
713 if (Reason)
return Login_Failed(Reason);
723 Say.
Emsg(
"Protocol",Link->
Name(),
"has not yet found a cluster slot!");
729 (
const char *)Data.
SID, (
const char *)Data.
ifList)))
731 myNode->
RoleID =
static_cast<char>(roleID);
739 if (Share > 0) myNode->
setShare(Share);
751 <<
" MB Util=" <<Data.
fsUtil <<
" Share=" <<Share
752 <<
" TZone=" <<tZone);
765 ConfigCheck(Data.
Paths);
766 while((tp = thePaths.GetLine()))
767 {
DEBUG(Link->
Name() <<
" adding path: " <<tp);
768 if (!(tp = thePaths.GetToken())
769 || !(pp = thePaths.GetToken()))
break;
770 if (!(newmask = AddPath(myNode, tp, pp)))
771 return Login_Failed(
"invalid exported path");
800 isNBSQ = Link->
setNB();
804 const char *sname = cgiEnv.Get(
"site");
805 const char *lfmt = (myNode->
isMan > 1 ?
"Standby%s%s" :
"Primary%s%s");
806 snprintf(envBuff,
sizeof(envBuff),lfmt,(sname ?
" ":
""),(sname ? sname :
""));
811 Say.
Emsg(
"Protocol", myNode->
Ident,
"system ID:", (
const char *)Data.
SID);
823XrdCmsRouting *XrdCmsProtocol::Admit_Redirector(
int wasSuspended)
825 EPNAME(
"Admit_Redirector");
831 myRole =
"redirector";
841 Say.
Emsg(
"Protocol",myNode->
Ident,
"login failed; too many redirectors.");
849 myNode->
Send((
char *)&newState,
sizeof(newState));
854 DEBUG(myNode->
Ident <<
" assigned slot " <<RSlot);
863 const char *pType,
const char *
Path)
870 {
if (
'r' == *pType) pinfo.
rovec = nP->
Mask();
900 if ((xp = ProtStack)) ProtStack = xp->ProtLink;
906 if (!xp)
Say.
Emsg(
"Protocol",
"No more protocol objects.");
907 else xp->Init(theRole, uMan, theMan, thePort);
918void XrdCmsProtocol::ConfigCheck(
unsigned char *theConfig)
920 unsigned int ConfigID;
925 if (!theConfig) ConfigID = 1;
948const char *XrdCmsProtocol::Dispatch(Bearing cDir,
int maxWait,
int maxTries)
951 static const int ReqSize =
sizeof(
CmsRRHdr);
954 const char *toRC = (cDir == isUp ?
"manager not active"
955 :
"server not responding");
956 const char *myArgs, *myArgt;
966do{
if ((rc = Link->
RecvAll((
char *)&Data->
Request, ReqSize, maxWait)) < 0)
968 "blacklisted" :
"request read failed");
969 if (!toLeft--)
return toRC;
972 return "server blacklisted w/ redirect";
973 if (!SendPing())
return "server unreachable";
983 return "server blacklisted w/ redirect";
984 if (!SendPing())
return "server unreachable";
995 <<
" dlen=" <<Data->
Dlen);
996 if (!(Data->
Dlen)) {myArgs = myArgt = 0;}
997 else {
if (Data->
Dlen > maxReqSize)
998 {
Say.
Emsg(
"Protocol",
"Request args too long from",Link->
Name());
999 return "protocol error";
1003 {
Say.
Emsg(
"Protocol",
"No buffers to serve", Link->
Name());
1004 return "insufficient buffers";
1007 return (rc == -ETIMEDOUT ?
"read timed out" :
"read failed");
1015 Say.
Emsg(
"Protocol",Link->
Name(),
"sent an invalid request -", buff);
1026 {Reply_Error(*Data,
kYR_EINVAL,
"badly formed request");
1039 {
if ((rc =
Execute(*Data)) && rc == -ECONNABORTED)
return "disconnected";}
1045 else Say.
Emsg(
"Protocol",
"No jobs to serve", Link->
Name());
1050 return "logic error";
1064 if (myRole) Pander(myMan, myManPort);
1071void XrdCmsProtocol::Init(
const char *iRole,
XrdCmsManager *uMan,
1072 const char *iMan,
int iPort)
1091XrdCmsRouting *XrdCmsProtocol::Login_Failed(
const char *reason)
1110 if (refWait && refCount <= 0) {refWait->
Post(); refWait = 0;}
1126 struct iovec ioB[2] = {{(
char *)&Data.
Request,
sizeof(Data.
Request)},
1134 "msg TTL exceeded for", Data.
Path);
1147 "aborted; no servers handling", Data.
Path);
1154 {
if (!(amask = pinfo.
rwvec))
1156 "aborted; no r/w servers handling", Data.
Path);
1188 Link->
Send((
char *)&Resp,
sizeof(Resp));
1189 }
else act =
" skip";
1191 DEBUG(myNode->
Ident <<act <<
" delay " <<ntohl(theDelay));
1198void XrdCmsProtocol::Reply_Error(
XrdCmsRRData &Data,
int ecode,
const char *etext)
1202 int n = strlen(etext)+1;
1206 htons((
unsigned short int)(
sizeof(
kXR_unt32)+n))},
1207 htonl(
static_cast<unsigned int>(ecode))};
1208 struct iovec ioV[2] = {{(
char *)&Resp,
sizeof(Resp)},
1209 {(
char *)etext, (
size_t)n}};
1212 }
else act =
" skip";
1214 DEBUG(myNode->
Ident <<act <<
" err " <<ecode <<
' ' <<etext);
1221bool XrdCmsProtocol::SendPing()
1233 if (Link->
Send((
char *)&Ping,
sizeof(Ping)) < 0)
return false;
1241void XrdCmsProtocol::Sync()
1249 if (refCount <= 0) refMutex.
UnLock();
1250 else {refWait = &mySem;
1251 DEBUG(
"Waiting for " <<refCount <<
' ' <<myNode->
Ident
1252 <<
" thread(s) to end.");
XrdProtocol * XrdgetProtocol(const char *pname, char *parms, XrdProtocol_Config *pi)
XrdVERSIONINFO(XrdgetProtocol, cmsd)
int XrdgetProtocolPort(const char *pname, char *parms, XrdProtocol_Config *pi)
unsigned long long SMask_t
void Bounce(SMask_t smask, int SNum)
void SLock(bool dolock, bool wrmode=true)
int Broadsend(SMask_t smask, XrdCms::CmsRRHdr &Hdr, void *Data, int Dlen)
void ResetRef(SMask_t smask, bool isLocked=false)
SMask_t Broadcast(SMask_t, const struct iovec *, int, int tot=0)
XrdCmsNode * Add(XrdLink *lp, int dport, int Status, int sport, const char *theNID, const char *theIF)
void Remove(XrdCmsNode *theNode)
int Stats(char *bfr, int bln)
int Statt(char *bfr, int bln)
int Configure1(int argc, char **argv, char *cfn)
int Configure0(XrdProtocol_Config *pi)
static XrdCmsJob * Alloc(XrdCmsProtocol *, XrdCmsRRData *)
static int Login(XrdLink *Link, XrdCms::CmsLoginData &Data, int timeout=-1)
void Add(const XrdNetAddr *netAddr, char *redList, int manport, int lvl)
int Next(int &port, char *buff, int bsz)
int Connect(int nID, XrdCmsNode *nP)
int Trying(int nID, int Lvl)
XrdCmsNode * Add(XrdLink *lp, int Lvl, bool &xit)
void Rerun(char *newMans)
void Finished(const char *manP, int mPort)
void Delete(XrdCmsNode *nodeP)
void Remove(XrdCmsNode *nP, const char *reason=0)
bool Verify(XrdLink *lP, const char *sid, const char *sname)
unsigned int TotalSpace(unsigned int &minfree)
int FreeSpace(int &tutil)
static const char allowsRW
void setShare(int shrval)
int Send(const char *buff, int blen=0)
static const char allowsSS
void setVersion(unsigned short vnum)
static const char isSuspend
void setSlot(short rslot)
static const char isDoomed
static const char isBlisted
void Remove(SMask_t mask)
int Find(const char *pname, XrdCmsPInfo &masks)
SMask_t Insert(const char *pname, XrdCmsPInfo *pinfo)
int Parse(XrdCms::CmsLoginData *Data, const char *Aps, const char *Apt)
void Recycle(XrdLink *lp, int consec, const char *reason)
int Execute(XrdCmsRRData &Data)
int Stats(char *buff, int blen, int do_sync=0)
static XrdCmsProtocol * Alloc(const char *theRole="", XrdCmsManager *mP=0, const char *theMan=0, int thePort=0)
XrdProtocol * Match(XrdLink *lp)
static XrdCmsRRData * Objectify(XrdCmsRRData *op=0)
short Add(XrdCmsNode *nP)
static const char * Name(RoleID rid)
NodeMethod_t getMethod(int Code)
const char * getName(int Code)
const char *(XrdCmsNode::*) NodeMethod_t(XrdCmsRRData &)
int getRoute(int reqCode)
static const char FES_Suspend
static const char All_Suspend
XrdLink * Connect(const char *host, int port, int opts=0, int timeout=-1)
void Secure(XrdNetSecurity *secp)
void Serialize()
Wait for all outstanding requests to be completed on the link.
int setEtext(const char *text)
int Peek(char *buff, int blen, int timeout=-1)
int Close(bool defer=false)
void setID(const char *userid, int procid)
const XrdNetAddr * NetAddr() const
int RecvAll(char *buff, int blen, int timeout=-1)
XrdNetAddrInfo * AddrInfo()
char * ID
Pointer to the client's link identity.
int Send(const char *buff, int blen)
bool Register(const char *hName)
const char * Host() const
const char * Name() const
bool isIPType(IPType ipType) const
static uint32_t CRC32(const unsigned char *data, int count)
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
static void Snooze(int seconds)
static const unsigned char kYR_Version
static const int CMS_isSuper
static const int CMS_noStage
static const int CMS_isMan
static const int CMS_isPeer
static const int CMS_isProxy
static const int CMS_Suspend