1 #ifndef OSMIUM_IO_WRITER_HPP 2 #define OSMIUM_IO_WRITER_HPP 37 #include <osmium/io/detail/output_format.hpp> 38 #include <osmium/io/detail/queue_util.hpp> 39 #include <osmium/io/detail/read_write.hpp> 40 #include <osmium/io/detail/write_thread.hpp> 56 #include <initializer_list> 71 inline std::size_t get_output_queue_size() noexcept {
103 default_buffer_size = 10UL * 1024UL * 1024UL
108 detail::future_string_queue_type m_output_queue{detail::get_output_queue_size(),
"raw_output"};
110 std::unique_ptr<osmium::io::detail::OutputFormat> m_output{
nullptr};
114 size_t m_buffer_size = default_buffer_size;
116 std::future<std::size_t> m_write_future{};
124 } m_status = status::okay;
127 static void write_thread(detail::future_string_queue_type& output_queue,
128 std::unique_ptr<osmium::io::Compressor>&& compressor,
129 std::promise<std::size_t>&& write_promise) {
130 detail::WriteThread write_thread{output_queue,
131 std::move(compressor),
132 std::move(write_promise)};
137 if (buffer && buffer.committed() > 0) {
138 m_output->write_buffer(std::move(buffer));
144 if (m_buffer && m_buffer.committed() > 0) {
148 swap(m_buffer, buffer);
150 m_output->write_buffer(std::move(buffer));
154 template <
typename TFunction,
typename... TArgs>
156 if (m_status != status::okay) {
157 throw io_error(
"Can not write to writer when in status 'closed' or 'error'");
161 func(std::forward<TArgs>(args)...);
163 m_status = status::error;
164 detail::add_to_queue(m_output_queue, std::current_exception());
165 detail::add_end_of_data_to_queue(m_output_queue);
178 options.
pool = &pool;
190 options.
sync = value;
194 if (m_status == status::okay) {
195 ensure_cleanup([&](){
196 do_write(std::move(m_buffer));
197 m_output->write_end();
198 m_status = status::closed;
199 detail::add_end_of_data_to_queue(m_output_queue);
236 template <
typename... TArgs>
238 m_file(file.check()) {
242 (void)std::initializer_list<int>{
243 (set_option(options, args), 0)...
250 m_output = osmium::io::detail::OutputFormatFactory::instance().create_output(*options.pool, m_file, m_output_queue);
252 if (options.header.get(
"generator").empty()) {
256 std::unique_ptr<osmium::io::Compressor> compressor =
257 CompressionFactory::instance().create_compressor(file.
compression(),
258 osmium::io::detail::open_for_writing(m_file.
filename(), options.allow_overwrite),
261 std::promise<std::size_t> write_promise;
262 m_write_future = write_promise.get_future();
265 ensure_cleanup([&](){
266 m_output->write_header(options.header);
270 template <
typename... TArgs>
271 explicit Writer(
const std::string& filename, TArgs&&... args) :
275 template <
typename... TArgs>
276 explicit Writer(
const char* filename, TArgs&&... args) :
298 return m_buffer_size;
306 m_buffer_size = size;
317 ensure_cleanup([&](){
331 ensure_cleanup([&](){
333 do_write(std::move(buffer));
345 ensure_cleanup([&](){
351 m_buffer.push_back(item);
354 m_buffer.push_back(item);
373 if (m_write_future.valid()) {
374 return m_write_future.get();
386 #endif // OSMIUM_IO_WRITER_HPP fsync sync
Definition: writer.hpp:173
~Writer() noexcept
Definition: writer.hpp:286
Definition: writer.hpp:170
static Pool & default_instance()
Definition: pool.hpp:186
void do_write(osmium::memory::Buffer &&buffer)
Definition: writer.hpp:136
void do_flush()
Definition: writer.hpp:142
void swap(Buffer &lhs, Buffer &rhs)
Definition: buffer.hpp:885
void do_close()
Definition: writer.hpp:193
osmium::io::Header header
Definition: writer.hpp:171
void set_buffer_size(size_t size) noexcept
Definition: writer.hpp:305
void operator()(osmium::memory::Buffer &&buffer)
Definition: writer.hpp:330
Namespace for everything in the Osmium library.
Definition: assembler.hpp:53
status
Definition: writer.hpp:120
#define LIBOSMIUM_VERSION_STRING
Definition: version.hpp:40
size_t buffer_size() const noexcept
Definition: writer.hpp:297
fsync
Definition: writer_options.hpp:51
static void set_option(options_type &options, fsync value)
Definition: writer.hpp:189
std::size_t close()
Definition: writer.hpp:370
void flush()
Definition: writer.hpp:316
osmium::io::File m_file
Definition: writer.hpp:106
std::size_t get_max_queue_size(const char *queue_name, const std::size_t default_value) noexcept
Definition: config.hpp:83
static void set_option(options_type &options, osmium::thread::Pool &pool)
Definition: writer.hpp:177
Definition: buffer.hpp:97
Definition: buffer.hpp:58
const char * buffer() const noexcept
Definition: file.hpp:143
static void set_option(options_type &options, overwrite value)
Definition: writer.hpp:185
Definition: writer.hpp:100
Writer(const char *filename, TArgs &&... args)
Definition: writer.hpp:276
osmium::thread::Pool * pool
Definition: writer.hpp:174
Writer(const osmium::io::File &file, TArgs &&... args)
Definition: writer.hpp:237
void check_for_exception(std::future< T > &future)
Definition: util.hpp:55
Writer(const std::string &filename, TArgs &&... args)
Definition: writer.hpp:271
overwrite allow_overwrite
Definition: writer.hpp:172
static void set_option(options_type &options, const osmium::io::Header &header)
Definition: writer.hpp:181
static void write_thread(detail::future_string_queue_type &output_queue, std::unique_ptr< osmium::io::Compressor > &&compressor, std::promise< std::size_t > &&write_promise)
Definition: writer.hpp:127
file_compression compression() const noexcept
Definition: file.hpp:291
File & filename(const std::string &filename)
Definition: file.hpp:309
void operator()(const osmium::memory::Item &item)
Definition: writer.hpp:344
void ensure_cleanup(TFunction func, TArgs &&... args)
Definition: writer.hpp:155
overwrite
Definition: writer_options.hpp:43