XRootD
Loading...
Searching...
No Matches
XrdClHttpWorker.hh
Go to the documentation of this file.
1/******************************************************************************/
2/* Copyright (C) 2025, Pelican Project, Morgridge Institute for Research */
3/* */
4/* This file is part of the XrdClHttp client plugin for XRootD. */
5/* */
6/* XRootD is free software: you can redistribute it and/or modify it under */
7/* the terms of the GNU Lesser General Public License as published by the */
8/* Free Software Foundation, either version 3 of the License, or (at your */
9/* option) any later version. */
10/* */
11/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
12/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
13/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
14/* License for more details. */
15/* */
16/* The copyright holder's institutional names and contributor's names may not */
17/* be used to endorse or promote products derived from this software without */
18/* specific prior written permission of the institution or contributor. */
19/******************************************************************************/
20
21#ifndef XRDCLHTTPWORKER_HH
22#define XRDCLHTTPWORKER_HH
23
24#include "XrdClHttpOps.hh"
25
26#include <array>
27#include <atomic>
28#include <chrono>
29#include <condition_variable>
30#include <memory>
31#include <mutex>
32#include <unordered_map>
33#include <unordered_set>
34
35typedef void CURL;
36
37namespace XrdCl {
38
39class Env;
40class Log;
41class ResponseHandler;
42class URL;
43
44}
45
46namespace XrdClHttp {
47
48class HandlerQueue;
49class VerbsCache;
50
52public:
53 CurlWorker(std::shared_ptr<HandlerQueue> queue, VerbsCache &cache, XrdCl::Log* logger);
54
55 CurlWorker(const CurlWorker &) = delete;
56
57 void Run();
58 static void RunStatic(CurlWorker *myself);
59
60 // Passes some initial values to the worker so it can start
61 void Start(std::unique_ptr<XrdClHttp::CurlWorker> self, std::thread tid);
62
63 // Returns the configured X509 client certificate and key file name
64 std::tuple<std::string, std::string> ClientX509CertKeyFile() const;
65
66 // Change the period (in seconds) for queue maintenance.
67 //
68 // Defaults to 5 seconds; smaller values are convenient for unit tests.
69 static void SetMaintenancePeriod(unsigned maint) {
70 m_maintenance_period.store(maint, std::memory_order_relaxed);
71 }
72
73 static std::string GetMonitoringJson();
74
75private:
76 // Invoked by the destructor of one of our static members. This triggers when
77 // the plugin is unloaded, triggers the shutdown of each of the worker threads.
78 static void ShutdownAll();
79
80 // Invoked by ShutdownAll, kills off the current object's thread
81 void Shutdown();
82
83 // A list of all known worker threads -- used to shutdown the process
84 static std::vector<std::unique_ptr<XrdClHttp::CurlWorker>> m_workers;
85 // Protects the data in m_workers
86 static std::mutex m_workers_mutex;
87
88 std::chrono::steady_clock::time_point m_last_prefix_log;
89 VerbsCache &m_cache; // Cache mapping server URLs to list of selected HTTP verbs.
90 std::shared_ptr<HandlerQueue> m_queue;
91
92 // Queue for operations that can be unpaused.
93 // Paused operations occur when a PUT is started but cannot be continued
94 // because more data is needed from the caller.
95 std::shared_ptr<HandlerQueue> m_continue_queue;
96
97 std::unordered_map<CURL*, std::pair<std::shared_ptr<CurlOperation>, std::chrono::system_clock::time_point>> m_op_map;
98 XrdCl::Log* m_logger;
99 std::string m_x509_client_cert_file;
100 std::string m_x509_client_key_file;
101
102 const static unsigned m_max_ops{20};
103 static std::atomic<unsigned> m_maintenance_period;
104
105 // File descriptor pair indicating shutdown is requested.
106 int m_shutdown_pipe_r{-1};
107 int m_shutdown_pipe_w{-1};
108 // Mutex for managing the startup of a worker
109 std::mutex m_start_lock;
110 // Condition variable for a worker to indicate to RunStatic that it is ready
111 std::condition_variable m_start_complete_cv;
112 // Flag indicating that Start has been called.
113 bool m_start_complete{false};
114 // The worker's thread object
115 std::thread m_self_tid;
116
117 // Monitoring statistics
118 struct OpStats {
119 std::atomic<uint64_t> m_conncall_timeout{}; // Timeout due to the connection callout mechanism
120 std::atomic<uint64_t> m_client_timeout{};
121 std::atomic<std::chrono::system_clock::duration::rep> m_duration{};
122 std::atomic<uint64_t> m_error{};
123 std::atomic<uint64_t> m_finished{};
124 std::atomic<std::chrono::steady_clock::duration::rep> m_pause_duration{};
125 std::atomic<uint64_t> m_started{};
126 std::atomic<uint64_t> m_server_timeout{};
127 std::atomic<uint64_t> m_bytes{};
128 };
129
130 enum class OpKind {
131 ConncallTimeout,
132 ClientTimeout,
133 Error,
134 Finish,
135 Start,
136 ServerTimeout,
137 Update
138 };
139 void OpRecord(XrdClHttp::CurlOperation &op, OpKind);
140
141 static std::atomic<uint64_t> m_conncall_errors;
142 static std::atomic<uint64_t> m_conncall_req;
143 static std::atomic<uint64_t> m_conncall_success;
144 static std::atomic<uint64_t> m_conncall_timeout;
145 static std::array<std::array<OpStats, 403>, static_cast<size_t>(XrdClHttp::CurlOperation::HttpVerb::Count)> m_ops;
146 std::atomic<std::chrono::system_clock::rep> m_last_completed_cycle;
147 std::atomic<std::chrono::system_clock::rep> m_oldest_op;
148
149 // Vector tracking known worker statistics.
150 static std::vector<std::atomic<std::chrono::system_clock::rep>*> m_workers_last_completed_cycle;
151 static std::vector<std::atomic<std::chrono::system_clock::rep>*> m_workers_oldest_op;
152 size_t m_stats_offset{0};
153 static std::mutex m_worker_stats_mutex;
154
155 // shutdown trigger
156 static struct initcontrol {
157 initcontrol();
158 ~initcontrol();
159 } m_initcontrol;
160};
161
162}
163
164#endif // XRDCLHTTPWORKER_HH
void CURL
std::tuple< std::string, std::string > ClientX509CertKeyFile() const
CurlWorker(std::shared_ptr< HandlerQueue > queue, VerbsCache &cache, XrdCl::Log *logger)
CurlWorker(const CurlWorker &)=delete
static void RunStatic(CurlWorker *myself)
void Start(std::unique_ptr< XrdClHttp::CurlWorker > self, std::thread tid)
static void SetMaintenancePeriod(unsigned maint)
static std::string GetMonitoringJson()
Handle diagnostics.
Definition XrdClLog.hh:101
Handle an async response.
URL representation.
Definition XrdClURL.hh:31