XRootD
Loading...
Searching...
No Matches
XrdClAsyncSocketHandler.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//------------------------------------------------------------------------------
18
19#include "XrdCl/XrdClStream.hh"
21#include "XrdCl/XrdClLog.hh"
22#include "XrdCl/XrdClMessage.hh"
27#include "XrdSys/XrdSysE2T.hh"
28#include <netinet/tcp.h>
29
30namespace XrdCl
31{
32 //----------------------------------------------------------------------------
33 // Constructor
34 //----------------------------------------------------------------------------
36 Poller *poller,
37 TransportHandler *transport,
38 AnyObject *channelData,
39 uint16_t subStreamNum,
40 Stream *strm ):
41 pPoller( poller ),
42 pTransport( transport ),
43 pChannelData( channelData ),
44 pSubStreamNum( subStreamNum ),
45 pStream( strm ),
46 pStreamName( ToStreamName( url, subStreamNum ) ),
47 pSocket( new Socket() ),
48 pHandShakeDone( false ),
51 pHSWaitStarted( 0 ),
52 pHSWaitSeconds( 0 ),
53 pUrl( url ),
54 pTlsHandShakeOngoing( false ),
55 pDoTransportDisc( false )
56 {
57 Env *env = DefaultEnv::GetEnv();
58
59 int timeoutResolution = DefaultTimeoutResolution;
60 env->GetInt( "TimeoutResolution", timeoutResolution );
61 pTimeoutResolution = timeoutResolution;
62
63 pSocket->SetChannelID( pChannelData );
64 pLastActivity = time(0);
65 }
66
67 //----------------------------------------------------------------------------
68 // Destructor
69 //----------------------------------------------------------------------------
75
76 //----------------------------------------------------------------------------
77 // Connect to given address
78 //----------------------------------------------------------------------------
80 {
81 Log *log = DefaultEnv::GetLog();
83 pConnectionTimeout = timeout;
84
85 //--------------------------------------------------------------------------
86 // Initialize the socket
87 //--------------------------------------------------------------------------
88 XRootDStatus st = pSocket->Initialize( pSockAddr.Family() );
89 if( !st.IsOK() )
90 {
91 log->Error( AsyncSockMsg, "[%s] Unable to initialize socket: %s",
92 pStreamName.c_str(), st.ToString().c_str() );
93 st.status = stFatal;
94 return st;
95 }
96
97 //--------------------------------------------------------------------------
98 // Set the keep-alive up
99 //--------------------------------------------------------------------------
100 Env *env = DefaultEnv::GetEnv();
101
102 int keepAlive = DefaultTCPKeepAlive;
103 env->GetInt( "TCPKeepAlive", keepAlive );
104 if( keepAlive )
105 {
106 int param = 1;
107 XRootDStatus st = pSocket->SetSockOpt( SOL_SOCKET, SO_KEEPALIVE, &param,
108 sizeof(param) );
109 if( !st.IsOK() )
110 log->Error( AsyncSockMsg, "[%s] Unable to turn on keepalive: %s",
111 pStreamName.c_str(), st.ToString().c_str() );
112
113#if ( defined(__linux__) || defined(__GNU__) ) && defined( TCP_KEEPIDLE ) && \
114 defined( TCP_KEEPINTVL ) && defined( TCP_KEEPCNT )
115
117 env->GetInt( "TCPKeepAliveTime", param );
118 st = pSocket->SetSockOpt(SOL_TCP, TCP_KEEPIDLE, &param, sizeof(param));
119 if( !st.IsOK() )
120 log->Error( AsyncSockMsg, "[%s] Unable to set keepalive time: %s",
121 pStreamName.c_str(), st.ToString().c_str() );
122
124 env->GetInt( "TCPKeepAliveInterval", param );
125 st = pSocket->SetSockOpt(SOL_TCP, TCP_KEEPINTVL, &param, sizeof(param));
126 if( !st.IsOK() )
127 log->Error( AsyncSockMsg, "[%s] Unable to set keepalive interval: %s",
128 pStreamName.c_str(), st.ToString().c_str() );
129
131 env->GetInt( "TCPKeepAliveProbes", param );
132 st = pSocket->SetSockOpt(SOL_TCP, TCP_KEEPCNT, &param, sizeof(param));
133 if( !st.IsOK() )
134 log->Error( AsyncSockMsg, "[%s] Unable to set keepalive probes: %s",
135 pStreamName.c_str(), st.ToString().c_str() );
136#endif
137 }
138
139 pHandShakeDone = false;
140 pTlsHandShakeOngoing = false;
141 pHSWaitStarted = 0;
142 pHSWaitSeconds = 0;
144
145 //--------------------------------------------------------------------------
146 // Initiate async connection to the address
147 //--------------------------------------------------------------------------
148 char nameBuff[256];
149 pSockAddr.Format( nameBuff, sizeof(nameBuff), XrdNetAddrInfo::fmtAdv6 );
150 log->Debug( AsyncSockMsg, "[%s] Attempting connection to %s",
151 pStreamName.c_str(), nameBuff );
152
153 st = pSocket->ConnectToAddress( pSockAddr, 0 );
154 if( !st.IsOK() )
155 {
156 log->Error( AsyncSockMsg, "[%s] Unable to initiate the connection: %s",
157 pStreamName.c_str(), st.ToString().c_str() );
158 return st;
159 }
160
161 pSocket->SetStatus( Socket::Connecting );
162
163 pOpenChannel = pStream->GetChannel();
164 pDoTransportDisc = true;
165
166 //--------------------------------------------------------------------------
167 // We should get the ready to write event once we're really connected
168 // so we need to listen to it
169 //--------------------------------------------------------------------------
170 if( !pPoller->AddSocket( pSocket, this ) )
171 {
173 pSocket->Close();
174 pDoTransportDisc = false;
175 pOpenChannel.reset();
176 return st;
177 }
178
179 if( !pPoller->EnableWriteNotification( pSocket, true, pTimeoutResolution ) )
180 {
182 pPoller->RemoveSocket( pSocket );
183 pSocket->Close();
184 pDoTransportDisc = false;
185 pOpenChannel.reset();
186 return st;
187 }
188
189 return XRootDStatus();
190 }
191
192 //----------------------------------------------------------------------------
193 // PreClose performs as many close actions as possible with fewer blocking
194 // conditions.
195 //----------------------------------------------------------------------------
197 {
198 Log *log = DefaultEnv::GetLog();
199 log->Debug( AsyncSockMsg, "[%s] PreClosing the socket", pStreamName.c_str() );
200
201 pPoller->ShutdownEvents( pSocket );
202
203 if( pDoTransportDisc )
204 {
205 pTransport->Disconnect( *pChannelData,
207 pDoTransportDisc = false;
208 }
209
210 return XRootDStatus();
211 }
212
213 //----------------------------------------------------------------------------
214 // Close the connection
215 //----------------------------------------------------------------------------
217 {
218 Log *log = DefaultEnv::GetLog();
219 log->Debug( AsyncSockMsg, "[%s] Closing the socket", pStreamName.c_str() );
220
221 pPoller->RemoveSocket( pSocket );
222
223 if( pDoTransportDisc )
224 {
225 pTransport->Disconnect( *pChannelData,
227 pDoTransportDisc = false;
228 }
229
230 pSocket->Close();
231 //--------------------------------------------------------------------------
232 // Releases a reference count on Channel. May possibly cause it to be
233 // destroyed, which will in turn destory pStream and thus us.
234 //--------------------------------------------------------------------------
235 pOpenChannel.reset();
236 return XRootDStatus();
237 }
238
239 std::string AsyncSocketHandler::ToStreamName( const URL &url, uint16_t strmnb )
240 {
241 std::ostringstream o;
242 o << url.GetHostId();
243 o << "." << strmnb;
244 return o.str();
245 }
246
247 //----------------------------------------------------------------------------
248 // Handler a socket event
249 //----------------------------------------------------------------------------
250 void AsyncSocketHandler::Event( uint8_t type, XrdCl::Socket */*socket*/ )
251 {
252 //--------------------------------------------------------------------------
253 // First check if the socket itself wants to apply some mapping on the
254 // event. E.g. in case of TLS socket it might want to map read events to
255 // write events and vice-versa.
256 //--------------------------------------------------------------------------
257 type = pSocket->MapEvent( type );
258
259 //--------------------------------------------------------------------------
260 // Handle any read or write events. If any of the handlers indicate an error
261 // we will have been disconnected. A disconnection may cause the current
262 // object to be asynchronously reused or deleted, so we return immediately.
263 //--------------------------------------------------------------------------
264 if( !EventRead( type ) )
265 return;
266
267 //--------------------------------------------------------------------------
268 // If there's a previosuly noted ECONNRESET error from write we give the
269 // fault now. This gave us the chance to process a read event.
270 //--------------------------------------------------------------------------
271 if( !pReqConnResetError.IsOK() )
272 {
274 return;
275 }
276
277 if( !EventWrite( type ) )
278 return;
279 }
280
281 //----------------------------------------------------------------------------
282 // Handler for read related socket events
283 //----------------------------------------------------------------------------
284 bool AsyncSocketHandler::EventRead( uint8_t type )
285 {
286 //--------------------------------------------------------------------------
287 // Read event
288 //--------------------------------------------------------------------------
289 if( type & ReadyToRead )
290 {
291 pLastActivity = time(0);
293 return OnTLSHandShake();
294
295 if( likely( pHandShakeDone ) )
296 return OnRead();
297
298 return OnReadWhileHandshaking();
299 }
300
301 //--------------------------------------------------------------------------
302 // Read timeout
303 //--------------------------------------------------------------------------
304 else if( type & ReadTimeOut )
305 {
306 if( pHSWaitSeconds )
307 {
308 if( !CheckHSWait() )
309 return false;
310 }
311
312 if( likely( pHandShakeDone ) )
313 return OnReadTimeout();
314
316 }
317
318 return true;
319 }
320
321 //----------------------------------------------------------------------------
322 // Handler for write related socket events
323 //----------------------------------------------------------------------------
325 {
326 //--------------------------------------------------------------------------
327 // Write event
328 //--------------------------------------------------------------------------
329 if( type & ReadyToWrite )
330 {
331 pLastActivity = time(0);
332 if( unlikely( pSocket->GetStatus() == Socket::Connecting ) )
333 return OnConnectionReturn();
334
335 //------------------------------------------------------------------------
336 // Make sure we are not writing anything if we have been told to wait.
337 //------------------------------------------------------------------------
338 if( pHSWaitSeconds != 0 )
339 return true;
340
342 return OnTLSHandShake();
343
344 if( likely( pHandShakeDone ) )
345 return OnWrite();
346
348 }
349
350 //--------------------------------------------------------------------------
351 // Write timeout
352 //--------------------------------------------------------------------------
353 else if( type & WriteTimeOut )
354 {
355 if( likely( pHandShakeDone ) )
356 return OnWriteTimeout();
357
359 }
360
361 return true;
362 }
363
364 //----------------------------------------------------------------------------
365 // Connect returned
366 //----------------------------------------------------------------------------
368 {
369 //--------------------------------------------------------------------------
370 // Check whether we were able to connect
371 //--------------------------------------------------------------------------
372 Log *log = DefaultEnv::GetLog();
373 log->Debug( AsyncSockMsg, "[%s] Async connection call returned",
374 pStreamName.c_str() );
375
376 int errorCode = 0;
377 socklen_t optSize = sizeof( errorCode );
378 XRootDStatus st = pSocket->GetSockOpt( SOL_SOCKET, SO_ERROR, &errorCode,
379 &optSize );
380
381 //--------------------------------------------------------------------------
382 // This is an internal error really (either logic or system fault),
383 // so we call it a day and don't retry
384 //--------------------------------------------------------------------------
385 if( !st.IsOK() )
386 {
387 log->Error( AsyncSockMsg, "[%s] Unable to get the status of the "
388 "connect operation: %s", pStreamName.c_str(),
389 XrdSysE2T( errno ) );
390 pStream->OnConnectError( pSubStreamNum,
392 return false;
393 }
394
395 //--------------------------------------------------------------------------
396 // We were unable to connect
397 //--------------------------------------------------------------------------
398 if( errorCode )
399 {
400 log->Error( AsyncSockMsg, "[%s] Unable to connect: %s",
401 pStreamName.c_str(), XrdSysE2T( errorCode ) );
402 pStream->OnConnectError( pSubStreamNum,
404 return false;
405 }
406 pSocket->SetStatus( Socket::Connected );
407
408 //--------------------------------------------------------------------------
409 // Cork the socket
410 //--------------------------------------------------------------------------
411 st = pSocket->Cork();
412 if( !st.IsOK() )
413 {
414 pStream->OnConnectError( pSubStreamNum, st );
415 return false;
416 }
417
418 //--------------------------------------------------------------------------
419 // Initialize the handshake
420 //--------------------------------------------------------------------------
421 pHandShakeData.reset( new HandShakeData( pStream->GetURL(),
422 pSubStreamNum ) );
423 pHandShakeData->serverAddr = pSocket->GetServerAddress();
424 pHandShakeData->clientName = pSocket->GetSockName();
425 pHandShakeData->streamName = pStreamName;
426
427 st = pTransport->HandShake( pHandShakeData.get(), *pChannelData );
428 if( !st.IsOK() )
429 {
430 log->Error( AsyncSockMsg, "[%s] Connection negotiation failed",
431 pStreamName.c_str() );
432 pStream->OnConnectError( pSubStreamNum, st );
433 return false;
434 }
435
436 if( st.code != suRetry )
437 ++pHandShakeData->step;
438
439 //--------------------------------------------------------------------------
440 // Initialize the hand-shake reader and writer
441 //--------------------------------------------------------------------------
442 hswriter.reset( new AsyncHSWriter( *pSocket, pStreamName ) );
444
445 //--------------------------------------------------------------------------
446 // Transport has given us something to send
447 //--------------------------------------------------------------------------
448 if( pHandShakeData->out )
449 {
450 hswriter->Reset( pHandShakeData->out );
451 pHandShakeData->out = nullptr;
452 }
453
454 //--------------------------------------------------------------------------
455 // Listen to what the server has to say
456 //--------------------------------------------------------------------------
457 if( !pPoller->EnableReadNotification( pSocket, true, pTimeoutResolution ) )
458 {
459 pStream->OnConnectError( pSubStreamNum,
461 return false;
462 }
463 return true;
464 }
465
466 //----------------------------------------------------------------------------
467 // Got a write readiness event
468 //----------------------------------------------------------------------------
470 {
471 if( !reqwriter )
472 {
473 OnFault( XRootDStatus( stError, errInternal, 0, "Request writer is null." ) );
474 return false;
475 }
476 //--------------------------------------------------------------------------
477 // Let's do the writing ...
478 //--------------------------------------------------------------------------
479 XRootDStatus st = reqwriter->Write();
480
481 //--------------------------------------------------------------------------
482 // In the case of ECONNRESET perhaps the server sent us something.
483 // To give a chance to read it in the next event poll we pass this as a
484 // retry, but return the error after the next event.
485 //--------------------------------------------------------------------------
486 if( st.code == errSocketError && st.errNo == ECONNRESET )
487 {
488 if( (DisableUplink()).IsOK() )
489 {
491 st = XRootDStatus( stOK, suRetry );
492 }
493 }
494 if( !st.IsOK() )
495 {
496 //------------------------------------------------------------------------
497 // We failed
498 //------------------------------------------------------------------------
499 OnFault( st );
500 return false;
501 }
502 //--------------------------------------------------------------------------
503 // We are not done yet
504 //--------------------------------------------------------------------------
505 if( st.code == suRetry) return true;
506 //--------------------------------------------------------------------------
507 // Disable the respective substream if empty
508 //--------------------------------------------------------------------------
509 reqwriter->Reset();
510 pStream->DisableIfEmpty( pSubStreamNum );
511 return true;
512 }
513
514 //----------------------------------------------------------------------------
515 // Got a write readiness event while handshaking
516 //----------------------------------------------------------------------------
518 {
519 XRootDStatus st;
520 if( !hswriter || !hswriter->HasMsg() )
521 {
522 if( !(st = DisableUplink()).IsOK() )
523 {
525 return false;
526 }
527 return true;
528 }
529 //--------------------------------------------------------------------------
530 // Let's do the writing ...
531 //--------------------------------------------------------------------------
532 st = hswriter->Write();
533 if( !st.IsOK() )
534 {
535 //------------------------------------------------------------------------
536 // We failed
537 //------------------------------------------------------------------------
539 return false;
540 }
541 //--------------------------------------------------------------------------
542 // We are not done yet
543 //--------------------------------------------------------------------------
544 if( st.code == suRetry ) return true;
545 //--------------------------------------------------------------------------
546 // Disable the uplink
547 // Note: at this point we don't deallocate the HS message as we might need
548 // to re-send it in case of a kXR_wait response
549 //--------------------------------------------------------------------------
550 if( !(st = DisableUplink()).IsOK() )
551 {
553 return false;
554 }
555 return true;
556 }
557
558 //----------------------------------------------------------------------------
559 // Got a read readiness event
560 //----------------------------------------------------------------------------
562 {
563 //--------------------------------------------------------------------------
564 // Make sure the response reader object exists
565 //--------------------------------------------------------------------------
566 if( !rspreader )
567 {
568 OnFault( XRootDStatus( stError, errInternal, 0, "Response reader is null." ) );
569 return false;
570 }
571
572 //--------------------------------------------------------------------------
573 // Readout the data from the socket
574 //--------------------------------------------------------------------------
575 XRootDStatus st = rspreader->Read();
576
577 //--------------------------------------------------------------------------
578 // Handler header corruption
579 //--------------------------------------------------------------------------
580 if( !st.IsOK() && st.code == errCorruptedHeader )
581 {
583 return false;
584 }
585
586 //--------------------------------------------------------------------------
587 // Handler other errors
588 //--------------------------------------------------------------------------
589 if( !st.IsOK() )
590 {
591 OnFault( st );
592 return false;
593 }
594
595 //--------------------------------------------------------------------------
596 // We are not done yet
597 //--------------------------------------------------------------------------
598 if( st.code == suRetry ) return true;
599
600 //--------------------------------------------------------------------------
601 // We are done, reset the response reader so we can read out next message
602 //--------------------------------------------------------------------------
603 rspreader->Reset();
604 return true;
605 }
606
607 //----------------------------------------------------------------------------
608 // Got a read readiness event while handshaking
609 //----------------------------------------------------------------------------
611 {
612 //--------------------------------------------------------------------------
613 // Make sure the response reader object exists
614 //--------------------------------------------------------------------------
615 if( !hsreader )
616 {
617 OnFault( XRootDStatus( stError, errInternal, 0, "Hand-shake reader is null." ) );
618 return false;
619 }
620
621 //--------------------------------------------------------------------------
622 // Read the message and let the transport handler look at it when
623 // reading has finished
624 //--------------------------------------------------------------------------
625 XRootDStatus st = hsreader->Read();
626 if( !st.IsOK() )
627 {
629 return false;
630 }
631
632 if( st.code != suDone )
633 return true;
634
635 return HandleHandShake( hsreader->ReleaseMsg() );
636 }
637
638 //------------------------------------------------------------------------
639 // Handle the handshake message
640 //------------------------------------------------------------------------
641 bool AsyncSocketHandler::HandleHandShake( std::unique_ptr<Message> msg )
642 {
643 //--------------------------------------------------------------------------
644 // OK, we have a new message, let's deal with it;
645 //--------------------------------------------------------------------------
646 pHandShakeData->in = msg.release();
647 XRootDStatus st = pTransport->HandShake( pHandShakeData.get(), *pChannelData );
648
649 //--------------------------------------------------------------------------
650 // Deal with wait responses
651 //--------------------------------------------------------------------------
652 kXR_int32 waitSeconds = HandleWaitRsp( pHandShakeData->in );
653
654 delete pHandShakeData->in;
655 pHandShakeData->in = 0;
656
657 if( !st.IsOK() )
658 {
660 return false;
661 }
662
663 if( st.code == suRetry )
664 {
665 //------------------------------------------------------------------------
666 // We are handling a wait response and the transport handler told
667 // as to retry the request
668 //------------------------------------------------------------------------
669 if( waitSeconds >=0 )
670 {
671 time_t resendTime = ::time( 0 ) + waitSeconds;
672 if( resendTime > pConnectionStarted + pConnectionTimeout )
673 {
674 Log *log = DefaultEnv::GetLog();
675 log->Error( AsyncSockMsg,
676 "[%s] Won't retry kXR_endsess request because would"
677 "reach connection timeout.",
678 pStreamName.c_str() );
679
681 return false;
682 }
683 else
684 {
685 //--------------------------------------------------------------------
686 // We need to wait before replaying the request
687 //--------------------------------------------------------------------
688 Log *log = DefaultEnv::GetLog();
689 log->Debug( AsyncSockMsg, "[%s] Received a wait response to endsess request, "
690 "will wait for %d seconds before replaying the endsess request",
691 pStreamName.c_str(), waitSeconds );
692 pHSWaitStarted = time( 0 );
693 pHSWaitSeconds = waitSeconds;
694 }
695 return true;
696 }
697 //------------------------------------------------------------------------
698 // We are re-sending a protocol request
699 //------------------------------------------------------------------------
700 else if( pHandShakeData->out )
701 {
702 return SendHSMsg();
703 }
704 }
705
706 //--------------------------------------------------------------------------
707 // If now is the time to enable encryption
708 //--------------------------------------------------------------------------
709 if( !pSocket->IsEncrypted() &&
710 pTransport->NeedEncryption( pHandShakeData.get(), *pChannelData ) )
711 {
713 if( !st.IsOK() )
714 return false;
715 if ( st.code == suRetry )
716 return true;
717 }
718
719 //--------------------------------------------------------------------------
720 // Now prepare the next step of the hand-shake procedure
721 //--------------------------------------------------------------------------
722 return HandShakeNextStep( st.IsOK() && st.code == suDone );
723 }
724
725 //------------------------------------------------------------------------
726 // Prepare the next step of the hand-shake procedure
727 //------------------------------------------------------------------------
729 {
730 //--------------------------------------------------------------------------
731 // We successfully proceeded to the next step
732 //--------------------------------------------------------------------------
733 ++pHandShakeData->step;
734
735 //--------------------------------------------------------------------------
736 // The hand shake process is done
737 //--------------------------------------------------------------------------
738 if( done )
739 {
740 pHandShakeData.reset();
741 hswriter.reset();
742 hsreader.reset();
743 //------------------------------------------------------------------------
744 // Initialize the request writer & reader
745 //------------------------------------------------------------------------
748 XRootDStatus st;
749 if( !(st = EnableUplink()).IsOK() )
750 {
752 return false;
753 }
754 pHandShakeDone = true;
755 pStream->OnConnect( pSubStreamNum );
756 }
757 //--------------------------------------------------------------------------
758 // The transport handler gave us something to write
759 //--------------------------------------------------------------------------
760 else if( pHandShakeData->out )
761 {
762 return SendHSMsg();
763 }
764 return true;
765 }
766
767 //----------------------------------------------------------------------------
768 // Handle fault
769 //----------------------------------------------------------------------------
771 {
772 Log *log = DefaultEnv::GetLog();
773 log->Error( AsyncSockMsg, "[%s] Socket error encountered: %s",
774 pStreamName.c_str(), st.ToString().c_str() );
775
776 pStream->OnError( pSubStreamNum, st );
777 }
778
779 //----------------------------------------------------------------------------
780 // Handle fault while handshaking
781 //----------------------------------------------------------------------------
783 {
784 Log *log = DefaultEnv::GetLog();
785 log->Error( AsyncSockMsg, "[%s] Socket error while handshaking: %s",
786 pStreamName.c_str(), st.ToString().c_str() );
787
788 pStream->OnConnectError( pSubStreamNum, st );
789 }
790
791 //----------------------------------------------------------------------------
792 // Handle write timeout
793 //----------------------------------------------------------------------------
795 {
796 return pStream->OnWriteTimeout( pSubStreamNum );
797 }
798
799 //----------------------------------------------------------------------------
800 // Handler read timeout
801 //----------------------------------------------------------------------------
803 {
804 return pStream->OnReadTimeout( pSubStreamNum );
805 }
806
807 //----------------------------------------------------------------------------
808 // Handle timeout while handshaking
809 //----------------------------------------------------------------------------
811 {
812 time_t now = time(0);
814 {
816 return false;
817 }
818 return true;
819 }
820
821 //----------------------------------------------------------------------------
822 // Handle header corruption in case of kXR_status response
823 //----------------------------------------------------------------------------
825 {
826 //--------------------------------------------------------------------------
827 // We need to force a socket error so this is handled in a similar way as
828 // a stream t/o and all requests are retried
829 //--------------------------------------------------------------------------
830 pStream->ForceError( XRootDStatus( stError, errSocketError ), false, 0 );
831 }
832
833 //----------------------------------------------------------------------------
834 // Carry out the TLS hand-shake
835 //----------------------------------------------------------------------------
837 {
838 Log *log = DefaultEnv::GetLog();
839 log->Debug( AsyncSockMsg, "[%s] TLS hand-shake exchange.", pStreamName.c_str() );
840
841 XRootDStatus st;
842 if( !( st = pSocket->TlsHandShake( this, pUrl.GetHostName() ) ).IsOK() )
843 {
844 pTlsHandShakeOngoing = false;
846 return st;
847 }
848
849 if( st.code == suRetry )
850 {
852 return st;
853 }
854
855 pTlsHandShakeOngoing = false;
856 log->Info( AsyncSockMsg, "[%s] TLS hand-shake done.", pStreamName.c_str() );
857
858 return st;
859 }
860
861 //----------------------------------------------------------------------------
862 // Handle read/write event if we are in the middle of a TLS hand-shake
863 //----------------------------------------------------------------------------
865 {
867 if( !st.IsOK() )
868 return false;
869 if ( st.code == suRetry )
870 return true;
871
872 return HandShakeNextStep( pTransport->HandShakeDone( pHandShakeData.get(),
873 *pChannelData ) );
874 }
875
876 //----------------------------------------------------------------------------
877 // Prepare a HS writer for sending and enable uplink
878 //----------------------------------------------------------------------------
880 {
881 if( !hswriter )
882 {
884 "HS writer object missing!" ) );
885 return false;
886 }
887 //--------------------------------------------------------------------------
888 // We only set a new HS message if this is not a replay due to kXR_wait
889 //--------------------------------------------------------------------------
890 if( !pHSWaitSeconds )
891 {
892 hswriter->Reset( pHandShakeData->out );
893 pHandShakeData->out = nullptr;
894 }
895 //--------------------------------------------------------------------------
896 // otherwise we replay the kXR_endsess request
897 //--------------------------------------------------------------------------
898 else
899 hswriter->Replay();
900 //--------------------------------------------------------------------------
901 // Enable writing so we can replay the HS message
902 //--------------------------------------------------------------------------
903 XRootDStatus st;
904 if( !(st = EnableUplink()).IsOK() )
905 {
907 return false;
908 }
909 return true;
910 }
911
913 {
914 // It would be more coherent if this could be done in the
915 // transport layer, unfortunately the API does not allow it.
916 kXR_int32 waitSeconds = -1;
918 if( rsp->hdr.status == kXR_wait )
919 waitSeconds = rsp->body.wait.seconds;
920 return waitSeconds;
921 }
922
923 //----------------------------------------------------------------------------
924 // Check if HS wait time elapsed
925 //----------------------------------------------------------------------------
927 {
928 time_t now = time( 0 );
929 if( now - pHSWaitStarted >= pHSWaitSeconds )
930 {
931 Log *log = DefaultEnv::GetLog();
932 log->Debug( AsyncSockMsg, "[%s] The hand-shake wait time elapsed, will "
933 "replay the endsess request.", pStreamName.c_str() );
934 if( !SendHSMsg() )
935 return false;
936 //------------------------------------------------------------------------
937 // Make sure the wait state is reset
938 //------------------------------------------------------------------------
939 pHSWaitSeconds = 0;
940 pHSWaitStarted = 0;
941 }
942 return true;
943 }
944
945 //------------------------------------------------------------------------
946 // Get the IP stack
947 //------------------------------------------------------------------------
949 {
950 std::string ipstack( ( pSockAddr.isIPType( XrdNetAddr::IPType::IPv6 ) &&
951 !pSockAddr.isMapped() ) ? "IPv6" : "IPv4" );
952 return ipstack;
953 }
954
955 //------------------------------------------------------------------------
956 // Get IP address
957 //------------------------------------------------------------------------
959 {
960 char nameBuff[256];
961 pSockAddr.Format( nameBuff, sizeof(nameBuff), XrdNetAddrInfo::fmtAddr, XrdNetAddrInfo::noPort );
962 return nameBuff;
963 }
964
965 //------------------------------------------------------------------------
967 //------------------------------------------------------------------------
969 {
970 const char *cstr = pSockAddr.Name();
971 if( !cstr )
972 return std::string();
973 return cstr;
974 }
975}
union ServerResponse::@040373375333017131300127053271011057331004327334 body
@ kXR_wait
Definition XProtocol.hh:947
ServerResponseHeader hdr
int kXR_int32
Definition XPtypes.hh:89
#define likely(x)
#define unlikely(x)
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104
Utility class encapsulating reading hand-shake response logic.
Utility class encapsulating writing hand-shake request logic.
Utility class encapsulating reading response message logic.
Utility class encapsulating writing request logic.
std::shared_ptr< Channel > pOpenChannel
static std::string ToStreamName(const URL &url, uint16_t strmnb)
Convert Stream object and sub-stream number to stream name.
bool OnReadTimeout() XRD_WARN_UNUSED_RESULT
std::unique_ptr< AsyncHSWriter > hswriter
virtual bool OnConnectionReturn() XRD_WARN_UNUSED_RESULT
bool OnWriteTimeout() XRD_WARN_UNUSED_RESULT
bool OnWrite() XRD_WARN_UNUSED_RESULT
bool OnTimeoutWhileHandshaking() XRD_WARN_UNUSED_RESULT
bool CheckHSWait() XRD_WARN_UNUSED_RESULT
bool EventRead(uint8_t type) XRD_WARN_UNUSED_RESULT
std::unique_ptr< AsyncHSReader > hsreader
bool HandleHandShake(std::unique_ptr< Message > msg) XRD_WARN_UNUSED_RESULT
bool OnWriteWhileHandshaking() XRD_WARN_UNUSED_RESULT
void OnFaultWhileHandshaking(XRootDStatus st)
virtual void Event(uint8_t type, XrdCl::Socket *)
Handle a socket event.
kXR_int32 HandleWaitRsp(Message *rsp)
bool HandShakeNextStep(bool done) XRD_WARN_UNUSED_RESULT
bool OnReadWhileHandshaking() XRD_WARN_UNUSED_RESULT
std::unique_ptr< AsyncMsgWriter > reqwriter
XRootDStatus EnableUplink()
Enable uplink.
std::string GetIpStack() const
Get the IP stack.
bool EventWrite(uint8_t type) XRD_WARN_UNUSED_RESULT
std::string GetHostName()
Get hostname.
std::unique_ptr< AsyncMsgReader > rspreader
XRootDStatus DisableUplink()
Disable uplink.
std::unique_ptr< HandShakeData > pHandShakeData
bool OnRead() XRD_WARN_UNUSED_RESULT
XRootDStatus Connect(time_t timeout)
Connect to the currently set address.
AsyncSocketHandler(const URL &url, Poller *poller, TransportHandler *transport, AnyObject *channelData, uint16_t subStreamNum, Stream *strm)
Constructors.
bool OnTLSHandShake() XRD_WARN_UNUSED_RESULT
bool SendHSMsg() XRD_WARN_UNUSED_RESULT
std::string GetIpAddr()
Get IP address.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
The message representation used throughout the system.
Interface for socket pollers.
@ ReadTimeOut
Read timeout.
@ ReadyToWrite
Writing won't block.
@ WriteTimeOut
Write timeout.
@ ReadyToRead
New data has arrived.
A network socket.
@ Connected
The socket is connected.
@ Connecting
The connection process is in progress.
Perform the handshake and the authentication for each physical stream.
URL representation.
Definition XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port).
Definition XrdClURL.hh:99
static const int noPort
Do not add port number.
@ fmtAddr
Address using suitable ipv4 or ipv6 format.
const uint16_t suRetry
const uint16_t errSocketOptError
const int DefaultTCPKeepAliveProbes
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t errPollerError
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errSocketTimeout
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const int DefaultTimeoutResolution
const uint64_t AsyncSockMsg
const int DefaultTCPKeepAliveInterval
const int DefaultTCPKeepAlive
const uint16_t errConnectionError
const int DefaultTCPKeepAliveTime
const uint16_t errSocketError
const uint16_t errCorruptedHeader
const uint16_t suDone
Data structure that carries the handshake information.
uint16_t code
Error type, or additional hints on what to do.
uint16_t status
Status of the execution.
bool IsOK() const
We're fine.
std::string ToString() const
Create a string representation.
uint32_t errNo
Errno, if any.