XRootD
Loading...
Searching...
No Matches
XrdClEcHandler.cc
Go to the documentation of this file.
1/*
2 * XrdClEcHandler.cc
3 *
4 * Created on: Nov 2, 2021
5 * Author: simonm
6 */
7
8
9
11
12namespace XrdCl
13{
14
16 if (getenv("XrdCl_EC_X_RATIO"))
17 {
18 xRatio = atoi(getenv("XrdCl_EC_X_RATIO"));
19 }
20 else
21 {
22 xRatio = 1;
23 }
24 };
25
27 XrdCl::LocationInfo &newList,
28 uint32_t n)
29 {
30 TryInitExportPaths();
31 AddServers(oldList);
32 UpdateSpaceInfo();
33
34 lock.lock();
35 if (oldList.GetSize() > n && ! BlindSelect())
36 {
37 for (uint32_t j=0; j<ServerList.size(); j++)
38 {
39 for (uint32_t i=0; i<oldList.GetSize(); i++)
40 {
41 if (ServerList[j].address == oldList.At(i).GetAddress() &&
43 {
44 newList.Add(oldList.At(i));
45 if (newList.GetSize() == n)
46 {
47 lock.unlock();
48 return;
49 }
50 }
51 }
52 }
53 }
54 else
55 {
56 for (uint32_t i=0; i<oldList.GetSize(); i++)
57 {
58 if (Exists(oldList.At(i)) &&
60 {
61 newList.Add(oldList.At(i));
62 }
63 }
64 }
65 lock.unlock();
66 }
67
69 {
70 for (uint32_t j=0; j<ServerList.size(); j++)
71 {
72 ServerList[j].Dump();
73 }
74 };
75
76 void ServerSpaceInfo::TryInitExportPaths()
77 {
78 if (initExportPaths) return;
79 lock.lock();
80 if (! initExportPaths && getenv("XRDEXPORTS"))
81 {
82 std::istringstream p(getenv("XRDEXPORTS"));
83 std::string s;
84 while(std::getline(p, s, ' '))
85 {
86 ExportPaths.push_back(s);
87 }
88 initExportPaths = true;
89 }
90 lock.unlock();
91 };
92
93 uint64_t ServerSpaceInfo::GetFreeSpace(const std::string addr)
94 {
95 XrdCl::FileSystem fs(addr);
96 XrdCl::Buffer queryArgs(1024), *queryResp = nullptr;
97
98 for (uint32_t i=0; i<ExportPaths.size(); i++)
99 {
100 queryArgs.FromString(ExportPaths[i]);
101 XrdCl::XRootDStatus st = fs.Query(XrdCl::QueryCode::Space, queryArgs, queryResp, 0);
102 if (st.IsOK())
103 {
104 std::string resp = queryResp->ToString();
105 int b = resp.find("oss.free=", 0);
106 int e = resp.find("&", b);
107 uint64_t s = 0;
108 std::stringstream sstream0( resp.substr(b+9, e-(b+9)) );
109 sstream0 >> s;
110 if (queryResp) delete queryResp;
111 return s;
112 }
113 if (queryResp) delete queryResp;
114 }
115 return 0;
116 };
117
118 bool ServerSpaceInfo::BlindSelect() {
119 auto ms_since_epoch = std::chrono::system_clock::now().time_since_epoch() /
120 std::chrono::nanoseconds(1);
121 return (ms_since_epoch % 10 > xRatio ? true : false);
122 };
123
124 void ServerSpaceInfo::UpdateSpaceInfo()
125 {
126 if (! initExportPaths) return;
127 time_t t = time(NULL);
128 if (t < lastUpdateT + 300) return;
129 lock.lock();
130 if (t > lastUpdateT + 300)
131 {
132 for (uint32_t j=0; j<ServerList.size(); j++)
133 ServerList[j].freeSpace = GetFreeSpace(ServerList[j].address);
134 std::sort(ServerList.begin(), ServerList.end());
135 lastUpdateT = t;
136 }
137 lock.unlock();
138 };
139
140 bool ServerSpaceInfo::Exists(XrdCl::LocationInfo::Location &loc)
141 {
142 for (uint32_t j=0; j<ServerList.size(); j++)
143 if (loc.GetAddress() == ServerList[j].address)
144 return true;
145 return false;
146 };
147
148 void ServerSpaceInfo::AddServers(XrdCl::LocationInfo &locInfo)
149 {
150 lock.lock();
151 for (uint32_t i=0; i<locInfo.GetSize(); i++)
152 {
153 if (Exists(locInfo.At(i))) continue;
154 if (locInfo.At(i).GetType() == XrdCl::LocationInfo::ServerOnline)
155 {
156 FreeSpace s;
157 s.address = locInfo.At(i).GetAddress();
158 s.freeSpace = GetFreeSpace( s.address );
159 ServerList.push_back(s);
160 std::sort(ServerList.begin(), ServerList.end());
161 }
162 }
163 lock.unlock();
164 };
165
166 EcHandler* GetEcHandler( const URL &headnode, const URL &redirurl )
167 {
168 const URL::ParamsMap &params = redirurl.GetParams();
169 // make sure all the xrdec. tokens are present and the values are sane
170 URL::ParamsMap::const_iterator itr = params.find( "xrdec.nbdta" );
171 if( itr == params.end() ) return nullptr;
172 uint8_t nbdta = std::stoul( itr->second );
173
174 itr = params.find( "xrdec.nbprt" );
175 if( itr == params.end() ) return nullptr;
176 uint8_t nbprt = std::stoul( itr->second );
177
178 itr = params.find( "xrdec.blksz" );
179 if( itr == params.end() ) return nullptr;
180 uint64_t blksz = std::stoul( itr->second );
181
182 itr = params.find( "xrdec.plgr" );
183 if( itr == params.end() ) return nullptr;
184 std::vector<std::string> plgr;
185 Utils::splitString( plgr, itr->second, "," );
186 if( plgr.size() < nbdta + nbprt ) return nullptr;
187
188 itr = params.find( "xrdec.objid" );
189 if( itr == params.end() ) return nullptr;
190 std::string objid = itr->second;
191
192 itr = params.find( "xrdec.format" );
193 if( itr == params.end() ) return nullptr;
194 size_t format = std::stoul( itr->second );
195 if( format != 1 ) return nullptr; // TODO use constant
196
197 std::vector<std::string> dtacgi;
198 itr = params.find( "xrdec.dtacgi" );
199 if( itr != params.end() )
200 {
201 Utils::splitString( dtacgi, itr->second, "," );
202 if( plgr.size() != dtacgi.size() ) return nullptr;
203 }
204
205 std::vector<std::string> mdtacgi;
206 itr = params.find( "xrdec.mdtacgi" );
207 if( itr != params.end() )
208 {
209 Utils::splitString( mdtacgi, itr->second, "," );
210 if( plgr.size() != mdtacgi.size() ) return nullptr;
211 }
212
213 itr = params.find( "xrdec.cosc" );
214 if( itr == params.end() ) return nullptr;
215 std::string cosc_str = itr->second;
216 if( cosc_str != "true" && cosc_str != "false" ) return nullptr;
217 bool cosc = cosc_str == "true";
218
219 std::string ckstype;
220 itr = params.find( "xrdec.cksum" );
221 if( cosc && itr == params.end() ) return nullptr;
222 if( cosc )
223 ckstype = itr->second;
224
225 std::string chdigest;
226 itr = params.find( "xrdec.chdigest" );
227 if( itr == params.end() )
228 chdigest = "crc32c";
229 else
230 chdigest = itr->second;
231 bool usecrc32c = ( chdigest == "crc32c" );
232
233 bool nomtfile = false;
234 itr = params.find( "xrdec.nomtfile" );
235 if( itr != params.end() )
236 nomtfile = ( itr->second == "true" );
237
238 XrdEc::ObjCfg *objcfg = new XrdEc::ObjCfg( objid, nbdta, nbprt, blksz / nbdta, usecrc32c );
239 objcfg->plgr = std::move( plgr );
240 objcfg->dtacgi = std::move( dtacgi );
241 objcfg->mdtacgi = std::move( mdtacgi );
242 objcfg->nomtfile = nomtfile;
243
244 std::unique_ptr<CheckSumHelper> cksHelper( cosc ? new CheckSumHelper( "", ckstype ) : nullptr );
245 if( cksHelper )
246 {
247 auto st = cksHelper->Initialize();
248 if( !st.IsOK() ) return nullptr;
249 }
250
251 return new EcHandler( headnode, objcfg, std::move( cksHelper ) );
252 }
253
254}
255
bool Exists
Binary blob representation.
std::string ToString() const
Convert the buffer to a string.
Check sum helper for stdio.
Send file/filesystem queries to an XRootD cluster.
LocationType GetType() const
Get location type.
const std::string & GetAddress() const
Get address.
Path location info.
uint32_t GetSize() const
Get number of locations.
Location & At(uint32_t index)
Get the location at index.
void Add(const Location &location)
Add a location.
@ ServerOnline
server node where the file is online
void SelectLocations(XrdCl::LocationInfo &oldList, XrdCl::LocationInfo &newList, uint32_t n)
URL representation.
Definition XrdClURL.hh:31
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33
const ParamsMap & GetParams() const
Get the URL params.
Definition XrdClURL.hh:239
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition XrdClUtils.hh:56
EcHandler * GetEcHandler(const URL &headnode, const URL &redirurl)
@ Space
Query logical space stats.
bool IsOK() const
We're fine.
std::vector< std::string > mdtacgi
std::vector< std::string > plgr
std::vector< std::string > dtacgi