Pyrogenesis  trunk
io.h
Go to the documentation of this file.
1 /* Copyright (c) 2011 Wildfire Games
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining
4  * a copy of this software and associated documentation files (the
5  * "Software"), to deal in the Software without restriction, including
6  * without limitation the rights to use, copy, modify, merge, publish,
7  * distribute, sublicense, and/or sell copies of the Software, and to
8  * permit persons to whom the Software is furnished to do so, subject to
9  * the following conditions:
10  *
11  * The above copyright notice and this permission notice shall be included
12  * in all copies or substantial portions of the Software.
13  *
14  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
15  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
17  * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
18  * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
19  * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
20  * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21  */
22 
23 /*
24  * provide asynchronous and synchronous I/O with hooks to allow
25  * overlapped processing or progress reporting.
26  */
27 
28 #ifndef INCLUDED_IO
29 #define INCLUDED_IO
30 
31 #include "lib/config2.h"
32 #include "lib/alignment.h"
33 #include "lib/bits.h"
34 #include "lib/timer.h"
35 #include "lib/file/file.h"
36 #include "lib/sysdep/filesystem.h" // wtruncate
37 #include "lib/posix/posix_aio.h" // LIO_READ, LIO_WRITE
38 
40 
41 namespace ERR
42 {
43  const Status IO = -110301;
44 }
45 
46 namespace io {
47 
48 // @return memory suitable for use as an I/O buffer (address is a
49 // multiple of alignment, size is rounded up to a multiple of alignment)
50 // @param alignment is automatically increased if smaller than the
51 // UniqueRange requirement.
52 //
53 // use this instead of the file cache for write buffers that are
54 // never reused (avoids displacing other items).
55 static inline UniqueRange Allocate(size_t size, size_t alignment = maxSectorSize)
56 {
57  return std::move(AllocateAligned(size, alignment));
58 }
59 
60 
61 #pragma pack(push, 1)
62 
63 // required information for any I/O (this is basically the same as aiocb,
64 // but also applies to synchronous I/O and has shorter/nicer names.)
65 struct Operation
66 {
67  // @param buf can be 0, in which case temporary block buffers are allocated.
68  // otherwise, it must be aligned and padded to the I/O alignment, e.g. via
69  // io::Allocate.
70  Operation(const File& file, void* buf, off_t size, off_t offset = 0)
71  : fd(file.Descriptor()), opcode((file.Flags() & O_WRONLY)? LIO_WRITE : LIO_READ)
72  , offset(offset), size(size), buf((void*)buf)
73  {
74  }
75 
76  void Validate() const
77  {
78  ENSURE(fd >= 0);
79  ENSURE(opcode == LIO_READ || opcode == LIO_WRITE);
80 
81  ENSURE(offset >= 0);
82  ENSURE(size >= 0);
83  // buf can legitimately be 0 (see above)
84  }
85 
86  int fd;
87  int opcode;
88 
91  void* buf;
92 };
93 
94 
95 // optional information how an Operation is to be carried out
96 struct Parameters
97 {
98  // default to single blocking I/Os
100  : alignment(1) // no alignment requirements
101  , blockSize(0) // do not split into blocks
102  , queueDepth(1) // disable aio
103  {
104  }
105 
106  // parameters for asynchronous I/O that maximize throughput on current drives
107  struct OverlappedTag {};
109  : alignment(maxSectorSize), blockSize(128*KiB), queueDepth(32)
110  {
111  }
112 
113  Parameters(size_t blockSize, size_t queueDepth, off_t alignment = maxSectorSize)
114  : alignment(alignment), blockSize(blockSize), queueDepth(queueDepth)
115  {
116  }
117 
118  void Validate(const Operation& op) const
119  {
120  ENSURE(is_pow2(alignment));
121  ENSURE(alignment > 0);
122 
123  if(blockSize != 0)
124  {
125  ENSURE(is_pow2(blockSize));
126  ENSURE(pageSize <= blockSize); // (don't bother checking an upper bound)
127  }
128 
129  ENSURE(1 <= queueDepth && queueDepth <= maxQueueDepth);
130 
131  ENSURE(IsAligned(op.offset, alignment));
132  // op.size doesn't need to be aligned
133  ENSURE(IsAligned(op.buf, alignment));
134  }
135 
136  // (ATTO only allows 10, which improves upon 8)
137  static const size_t maxQueueDepth = 32;
138 
140 
141  size_t blockSize; // 0 for one big "block"
142 
143  size_t queueDepth;
144 };
145 
146 #define IO_OVERLAPPED io::Parameters(io::Parameters::OverlappedTag())
147 
148 
150 {
151  /**
152  * called after a block I/O has completed.
153  * @return Status (see RETURN_STATUS_FROM_CALLBACK).
154  *
155  * allows progress notification and processing data while waiting for
156  * previous I/Os to complete.
157  **/
158  Status operator()(const u8* UNUSED(block), size_t UNUSED(blockSize)) const
159  {
160  return INFO::OK;
161  }
162 };
163 
164 
166 {
167  /**
168  * called before a block I/O is issued.
169  * @return Status (see RETURN_STATUS_FROM_CALLBACK).
170  *
171  * allows generating the data to write while waiting for
172  * previous I/Os to complete.
173  **/
175  {
176  return INFO::OK;
177  }
178 };
179 
180 
181 // ring buffer of partially initialized aiocb that can be passed
182 // directly to aio_write etc. after setting offset and buffer.
184 {
185 public:
187  : controlBlocks() // zero-initialize
188  {
189  const size_t blockSize = p.blockSize? p.blockSize : (size_t)op.size;
190 
191  const bool temporaryBuffersRequested = (op.buf == 0);
192  if(temporaryBuffersRequested)
193  buffers = std::move(io::Allocate(blockSize * p.queueDepth, p.alignment));
194 
195  for(size_t i = 0; i < ARRAY_SIZE(controlBlocks); i++)
196  {
197  aiocb& cb = operator[](i);
198  cb.aio_fildes = op.fd;
199  cb.aio_nbytes = blockSize;
200  cb.aio_lio_opcode = op.opcode;
201  if(temporaryBuffersRequested)
202  cb.aio_buf = (volatile void*)(uintptr_t(buffers.get()) + i * blockSize);
203  }
204  }
205 
207  {
208  return controlBlocks[counter % ARRAY_SIZE(controlBlocks)];
209  }
210 
211 private:
213  aiocb controlBlocks[Parameters::maxQueueDepth];
214 };
215 
216 #pragma pack(pop)
217 
218 
219 LIB_API Status Issue(aiocb& cb, size_t queueDepth);
220 LIB_API Status WaitUntilComplete(aiocb& cb, size_t queueDepth);
221 
222 
223 //-----------------------------------------------------------------------------
224 // Run
225 
226 #ifndef ENABLE_IO_STATS
227 #define ENABLE_IO_STATS 0
228 #endif
229 
230 // (hooks must be passed by const reference to allow passing rvalues.
231 // functors with non-const member data can mark them as mutable.)
232 template<class CompletedHook, class IssueHook>
233 static inline Status Run(const Operation& op, const Parameters& p = Parameters(), const CompletedHook& completedHook = CompletedHook(), const IssueHook& issueHook = IssueHook())
234 {
235  op.Validate();
236  p.Validate(op);
237 
238  ControlBlockRingBuffer controlBlockRingBuffer(op, p);
239 
240 #if ENABLE_IO_STATS
241  const double t0 = timer_Time();
243 #endif
244 
245  const off_t numBlocks = p.blockSize? (off_t)DivideRoundUp((u64)op.size, (u64)p.blockSize) : 1;
246  for(off_t blocksIssued = 0, blocksCompleted = 0; blocksCompleted < numBlocks; blocksCompleted++)
247  {
248  for(; blocksIssued != numBlocks && blocksIssued < blocksCompleted + (off_t)p.queueDepth; blocksIssued++)
249  {
250  aiocb& cb = controlBlockRingBuffer[blocksIssued];
251  cb.aio_offset = op.offset + blocksIssued * p.blockSize;
252  if(op.buf)
253  cb.aio_buf = (volatile void*)(uintptr_t(op.buf) + blocksIssued * p.blockSize);
254  if(blocksIssued == numBlocks-1)
255  cb.aio_nbytes = round_up(size_t(op.size - blocksIssued * p.blockSize), size_t(p.alignment));
256 
257  RETURN_STATUS_FROM_CALLBACK(issueHook(cb));
258 
259  RETURN_STATUS_IF_ERR(Issue(cb, p.queueDepth));
260  }
261 
262  aiocb& cb = controlBlockRingBuffer[blocksCompleted];
263  RETURN_STATUS_IF_ERR(WaitUntilComplete(cb, p.queueDepth));
264 
265  RETURN_STATUS_FROM_CALLBACK(completedHook((u8*)cb.aio_buf, cb.aio_nbytes));
266  }
267 
268 #if ENABLE_IO_STATS
270  const double t1 = timer_Time();
271  const off_t totalSize = p.blockSize? numBlocks*p.blockSize : op.size;
272  debug_printf("IO: %.2f MB/s (%.2f)\n", totalSize/(t1-t0)/1e6, (t1-t0)*1e3);
273 #endif
274 
275  return INFO::OK;
276 }
277 
278 // (overloads allow omitting parameters without requiring a template argument list)
279 template<class CompletedHook>
280 static inline Status Run(const Operation& op, const Parameters& p = Parameters(), const CompletedHook& completedHook = CompletedHook())
281 {
282  return Run(op, p, completedHook, DefaultIssueHook());
283 }
284 
285 static inline Status Run(const Operation& op, const Parameters& p = Parameters())
286 {
287  return Run(op, p, DefaultCompletedHook(), DefaultIssueHook());
288 }
289 
290 
291 //-----------------------------------------------------------------------------
292 // Store
293 
294 // efficient writing requires preallocation; the resulting file is
295 // padded to the sector size and needs to be truncated afterwards.
296 // this function takes care of both.
297 template<class CompletedHook, class IssueHook>
298 static inline Status Store(const OsPath& pathname, const void* data, size_t size, const Parameters& p = Parameters(), const CompletedHook& completedHook = CompletedHook(), const IssueHook& issueHook = IssueHook())
299 {
300  File file;
301  int oflag = O_WRONLY;
302  if(p.queueDepth != 1)
303  oflag |= O_DIRECT;
304  RETURN_STATUS_IF_ERR(file.Open(pathname, oflag));
305  io::Operation op(file, (void*)data, size);
306 
307 #if OS_WIN
308  (void)waio_Preallocate(op.fd, (off_t)size);
309 #endif
310 
311  RETURN_STATUS_IF_ERR(io::Run(op, p, completedHook, issueHook));
312 
313  file.Close(); // (required by wtruncate)
314 
315  RETURN_STATUS_IF_ERR(wtruncate(pathname, size));
316 
317  return INFO::OK;
318 }
319 
320 template<class CompletedHook>
321 static inline Status Store(const OsPath& pathname, const void* data, size_t size, const Parameters& p = Parameters(), const CompletedHook& completedHook = CompletedHook())
322 {
323  return Store(pathname, data, size, p, completedHook, DefaultIssueHook());
324 }
325 
326 static inline Status Store(const OsPath& pathname, const void* data, size_t size, const Parameters& p = Parameters())
327 {
328  return Store(pathname, data, size, p, DefaultCompletedHook(), DefaultIssueHook());
329 }
330 
331 
332 //-----------------------------------------------------------------------------
333 // Load
334 
335 // convenience function provided for symmetry with Store.
336 template<class CompletedHook, class IssueHook>
337 static inline Status Load(const OsPath& pathname, void* buf, size_t size, const Parameters& p = Parameters(), const CompletedHook& completedHook = CompletedHook(), const IssueHook& issueHook = IssueHook())
338 {
339  File file;
340  int oflag = O_RDONLY;
341  if(p.queueDepth != 1)
342  oflag |= O_DIRECT;
343  RETURN_STATUS_IF_ERR(file.Open(pathname, oflag));
344  io::Operation op(file, buf, size);
345  return io::Run(op, p, completedHook, issueHook);
346 }
347 
348 template<class CompletedHook>
349 static inline Status Load(const OsPath& pathname, void* buf, size_t size, const Parameters& p = Parameters(), const CompletedHook& completedHook = CompletedHook())
350 {
351  return Load(pathname, buf, size, p, completedHook, DefaultIssueHook());
352 }
353 
354 static inline Status Load(const OsPath& pathname, void* buf, size_t size, const Parameters& p = Parameters())
355 {
356  return Load(pathname, buf, size, p, DefaultCompletedHook(), DefaultIssueHook());
357 }
358 
359 } // namespace io
360 
361 #endif // #ifndef INCLUDED_IO
#define RETURN_STATUS_FROM_CALLBACK(expression)
Definition: status.h:338
#define O_DIRECT
Definition: filesystem.h:67
ControlBlockRingBuffer(const Operation &op, const Parameters &p)
Definition: io.h:186
Definition: io.h:165
static const size_t pageSize
Definition: alignment.h:83
#define UNUSED(param)
mark a function parameter as unused and avoid the corresponding compiler warning. ...
Definition: code_annotation.h:38
size_t aio_nbytes
Definition: waio.h:68
Status operator()(const u8 *block, size_t blockSize) const
called after a block I/O has completed.
Definition: io.h:158
#define COMPILER_FENCE
prevent the compiler from reordering loads or stores across this point.
Definition: code_annotation.h:269
Status waio_Preallocate(int fd, off_t size)
Definition: waio.cpp:437
T DivideRoundUp(T dividend, T divisor)
Definition: lib.h:76
Flags
Definition: cache.cpp:308
size_t queueDepth
Definition: io.h:143
const Status OK
Definition: status.h:386
const Status IO
Definition: io.h:43
static ICounter * counter
Definition: whrt.cpp:96
Definition: unique_range.h:75
Definition: io.h:65
static const uintptr_t maxSectorSize
Definition: alignment.h:104
off_t alignment
Definition: io.h:139
T round_up(T n, T multiple)
round number up/down to the next given multiple.
Definition: bits.h:265
Status WaitUntilComplete(aiocb &cb, size_t queueDepth)
Definition: io.cpp:67
static Status Store(const OsPath &pathname, const void *data, size_t size, const Parameters &p=Parameters())
Definition: io.h:326
Parameters()
Definition: io.h:99
void Close()
Definition: file.h:73
Definition: io.h:149
uint8_t u8
Definition: types.h:37
LIB_API int wtruncate(const OsPath &pathname, off_t length)
Definition: ufilesystem.cpp:123
volatile void * aio_buf
Definition: waio.h:67
#define ARRAY_SIZE(name)
Definition: code_annotation.h:336
bool IsAligned(T t, uintptr_t multiple)
Definition: alignment.h:30
uint64_t u64
Definition: types.h:40
Definition: io.cpp:33
#define ENSURE(expr)
ensure the expression <expr> evaluates to non-zero.
Definition: debug.h:287
INLINE aiocb & operator[](size_t counter)
Definition: io.h:206
int aio_lio_opcode
Definition: waio.h:71
__int64 off_t
Definition: wposix_types.h:91
Definition: path.h:77
Definition: waio.h:93
static const size_t KiB
Definition: alignment.h:93
UniqueRange buffers
Definition: io.h:212
size_t blockSize
Definition: io.h:141
int fd
Definition: io.h:86
static UniqueRange Allocate(size_t size, size_t alignment=maxSectorSize)
Definition: io.h:55
i64 Status
Error handling system.
Definition: status.h:171
void debug_printf(const char *fmt,...)
write a formatted string to the debug channel, subject to filtering (see below).
Definition: debug.cpp:142
double timer_Time()
Definition: timer.cpp:98
Definition: io.h:107
u8 Descriptor
Definition: cache.cpp:255
Status Open(const OsPath &pathname, int oflag)
Definition: file.h:63
Introduction
Definition: debug.h:404
Parameters(OverlappedTag)
Definition: io.h:108
static Status AllocateAligned(shared_ptr< T > &p, size_t size, size_t alignment=cacheLineSize)
Definition: shared_ptr.h:66
Definition: waio.h:63
Definition: io.h:183
off_t size
Definition: io.h:90
bool is_pow2(T n)
Definition: bits.h:164
off_t aio_offset
Definition: waio.h:66
void Validate() const
Definition: io.h:76
Parameters(size_t blockSize, size_t queueDepth, off_t alignment=maxSectorSize)
Definition: io.h:113
Operation(const File &file, void *buf, off_t size, off_t offset=0)
Definition: io.h:70
off_t offset
Definition: io.h:89
Definition: waio.h:92
Definition: io.h:96
void * buf
Definition: io.h:91
static Status Run(const Operation &op, const Parameters &p=Parameters())
Definition: io.h:285
Status operator()(aiocb &cb) const
called before a block I/O is issued.
Definition: io.h:174
Definition: file.h:45
static int Issue(aiocb *cb)
Definition: waio.cpp:494
static Status Load(const OsPath &pathname, void *buf, size_t size, const Parameters &p=Parameters())
Definition: io.h:354
int opcode
Definition: io.h:87
int aio_fildes
Definition: waio.h:65
void Validate(const Operation &op) const
Definition: io.h:118
#define RETURN_STATUS_IF_ERR(expression)
Definition: status.h:276
#define INLINE
Definition: code_annotation.h:363
static Status Run(const Operation &op, const Parameters &p=Parameters(), const CompletedHook &completedHook=CompletedHook(), const IssueHook &issueHook=IssueHook())
Definition: io.h:233