54 pJob(job), pProgress(progress), pCurrentJob(currentJob),
55 pTotalJobs(totalJobs), pSem(sem),
68 virtual void Run(
void * )
77 pProgress->BeginJob( pCurrentJob, pTotalJobs,
89 gettimeofday( &bTOD, 0 );
97 st = pJob->Run( pProgress );
104 pJob->GetResults()->Get(
"LastURL", url );
107 auto itr = cgi.find(
"tried" );
108 if( itr != cgi.end() )
110 std::string tried = itr->second;
111 if( tried[tried.size() - 1] !=
',' ) tried +=
',';
112 tried += lastURL.GetHostName();
113 cgi[
"tried"] = tried;
116 cgi[
"tried"] = lastURL.GetHostName();
118 std::string recoveryRedir;
119 pJob->GetResults()->Get(
"WrtRecoveryRedir", recoveryRedir );
123 pJob->GetProperties()->Get(
"target", target );
125 trgURL.SetHostName( recRedirURL.GetHostName() );
126 trgURL.SetPort( recRedirURL.GetPort() );
127 trgURL.SetProtocol( recRedirURL.GetProtocol() );
128 trgURL.SetParams( cgi );
129 pJob->GetProperties()->Set(
"target", trgURL.GetURL() );
139 if( !st.
IsOK() && pRetryCnt > 0 &&
144 if( pRetryPolicy ==
"continue" )
146 pJob->GetProperties()->Set(
"force",
false );
147 pJob->GetProperties()->Set(
"continue",
true );
151 pJob->GetProperties()->Set(
"force",
true );
152 pJob->GetProperties()->Set(
"continue",
false );
162 pJob->GetResults()->Set(
"status", st );
169 std::vector<std::string> sources;
170 pJob->GetResults()->Get(
"sources", sources );
176 gettimeofday( &i.
eTOD, 0 );
182 pProgress->EndJob( pCurrentJob, pJob->GetResults() );
191 uint16_t pCurrentJob;
196 std::string pRetryPolicy;
202 struct CopyProcessImpl
204 std::vector<PropertyList> pJobProperties;
205 std::vector<PropertyList*> pJobResults;
206 std::vector<CopyJob*> pJobs;
237 properties.
Get<std::string>(
"jobType" ) ==
"configuration" )
239 if( pImpl->pJobProperties.size() > 0 &&
240 pImpl->pJobProperties.rbegin()->HasProperty(
"jobType" ) &&
241 pImpl->pJobProperties.rbegin()->Get<std::string>(
"jobType" ) ==
"configuration" )
244 PropertyList::PropertyMap::const_iterator it;
245 for( it = properties.
begin(); it != properties.
end(); ++it )
246 config.
Set( it->first, it->second );
249 pImpl->pJobProperties.push_back( properties );
262 pImpl->pJobProperties.push_back( properties );
265 const char *bools[] = {
"target",
"force",
"posc",
"coerce",
"makeDir",
266 "zipArchive",
"xcp",
"preserveXAttr",
"rmOnBadCksum",
267 "continue",
"zipAppend",
"doServer", 0};
268 for(
int i = 0; bools[i]; ++i )
270 p.
Set( bools[i],
false );
273 p.
Set(
"thirdParty",
"none" );
276 p.
Set(
"checkSumMode",
"none" );
281 pImpl->pJobProperties.pop_back();
283 "checkSumType not specified" );
290 std::string checkSumType;
291 p.
Get(
"checkSumType", checkSumType );
292 std::transform(checkSumType.begin(), checkSumType.end(),
293 checkSumType.begin(), ::tolower);
294 p.
Set(
"checkSumType", checkSumType );
301 env->
GetInt(
"CPParallelChunks", val );
302 p.
Set(
"parallelChunks", val );
308 env->
GetInt(
"CPChunkSize", val );
309 p.
Set(
"chunkSize", val );
315 env->
GetInt(
"XCpBlockSize", val );
316 p.
Set(
"xcpBlockSize", val );
322 env->
GetInt(
"CPInitTimeout", val );
323 p.
Set(
"initTimeout", val );
329 env->
GetInt(
"CPTPCTimeout", val );
330 p.
Set(
"tpcTimeout", val );
336 env->
GetInt(
"CPTimeout", val );
337 p.
Set(
"cpTimeout", val );
341 p.
Set(
"dynamicSource",
false );
346 if( !p.
HasProperty(
"xrateThreshold" ) || p.
Get<
long long>(
"xrateThreshold" ) == 0 )
349 env->
GetInt(
"XRateThreshold", val );
350 p.
Set(
"xrateThreshold", val );
359 pImpl->pJobResults.push_back( results );
369 std::vector<PropertyList>::iterator it;
372 (
unsigned long long) pImpl->pJobProperties.size() );
374 std::map<std::string, uint32_t> targetFlags;
376 for( it = pImpl->pJobProperties.begin(); it != pImpl->pJobProperties.end(); ++it, ++i )
381 props.
Get<std::string>(
"jobType" ) ==
"configuration" )
387 props.
Get(
"source", tmp );
399 if( !st.
IsOK() )
return st;
404 URL::ParamsMap::const_iterator itr = cgi.find(
"xrdcl.unzip" );
405 if( itr != cgi.end() )
407 props.
Set(
"zipArchive",
true );
408 props.
Set(
"zipSource", itr->second );
411 props.
Get(
"target", tmp );
419 bool targetIsDir =
false;
420 props.
Get(
"targetIsDir", targetIsDir );
424 std::string path = target.
GetPath() +
'/';
428 props.
Get(
"zipArchive", isZip );
431 props.
Get(
"zipSource", fn );
444 size_t pos = fn.rfind(
'/' );
445 if( pos != std::string::npos )
446 fn = fn.substr( pos + 1 );
454 props.
Get(
"thirdParty", tmp );
467 res->
Set(
"status", st );
477 res->
Set(
"status", st );
494 pImpl->pJobs.push_back( job );
507 uint8_t parallelThreads = 1;
508 if( pImpl->pJobProperties.size() > 0 &&
509 pImpl->pJobProperties.rbegin()->HasProperty(
"jobType" ) &&
510 pImpl->pJobProperties.rbegin()->Get<std::string>(
"jobType" ) ==
"configuration" )
514 parallelThreads = (uint8_t)config.
Get<
int>(
"parallel" );
520 std::vector<CopyJob *>::iterator it;
521 uint16_t currentJob = 1;
522 uint16_t totalJobs = pImpl->pJobs.size();
527 if( parallelThreads == 1 )
531 for( it = pImpl->pJobs.begin(); it != pImpl->pJobs.end(); ++it )
533 QueuedCopyJob j( *it, progress, currentJob, totalJobs );
544 if( !err.
IsOK() )
return err;
551 uint16_t workers = std::min( (uint16_t)parallelThreads,
552 (uint16_t)pImpl->pJobs.size() );
557 "Unable to start job manager" );
560 std::vector<QueuedCopyJob*> queued;
561 for( it = pImpl->pJobs.begin(); it != pImpl->pJobs.end(); ++it )
563 QueuedCopyJob *j =
new QueuedCopyJob( *it, progress, currentJob,
566 queued.push_back( j );
571 std::vector<QueuedCopyJob*>::iterator itQ;
572 for( itQ = queued.begin(); itQ != queued.end(); ++itQ )
578 "Unable to stop job manager" );
580 for( itQ = queued.begin(); itQ != queued.end(); ++itQ )
583 for( it = pImpl->pJobs.begin(); it != pImpl->pJobs.end(); ++it )
586 if( !st.
IsOK() )
return st;
592 void CopyProcess::CleanUpJobs()
594 std::vector<CopyJob*>::iterator itJ;
595 for( itJ = pImpl->pJobs.begin(); itJ != pImpl->pJobs.end(); ++itJ )
606 pImpl->pJobs.clear();
const URL & GetSource() const
Get source.
virtual ~CopyProcess()
Destructor.
CopyProcess()
Constructor.
XRootDStatus Run(CopyProgressHandler *handler)
Run the copy jobs.
XRootDStatus AddJob(const PropertyList &properties, PropertyList *results)
Interface for copy progress notification.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetString(const std::string &key, std::string &value)
bool GetInt(const std::string &key, int &value)
bool Finalize()
Finalize the job manager, clear the queues.
bool Start()
Start the workers.
bool Initialize()
Initialize the job manager.
bool Stop()
Stop the workers.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Interface for a job to be run by the job manager.
virtual void Run(void *arg)=0
The job logic.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
An abstract class to describe the client-side monitoring plugin interface.
TransferInfo transfer
The transfer in question.
@ EvCopyBeg
CopyBInfo: Copy operation started.
@ EvCopyEnd
CopyEInfo: Copy operation ended.
virtual void Event(EventCode evCode, void *evData)=0
A key-value pair map storing both keys and values as strings.
void Set(const std::string &name, const Item &value)
PropertyMap::const_iterator end() const
Get the end iterator.
bool Get(const std::string &name, Item &item) const
bool HasProperty(const std::string &name) const
Check if we now about the given name.
PropertyMap::const_iterator begin() const
Get the begin iterator.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
void Release(const URL &url)
Release the virtual redirector associated with the given URL.
XRootDStatus RegisterAndWait(const URL &url)
Creates a new virtual redirector and registers it (sync).
VirtualRedirector * Get(const URL &url) const
Get a virtual redirector associated with the given URL.
const std::string & GetPath() const
Get the path.
bool IsMetalink() const
Is it a URL to a metalink.
std::map< std::string, std::string > ParamsMap
std::string GetURL() const
Get the URL.
void SetPath(const std::string &path)
Set the path.
const ParamsMap & GetParams() const
Get the URL params.
const std::string & GetProtocol() const
Get the protocol.
bool IsValid() const
Is the url valid.
static void LogPropertyList(Log *log, uint64_t topic, const char *format, const PropertyList &list)
Log property list.
An interface for metadata redirectors.
virtual std::string GetTargetName() const =0
Gets the file name as specified in the metalink.
const int DefaultCPInitTimeout
const int DefaultXRateThreshold
const uint16_t errOperationExpired
const int DefaultCPChunkSize
const uint16_t stError
An error occurred that could potentially be retried.
const int DefaultRetryWrtAtLBLimit
const int DefaultCPParallelChunks
const uint16_t errOSError
const int DefaultXCpBlockSize
const uint64_t UtilityMsg
const int DefaultCPTimeout
const uint16_t errInvalidArgs
const uint16_t errRetry
Try again for whatever reason.
const uint16_t errThresholdExceeded
const char *const DefaultCpRetryPolicy
const int DefaultCPTPCTimeout
Describe an end of copy event.
TransferInfo transfer
The transfer in question.
int sources
Number of sources used for the copy.
timeval bTOD
Copy start time.
const XRootDStatus * status
Status of the copy.
timeval eTOD
Copy end time.
const URL * target
URL of the target.
const URL * origin
URL of the origin.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
static bool IsSocketError(uint16_t code)