31 const std::string &url,
struct timespec timeout,
const std::pair<uint64_t, uint64_t> &op,
34 CurlOperation(handler, url, timeout, logger, callout, header_callout),
35 m_default_handler(default_handler),
44 if (op.get() !=
this) {
54 if (!m_prefetch_buffer.empty()) {
55 auto prefetch_remaining = m_prefetch_buffer.size() - m_prefetch_buffer_offset;
56 auto to_copy = prefetch_remaining > buffer_size ? buffer_size : prefetch_remaining;
58 memcpy(buffer, m_prefetch_buffer.data() + m_prefetch_buffer_offset, to_copy);
59 m_prefetch_buffer_offset += to_copy;
60 if (m_prefetch_buffer_offset == m_prefetch_buffer.size()) {
61 m_prefetch_buffer.clear();
62 m_prefetch_buffer_offset = 0;
95 if ((rc = curl_easy_pause(
m_curl.get(), CURLPAUSE_CONT)) != CURLE_OK) {
108 curl_easy_setopt(
m_curl.get(), CURLOPT_WRITEFUNCTION, CurlReadOp::WriteCallback);
109 curl_easy_setopt(
m_curl.get(), CURLOPT_WRITEDATA,
this);
113 if (
m_op.second == 0) {
117 if (
m_op.second >= 1024*1024) {
118 curl_easy_setopt(curl, CURLOPT_BUFFERSIZE, 128*1024);
120 else if (
m_op.second >= 256*1024) {
121 curl_easy_setopt(curl, CURLOPT_BUFFERSIZE, 64*1024);
123 else if (
m_op.second >= 128*1024) {
124 curl_easy_setopt(curl, CURLOPT_BUFFERSIZE, 32*1024);
128 if (
m_op.second != UINT64_MAX) {
129 auto range_req =
"bytes=" + std::to_string(
m_op.first) +
"-" + std::to_string(
m_op.first +
m_op.second - 1);
139 std::string custom_msg = msg;
141 if (
m_handler ==
nullptr && m_default_handler ==
nullptr) {
return;}
142 if (!custom_msg.empty()) {
143 m_logger->Debug(
kLogXrdClHttp,
"curl operation at offset %llu failed with message: %s%s",
static_cast<long long unsigned>(
m_op.first), msg.c_str(),
m_err_msg.empty() ?
"" : (
", server message: " +
m_err_msg).c_str());
144 custom_msg +=
" (read operation at offset " + std::to_string(
static_cast<long long unsigned>(
m_op.first)) +
")";
146 m_logger->Debug(
kLogXrdClHttp,
"curl operation at offset %llu failed with status code %d%s",
static_cast<long long unsigned>(
m_op.first), errNum,
m_err_msg.empty() ?
"" : (
", server message: " +
m_err_msg).c_str());
151 if (handle) handle->HandleResponse(status,
nullptr);
152 else m_default_handler->HandleResponse(status,
nullptr);
156CurlReadOp::DeliverResponse()
165 obj->Set(chunk_info);
198 obj->Set(chunk_info);
201 handle->HandleResponse(status, obj);
207 if (
m_curl ==
nullptr)
return;
208 curl_easy_setopt(
m_curl.get(), CURLOPT_WRITEFUNCTION,
nullptr);
209 curl_easy_setopt(
m_curl.get(), CURLOPT_WRITEDATA,
nullptr);
210 curl_easy_setopt(
m_curl.get(), CURLOPT_HTTPHEADER,
nullptr);
211 curl_easy_setopt(
m_curl.get(), CURLOPT_OPENSOCKETFUNCTION,
nullptr);
212 curl_easy_setopt(
m_curl.get(), CURLOPT_OPENSOCKETDATA,
nullptr);
213 curl_easy_setopt(
m_curl.get(), CURLOPT_SOCKOPTFUNCTION,
nullptr);
214 curl_easy_setopt(
m_curl.get(), CURLOPT_SOCKOPTDATA,
nullptr);
219CurlReadOp::WriteCallback(
char *buffer,
size_t size,
size_t nitems,
void *this_ptr)
221 return static_cast<CurlReadOp*
>(this_ptr)->Write(buffer, size * nitems);
225CurlReadOp::Write(
char *buffer,
size_t length)
255 return CURL_WRITEFUNC_PAUSE;
259 auto larger_than_result_buffer = length > output_remaining;
260 auto to_copy = larger_than_result_buffer ? output_remaining : length;
267 }
else if (larger_than_result_buffer) {
268 auto input_remaining = length - output_remaining;
269 m_prefetch_buffer.append(buffer + to_copy, input_remaining);
270 m_prefetch_buffer_offset = 0;
282 std::vector<uint32_t> cksums;
285 cksums.reserve(nbpages);
289 for (
size_t pg=0; pg<nbpages; ++pg)
292 if (pgsize > size) pgsize = size;
303 handle->HandleResponse(status, obj);
void SetDone(bool has_failed)
int FailCallback(XErrorCode ecode, const std::string &emsg)
std::unique_ptr< CURL, void(*)(CURL *)> m_curl
virtual void ReleaseHandle()
void UpdateBytes(uint64_t bytes)
std::vector< std::pair< std::string, std::string > > m_headers_list
XrdCl::ResponseHandler * m_handler
CurlOperation(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout, XrdCl::Log *log, CreateConnCalloutType, HeaderCallout *header_callout)
void SetPaused(bool paused)
virtual bool Setup(CURL *curl, CurlWorker &)
void Fail(uint16_t errCode, uint32_t errNum, const std::string &msg) override
std::pair< uint64_t, uint64_t > m_op
bool Setup(CURL *curl, CurlWorker &) override
std::shared_ptr< XrdClHttp::HandlerQueue > m_continue_queue
bool ContinueHandle() override
CurlReadOp(XrdCl::ResponseHandler *handler, std::shared_ptr< XrdCl::ResponseHandler > default_handler, const std::string &url, struct timespec timeout, const std::pair< uint64_t, uint64_t > &op, char *buffer, size_t sz, XrdCl::Log *logger, CreateConnCalloutType callout, HeaderCallout *header_callout)
bool Continue(std::shared_ptr< CurlOperation > op, XrdCl::ResponseHandler *handler, char *buffer, size_t buffer_size)
void ReleaseHandle() override
Handle an async response.
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
ConnectionCallout *(*)(const std::string &, const ResponseInfo &) CreateConnCalloutType
const uint64_t kLogXrdClHttp
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errInternal
Internal error.
static const int PageSize
Describe a data chunk for vector read.