XRootD
Loading...
Searching...
No Matches
XrdClThirdPartyCopyJob.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
27#include "XrdCl/XrdClLog.hh"
29#include "XrdCl/XrdClUtils.hh"
31#include "XrdCl/XrdClMonitor.hh"
33#include "XrdCl/XrdClDlgEnv.hh"
34#include "XrdOuc/XrdOucTPC.hh"
36#include "XrdSys/XrdSysTimer.hh"
37
38#include <iostream>
39#include <chrono>
40
41#include <cctype>
42#include <sstream>
43#include <cstdlib>
44#include <cstdio>
45#include <sys/time.h>
46#include <sys/types.h>
47#include <unistd.h>
48
49namespace
50{
51 //----------------------------------------------------------------------------
53 //----------------------------------------------------------------------------
54 class TPCStatusHandler: public XrdCl::ResponseHandler
55 {
56 public:
57 //------------------------------------------------------------------------
58 // Constructor
59 //------------------------------------------------------------------------
60 TPCStatusHandler():
61 pSem( new XrdSysSemaphore(0) ), pStatus(0)
62 {
63 }
64
65 //------------------------------------------------------------------------
66 // Destructor
67 //------------------------------------------------------------------------
68 virtual ~TPCStatusHandler()
69 {
70 delete pStatus;
71 delete pSem;
72 }
73
74 //------------------------------------------------------------------------
75 // Handle Response
76 //------------------------------------------------------------------------
77 virtual void HandleResponse( XrdCl::XRootDStatus *status,
78 XrdCl::AnyObject *response )
79 {
80 delete response;
81 pStatus = status;
82 pSem->Post();
83 }
84
85 //------------------------------------------------------------------------
86 // Get Mutex
87 //------------------------------------------------------------------------
88 XrdSysSemaphore *GetXrdSysSemaphore()
89 {
90 return pSem;
91 }
92
93 //------------------------------------------------------------------------
94 // Get status
95 //------------------------------------------------------------------------
96 XrdCl::XRootDStatus *GetStatus()
97 {
98 return pStatus;
99 }
100
101 private:
102 TPCStatusHandler(const TPCStatusHandler &other);
103 TPCStatusHandler &operator = (const TPCStatusHandler &other);
104
105 XrdSysSemaphore *pSem;
106 XrdCl::XRootDStatus *pStatus;
107 };
108
109 class InitTimeoutCalc
110 {
111 public:
112
113 InitTimeoutCalc( time_t timeLeft ) :
114 hasInitTimeout( timeLeft ), start( time( 0 ) ), timeLeft( timeLeft )
115 {
116
117 }
118
119 XrdCl::XRootDStatus operator()()
120 {
121 if( !hasInitTimeout ) return XrdCl::XRootDStatus();
122
123 time_t now = time( 0 );
124 if( now - start > timeLeft )
125 return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errOperationExpired );
126
127 timeLeft -= ( now - start );
128 return XrdCl::XRootDStatus();
129 }
130
131 // used to fetch a timeout count in 2 situations: to pass to XrdCl methods
132 // and preserve remaining timeout at end of CanDo(). Zero has special
133 // meaning in both these contexts, so if we had an initial timeout we
134 // return a current timeout of at least 1.
135 operator time_t()
136 {
137 if( !hasInitTimeout ) return timeLeft;
138 return timeLeft ? timeLeft : 1;
139 }
140
141 private:
142 bool hasInitTimeout;
143 time_t start;
144 time_t timeLeft;
145 };
146
147 static XrdCl::XRootDStatus& UpdateErrMsg( XrdCl::XRootDStatus &status, const std::string &str )
148 {
149 std::string msg = status.GetErrorMessage();
150 msg += " (" + str + ")";
151 status.SetErrorMessage( msg );
152 return status;
153 }
154}
155
156namespace XrdCl
157{
158 //----------------------------------------------------------------------------
159 // Constructor
160 //----------------------------------------------------------------------------
162 PropertyList *jobProperties,
163 PropertyList *jobResults ):
164 CopyJob( jobId, jobProperties, jobResults ),
165 dstFile( File::DisableVirtRedirect ),
166 sourceSize( 0 ),
167 initTimeout( 0 ),
168 force( false ),
169 coerce( false ),
170 delegate( false ),
171 nbStrm( 0 ),
172 tpcLite( false )
173 {
174 Log *log = DefaultEnv::GetLog();
175 log->Debug( UtilityMsg, "Creating a third party copy job, from %s to %s",
176 GetSource().GetObfuscatedURL().c_str(), GetTarget().GetObfuscatedURL().c_str() );
177 }
178
179 //----------------------------------------------------------------------------
180 // Run the copy job
181 //----------------------------------------------------------------------------
183 {
184 Log *log = DefaultEnv::GetLog();
185
186 XRootDStatus st = CanDo();
187 if( !st.IsOK() ) return st;
188
189 if( tpcLite )
190 {
191 //------------------------------------------------------------------------
192 // Run TPC-lite algorithm
193 //------------------------------------------------------------------------
194 XRootDStatus st = RunLite( progress );
195 if( !st.IsOK() ) return st;
196 }
197 else
198 {
199 //------------------------------------------------------------------------
200 // Run vanilla TPC algorithm
201 //------------------------------------------------------------------------
202 XRootDStatus st = RunTPC( progress );
203 if( !st.IsOK() ) return st;
204 }
205
206 //--------------------------------------------------------------------------
207 // Verify the checksums if needed
208 //--------------------------------------------------------------------------
209 if( checkSumMode != "none" )
210 {
211 log->Debug( UtilityMsg, "Attempting checksum calculation." );
212 std::string sourceCheckSum;
213 std::string targetCheckSum;
214
215 //------------------------------------------------------------------------
216 // Get the check sum at source
217 //------------------------------------------------------------------------
218 timeval oStart, oEnd;
219 XRootDStatus st;
220 if( checkSumMode == "end2end" || checkSumMode == "source" ||
221 !checkSumPreset.empty() )
222 {
223 gettimeofday( &oStart, 0 );
224 if( !checkSumPreset.empty() )
225 {
226 sourceCheckSum = checkSumType + ":";
227 sourceCheckSum += Utils::NormalizeChecksum( checkSumType,
228 checkSumPreset );
229 }
230 else
231 {
232 VirtualRedirector *redirector = 0;
233 std::string vrCheckSum;
234 if( GetSource().IsMetalink() &&
235 ( redirector = RedirectorRegistry::Instance().Get( GetSource() ) ) &&
236 !( vrCheckSum = redirector->GetCheckSum( checkSumType ) ).empty() )
237 sourceCheckSum = vrCheckSum;
238 else
239 st = Utils::GetRemoteCheckSum( sourceCheckSum, checkSumType, tpcSource );
240 }
241 gettimeofday( &oEnd, 0 );
242 if( !st.IsOK() )
243 return UpdateErrMsg( st, "source" );
244
245 pResults->Set( "sourceCheckSum", sourceCheckSum );
246 }
247
248 //------------------------------------------------------------------------
249 // Get the check sum at destination
250 //------------------------------------------------------------------------
251 timeval tStart, tEnd;
252
253 if( checkSumMode == "end2end" || checkSumMode == "target" )
254 {
255 gettimeofday( &tStart, 0 );
256 st = Utils::GetRemoteCheckSum( targetCheckSum, checkSumType, realTarget );
257
258 gettimeofday( &tEnd, 0 );
259 if( !st.IsOK() )
260 return UpdateErrMsg( st, "destination" );
261 pResults->Set( "targetCheckSum", targetCheckSum );
262 }
263
264 //------------------------------------------------------------------------
265 // Make sure the checksums are both lower case
266 //------------------------------------------------------------------------
267 auto sanitize_cksum = []( char c )
268 {
269 std::locale loc;
270 if( std::isalpha( c ) ) return std::tolower( c, loc );
271 return c;
272 };
273
274 std::transform( sourceCheckSum.begin(), sourceCheckSum.end(),
275 sourceCheckSum.begin(), sanitize_cksum );
276
277 std::transform( targetCheckSum.begin(), targetCheckSum.end(),
278 targetCheckSum.begin(), sanitize_cksum );
279
280 //------------------------------------------------------------------------
281 // Compare and inform monitoring
282 //------------------------------------------------------------------------
283 if( !sourceCheckSum.empty() && !targetCheckSum.empty() )
284 {
285 bool match = false;
286 if( sourceCheckSum == targetCheckSum )
287 match = true;
288
290 if( mon )
291 {
295 i.cksum = sourceCheckSum;
296 i.oTime = Utils::GetElapsedMicroSecs( oStart, oEnd );
297 i.tTime = Utils::GetElapsedMicroSecs( tStart, tEnd );
298 i.isOK = match;
299 mon->Event( Monitor::EvCheckSum, &i );
300 }
301
302 if( !match )
304
305 log->Info(UtilityMsg, "Checksum verification: succeeded." );
306 }
307 }
308
309 return XRootDStatus();
310 }
311
312 //----------------------------------------------------------------------------
313 // Check whether doing a third party copy is feasible for given
314 // job descriptor
315 //----------------------------------------------------------------------------
316 XRootDStatus ThirdPartyCopyJob::CanDo()
317 {
318 const URL &source = GetSource();
319 const URL &target = GetTarget();
320
321 //--------------------------------------------------------------------------
322 // We can only do a TPC if both source and destination are remote files
323 //--------------------------------------------------------------------------
324 if( source.IsLocalFile() || target.IsLocalFile() )
326 "Cannot do a third-party-copy for local file." );
327
328 //--------------------------------------------------------------------------
329 // Check the initial settings
330 //--------------------------------------------------------------------------
331 Log *log = DefaultEnv::GetLog();
332 log->Debug( UtilityMsg, "Check if third party copy between %s and %s "
333 "is possible", source.GetObfuscatedURL().c_str(),
334 target.GetObfuscatedURL().c_str() );
335
336 if( target.GetProtocol() != "root" &&
337 target.GetProtocol() != "xroot" &&
338 target.GetProtocol() != "roots" &&
339 target.GetProtocol() != "xroots" )
340 return XRootDStatus( stError, errNotSupported, 0, "Third-party-copy "
341 "is only supported for root/xroot protocol." );
342
343 pProperties->Get( "initTimeout", initTimeout );
344 InitTimeoutCalc timeLeft( initTimeout );
345
346 pProperties->Get( "checkSumMode", checkSumMode );
347 pProperties->Get( "checkSumType", checkSumType );
348 pProperties->Get( "checkSumPreset", checkSumPreset );
349 pProperties->Get( "force", force );
350 pProperties->Get( "coerce", coerce );
351 pProperties->Get( "delegate", delegate );
352
354 env->GetInt( "SubStreamsPerChannel", nbStrm );
355
356 // account for the control stream
357 if (nbStrm > 0) --nbStrm;
358
359 bool tpcLiteOnly = false;
360
361 if( !delegate )
362 log->Info( UtilityMsg, "We are NOT using delegation" );
363
364 //--------------------------------------------------------------------------
365 // Resolve the 'auto' checksum type.
366 //--------------------------------------------------------------------------
367 if( checkSumType == "auto" )
368 {
369 checkSumType = Utils::InferChecksumType( GetSource(), GetTarget() );
370 if( checkSumType.empty() )
371 log->Info( UtilityMsg, "Could not infer checksum type." );
372 else
373 log->Info( UtilityMsg, "Using inferred checksum type: %s.", checkSumType.c_str() );
374 }
375
376 //--------------------------------------------------------------------------
377 // Check if we can open the source. Note in TPC-lite scenario it is optional
378 // for this step to be successful.
379 //--------------------------------------------------------------------------
380 File sourceFile;
381 XRootDStatus st;
382 URL sourceURL = source;
383 URL::ParamsMap params;
384
385 // set WriteRecovery property
386 std::string value;
387 DefaultEnv::GetEnv()->GetString( "ReadRecovery", value );
388 sourceFile.SetProperty( "ReadRecovery", value );
389
390 // save the original opaque parameter list as specified by the user for later
391 const URL::ParamsMap &srcparams = sourceURL.GetParams();
392
393 //--------------------------------------------------------------------------
394 // Do the facultative step at source only if the protocol is root/xroot,
395 // otherwise don't bother
396 //--------------------------------------------------------------------------
397 if( sourceURL.GetProtocol() == "root" || sourceURL.GetProtocol() == "xroot" ||
398 sourceURL.GetProtocol() == "roots" || sourceURL.GetProtocol() == "xroots" )
399 {
400 params = sourceURL.GetParams();
401 params["tpc.stage"] = "placement";
402 sourceURL.SetParams( params );
403 log->Debug( UtilityMsg, "Trying to open %s for reading",
404 sourceURL.GetObfuscatedURL().c_str() );
405 st = sourceFile.Open( sourceURL.GetURL(), OpenFlags::Read, Access::None,
406 timeLeft );
407 }
408 else
409 st = XRootDStatus( stError, errNotSupported );
410
411 if( st.IsOK() )
412 {
413 std::string sourceUrl;
414 sourceFile.GetProperty( "LastURL", sourceUrl );
415 tpcSource = sourceUrl;
416
417 VirtualRedirector *redirector = 0;
418 long long size = -1;
419 if( source.IsMetalink() &&
420 ( redirector = RedirectorRegistry::Instance().Get( tpcSource ) ) &&
421 ( size = redirector->GetSize() ) >= 0 )
422 sourceSize = size;
423 else
424 {
425 StatInfo *statInfo;
426 st = sourceFile.Stat( false, statInfo );
427 if (st.IsOK()) sourceSize = statInfo->GetSize();
428 delete statInfo;
429 }
430 }
431 else
432 {
433 log->Info( UtilityMsg, "Cannot open source file %s: %s",
434 source.GetObfuscatedURL().c_str(), st.ToStr().c_str() );
435 if( !delegate )
436 {
437 //----------------------------------------------------------------------
438 // If we cannot contact the source and there is no credential to delegate
439 // it cannot possibly work
440 //----------------------------------------------------------------------
441 st.status = stFatal;
442 return st;
443 }
444
445 tpcSource = sourceURL;
446 tpcLiteOnly = true;
447 }
448
449 // get the opaque parameters as returned by the redirector
450 URL tpcSourceUrl = tpcSource;
451 URL::ParamsMap tpcsrcparams = tpcSourceUrl.GetParams();
452 // merge the original cgi with the one returned by the redirector,
453 // the original values take precedence
454 URL::ParamsMap::const_iterator itr = srcparams.begin();
455 for( ; itr != srcparams.end(); ++itr )
456 tpcsrcparams[itr->first] = itr->second;
457 tpcSourceUrl.SetParams( tpcsrcparams );
458 // save the merged opaque parameter list for later
459 std::string scgi;
460 const URL::ParamsMap &scgiparams = tpcSourceUrl.GetParams();
461 itr = scgiparams.begin();
462 for( ; itr != scgiparams.end(); ++itr )
463 if( itr->first.compare( 0, 6, "xrdcl." ) != 0 )
464 {
465 if( !scgi.empty() ) scgi += '\t';
466 scgi += itr->first + '=' + itr->second;
467 }
468
469 if( !timeLeft().IsOK() )
470 {
471 // we still want to send a close, but we time it out quickly
472 st = sourceFile.Close( 1 );
473 return XRootDStatus( stError, errOperationExpired );
474 }
475
476 st = sourceFile.Close( timeLeft );
477
478 if( !timeLeft().IsOK() )
479 return XRootDStatus( stError, errOperationExpired );
480
481 //--------------------------------------------------------------------------
482 // Now we need to check the destination !!!
483 //--------------------------------------------------------------------------
484 if( delegate )
486 else
488
489 //--------------------------------------------------------------------------
490 // Generate the destination CGI
491 //--------------------------------------------------------------------------
492 log->Debug( UtilityMsg, "Generating the destination TPC URL" );
493
494 tpcKey = GenerateKey();
495
496 char *cgiBuff = new char[2048];
497 const char *cgiP = XrdOucTPC::cgiC2Dst( tpcKey.c_str(),
498 tpcSource.GetHostId().c_str(),
499 tpcSource.GetPath().c_str(),
500 0, cgiBuff, 2048, nbStrm,
501 GetSource().GetHostId().c_str(),
502 GetSource().GetProtocol().c_str(),
503 GetTarget().GetProtocol().c_str(),
504 delegate );
505
506 if( *cgiP == '!' )
507 {
508 log->Error( UtilityMsg, "Unable to setup target url: %s", cgiP+1 );
509 delete [] cgiBuff;
510 return XRootDStatus( stError, errNotSupported );
511 }
512
513 URL cgiURL; cgiURL.SetParams( cgiBuff );
514 delete [] cgiBuff;
515
516 realTarget = GetTarget();
517 params = realTarget.GetParams();
518 MessageUtils::MergeCGI( params, cgiURL.GetParams(), true );
519
520 if( !tpcLiteOnly ) // we only append oss.asize if it source file size is actually known
521 {
522 std::ostringstream o; o << sourceSize;
523 params["oss.asize"] = o.str();
524 }
525 params["tpc.stage"] = "copy";
526
527 // forward source cgi info to the destination in case we are going to do delegation
528 if( !scgi.empty() && delegate )
529 params["tpc.scgi"] = scgi;
530
531 realTarget.SetParams( params );
532
533 log->Debug( UtilityMsg, "Target url is: %s", realTarget.GetObfuscatedURL().c_str() );
534
535 //--------------------------------------------------------------------------
536 // Open the target file
537 //--------------------------------------------------------------------------
538 // set WriteRecovery property
539 DefaultEnv::GetEnv()->GetString( "WriteRecovery", value );
540 dstFile.SetProperty( "WriteRecovery", value );
541
543 if( force )
544 targetFlags |= OpenFlags::Delete;
545 else
546 targetFlags |= OpenFlags::New;
547
548 if( coerce )
549 targetFlags |= OpenFlags::Force;
550
552 st = dstFile.Open( realTarget.GetURL(), targetFlags, mode, timeLeft );
553 if( !st.IsOK() )
554 {
555 log->Error( UtilityMsg, "Unable to open target %s: %s",
556 realTarget.GetObfuscatedURL().c_str(), st.ToStr().c_str() );
557 if( st.code == errErrorResponse &&
558 st.errNo == kXR_FSError &&
559 st.GetErrorMessage().find( "tpc not supported" ) != std::string::npos )
560 return XRootDStatus( stError, errNotSupported, 0, // the open failed due to lack of TPC support on the server side
561 "Destination does not support third-party-copy." );
562 return UpdateErrMsg( st, "destination" );
563 }
564
565 std::string lastUrl; dstFile.GetProperty( "LastURL", lastUrl );
566 realTarget = lastUrl;
567
568 if( !timeLeft().IsOK() )
569 {
570 // we still want to send a close, but we time it out fast
571 st = dstFile.Close( 1 );
572 return XRootDStatus( stError, errOperationExpired );
573 }
574
575 //--------------------------------------------------------------------------
576 // Verify if the destination supports TPC / TPC-lite
577 //--------------------------------------------------------------------------
578 st = Utils::CheckTPCLite( realTarget.GetURL() );
579 if( !st.IsOK() )
580 {
581 // we still want to send a close, but we time it out fast
582 st = dstFile.Close( 1 );
583 return XRootDStatus( stError, errNotSupported, 0, // doesn't support TPC
584 "Destination does not support third-party-copy.");
585 }
586
587 //--------------------------------------------------------------------------
588 // if target supports TPC-lite and we have a credential to delegate we can
589 // go ahead and use TPC-lite
590 //--------------------------------------------------------------------------
591 tpcLite = ( st.code != suPartial ) && delegate;
592
593 if( !tpcLite && tpcLiteOnly ) // doesn't support TPC-lite and it was our only hope
594 {
595 st = dstFile.Close( 1 );
596 return XRootDStatus( stError, errNotSupported, 0, "Destination does not "
597 "support delegation." );
598 }
599
600 //--------------------------------------------------------------------------
601 // adjust the InitTimeout
602 //--------------------------------------------------------------------------
603 if( !timeLeft().IsOK() )
604 {
605 // we still want to send a close, but we time it out fast
606 st = dstFile.Close( 1 );
607 return XRootDStatus( stError, errOperationExpired );
608 }
609
610 //--------------------------------------------------------------------------
611 // If we don't use delegation the source has to support TPC
612 //--------------------------------------------------------------------------
613 if( !tpcLite )
614 {
615 st = Utils::CheckTPC( URL( tpcSource ).GetURL(), timeLeft );
616 if( !st.IsOK() )
617 {
618 log->Error( UtilityMsg, "Source (%s) does not support TPC",
619 tpcSource.GetURL().c_str() );
620 return XRootDStatus( stError, errNotSupported, 0, "Source does not "
621 "support third-party-copy" );
622 }
623
624 if( !timeLeft().IsOK() )
625 {
626 // we still want to send a close, but we time it out quickly
627 st = sourceFile.Close( 1 );
628 return XRootDStatus( stError, errOperationExpired );
629 }
630 }
631
632 initTimeout = time_t( timeLeft );
633
634 return XRootDStatus();
635 }
636
637 //----------------------------------------------------------------------------
638 // Run vanilla copy job
639 //----------------------------------------------------------------------------
640 XRootDStatus ThirdPartyCopyJob::RunTPC( CopyProgressHandler *progress )
641 {
642 Log *log = DefaultEnv::GetLog();
643
644 //--------------------------------------------------------------------------
645 // Generate the source CGI
646 //--------------------------------------------------------------------------
647 char *cgiBuff = new char[2048];
648 const char *cgiP = XrdOucTPC::cgiC2Src( tpcKey.c_str(),
649 realTarget.GetHostName().c_str(), -1, cgiBuff,
650 2048 );
651 if( *cgiP == '!' )
652 {
653 log->Error( UtilityMsg, "Unable to setup source url: %s", cgiP+1 );
654 delete [] cgiBuff;
655 return XRootDStatus( stError, errInvalidArgs );
656 }
657
658 URL cgiURL; cgiURL.SetParams( cgiBuff );
659 delete [] cgiBuff;
660 URL::ParamsMap params = tpcSource.GetParams();
661 MessageUtils::MergeCGI( params, cgiURL.GetParams(), true );
662 params["tpc.stage"] = "copy";
663 tpcSource.SetParams( params );
664
665 log->Debug( UtilityMsg, "Source url is: %s", tpcSource.GetObfuscatedURL().c_str() );
666
667 // Set the close timeout to the default value of the stream timeout
668 int closeTimeout = 0;
669 (void) DefaultEnv::GetEnv()->GetInt( "StreamTimeout", closeTimeout);
670
671 //--------------------------------------------------------------------------
672 // Set up the rendez-vous and open the source
673 //--------------------------------------------------------------------------
674 InitTimeoutCalc timeLeft( initTimeout );
675 XRootDStatus st = dstFile.Sync( timeLeft );
676 if( !st.IsOK() )
677 {
678 log->Error( UtilityMsg, "Unable set up rendez-vous: %s",
679 st.ToStr().c_str() );
680 XRootDStatus status = dstFile.Close( closeTimeout );
681 return UpdateErrMsg( st, "destination" );
682 }
683
684 //--------------------------------------------------------------------------
685 // Calculate the time we have left to perform source open
686 //--------------------------------------------------------------------------
687 if( !timeLeft().IsOK() )
688 {
689 XRootDStatus status = dstFile.Close( closeTimeout );
690 return XRootDStatus( stError, errOperationExpired );
691 }
692
693 File sourceFile( File::DisableVirtRedirect );
694 // set ReadRecovery property
695 std::string value;
696 DefaultEnv::GetEnv()->GetString( "ReadRecovery", value );
697 sourceFile.SetProperty( "ReadRecovery", value );
698
699 st = sourceFile.Open( tpcSource.GetURL(), OpenFlags::Read, Access::None,
700 timeLeft );
701
702 if( !st.IsOK() )
703 {
704 log->Error( UtilityMsg, "Unable to open source %s: %s",
705 tpcSource.GetObfuscatedURL().c_str(), st.ToStr().c_str() );
706 XRootDStatus status = dstFile.Close( closeTimeout );
707 return UpdateErrMsg( st, "source" );
708 }
709
710 //--------------------------------------------------------------------------
711 // Do the copy and follow progress
712 //--------------------------------------------------------------------------
713 TPCStatusHandler statusHandler;
714 XrdSysSemaphore *sem = statusHandler.GetXrdSysSemaphore();
715 StatInfo *info = 0;
716
717 time_t tpcTimeout = 0;
718 pProperties->Get( "tpcTimeout", tpcTimeout );
719
720 st = dstFile.Sync( &statusHandler, tpcTimeout );
721 if( !st.IsOK() )
722 {
723 log->Error( UtilityMsg, "Unable start the copy: %s",
724 st.ToStr().c_str() );
725 XRootDStatus statusS = sourceFile.Close( closeTimeout );
726 XRootDStatus statusT = dstFile.Close( closeTimeout );
727 return UpdateErrMsg( st, "destination" );
728 }
729
730 //--------------------------------------------------------------------------
731 // Stat the file every second until sync returns
732 //--------------------------------------------------------------------------
733 bool canceled = false;
734 while( 1 )
735 {
736 XrdSysTimer::Wait( 2500 );
737
738 if( progress )
739 {
740 st = dstFile.Stat( true, info );
741 if( st.IsOK() )
742 {
743 progress->JobProgress( pJobId, info->GetSize(), sourceSize );
744 delete info;
745 info = 0;
746 }
747 bool shouldCancel = progress->ShouldCancel( pJobId );
748 if( shouldCancel )
749 {
750 log->Debug( UtilityMsg, "Cancellation requested by progress handler" );
751 Buffer arg, *response = 0; arg.FromString( "ofs.tpc cancel" );
752 XRootDStatus st = dstFile.Fcntl( arg, response );
753 if( !st.IsOK() )
754 log->Debug( UtilityMsg, "Error while trying to cancel tpc: %s",
755 st.ToStr().c_str() );
756
757 delete response;
758 canceled = true;
759 break;
760 }
761 }
762
763 if( sem->CondWait() )
764 break;
765 }
766
767 //--------------------------------------------------------------------------
768 // Sync has returned so we can check if it was successful
769 //--------------------------------------------------------------------------
770 if( canceled )
771 sem->Wait();
772
773 st = *statusHandler.GetStatus();
774
775 if( !st.IsOK() )
776 {
777 log->Error( UtilityMsg, "Third party copy from %s to %s failed: %s",
778 GetSource().GetObfuscatedURL().c_str(), GetTarget().GetObfuscatedURL().c_str(),
779 st.ToStr().c_str() );
780
781 // Ignore close response
782 XRootDStatus statusS = sourceFile.Close( closeTimeout );
783 XRootDStatus statusT = dstFile.Close( closeTimeout );
784 return st;
785 }
786
787 XRootDStatus statusS = sourceFile.Close( closeTimeout );
788 XRootDStatus statusT = dstFile.Close( closeTimeout );
789
790 if ( !statusS.IsOK() || !statusT.IsOK() )
791 {
792 st = (statusS.IsOK() ? statusT : statusS);
793 log->Error( UtilityMsg, "Third party copy from %s to %s failed during "
794 "close of %s: %s", GetSource().GetObfuscatedURL().c_str(),
795 GetTarget().GetObfuscatedURL().c_str(),
796 (statusS.IsOK() ? "destination" : "source"), st.ToStr().c_str() );
797 return UpdateErrMsg( st, statusS.IsOK() ? "source" : "destination" );
798 }
799
800 log->Debug( UtilityMsg, "Third party copy from %s to %s successful",
801 GetSource().GetObfuscatedURL().c_str(), GetTarget().GetObfuscatedURL().c_str() );
802
803 pResults->Set( "size", sourceSize );
804
805 return XRootDStatus();
806 }
807
808 XRootDStatus ThirdPartyCopyJob::RunLite( CopyProgressHandler *progress )
809 {
810 Log *log = DefaultEnv::GetLog();
811
812 // Set the close timeout to the default value of the stream timeout
813 int closeTimeout = 0;
814 (void) DefaultEnv::GetEnv()->GetInt( "StreamTimeout", closeTimeout);
815
816 //--------------------------------------------------------------------------
817 // Set up the rendez-vous
818 //--------------------------------------------------------------------------
819 InitTimeoutCalc timeLeft( initTimeout );
820 XRootDStatus st = dstFile.Sync( timeLeft );
821 if( !st.IsOK() )
822 {
823 log->Error( UtilityMsg, "Unable set up rendez-vous: %s",
824 st.ToStr().c_str() );
825 XRootDStatus status = dstFile.Close( closeTimeout );
826 return UpdateErrMsg( st, "destination" );
827 }
828
829 //--------------------------------------------------------------------------
830 // Do the copy and follow progress
831 //--------------------------------------------------------------------------
832 TPCStatusHandler statusHandler;
833 XrdSysSemaphore *sem = statusHandler.GetXrdSysSemaphore();
834 StatInfo *info = 0;
835
836 time_t tpcTimeout = 0;
837 pProperties->Get( "tpcTimeout", tpcTimeout );
838
839 st = dstFile.Sync( &statusHandler, tpcTimeout );
840 if( !st.IsOK() )
841 {
842 log->Error( UtilityMsg, "Unable start the copy: %s",
843 st.ToStr().c_str() );
844 XRootDStatus statusT = dstFile.Close( closeTimeout );
845 return UpdateErrMsg( st, "destination" );
846 }
847
848 //--------------------------------------------------------------------------
849 // Stat the file every second until sync returns
850 //--------------------------------------------------------------------------
851 bool canceled = false;
852 while( 1 )
853 {
854 XrdSysTimer::Wait( 2500 );
855
856 if( progress )
857 {
858 st = dstFile.Stat( true, info );
859 if( st.IsOK() )
860 {
861 progress->JobProgress( pJobId, info->GetSize(), sourceSize );
862 delete info;
863 info = 0;
864 }
865 bool shouldCancel = progress->ShouldCancel( pJobId );
866 if( shouldCancel )
867 {
868 log->Debug( UtilityMsg, "Cancellation requested by progress handler" );
869 Buffer arg, *response = 0; arg.FromString( "ofs.tpc cancel" );
870 XRootDStatus st = dstFile.Fcntl( arg, response );
871 if( !st.IsOK() )
872 log->Debug( UtilityMsg, "Error while trying to cancel tpc: %s",
873 st.ToStr().c_str() );
874
875 delete response;
876 canceled = true;
877 break;
878 }
879 }
880
881 if( sem->CondWait() )
882 break;
883 }
884
885 //--------------------------------------------------------------------------
886 // Sync has returned so we can check if it was successful
887 //--------------------------------------------------------------------------
888 if( canceled )
889 sem->Wait();
890
891 st = *statusHandler.GetStatus();
892
893 if( !st.IsOK() )
894 {
895 log->Error( UtilityMsg, "Third party copy from %s to %s failed: %s",
896 GetSource().GetObfuscatedURL().c_str(), GetTarget().GetObfuscatedURL().c_str(),
897 st.ToStr().c_str() );
898
899 // Ignore close response
900 XRootDStatus statusT = dstFile.Close( closeTimeout );
901 return st;
902 }
903
904 st = dstFile.Close( closeTimeout );
905
906 if ( !st.IsOK() )
907 {
908 log->Error( UtilityMsg, "Third party copy from %s to %s failed during "
909 "close of %s: %s", GetSource().GetObfuscatedURL().c_str(),
910 GetTarget().GetObfuscatedURL().c_str(),
911 "destination", st.ToStr().c_str() );
912 return UpdateErrMsg( st, "destination" );
913 }
914
915 log->Debug( UtilityMsg, "Third party copy from %s to %s successful",
916 GetSource().GetObfuscatedURL().c_str(), GetTarget().GetObfuscatedURL().c_str() );
917
918 pResults->Set( "size", sourceSize );
919
920 return XRootDStatus();
921 }
922
923
924 //----------------------------------------------------------------------------
925 // Generate a rendez-vous key
926 //----------------------------------------------------------------------------
927 std::string ThirdPartyCopyJob::GenerateKey()
928 {
929 static const int _10to9 = 1000000000;
930
931 char tpcKey[25];
932
933 auto tp = std::chrono::high_resolution_clock::now();
934 auto d = tp.time_since_epoch();
935 auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>( d );
936 auto s = std::chrono::duration_cast<std::chrono::seconds>( d );
937 uint32_t k1 = ns.count() - s.count() * _10to9;
938 uint32_t k2 = getpid() | (getppid() << 16);
939 uint32_t k3 = s.count();
940 snprintf( tpcKey, 25, "%08x%08x%08x", k1, k2, k3 );
941 return std::string(tpcKey);
942 }
943}
@ kXR_FSError
XrdOucString File
PropertyList * pResults
CopyJob(uint32_t jobId, PropertyList *jobProperties, PropertyList *jobResults)
Constructor.
const URL & GetSource() const
Get source.
const URL & GetTarget() const
Get target.
PropertyList * pProperties
Interface for copy progress notification.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
void Enable()
Enable delegation in the environment.
static DlgEnv & Instance()
void Disable()
Disable delegation in the environment.
bool GetString(const std::string &key, std::string &value)
Definition XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
A file.
Definition XrdClFile.hh:52
@ DisableVirtRedirect
Definition XrdClFile.hh:59
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
static void MergeCGI(URL::ParamsMap &cgi1, const URL::ParamsMap &cgi2, bool replace)
Merge cgi2 into cgi1.
An abstract class to describe the client-side monitoring plugin interface.
@ EvCheckSum
CheckSumInfo: File checksummed.
virtual void Event(EventCode evCode, void *evData)=0
A key-value pair map storing both keys and values as strings.
bool Get(const std::string &name, Item &item) const
static RedirectorRegistry & Instance()
Returns reference to the single instance.
VirtualRedirector * Get(const URL &url) const
Get a virtual redirector associated with the given URL.
Handle an async response.
virtual XRootDStatus Run(CopyProgressHandler *progress=0)
ThirdPartyCopyJob(uint32_t jobId, PropertyList *jobProperties, PropertyList *jobResults)
Constructor.
URL representation.
Definition XrdClURL.hh:31
bool IsMetalink() const
Is it a URL to a metalink.
Definition XrdClURL.cc:465
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33
std::string GetObfuscatedURL() const
Get the URL with authz information obfuscated.
Definition XrdClURL.cc:498
bool IsLocalFile() const
Definition XrdClURL.cc:474
const std::string & GetProtocol() const
Get the protocol.
Definition XrdClURL.hh:118
static std::string NormalizeChecksum(const std::string &name, const std::string &checksum)
Normalize checksum.
static std::string InferChecksumType(const XrdCl::URL &source, const XrdCl::URL &destination, bool zip=false)
Automatically infer the right checksum type.
static uint64_t GetElapsedMicroSecs(timeval start, timeval end)
Get the elapsed microseconds between two timevals.
static XRootDStatus CheckTPCLite(const std::string &server, time_t timeout=0)
static XRootDStatus GetRemoteCheckSum(std::string &checkSum, const std::string &checkSumType, const URL &url)
Get a checksum from a remote xrootd server.
static XRootDStatus CheckTPC(const std::string &server, time_t timeout=0)
Check if peer supports tpc.
An interface for metadata redirectors.
virtual std::string GetCheckSum(const std::string &type) const =0
const std::string & GetErrorMessage() const
Get error message.
void SetErrorMessage(const std::string &message)
Set the error message.
static const char * cgiC2Dst(const char *cKey, const char *xSrc, const char *xLfn, const char *xCks, char *Buff, int Blen, int strms=0, const char *iHst=0, const char *sprt=0, const char *tprt=0, bool dlgon=false, bool push=false)
Definition XrdOucTPC.cc:62
static const char * cgiC2Src(const char *cKey, const char *xDst, int xTTL, char *Buff, int Blen)
Definition XrdOucTPC.cc:136
static void Wait(int milliseconds)
const uint16_t suPartial
const uint16_t errErrorResponse
const uint16_t errOperationExpired
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint64_t UtilityMsg
const uint16_t errInvalidArgs
const uint16_t errNotSupported
const uint16_t errCheckSumError
XrdSysError Log
Definition XrdConfig.cc:113
@ UR
owner readable
@ GR
group readable
@ UW
owner writable
@ OR
world readable
Describe a checksum event.
TransferInfo transfer
The transfer in question.
uint64_t tTime
Microseconds to obtain cksum from target.
bool isOK
True if checksum matched, false otherwise.
std::string cksum
Checksum as "type:value".
uint64_t oTime
Microseconds to obtain cksum from origin.
const URL * target
URL of the target.
const URL * origin
URL of the origin.
Flags
Open flags, may be or'd when appropriate.
@ Read
Open only for reading.
@ Update
Open for reading and writing.
bool IsOK() const
We're fine.