Stxxl  1.2.1
block_prefetcher.h
1 /***************************************************************************
2  * include/stxxl/bits/mng/block_prefetcher.h
3  *
4  * Part of the STXXL. See http://stxxl.sourceforge.net
5  *
6  * Copyright (C) 2002-2004 Roman Dementiev <dementiev@mpi-sb.mpg.de>
7  *
8  * Distributed under the Boost Software License, Version 1.0.
9  * (See accompanying file LICENSE_1_0.txt or copy at
10  * http://www.boost.org/LICENSE_1_0.txt)
11  **************************************************************************/
12 
13 #ifndef STXXL_BLOCK_PREFETCHER_HEADER
14 #define STXXL_BLOCK_PREFETCHER_HEADER
15 
16 #include <vector>
17 #include <queue>
18 
19 #include <stxxl/bits/common/switch.h>
20 #include <stxxl/bits/io/iobase.h>
21 
22 
23 __STXXL_BEGIN_NAMESPACE
24 
27 
28 
29 class set_switch_handler
30 {
31  onoff_switch & switch_;
32 
33 public:
34  set_switch_handler(onoff_switch & switch__) : switch_(switch__) { }
35  void operator () (request * /*req*/) { switch_.on(); }
36 };
37 
42 template <typename block_type, typename bid_iterator_type>
44 {
45  block_prefetcher() { }
46 
47 protected:
48  bid_iterator_type consume_seq_begin;
49  bid_iterator_type consume_seq_end;
50  unsigned_type seq_length;
51 
52  int_type * prefetch_seq;
53 
54  unsigned_type nextread;
55  unsigned_type nextconsume;
56 
57  const int_type nreadblocks;
58 
59  block_type * read_buffers;
60  request_ptr * read_reqs;
61 
62  onoff_switch * completed;
63  int_type * pref_buffer;
64 
65  block_type * wait(int_type iblock)
66  {
67  STXXL_VERBOSE1("block_prefetcher: waiting block " << iblock);
68  {
69  stats::scoped_wait_timer wait_timer;
70 
71 #ifdef NO_OVERLAPPING
72  read_reqs[pref_buffer[iblock]]->poll();
73 #endif
74 
75  completed[iblock].wait_for_on();
76  }
77  STXXL_VERBOSE1("block_prefetcher: finished waiting block " << iblock);
78  int_type ibuffer = pref_buffer[iblock];
79  STXXL_VERBOSE1("block_prefetcher: returning buffer " << ibuffer);
80  assert(ibuffer >= 0 && ibuffer < nreadblocks);
81  return (read_buffers + ibuffer);
82  }
83 
84 public:
92  bid_iterator_type _cons_begin,
93  bid_iterator_type _cons_end,
94  int_type * _pref_seq,
95  int_type _prefetch_buf_size
96  ) :
97  consume_seq_begin(_cons_begin),
98  consume_seq_end(_cons_end),
99  seq_length(_cons_end - _cons_begin),
100  prefetch_seq(_pref_seq),
101  nextread(STXXL_MIN(unsigned_type(_prefetch_buf_size), seq_length)),
102  nextconsume(0),
103  nreadblocks(nextread)
104  {
105  STXXL_VERBOSE1("block_prefetcher: seq_length=" << seq_length);
106  STXXL_VERBOSE1("block_prefetcher: _prefetch_buf_size=" << _prefetch_buf_size);
107  assert(seq_length > 0);
108  assert(_prefetch_buf_size > 0);
109  int_type i;
110  read_buffers = new block_type[nreadblocks];
111  read_reqs = new request_ptr[nreadblocks];
112  pref_buffer = new int_type[seq_length];
113 
114  std::fill(pref_buffer, pref_buffer + seq_length, -1);
115 
116  completed = new onoff_switch[seq_length];
117 
118  for (i = 0; i < nreadblocks; ++i)
119  {
120  STXXL_VERBOSE1("block_prefetcher: reading block " << i
121  << " prefetch_seq[" << i << "]=" << prefetch_seq[i]);
122  assert(prefetch_seq[i] < int_type(seq_length));
123  assert(prefetch_seq[i] >= 0);
124  read_reqs[i] = read_buffers[i].read(
125  *(consume_seq_begin + prefetch_seq[i]),
126  set_switch_handler(*(completed + prefetch_seq[i])));
127  pref_buffer[prefetch_seq[i]] = i;
128  }
129  }
132  block_type * pull_block()
133  {
134  STXXL_VERBOSE1("block_prefetcher: pulling a block");
135  return wait(nextconsume++);
136  }
142  bool block_consumed(block_type * & buffer)
143  {
144  int_type ibuffer = buffer - read_buffers;
145  STXXL_VERBOSE1("block_prefetcher: buffer " << ibuffer << " consumed");
146  if (read_reqs[ibuffer].valid())
147  read_reqs[ibuffer]->wait();
148 
149  read_reqs[ibuffer] = NULL;
150 
151  if (nextread < seq_length)
152  {
153  assert(ibuffer >= 0 && ibuffer < nreadblocks);
154  int_type next_2_prefetch = prefetch_seq[nextread++];
155  STXXL_VERBOSE1("block_prefetcher: prefetching block " << next_2_prefetch);
156 
157  assert(next_2_prefetch < int_type(seq_length) && next_2_prefetch >= 0);
158  assert(!completed[next_2_prefetch].is_on());
159 
160  pref_buffer[next_2_prefetch] = ibuffer;
161  read_reqs[ibuffer] = read_buffers[ibuffer].read(
162  *(consume_seq_begin + next_2_prefetch),
163  set_switch_handler(*(completed + next_2_prefetch))
164  );
165  }
166 
167  if (nextconsume >= seq_length)
168  return false;
169 
170 
171  buffer = wait(nextconsume++);
172 
173  return true;
174  }
177  {
178  for (int_type i = 0; i < nreadblocks; ++i)
179  if (read_reqs[i].valid())
180  read_reqs[i]->wait();
181 
182 
183  delete[] read_reqs;
184  delete[] completed;
185  delete[] pref_buffer;
186  delete[] read_buffers;
187  }
188 };
189 
191 
192 __STXXL_END_NAMESPACE
193 
194 #endif // !STXXL_BLOCK_PREFETCHER_HEADER