1#pragma once 2 3#include "timer.h" 4 5#include <vector> 6 7#include <fcntl.h> 8#include <sys/stat.h> 9 10extern bool g_keep; 11extern const char* g_output; 12 13class TextInput 14{ 15 public: 16 explicit TextInput(const char* filename, int buffer_size = 8 * 1024 * 1024) 17 : fd_(::open(filename, O_RDONLY)), 18 buffer_size_(buffer_size), 19 block_(new Block) 20 { 21 assert(fd_ >= 0); 22 block_->data.reset(new char[buffer_size_]); 23 refill(); 24 } 25 26 ~TextInput() 27 { 28 ::close(fd_); 29 } 30 31 absl::string_view line() const { return line_; } 32 33 bool next(int64_t* count) 34 { 35 // EOF 36 if (block_->records.empty()) 37 { 38 return false; 39 } 40 41 if (index_ < block_->records.size()) 42 { 43 const Record& rec = block_->records[index_]; 44 *count = rec.count; 45 line_ = absl::string_view(block_->data.get() + rec.offset, rec.len); 46 ++index_; 47 return true; 48 } 49 else 50 { 51 refill(); 52 index_ = 0; 53 return next(count); 54 } 55 } 56 57 private: 58 59 struct Record 60 { 61 int64_t count = 0; 62 int32_t offset = 0, len = 0; 63 }; 64 65 struct Block 66 { 67 std::unique_ptr<char[]> data; 68 std::vector<Record> records; 69 }; 70 71 void refill() 72 { 73 block_->records.clear(); 74 char* data = block_->data.get(); 75 ssize_t nr = ::pread(fd_, data, buffer_size_, pos_); 76 if (nr > 0) 77 { 78 char* start = data; 79 size_t len = nr; 80 char* nl = static_cast<char*>(::memchr(start, '\n', len)); 81 while (nl) 82 { 83 Record rec; 84 rec.count = strtol(start, NULL, 10); 85 rec.offset = start - data; 86 rec.len = nl - start + 1; 87 block_->records.push_back(rec); 88 start = nl+1; 89 len -= rec.len; 90 nl = static_cast<char*>(::memchr(start, '\n', len)); 91 } 92 pos_ += start - data; 93 } 94 } 95 96 const int fd_; 97 const int buffer_size_; 98 int64_t pos_ = 0; // file position 99 size_t index_ = 0; // index into block_ 100 std::unique_ptr<Block> block_; 101 absl::string_view line_; 102 103 TextInput(const TextInput&) = delete; 104 void operator=(const TextInput&) = delete; 105}; 106 107class Source // copyable 108{ 109 public: 110 explicit Source(TextInput* in) 111 : input_(in) 112 { 113 } 114 115 bool next() 116 { 117 return input_->next(&count_); 118 } 119 120 bool operator<(const Source& rhs) const 121 { 122 return count_ < rhs.count_; 123 } 124 125 absl::string_view line() const { return input_->line(); } 126 127 private: 128 TextInput* input_; // not owned 129 int64_t count_ = 0; 130}; 131 132int64_t merge(int n) 133{ 134 Timer timer; 135 std::vector<std::unique_ptr<TextInput>> inputs; 136 std::vector<Source> keys; 137 138 int64_t total = 0; 139 for (int i = 0; i < n; ++i) 140 { 141 char buf[256]; 142 snprintf(buf, sizeof buf, "count-%05d", i); 143 struct stat st; 144 if (::stat(buf, &st) == 0) 145 { 146 total += st.st_size; 147 // TODO: select buffer size based on kShards. 148 inputs.push_back(std::make_unique<TextInput>(buf)); 149 Source rec(inputs.back().get()); 150 if (rec.next()) 151 { 152 keys.push_back(rec); 153 } 154 if (!g_keep) 155 ::unlink(buf); 156 } 157 else 158 { 159 perror("Unable to stat file:"); 160 } 161 } 162 LOG_INFO << "merging " << inputs.size() << " files of " << total << " bytes in total"; 163 164 int64_t lines = 0; 165 { 166 OutputFile out(g_output); 167 168 std::make_heap(keys.begin(), keys.end()); 169 while (!keys.empty()) 170 { 171 std::pop_heap(keys.begin(), keys.end()); 172 out.write(keys.back().line()); 173 ++lines; 174 175 if (keys.back().next()) 176 { 177 std::push_heap(keys.begin(), keys.end()); 178 } 179 else 180 { 181 keys.pop_back(); 182 } 183 } 184 185 } 186 LOG_INFO << "Merging done " << timer.report(total) << " lines " << lines; 187 return total; 188} 189 190 191