Libosmium  2.16.0
Fast and flexible C++ library for working with OpenStreetMap data
reader.hpp
Go to the documentation of this file.
1 #ifndef OSMIUM_IO_READER_HPP
2 #define OSMIUM_IO_READER_HPP
3 
4 /*
5 
6 This file is part of Osmium (https://osmcode.org/libosmium).
7 
8 Copyright 2013-2021 Jochen Topf <jochen@topf.org> and others (see README).
9 
10 Boost Software License - Version 1.0 - August 17th, 2003
11 
12 Permission is hereby granted, free of charge, to any person or organization
13 obtaining a copy of the software and accompanying documentation covered by
14 this license (the "Software") to use, reproduce, display, distribute,
15 execute, and transmit the Software, and to prepare derivative works of the
16 Software, and to permit third-parties to whom the Software is furnished to
17 do so, all subject to the following:
18 
19 The copyright notices in the Software and this entire statement, including
20 the above license grant, this restriction and the following disclaimer,
21 must be included in all copies of the Software, in whole or in part, and
22 all derivative works of the Software, unless such copies or derivative
23 works are solely in the form of machine-executable object code generated by
24 a source language processor.
25 
26 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
27 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
28 FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
29 SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
30 FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
31 ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
32 DEALINGS IN THE SOFTWARE.
33 
34 */
35 
37 #include <osmium/io/detail/input_format.hpp>
38 #include <osmium/io/detail/queue_util.hpp>
39 #include <osmium/io/detail/read_thread.hpp>
40 #include <osmium/io/detail/read_write.hpp>
41 #include <osmium/io/error.hpp>
42 #include <osmium/io/file.hpp>
43 #include <osmium/io/header.hpp>
44 #include <osmium/memory/buffer.hpp>
46 #include <osmium/thread/pool.hpp>
47 #include <osmium/thread/util.hpp>
48 #include <osmium/util/config.hpp>
49 
50 #include <cerrno>
51 #include <cstdlib>
52 #include <fcntl.h>
53 #include <future>
54 #include <memory>
55 #include <string>
56 #include <system_error>
57 #include <thread>
58 #include <utility>
59 
60 #ifndef _WIN32
61 # include <sys/wait.h>
62 #endif
63 
64 #ifndef _MSC_VER
65 # include <unistd.h>
66 #endif
67 
68 namespace osmium {
69 
70  namespace io {
71 
72  namespace detail {
73 
74  inline std::size_t get_input_queue_size() noexcept {
75  return osmium::config::get_max_queue_size("INPUT", 20);
76  }
77 
78  inline std::size_t get_osmdata_queue_size() noexcept {
79  return osmium::config::get_max_queue_size("OSMDATA", 20);
80  }
81 
82  } // namespace detail
83 
90  class Reader {
91 
92  // The Reader::read() function reads from a queue of buffers which
93  // can contain nested buffers. These nested buffers will be in
94  // here, because read() can only return a single unnested buffer.
95  osmium::memory::Buffer m_back_buffers{};
96 
98 
99  osmium::thread::Pool* m_pool = nullptr;
100 
101  detail::ParserFactory::create_parser_type m_creator;
102 
103  enum class status {
104  okay = 0, // normal reading
105  error = 1, // some error occurred while reading
106  closed = 2, // close() called
107  eof = 3 // eof of file was reached without error
108  } m_status = status::okay;
109 
110  int m_childpid = 0;
111 
112  detail::future_string_queue_type m_input_queue;
113 
114  std::unique_ptr<osmium::io::Decompressor> m_decompressor;
115 
116  osmium::io::detail::ReadThreadManager m_read_thread_manager;
117 
118  detail::future_buffer_queue_type m_osmdata_queue;
119  detail::queue_wrapper<osmium::memory::Buffer> m_osmdata_queue_wrapper;
120 
121  std::future<osmium::io::Header> m_header_future{};
122  osmium::io::Header m_header{};
123 
125 
126  std::size_t m_file_size = 0;
127 
130 
131  void set_option(osmium::thread::Pool& pool) noexcept {
132  m_pool = &pool;
133  }
134 
136  m_read_which_entities = value;
137  }
138 
139  void set_option(osmium::io::read_meta value) noexcept {
140  m_read_metadata = value;
141  }
142 
143  // This function will run in a separate thread.
145  const detail::ParserFactory::create_parser_type& creator,
146  detail::future_string_queue_type& input_queue,
147  detail::future_buffer_queue_type& osmdata_queue,
148  std::promise<osmium::io::Header>&& header_promise,
149  osmium::osm_entity_bits::type read_which_entities,
150  osmium::io::read_meta read_metadata) {
151  std::promise<osmium::io::Header> promise{std::move(header_promise)};
152  osmium::io::detail::parser_arguments args = {
153  pool,
154  input_queue,
155  osmdata_queue,
156  promise,
157  read_which_entities,
158  read_metadata
159  };
160  creator(args)->parse();
161  }
162 
163 #ifndef _WIN32
164 
175  static int execute(const std::string& command, const std::string& filename, int* childpid) {
176  int pipefd[2];
177  if (pipe(pipefd) < 0) {
178  throw std::system_error{errno, std::system_category(), "opening pipe failed"};
179  }
180  const pid_t pid = fork();
181  if (pid < 0) {
182  throw std::system_error{errno, std::system_category(), "fork failed"};
183  }
184  if (pid == 0) { // child
185  // close all file descriptors except one end of the pipe
186  for (int i = 0; i < 32; ++i) {
187  if (i != pipefd[1]) {
188  ::close(i);
189  }
190  }
191  if (dup2(pipefd[1], 1) < 0) { // put end of pipe as stdout/stdin
192  exit(1);
193  }
194 
195  ::open("/dev/null", O_RDONLY); // stdin
196  ::open("/dev/null", O_WRONLY); // stderr
197  // hack: -g switches off globbing in curl which allows [] to be used in file names
198  // this is important for XAPI URLs
199  // in theory this execute() function could be used for other commands, but it is
200  // only used for curl at the moment, so this is okay.
201  if (::execlp(command.c_str(), command.c_str(), "-g", filename.c_str(), nullptr) < 0) {
202  exit(1);
203  }
204  }
205  // parent
206  *childpid = pid;
207  ::close(pipefd[1]);
208  return pipefd[0];
209  }
210 #endif
211 
220  static int open_input_file_or_url(const std::string& filename, int* childpid) {
221  const std::string protocol{filename.substr(0, filename.find_first_of(':'))};
222  if (protocol == "http" || protocol == "https" || protocol == "ftp" || protocol == "file") {
223 #ifndef _WIN32
224  return execute("curl", filename, childpid);
225 #else
226  throw io_error{"Reading OSM files from the network currently not supported on Windows."};
227 #endif
228  }
229  return osmium::io::detail::open_for_reading(filename);
230  }
231 
232  public:
233 
262  template <typename... TArgs>
263  explicit Reader(const osmium::io::File& file, TArgs&&... args) :
264  m_file(file.check()),
265  m_creator(detail::ParserFactory::instance().get_creator_function(m_file)),
266  m_input_queue(detail::get_input_queue_size(), "raw_input"),
267  m_decompressor(m_file.buffer() ?
268  osmium::io::CompressionFactory::instance().create_decompressor(file.compression(), m_file.buffer(), m_file.buffer_size()) :
269  osmium::io::CompressionFactory::instance().create_decompressor(file.compression(), open_input_file_or_url(m_file.filename(), &m_childpid))),
270  m_read_thread_manager(*m_decompressor, m_input_queue),
271  m_osmdata_queue(detail::get_osmdata_queue_size(), "parser_results"),
272  m_osmdata_queue_wrapper(m_osmdata_queue),
273  m_file_size(m_decompressor->file_size()) {
274 
275  (void)std::initializer_list<int>{
276  (set_option(args), 0)...
277  };
278 
279  if (!m_pool) {
280  m_pool = &thread::Pool::default_instance();
281  }
282 
283  std::promise<osmium::io::Header> header_promise;
284  m_header_future = header_promise.get_future();
285  m_thread = osmium::thread::thread_handler{parser_thread, std::ref(*m_pool), std::ref(m_creator), std::ref(m_input_queue), std::ref(m_osmdata_queue), std::move(header_promise), m_read_which_entities, m_read_metadata};
286  }
287 
288  template <typename... TArgs>
289  explicit Reader(const std::string& filename, TArgs&&... args) :
290  Reader(osmium::io::File(filename), std::forward<TArgs>(args)...) {
291  }
292 
293  template <typename... TArgs>
294  explicit Reader(const char* filename, TArgs&&... args) :
295  Reader(osmium::io::File(filename), std::forward<TArgs>(args)...) {
296  }
297 
298  Reader(const Reader&) = delete;
299  Reader& operator=(const Reader&) = delete;
300 
301  Reader(Reader&&) = delete;
302  Reader& operator=(Reader&&) = delete;
303 
304  ~Reader() noexcept {
305  try {
306  close();
307  } catch (...) {
308  // Ignore any exceptions because destructor must not throw.
309  }
310  }
311 
320  void close() {
321  m_status = status::closed;
322 
323  m_read_thread_manager.stop();
324 
325  m_osmdata_queue_wrapper.drain();
326 
327  try {
328  m_read_thread_manager.close();
329  } catch (...) {
330  // Ignore any exceptions.
331  }
332 
333 #ifndef _WIN32
334  if (m_childpid) {
335  int status = 0;
336  const pid_t pid = ::waitpid(m_childpid, &status, 0);
337 #pragma GCC diagnostic push
338 #pragma GCC diagnostic ignored "-Wold-style-cast"
339  if (pid < 0 || !WIFEXITED(status) || WEXITSTATUS(status) != 0) { // NOLINT(hicpp-signed-bitwise)
340  throw std::system_error{errno, std::system_category(), "subprocess returned error"};
341  }
342 #pragma GCC diagnostic pop
343  m_childpid = 0;
344  }
345 #endif
346  }
347 
355  if (m_status == status::error) {
356  throw io_error{"Can not get header from reader when in status 'error'"};
357  }
358 
359  try {
360  if (m_header_future.valid()) {
361  m_header = m_header_future.get();
362  }
363  } catch (...) {
364  close();
365  m_status = status::error;
366  throw;
367  }
368 
369  return m_header;
370  }
371 
381  osmium::memory::Buffer buffer;
382 
383  // If there are buffers on the stack, return those first.
384  if (m_back_buffers) {
385  if (m_back_buffers.has_nested_buffers()) {
386  buffer = std::move(*m_back_buffers.get_last_nested());
387  } else {
388  buffer = std::move(m_back_buffers);
389  m_back_buffers = osmium::memory::Buffer{};
390  }
391  return buffer;
392  }
393 
394  if (m_status != status::okay) {
395  throw io_error{"Can not read from reader when in status 'closed', 'eof', or 'error'"};
396  }
397 
398  if (m_read_which_entities == osmium::osm_entity_bits::nothing) {
399  m_status = status::eof;
400  return buffer;
401  }
402 
403  try {
404  // m_input_format.read() can return an invalid buffer to signal EOF,
405  // or a valid buffer with or without data. A valid buffer
406  // without data is not an error, it just means we have to
407  // keep getting the next buffer until there is one with data.
408  while (true) {
409  buffer = m_osmdata_queue_wrapper.pop();
410  if (detail::at_end_of_data(buffer)) {
411  m_status = status::eof;
412  m_read_thread_manager.close();
413  return buffer;
414  }
415  if (buffer.has_nested_buffers()) {
416  m_back_buffers = std::move(buffer);
417  buffer = std::move(*m_back_buffers.get_last_nested());
418  }
419  if (buffer.committed() > 0) {
420  return buffer;
421  }
422  }
423  } catch (...) {
424  close();
425  m_status = status::error;
426  throw;
427  }
428  }
429 
434  bool eof() const {
435  return m_status == status::eof || m_status == status::closed;
436  }
437 
442  std::size_t file_size() const noexcept {
443  return m_file_size;
444  }
445 
460  std::size_t offset() const noexcept {
461  return m_decompressor->offset();
462  }
463 
464  }; // class Reader
465 
474  template <typename... TArgs>
477 
478  Reader reader{std::forward<TArgs>(args)...};
479  while (auto read_buffer = reader.read()) {
480  buffer.add_buffer(read_buffer);
481  buffer.commit();
482  }
483 
484  return buffer;
485  }
486 
487  } // namespace io
488 
489 } // namespace osmium
490 
491 #endif // OSMIUM_IO_READER_HPP
detail::queue_wrapper< osmium::memory::Buffer > m_osmdata_queue_wrapper
Definition: reader.hpp:119
status
Definition: reader.hpp:103
osmium::memory::Buffer read()
Definition: reader.hpp:380
Reader(const osmium::io::File &file, TArgs &&... args)
Definition: reader.hpp:263
type
Definition: entity_bits.hpp:63
std::size_t file_size(int fd)
Definition: file.hpp:109
std::size_t file_size() const noexcept
Definition: reader.hpp:442
bool eof() const
Definition: reader.hpp:434
static Pool & default_instance()
Definition: pool.hpp:186
Reader(const char *filename, TArgs &&... args)
Definition: reader.hpp:294
Definition: location.hpp:551
std::unique_ptr< osmium::io::Decompressor > m_decompressor
Definition: reader.hpp:114
std::size_t committed() const noexcept
Definition: buffer.hpp:356
object or changeset
Definition: entity_bits.hpp:76
detail::future_string_queue_type m_input_queue
Definition: reader.hpp:112
osmium::memory::Buffer read_file(TArgs &&... args)
Definition: reader.hpp:475
osmium::io::File m_file
Definition: reader.hpp:97
Definition: file.hpp:72
Namespace for everything in the Osmium library.
Definition: assembler.hpp:53
void set_option(osmium::osm_entity_bits::type value) noexcept
Definition: reader.hpp:135
Definition: attr.hpp:342
osmium::io::detail::ReadThreadManager m_read_thread_manager
Definition: reader.hpp:116
static int execute(const std::string &command, const std::string &filename, int *childpid)
Definition: reader.hpp:175
static void parser_thread(osmium::thread::Pool &pool, const detail::ParserFactory::create_parser_type &creator, detail::future_string_queue_type &input_queue, detail::future_buffer_queue_type &osmdata_queue, std::promise< osmium::io::Header > &&header_promise, osmium::osm_entity_bits::type read_which_entities, osmium::io::read_meta read_metadata)
Definition: reader.hpp:144
Reader(const std::string &filename, TArgs &&... args)
Definition: reader.hpp:289
Definition: reader.hpp:90
~Reader() noexcept
Definition: reader.hpp:304
Definition: error.hpp:44
std::size_t offset() const noexcept
Definition: reader.hpp:460
Definition: pool.hpp:90
osmium::io::Header header()
Definition: reader.hpp:354
void set_option(osmium::io::read_meta value) noexcept
Definition: reader.hpp:139
std::size_t get_max_queue_size(const char *queue_name, const std::size_t default_value) noexcept
Definition: config.hpp:83
bool has_nested_buffers() const noexcept
Definition: buffer.hpp:437
void set_option(osmium::thread::Pool &pool) noexcept
Definition: reader.hpp:131
Definition: buffer.hpp:97
static int open_input_file_or_url(const std::string &filename, int *childpid)
Definition: reader.hpp:220
detail::future_buffer_queue_type m_osmdata_queue
Definition: reader.hpp:118
read_meta
Definition: file_format.hpp:54
Definition: entity_bits.hpp:67
Definition: compression.hpp:141
detail::ParserFactory::create_parser_type m_creator
Definition: reader.hpp:101
Definition: header.hpp:68
void close()
Definition: reader.hpp:320
Definition: util.hpp:85