XRootD
Loading...
Searching...
No Matches
XrdClXCpSrc.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3// Author: Michal Simon <michal.simon@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
25#include "XrdCl/XrdClXCpSrc.hh"
26#include "XrdCl/XrdClXCpCtx.hh"
27#include "XrdCl/XrdClLog.hh"
30#include "XrdCl/XrdClUtils.hh"
31
32#include <cmath>
33#include <cstdlib>
34
35namespace XrdCl
36{
37
39{
40 public:
41
42 ChunkHandler( XCpSrc *src, uint64_t offset, uint64_t size, char *buffer, File *handle, bool usepgrd ) :
43 pSrc( src->Self() ), pOffset( offset ), pSize( size ), pBuffer( buffer ), pHandle( handle ), pUsePgRead( usepgrd )
44 {
45
46 }
47
48 virtual ~ChunkHandler()
49 {
50 pSrc->Delete();
51 }
52
53 virtual void HandleResponse( XRootDStatus *status, AnyObject *response )
54 {
55 PageInfo *chunk = 0;
56 if( response ) // get the response
57 {
58 ToPgInfo( response, chunk );
59 delete response;
60 }
61
62 if( !chunk && status->IsOK() ) // if the response is not there make sure the status is error
63 {
64 *status = XRootDStatus( stError, errInternal );
65 }
66
67 if( status->IsOK() && chunk->GetLength() != pSize ) // the file size on the server is different
68 { // than the one specified in metalink file
69 *status = XRootDStatus( stError, errDataError );
70 }
71
72 if( !status->IsOK() )
73 {
74 delete[] pBuffer;
75 delete chunk;
76 chunk = 0;
77 }
78
79 pSrc->ReportResponse( status, chunk, pHandle );
80
81 delete this;
82 }
83
84 private:
85
86 void ToPgInfo( AnyObject *response, PageInfo *&chunk )
87 {
88 if( pUsePgRead )
89 {
90 response->Get( chunk );
91 response->Set( ( int* )0 );
92 }
93 else
94 {
95 ChunkInfo *rsp = nullptr;
96 response->Get( rsp );
97 chunk = new PageInfo( rsp->offset, rsp->length, rsp->buffer );
98 }
99 }
100
101 XCpSrc *pSrc;
102 uint64_t pOffset;
103 uint64_t pSize;
104 char *pBuffer;
105 File *pHandle;
106 bool pUsePgRead;
107};
108
109
110XCpSrc::XCpSrc( uint32_t chunkSize, uint8_t parallel, int64_t fileSize, XCpCtx *ctx ) :
111 pChunkSize( chunkSize ), pParallel( parallel ), pFileSize( fileSize ), pThread(),
112 pCtx( ctx->Self() ), pFile( 0 ), pCurrentOffset( 0 ), pBlkEnd( 0 ), pDataTransfered( 0 ), pRefCount( 1 ),
113 pRunning( false ), pStartTime( 0 ), pTransferTime( 0 ), pUsePgRead( false )
114{
115}
116
117XCpSrc::~XCpSrc()
118{
119 pCtx->RemoveSrc( this );
120 pCtx->Delete();
121}
122
124{
125 pRunning = true;
126 int rc = pthread_create( &pThread, 0, Run, this );
127 if( rc )
128 {
129 pRunning = false;
130 pCtx->RemoveSrc( this );
131 pCtx->Delete();
132 }
133}
134
135void* XCpSrc::Run( void* arg )
136{
137 XCpSrc *me = static_cast<XCpSrc*>( arg );
138 me->StartDownloading();
139 me->Delete();
140 return 0;
141}
142
143void XCpSrc::StartDownloading()
144{
145 XRootDStatus st = Initialize();
146 if( !st.IsOK() )
147 {
148 pRunning = false;
149 // notify those who wait for the file
150 // size, they won't get it from this
151 // source
152 pCtx->NotifyInitExpectant();
153 // put a null chunk so we are sure
154 // the main thread doesn't get stuck
155 // at the sync queue
156 pCtx->PutChunk( 0 );
157 return;
158 }
159
160 // start counting transfer time
161 pStartTime = time( 0 );
162
163 while( pRunning )
164 {
165 st = ReadChunks();
166 if( st.IsOK() && st.code == suPartial )
167 {
168 // we have only ongoing transfers
169 // so we can already ask for new block
170 if( GetWork().IsOK() ) continue;
171 }
172 else if( st.IsOK() && st.code == suDone )
173 {
174 // if we are done, try to get more work,
175 // if successful continue
176 if( GetWork().IsOK() ) continue;
177 // keep track of the time before we go idle
178 pTransferTime += time( 0 ) - pStartTime;
179 // check if the overall download process is
180 // done, this makes the thread wait until
181 // either the download is done, or a source
182 // went to error, or a 60s timeout has been
183 // reached (the timeout is there so we can
184 // check if a source degraded in the meanwhile
185 // and now we can steal from it)
186 if( !pCtx->AllDone() )
187 {
188 // reset start time after pause
189 pStartTime = time( 0 );
190 continue;
191 }
192 // stop counting
193 // otherwise we are done here
194 pRunning = false;
195 return;
196 }
197
198 XRootDStatus *status = pReports.Get();
199 if( !status->IsOK() )
200 {
201 Log *log = DefaultEnv::GetLog();
202 std::string myHost = URL( pUrl ).GetHostName();
203 log->Error( UtilityMsg, "Failed to read chunk from %s: %s", myHost.c_str(), status->GetErrorMessage().c_str() );
204
205 if( !Recover().IsOK() )
206 {
207 delete status;
208 pRunning = false;
209 // notify idle sources, they might be
210 // interested in taking over my workload
211 pCtx->NotifyIdleSrc();
212 // put a null chunk so we are sure
213 // the main thread doesn't get stuck
214 // at the sync queue
215 pCtx->PutChunk( 0 );
216 // if we have data we need to wait for someone to take over
217 // unless the extreme copy is over, in this case we don't care
218 while( HasData() && !pCtx->AllDone() );
219
220 return;
221 }
222 }
223 delete status;
224 }
225}
226
227XRootDStatus XCpSrc::Initialize()
228{
229 Log *log = DefaultEnv::GetLog();
230 XRootDStatus st;
231
232 do
233 {
234 if( !pCtx->GetNextUrl( pUrl ) )
235 {
236 log->Error( UtilityMsg, "Failed to initialize XCp source, no more replicas to try" );
237 return XRootDStatus( stError );
238 }
239
240 log->Debug( UtilityMsg, "Opening %s for reading", pUrl.c_str() );
241
242 std::string value;
243 DefaultEnv::GetEnv()->GetString( "ReadRecovery", value );
244
245 pFile = new File();
246 pFile->SetProperty( "ReadRecovery", value );
247
248 st = pFile->Open( pUrl, OpenFlags::Read );
249 if( !st.IsOK() )
250 {
251 log->Warning( UtilityMsg, "Failed to open %s for reading: %s", pUrl.c_str(), st.GetErrorMessage().c_str() );
252 DeletePtr( pFile );
253 continue;
254 }
255
256 URL url( pUrl );
257 if( ( !url.IsLocalFile() && !pFile->IsSecure() ) ||
258 ( url.IsLocalFile() && url.IsMetalink() ) )
259 {
260 std::string datasrv;
261 pFile->GetProperty( "DataServer", datasrv );
262 //--------------------------------------------------------------------
263 // Decide whether we can use PgRead
264 //--------------------------------------------------------------------
266 XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
267 pUsePgRead = XrdCl::Utils::HasPgRW( datasrv ) && ( val == 1 );
268 }
269
270 if( pFileSize < 0 )
271 {
272 StatInfo *statInfo = 0;
273 st = pFile->Stat( false, statInfo );
274 if( !st.IsOK() )
275 {
276 log->Warning( UtilityMsg, "Failed to stat %s: %s", pUrl.c_str(), st.GetErrorMessage().c_str() );
277 DeletePtr( pFile );
278 continue;
279 }
280 pFileSize = statInfo->GetSize();
281 pCtx->SetFileSize( pFileSize );
282 delete statInfo;
283 }
284 }
285 while( !st.IsOK() );
286
287 std::pair<uint64_t, uint64_t> p = pCtx->GetBlock();
288 pCurrentOffset = p.first;
289 pBlkEnd = p.second + p.first;
290
291 return st;
292}
293
294XRootDStatus XCpSrc::Recover()
295{
296 Log *log = DefaultEnv::GetLog();
297 XRootDStatus st;
298
299 do
300 {
301 if( !pCtx->GetNextUrl( pUrl ) )
302 {
303 log->Error( UtilityMsg, "Failed to initialize XCp source, no more replicas to try" );
304 return XRootDStatus( stError );
305 }
306
307 log->Debug( UtilityMsg, "Opening %s for reading", pUrl.c_str() );
308
309 std::string value;
310 DefaultEnv::GetEnv()->GetString( "ReadRecovery", value );
311
312 pFile = new File();
313 pFile->SetProperty( "ReadRecovery", value );
314
315 st = pFile->Open( pUrl, OpenFlags::Read );
316 if( !st.IsOK() )
317 {
318 DeletePtr( pFile );
319 log->Warning( UtilityMsg, "Failed to open %s for reading: %s", pUrl.c_str(), st.GetErrorMessage().c_str() );
320 }
321
322 URL url( pUrl );
323 if( ( !url.IsLocalFile() && pFile->IsSecure() ) ||
324 ( url.IsLocalFile() && url.IsMetalink() ) )
325 {
326 std::string datasrv;
327 pFile->GetProperty( "DataServer", datasrv );
328 //--------------------------------------------------------------------
329 // Decide whether we can use PgRead
330 //--------------------------------------------------------------------
332 XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
333 pUsePgRead = XrdCl::Utils::HasPgRW( datasrv ) && ( val == 1 );
334 }
335 }
336 while( !st.IsOK() );
337
338 pRecovered.insert( pOngoing.begin(), pOngoing.end() );
339 pOngoing.clear();
340
341 // since we have a brand new source, we need
342 // to restart transfer rate statistics
343 pTransferTime = 0;
344 pStartTime = time( 0 );
345 pDataTransfered = 0;
346
347 return st;
348}
349
350XRootDStatus XCpSrc::ReadChunks()
351{
352 XrdSysMutexHelper lck( pMtx );
353
354 while( pOngoing.size() < pParallel && !pRecovered.empty() )
355 {
356 std::pair<uint64_t, uint64_t> p;
357 std::map<uint64_t, uint64_t>::iterator itr = pRecovered.begin();
358 p = *itr;
359 pOngoing.insert( p );
360 pRecovered.erase( itr );
361
362 char *buffer = new char[p.second];
363 ChunkHandler *handler = new ChunkHandler( this, p.first, p.second, buffer, pFile, pUsePgRead );
364 XRootDStatus st = pUsePgRead
365 ? pFile->PgRead( p.first, p.second, buffer, handler )
366 : pFile->Read( p.first, p.second, buffer, handler );
367 if( !st.IsOK() )
368 {
369 delete[] buffer;
370 delete handler;
371 ReportResponse( new XRootDStatus( st ), 0, pFile );
372 return st;
373 }
374 }
375
376 while( pOngoing.size() < pParallel && pCurrentOffset < pBlkEnd )
377 {
378 uint64_t chunkSize = pChunkSize;
379 if( pCurrentOffset + chunkSize > pBlkEnd )
380 chunkSize = pBlkEnd - pCurrentOffset;
381 pOngoing[pCurrentOffset] = chunkSize;
382 char *buffer = new char[chunkSize];
383 ChunkHandler *handler = new ChunkHandler( this, pCurrentOffset, chunkSize, buffer, pFile, pUsePgRead );
384 XRootDStatus st = pUsePgRead
385 ? pFile->PgRead( pCurrentOffset, chunkSize, buffer, handler )
386 : pFile->Read( pCurrentOffset, chunkSize, buffer, handler );
387 pCurrentOffset += chunkSize;
388 if( !st.IsOK() )
389 {
390 delete[] buffer;
391 delete handler;
392 ReportResponse( new XRootDStatus( st ), 0, pFile );
393 return st;
394 }
395 }
396
397 if( pOngoing.empty() ) return XRootDStatus( stOK, suDone );
398
399 if( pRecovered.empty() && pCurrentOffset >= pBlkEnd ) return XRootDStatus( stOK, suPartial );
400
401 return XRootDStatus( stOK, suContinue );
402}
403
404void XCpSrc::ReportResponse( XRootDStatus *status, PageInfo *chunk, File *handle )
405{
406 XrdSysMutexHelper lck( pMtx );
407 bool ignore = false;
408
409 if( status->IsOK() )
410 {
411 // if the status is OK remove it from
412 // the list of ongoing transfers, if it
413 // was not on the list we ignore the
414 // response (this could happen due to
415 // source change or stealing)
416 ignore = !pOngoing.erase( chunk->GetOffset() );
417 }
418 else if( FilesEqual( pFile, handle ) )
419 {
420 // if the status is NOT OK, and pFile
421 // match the handle it means that we see
422 // an error for the first time, map the
423 // broken file to the number of outstanding
424 // asynchronous operations and reset the pointer
425 pFailed[pFile] = pOngoing.size();
426 pFile = 0;
427 }
428 else
429 DeletePtr( status );
430
431 if( !FilesEqual( pFile, handle ) )
432 {
433 // if the pFile does not match the handle,
434 // it means that this response came from
435 // a broken source, decrement the count of
436 // outstanding async operations for this src,
437 --pFailed[handle];
438 if( pFailed[handle] == 0 )
439 {
440 // if this was the last outstanding operation
441 // close the file and delete it
442 pFailed.erase( handle );
443 XRootDStatus st = handle->Close();
444 delete handle;
445 }
446 }
447
448 lck.UnLock();
449
450 if( status ) pReports.Put( status );
451
452 if( ignore )
453 {
454 DeleteChunk( chunk );
455 return;
456 }
457
458 if( chunk )
459 {
460 pDataTransfered += chunk->GetLength();
461 pCtx->PutChunk( chunk );
462 }
463}
464
465void XCpSrc::Steal( XCpSrc *src )
466{
467 if( !src ) return;
468
469 XrdSysMutexHelper lck1( pMtx ), lck2( src->pMtx );
470
471 Log *log = DefaultEnv::GetLog();
472 std::string myHost = URL( pUrl ).GetHostName(), srcHost = URL( src->pUrl ).GetHostName();
473
474 if( !src->pRunning )
475 {
476 // the source we are stealing from is in error state, we can have everything
477
478 pRecovered.insert( src->pOngoing.begin(), src->pOngoing.end() );
479 pRecovered.insert( src->pRecovered.begin(), src->pRecovered.end() );
480 pCurrentOffset = src->pCurrentOffset;
481 pBlkEnd = src->pBlkEnd;
482
483 src->pOngoing.clear();
484 src->pRecovered.clear();
485 src->pCurrentOffset = 0;
486 src->pBlkEnd = 0;
487
488 // a broken source might be waiting for
489 // someone to take over his data, so we
490 // need to notify
491 pCtx->NotifyIdleSrc();
492
493 log->Debug( UtilityMsg, "%s: Stealing everything from %s", myHost.c_str(), srcHost.c_str() );
494
495 return;
496 }
497
498 // the source we are stealing from is just slower, only take part of its work
499 // so we want a fraction of its work we want for ourself
500 uint64_t myTransferRate = TransferRate(), srcTransferRate = src->TransferRate();
501 if( myTransferRate == 0 ) return;
502 double fraction = double( myTransferRate ) / double( myTransferRate + srcTransferRate );
503
504 if( src->pCurrentOffset < src->pBlkEnd )
505 {
506 // the source still has a block of data
507 uint64_t blkSize = src->pBlkEnd - src->pCurrentOffset;
508 uint64_t steal = static_cast<uint64_t>( round( fraction * blkSize ) );
509 // if after stealing there will be less than one chunk
510 // take everything
511 if( blkSize - steal <= pChunkSize )
512 steal = blkSize;
513
514 pCurrentOffset = src->pBlkEnd - steal;
515 pBlkEnd = src->pBlkEnd;
516 src->pBlkEnd -= steal;
517
518 log->Debug( UtilityMsg, "%s: Stealing fraction (%f) of block from %s", myHost.c_str(), fraction, srcHost.c_str() );
519
520 return;
521 }
522
523 if( !src->pRecovered.empty() )
524 {
525 size_t count = static_cast<size_t>( round( fraction * src->pRecovered.size() ) );
526 while( count-- )
527 {
528 std::map<uint64_t, uint64_t>::iterator itr = src->pRecovered.begin();
529 pRecovered.insert( *itr );
530 src->pRecovered.erase( itr );
531 }
532
533 log->Debug( UtilityMsg, "%s: Stealing fraction (%f) of recovered chunks from %s", myHost.c_str(), fraction, srcHost.c_str() );
534
535 return;
536 }
537
538 // * a fraction < 0.5 means that we are actually slower (so it does
539 // not make sense to steal ongoing's from someone who's faster)
540 // * a fraction ~ 0.5 means that we have more or less the same transfer
541 // rate (similarly, it doesn't make sense to steal)
542 // * the source needs to be really faster (though, this is an arbitrary
543 // choice) to actually steal something
544 if( !src->pOngoing.empty() && fraction > 0.7 )
545 {
546 size_t count = static_cast<size_t>( round( fraction * src->pOngoing.size() ) );
547 while( count-- )
548 {
549 std::map<uint64_t, uint64_t>::iterator itr = src->pOngoing.begin();
550 pRecovered.insert( *itr );
551 src->pOngoing.erase( itr );
552 }
553
554 log->Debug( UtilityMsg, "%s: Stealing fraction (%f) of ongoing chunks from %s", myHost.c_str(), fraction, srcHost.c_str() );
555 }
556}
557
558XRootDStatus XCpSrc::GetWork()
559{
560 std::pair<uint64_t, uint64_t> p = pCtx->GetBlock();
561
562 if( p.second > 0 )
563 {
564 XrdSysMutexHelper lck( pMtx );
565 pCurrentOffset = p.first;
566 pBlkEnd = p.first + p.second;
567
568 Log *log = DefaultEnv::GetLog();
569 std::string myHost = URL( pUrl ).GetHostName();
570 log->Debug( UtilityMsg, "%s got next block", myHost.c_str() );
571
572 return XRootDStatus();
573 }
574
575 XCpSrc *wLink = pCtx->WeakestLink( this );
576 Steal( wLink );
577
578 // if we managed to steal something declare success
579 if( pCurrentOffset < pBlkEnd || !pRecovered.empty() ) return XRootDStatus();
580 // otherwise return an error
581 return XRootDStatus( stError, errInvalidOp );
582}
583
585{
586 time_t duration = pTransferTime + time( 0 ) - pStartTime;
587 return pDataTransfered / ( duration + 1 ); // add one to avoid floating point exception
588}
589
590} /* namespace XrdCl */
XrdOucString File
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
ChunkHandler(XCpSrc *src, uint64_t offset, uint64_t size, char *buffer, File *handle, bool usepgrd)
virtual ~ChunkHandler()
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetString(const std::string &key, std::string &value)
Definition XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
A file.
Definition XrdClFile.hh:46
XRootDStatus Read(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition XrdClFile.cc:206
bool IsSecure() const
Check if the file is using an encrypted connection.
Definition XrdClFile.cc:857
XRootDStatus Open(const std::string &url, OpenFlags::Flags flags, Access::Mode mode, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition XrdClFile.cc:99
bool GetProperty(const std::string &name, std::string &value) const
Definition XrdClFile.cc:878
XRootDStatus PgRead(uint64_t offset, uint32_t size, void *buffer, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition XrdClFile.cc:245
XRootDStatus Stat(bool force, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
Definition XrdClFile.cc:177
bool SetProperty(const std::string &name, const std::string &value)
Definition XrdClFile.cc:867
Handle an async response.
static bool HasPgRW(const XrdCl::URL &url)
void NotifyInitExpectant()
void NotifyIdleSrc()
bool GetNextUrl(std::string &url)
void RemoveSrc(XCpSrc *src)
XCpSrc * WeakestLink(XCpSrc *exclude)
void PutChunk(PageInfo *chunk)
void SetFileSize(int64_t size)
std::pair< uint64_t, uint64_t > GetBlock()
friend class ChunkHandler
static void DeleteChunk(PageInfo *&chunk)
XCpSrc(uint32_t chunkSize, uint8_t parallel, int64_t fileSize, XCpCtx *ctx)
uint64_t TransferRate()
const uint16_t suPartial
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const uint16_t errInvalidOp
const int DefaultCpUsePgWrtRd
const uint64_t UtilityMsg
const uint16_t suDone
const uint16_t suContinue
XrdSysError Log
Definition XrdConfig.cc:112
@ Read
Open only for reading.
uint32_t GetLength() const
Get the data length.
bool IsOK() const
We're fine.