XRootD
Loading...
Searching...
No Matches
XrdClStream.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//------------------------------------------------------------------------------
18
19#ifndef __XRD_CL_STREAM_HH__
20#define __XRD_CL_STREAM_HH__
21
22#include "XrdCl/XrdClPoller.hh"
23#include "XrdCl/XrdClStatus.hh"
24#include "XrdCl/XrdClURL.hh"
28#include "XrdCl/XrdClInQueue.hh"
29#include "XrdCl/XrdClUtils.hh"
30
33#include "XrdNet/XrdNetAddr.hh"
35#include <list>
36#include <vector>
37#include <functional>
38#include <memory>
39
40#include <assert.h>
41
42namespace XrdCl
43{
44 class Message;
45 class Channel;
46 class TransportHandler;
47 class TaskManager;
48 struct SubStreamData;
49
50 //----------------------------------------------------------------------------
60 //----------------------------------------------------------------------------
62 {
63 public:
64 StreamMutex(): mcv(0), hasfn(false) { }
66 {
67 assert( mlist.empty() );
68 assert( mclosing.empty() );
69 assert( mthmap.empty() );
70 assert( hasfn == false );
71 }
72
73 //------------------------------------------------------------------------
75 //------------------------------------------------------------------------
76 void AddClosing( uint16_t subStream );
77
78 //------------------------------------------------------------------------
80 //------------------------------------------------------------------------
81 void RemoveClosing( uint16_t subStream );
82
83 //------------------------------------------------------------------------
85 //------------------------------------------------------------------------
86 void Lock();
87
88 //------------------------------------------------------------------------
90 //------------------------------------------------------------------------
91 void Lock( uint16_t subStream, bool &isclosing );
92
93 //------------------------------------------------------------------------
98 //------------------------------------------------------------------------
99 void Lock( const std::function<void()> &func, bool &isclosing );
100
101 //------------------------------------------------------------------------
103 //------------------------------------------------------------------------
104 void UnLock();
105
106 struct MtxInfo
107 {
108 MtxInfo(): cnt( 0 ) { }
109 MtxInfo( const std::function<void()> &func ): cnt( 0 ), fn( func ) { }
111
112 size_t cnt;
113 std::function<void()> fn;
114 };
115
117 std::list<MtxInfo> mlist;
118 std::map<uint16_t, size_t> mclosing;
119 std::map<pthread_t, std::list<MtxInfo>::iterator> mthmap;
120 bool hasfn;
121 std::list<MtxInfo>::iterator fnlistit;
122 };
123
124 //----------------------------------------------------------------------------
128 //----------------------------------------------------------------------------
130 {
131 public:
133 {
134 mtx->Lock();
135 }
136
137 StreamMutexHelper( StreamMutex &sm, uint16_t idx,
138 bool &isclosing ): mtx( &sm )
139 {
140 mtx->Lock( idx, isclosing );
141 if( isclosing ) mtx = nullptr;
142 }
143
144 StreamMutexHelper( StreamMutex &sm, const std::function<void()> &func,
145 bool &isclosing ): mtx( &sm )
146 {
147 mtx->Lock( func, isclosing );
148 if( isclosing ) mtx = nullptr;
149 }
150
152 {
153 UnLock();
154 }
155
156 void UnLock()
157 {
158 if( !mtx ) return;
159 mtx->UnLock();
160 mtx = nullptr;
161 }
162
164 };
165
166 //----------------------------------------------------------------------------
168 //----------------------------------------------------------------------------
169 class Stream
170 {
171 public:
172 //------------------------------------------------------------------------
174 //------------------------------------------------------------------------
182
183 //------------------------------------------------------------------------
185 //------------------------------------------------------------------------
186 Stream( const URL *url, const URL &prefer = URL() );
187
188 //------------------------------------------------------------------------
190 //------------------------------------------------------------------------
191 ~Stream();
192
193 //------------------------------------------------------------------------
195 //------------------------------------------------------------------------
197
198 //------------------------------------------------------------------------
200 //------------------------------------------------------------------------
202 MsgHandler *handler,
203 bool stateful,
204 time_t expires );
205
206 //------------------------------------------------------------------------
208 //------------------------------------------------------------------------
210 {
211 pTransport = transport;
212 }
213
214 //------------------------------------------------------------------------
216 //------------------------------------------------------------------------
217 void SetPoller( Poller *poller )
218 {
219 pPoller = poller;
220 }
221
222 //------------------------------------------------------------------------
224 //------------------------------------------------------------------------
225 void SetIncomingQueue( InQueue *incomingQueue )
226 {
227 pIncomingQueue = incomingQueue;
228 }
229
230 //------------------------------------------------------------------------
232 //------------------------------------------------------------------------
233 void SetChannel( std::weak_ptr<Channel> &channel )
234 {
235 pChannel = channel;
236 }
237
238 //------------------------------------------------------------------------
240 //------------------------------------------------------------------------
241 void SetChannelData( AnyObject *channelData )
242 {
243 pChannelData = channelData;
244 }
245
246 //------------------------------------------------------------------------
248 //------------------------------------------------------------------------
249 void SetTaskManager( TaskManager *taskManager )
250 {
251 pTaskManager = taskManager;
252 }
253
254 //------------------------------------------------------------------------
256 //------------------------------------------------------------------------
257 void SetJobManager( JobManager *jobManager )
258 {
259 pJobManager = jobManager;
260 }
261
262 //------------------------------------------------------------------------
266 //------------------------------------------------------------------------
268
269 //------------------------------------------------------------------------
272 //------------------------------------------------------------------------
273 void Finalize();
274
275 //------------------------------------------------------------------------
278 //------------------------------------------------------------------------
279 void Tick( time_t now );
280
281 //------------------------------------------------------------------------
283 //------------------------------------------------------------------------
284 const URL *GetURL() const
285 {
286 return pUrl;
287 }
288
289 //------------------------------------------------------------------------
291 //------------------------------------------------------------------------
292 void ForceConnect();
293
294 //------------------------------------------------------------------------
296 //------------------------------------------------------------------------
297 const std::string &GetName() const
298 {
299 return pStreamName;
300 }
301
302 //------------------------------------------------------------------------
304 //------------------------------------------------------------------------
305 void DisableIfEmpty( uint16_t subStream );
306
307 //------------------------------------------------------------------------
309 //------------------------------------------------------------------------
310 void OnIncoming( uint16_t subStream,
311 std::shared_ptr<Message> msg,
312 uint32_t bytesReceived );
313
314 //------------------------------------------------------------------------
315 // Call when one of the sockets is ready to accept a new message
316 //------------------------------------------------------------------------
317 std::pair<Message *, MsgHandler *>
318 OnReadyToWrite( uint16_t subStream );
319
320 //------------------------------------------------------------------------
321 // Call when a message is written to the socket
322 //------------------------------------------------------------------------
323 void OnMessageSent( uint16_t subStream,
324 Message *msg,
325 uint32_t bytesSent );
326
327 //------------------------------------------------------------------------
329 //------------------------------------------------------------------------
330 void OnConnect( uint16_t subStream );
331
332 //------------------------------------------------------------------------
334 //------------------------------------------------------------------------
335 void OnConnectError( uint16_t subStream, XRootDStatus status );
336
337 //------------------------------------------------------------------------
339 //------------------------------------------------------------------------
340 void OnError( uint16_t subStream, XRootDStatus status );
341
342 //------------------------------------------------------------------------
344 //------------------------------------------------------------------------
345 void ForceError( XRootDStatus status, const bool hush,
346 const uint64_t sess );
347
348 //------------------------------------------------------------------------
350 //------------------------------------------------------------------------
351 bool OnReadTimeout( uint16_t subStream ) XRD_WARN_UNUSED_RESULT;
352
353 //------------------------------------------------------------------------
355 //------------------------------------------------------------------------
356 bool OnWriteTimeout( uint16_t subStream ) XRD_WARN_UNUSED_RESULT;
357
358 //------------------------------------------------------------------------
360 //------------------------------------------------------------------------
362
363 //------------------------------------------------------------------------
365 //------------------------------------------------------------------------
367
368 //------------------------------------------------------------------------
377 //------------------------------------------------------------------------
379 InstallIncHandler( std::shared_ptr<Message> &msg, uint16_t stream );
380
381 //------------------------------------------------------------------------
385 //------------------------------------------------------------------------
386 uint16_t InspectStatusRsp( uint16_t stream, MsgHandler *&incHandler );
387
388 //------------------------------------------------------------------------
390 //------------------------------------------------------------------------
391 void SetOnDataConnectHandler( std::shared_ptr<Job> &onConnJob )
392 {
393 StreamMutexHelper scopedLock( pMutex );
394 pOnDataConnJob = onConnJob;
395 }
396
397 //------------------------------------------------------------------------
400 //------------------------------------------------------------------------
401 bool CanCollapse( const URL &url );
402
403 //------------------------------------------------------------------------
405 //------------------------------------------------------------------------
406 Status Query( uint16_t query, AnyObject &result );
407
408 //------------------------------------------------------------------------
413 //------------------------------------------------------------------------
414 std::shared_ptr<Channel> GetChannel()
415 {
416 return pChannel.lock();
417 }
418
419 private:
420
421 //------------------------------------------------------------------------
423 //------------------------------------------------------------------------
424 static bool IsPartial( Message &msg );
425
426 //------------------------------------------------------------------------
428 //------------------------------------------------------------------------
429 inline static bool HasNetAddr( const XrdNetAddr &addr,
430 std::vector<XrdNetAddr> &addresses )
431 {
432 auto itr = addresses.begin();
433 for( ; itr != addresses.end() ; ++itr )
434 {
435 if( itr->Same( &addr ) ) return true;
436 }
437
438 return false;
439 }
440
441 //------------------------------------------------------------------------
442 // Used under error conditions to move handlers from the out & in queue
443 // helpers back to main out queue for the subStream or the in queue.
444 //------------------------------------------------------------------------
445 void Reinsert( uint16_t subStream );
446
447 //------------------------------------------------------------------------
448 // Job handling the incoming messages
449 //------------------------------------------------------------------------
450 class HandleIncMsgJob: public Job
451 {
452 public:
453 HandleIncMsgJob( MsgHandler *handler ): pHandler( handler ) {};
454 virtual ~HandleIncMsgJob() {};
455 virtual void Run( void* )
456 {
457 pHandler->Process();
458 delete this;
459 }
460 private:
461 MsgHandler *pHandler;
462 };
463
464 //------------------------------------------------------------------------
466 //------------------------------------------------------------------------
467 void OnFatalError( uint16_t subStream,
468 XRootDStatus status,
469 StreamMutexHelper &lock );
470
471 //------------------------------------------------------------------------
473 //------------------------------------------------------------------------
474 void MonitorDisconnection( XRootDStatus status );
475
476 //------------------------------------------------------------------------
478 //------------------------------------------------------------------------
479 XRootDStatus RequestClose( Message &resp );
480
481 //------------------------------------------------------------------------
483 //------------------------------------------------------------------------
484 void SockHandlerClose( uint16_t subStream );
485
486
487 typedef std::vector<SubStreamData*> SubStreamList;
488
489 //------------------------------------------------------------------------
490 // Data members
491 //------------------------------------------------------------------------
492 const URL *pUrl;
493 const URL pPrefer;
494 std::string pStreamName;
495 TransportHandler *pTransport;
496 Poller *pPoller;
497 TaskManager *pTaskManager;
498 JobManager *pJobManager;
499 StreamMutex pMutex;
500 InQueue *pIncomingQueue;
501 AnyObject *pChannelData;
502 uint32_t pLastStreamError;
503 XRootDStatus pLastFatalError;
504 uint16_t pStreamErrorWindow;
505 uint16_t pConnectionCount;
506 uint16_t pConnectionRetry;
507 time_t pConnectionInitTime;
508 uint16_t pConnectionWindow;
509 SubStreamList pSubStreams;
510 std::vector<XrdNetAddr> pAddresses;
511 Utils::AddressType pAddressType;
512 ChannelHandlerList pChannelEvHandlers;
513 uint64_t pSessionId;
514
515 //------------------------------------------------------------------------
516 // Monitoring info
517 //------------------------------------------------------------------------
518 timeval pConnectionStarted;
519 timeval pConnectionDone;
520 std::atomic<uint64_t> pBytesSent;
521 std::atomic<uint64_t> pBytesReceived;
522
523 //------------------------------------------------------------------------
524 // Data stream on-connect handler
525 //------------------------------------------------------------------------
526 std::shared_ptr<Job> pOnDataConnJob;
527
528 //------------------------------------------------------------------------
529 // Track last assigned Id across all Streams, to ensure unique sessionId
530 //------------------------------------------------------------------------
531 static RAtomic_uint64_t sSessCntGen;
532
533 //------------------------------------------------------------------------
534 // Non owning copy of the shared_ptr PostMaster creates for our Channel
535 //------------------------------------------------------------------------
536 std::weak_ptr<Channel> pChannel;
537 };
538}
539
540#endif // __XRD_CL_STREAM_HH__
#define XRD_WARN_UNUSED_RESULT
XrdSys::RAtomic< uint64_t > RAtomic_uint64_t
A synchronize queue for incoming data.
A synchronized queue.
The message representation used throughout the system.
Interface for socket pollers.
StreamMutexHelper(StreamMutex &sm, const std::function< void()> &func, bool &isclosing)
StreamMutexHelper(StreamMutex &sm)
StreamMutexHelper(StreamMutex &sm, uint16_t idx, bool &isclosing)
void UnLock()
UnLock.
void AddClosing(uint16_t subStream)
AddClosing. Notified that subStream will be closed.
std::map< pthread_t, std::list< MtxInfo >::iterator > mthmap
std::list< MtxInfo >::iterator fnlistit
void Lock()
Lock. Regular, non-subStream aware recursive lock.
XrdSysCondVar mcv
void RemoveClosing(uint16_t subStream)
RemoveClosing. Notified that subStream close has completed.
std::map< uint16_t, size_t > mclosing
std::list< MtxInfo > mlist
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
void SetTransport(TransportHandler *transport)
Set the transport.
StreamStatus
Status of the stream.
@ Disconnected
Not connected.
@ Connected
Connected.
@ Connecting
In the process of being connected.
void SetIncomingQueue(InQueue *incomingQueue)
Set the incoming queue.
void SetChannel(std::weak_ptr< Channel > &channel)
Sets a weak_ptr of our owning Channel.
bool OnReadTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On read timeout.
bool CanCollapse(const URL &url)
void SetPoller(Poller *poller)
Set the poller.
void ForceConnect()
Force connection.
void SetTaskManager(TaskManager *taskManager)
Set task manager.
void SetOnDataConnectHandler(std::shared_ptr< Job > &onConnJob)
Set the on-connect handler for data streams.
void SetJobManager(JobManager *jobManager)
Set job manager.
XRootDStatus EnableLink(PathID &path)
Stream(const URL *url, const URL &prefer=URL())
Constructor.
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
const std::string & GetName() const
Return stream name.
void Tick(time_t now)
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
~Stream()
Destructor.
std::shared_ptr< Channel > GetChannel()
bool OnWriteTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On write timeout.
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
void OnError(uint16_t subStream, XRootDStatus status)
On error.
const URL * GetURL() const
Get the URL.
void ForceError(XRootDStatus status, const bool hush, const uint64_t sess)
Force error.
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
void SetChannelData(AnyObject *channelData)
Set the channel data.
std::pair< Message *, MsgHandler * > OnReadyToWrite(uint16_t subStream)
XRootDStatus Initialize()
Initializer.
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
Perform the handshake and the authentication for each physical stream.
URL representation.
Definition XrdClURL.hh:31
AddressType
Address type.
Definition XrdClUtils.hh:87
QueryImpl< false > Query
Procedure execution status.
std::function< void()> fn
MtxInfo(const std::function< void()> &func)