1#pragma once
2
3#include "timer.h"
4
5#include <vector>
6
7#include <fcntl.h>
8#include <sys/stat.h>
9
10extern bool g_keep;
11extern const char* g_output;
12
13class TextInput
14{
15 public:
16  explicit TextInput(const char* filename, int buffer_size = 8 * 1024 * 1024)
17    : fd_(::open(filename, O_RDONLY)),
18      buffer_size_(buffer_size),
19      block_(new Block)
20  {
21    assert(fd_ >= 0);
22    block_->data.reset(new char[buffer_size_]);
23    refill();
24  }
25
26  ~TextInput()
27  {
28    ::close(fd_);
29  }
30
31  absl::string_view line() const { return line_; }
32
33  bool next(int64_t* count)
34  {
35    // EOF
36    if (block_->records.empty())
37    {
38      return false;
39    }
40
41    if (index_ < block_->records.size())
42    {
43      const Record& rec = block_->records[index_];
44      *count = rec.count;
45      line_ = absl::string_view(block_->data.get() + rec.offset, rec.len);
46      ++index_;
47      return true;
48    }
49    else
50    {
51      refill();
52      index_ = 0;
53      return next(count);
54    }
55  }
56
57 private:
58
59  struct Record
60  {
61    int64_t count = 0;
62    int32_t offset = 0, len = 0;
63  };
64
65  struct Block
66  {
67    std::unique_ptr<char[]> data;
68    std::vector<Record> records;
69  };
70
71  void refill()
72  {
73    block_->records.clear();
74    char* data = block_->data.get();
75    ssize_t nr = ::pread(fd_, data, buffer_size_, pos_);
76    if (nr > 0)
77    {
78      char* start = data;
79      size_t len = nr;
80      char* nl = static_cast<char*>(::memchr(start, '\n', len));
81      while (nl)
82      {
83        Record rec;
84        rec.count = strtol(start, NULL, 10);
85        rec.offset = start - data;
86        rec.len = nl - start + 1;
87        block_->records.push_back(rec);
88        start = nl+1;
89        len -= rec.len;
90        nl = static_cast<char*>(::memchr(start, '\n', len));
91      }
92      pos_ += start - data;
93    }
94  }
95
96  const int fd_;
97  const int buffer_size_;
98  int64_t pos_ = 0;  // file position
99  size_t index_ = 0; // index into block_
100  std::unique_ptr<Block> block_;
101  absl::string_view line_;
102
103  TextInput(const TextInput&) = delete;
104  void operator=(const TextInput&) = delete;
105};
106
107class Source  // copyable
108{
109 public:
110  explicit Source(TextInput* in)
111    : input_(in)
112  {
113  }
114
115  bool next()
116  {
117    return input_->next(&count_);
118  }
119
120  bool operator<(const Source& rhs) const
121  {
122    return count_ < rhs.count_;
123  }
124
125  absl::string_view line() const { return input_->line(); }
126
127 private:
128  TextInput* input_;  // not owned
129  int64_t count_ = 0;
130};
131
132int64_t merge(int n)
133{
134  Timer timer;
135  std::vector<std::unique_ptr<TextInput>> inputs;
136  std::vector<Source> keys;
137
138  int64_t total = 0;
139  for (int i = 0; i < n; ++i)
140  {
141    char buf[256];
142    snprintf(buf, sizeof buf, "count-%05d", i);
143    struct stat st;
144    if (::stat(buf, &st) == 0)
145    {
146      total += st.st_size;
147      // TODO: select buffer size based on kShards.
148      inputs.push_back(std::make_unique<TextInput>(buf));
149      Source rec(inputs.back().get());
150      if (rec.next())
151      {
152        keys.push_back(rec);
153      }
154      if (!g_keep)
155        ::unlink(buf);
156    }
157    else
158    {
159      perror("Unable to stat file:");
160    }
161  }
162  LOG_INFO << "merging " << inputs.size() << " files of " << total << " bytes in total";
163
164  int64_t lines = 0;
165  {
166  OutputFile out(g_output);
167
168  std::make_heap(keys.begin(), keys.end());
169  while (!keys.empty())
170  {
171    std::pop_heap(keys.begin(), keys.end());
172    out.write(keys.back().line());
173    ++lines;
174
175    if (keys.back().next())
176    {
177      std::push_heap(keys.begin(), keys.end());
178    }
179    else
180    {
181      keys.pop_back();
182    }
183  }
184
185  }
186  LOG_INFO << "Merging done " << timer.report(total) << " lines " << lines;
187  return total;
188}
189
190
191