MySQL 5.6.14 Source Code Document
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
AsyncFile.cpp
1 /*
2  Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation; version 2 of the License.
7 
8  This program is distributed in the hope that it will be useful,
9  but WITHOUT ANY WARRANTY; without even the implied warranty of
10  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  GNU General Public License for more details.
12 
13  You should have received a copy of the GNU General Public License
14  along with this program; if not, write to the Free Software
15  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
16 */
17 
18 #include <ndb_global.h>
19 
20 #include "AsyncFile.hpp"
21 #include <ErrorHandlingMacros.hpp>
22 #include <kernel_types.h>
23 #include <ndbd_malloc.hpp>
24 #include <NdbThread.h>
25 #include <signaldata/FsRef.hpp>
26 #include <signaldata/FsOpenReq.hpp>
27 #include <signaldata/FsReadWriteReq.hpp>
28 #include <Configuration.hpp>
29 
30 AsyncFile::AsyncFile(SimulatedBlock& fs) :
31  theFileName(),
32  m_fs(fs)
33 {
34  m_thread = 0;
35 
36  m_resource_group = RNIL;
37  m_page_cnt = 0;
38  m_page_ptr.setNull();
39  theWriteBuffer = 0;
40  theWriteBufferSize = 0;
41 }
42 
43 void
44 AsyncFile::attach(AsyncIoThread* thr)
45 {
46 #if 0
47  ndbout_c("%p:%s attach to %p (m_thread: %p)", this, theFileName.c_str(), thr,
48  m_thread);
49 #endif
50  assert(m_thread == 0);
51  m_thread = thr;
52 }
53 
54 void
55 AsyncFile::detach(AsyncIoThread* thr)
56 {
57 #if 0
58  ndbout_c("%p:%s detach from %p", this, theFileName.c_str(), thr);
59 #endif
60  assert(m_thread == thr);
61  m_thread = 0;
62 }
63 
64 void
66 {
67  for(int i = 0; i < request->par.readWrite.numberOfPages ; i++)
68  {
69  off_t offset = request->par.readWrite.pages[i].offset;
70  size_t size = request->par.readWrite.pages[i].size;
71  char * buf = request->par.readWrite.pages[i].buf;
72 
73  int err = readBuffer(request, buf, size, offset);
74  if(err != 0){
75  request->error = err;
76  return;
77  }
78  }
79 }
80 
81 void
82 AsyncFile::readvReq( Request * request)
83 {
84  readReq(request);
85  return;
86 }
87 
88 void
90 {
91  const Uint32 cnt = request->par.readWrite.numberOfPages;
92  if (theWriteBuffer == 0 || cnt == 1)
93  {
94  for (Uint32 i = 0; i<cnt; i++)
95  {
96  int err = writeBuffer(request->par.readWrite.pages[i].buf,
97  request->par.readWrite.pages[i].size,
98  request->par.readWrite.pages[i].offset);
99  if (err)
100  {
101  request->error = err;
102  return;
103  }
104  }
105  goto done;
106  }
107 
108  {
109  int page_num = 0;
110  bool write_not_complete = true;
111 
112  while(write_not_complete) {
113  int totsize = 0;
114  off_t offset = request->par.readWrite.pages[page_num].offset;
115  char* bufptr = theWriteBuffer;
116 
117  write_not_complete = false;
118  if (request->par.readWrite.numberOfPages > 1) {
119  off_t page_offset = offset;
120 
121  // Multiple page write, copy to buffer for one write
122  for(int i=page_num; i < request->par.readWrite.numberOfPages; i++) {
123  memcpy(bufptr,
124  request->par.readWrite.pages[i].buf,
125  request->par.readWrite.pages[i].size);
126  bufptr += request->par.readWrite.pages[i].size;
127  totsize += request->par.readWrite.pages[i].size;
128  if (((i + 1) < request->par.readWrite.numberOfPages)) {
129  // There are more pages to write
130  // Check that offsets are consequtive
131  off_t tmp = page_offset + request->par.readWrite.pages[i].size;
132  if (tmp != request->par.readWrite.pages[i+1].offset) {
133  // Next page is not aligned with previous, not allowed
134  DEBUG(ndbout_c("Page offsets are not aligned"));
135  request->error = EINVAL;
136  return;
137  }
138  if ((unsigned)(totsize + request->par.readWrite.pages[i+1].size) > (unsigned)theWriteBufferSize) {
139  // We are not finished and the buffer is full
140  write_not_complete = true;
141  // Start again with next page
142  page_num = i + 1;
143  break;
144  }
145  }
146  page_offset += request->par.readWrite.pages[i].size;
147  }
148  bufptr = theWriteBuffer;
149  } else {
150  // One page write, write page directly
151  bufptr = request->par.readWrite.pages[0].buf;
152  totsize = request->par.readWrite.pages[0].size;
153  }
154  int err = writeBuffer(bufptr, totsize, offset);
155  if(err != 0){
156  request->error = err;
157  return;
158  }
159  } // while(write_not_complete)
160  }
161 done:
162  if(m_auto_sync_freq && m_write_wo_sync > m_auto_sync_freq)
163  {
164  syncReq(request);
165  }
166 }
167 
168 void
169 AsyncFile::writevReq(Request * request)
170 {
171  writeReq(request);
172 }
173 
174 #ifdef DEBUG_ASYNCFILE
175 void printErrorAndFlags(Uint32 used_flags) {
176  char buf[255];
177  sprintf(buf, "PEAF: errno=%d \"", errno);
178 
179  strcat(buf, strerror(errno));
180 
181  strcat(buf, "\" ");
182  strcat(buf, " flags: ");
183  switch(used_flags & 3){
184  case O_RDONLY:
185  strcat(buf, "O_RDONLY, ");
186  break;
187  case O_WRONLY:
188  strcat(buf, "O_WRONLY, ");
189  break;
190  case O_RDWR:
191  strcat(buf, "O_RDWR, ");
192  break;
193  default:
194  strcat(buf, "Unknown!!, ");
195  }
196 
197  if((used_flags & O_APPEND)==O_APPEND)
198  strcat(buf, "O_APPEND, ");
199  if((used_flags & O_CREAT)==O_CREAT)
200  strcat(buf, "O_CREAT, ");
201  if((used_flags & O_EXCL)==O_EXCL)
202  strcat(buf, "O_EXCL, ");
203  if((used_flags & O_NOCTTY) == O_NOCTTY)
204  strcat(buf, "O_NOCTTY, ");
205  if((used_flags & O_NONBLOCK)==O_NONBLOCK)
206  strcat(buf, "O_NONBLOCK, ");
207  if((used_flags & O_TRUNC)==O_TRUNC)
208  strcat(buf, "O_TRUNC, ");
209 #ifdef O_DSYNC /* At least Darwin 7.9 doesn't have it */
210  if((used_flags & O_DSYNC)==O_DSYNC)
211  strcat(buf, "O_DSYNC, ");
212 #endif
213  if((used_flags & O_NDELAY)==O_NDELAY)
214  strcat(buf, "O_NDELAY, ");
215 #ifdef O_RSYNC /* At least Darwin 7.9 doesn't have it */
216  if((used_flags & O_RSYNC)==O_RSYNC)
217  strcat(buf, "O_RSYNC, ");
218 #endif
219 #ifdef O_SYNC
220  if((used_flags & O_SYNC)==O_SYNC)
221  strcat(buf, "O_SYNC, ");
222 #endif
223  DEBUG(ndbout_c(buf));
224 
225 }
226 #endif
227 
228 NdbOut&
229 operator<<(NdbOut& out, const Request& req)
230 {
231  out << "[ Request: file: " << hex << req.file
232  << " userRef: " << hex << req.theUserReference
233  << " userData: " << dec << req.theUserPointer
234  << " theFilePointer: " << req.theFilePointer
235  << " action: ";
236  switch(req.action){
237  case Request::open:
238  out << "open";
239  break;
240  case Request::close:
241  out << "close";
242  break;
243  case Request::closeRemove:
244  out << "closeRemove";
245  break;
246  case Request::read: // Allways leave readv directly after
247  out << "read";
248  break;
249  case Request::readv:
250  out << "readv";
251  break;
252  case Request::write:// Allways leave writev directly after
253  out << "write";
254  break;
255  case Request::writev:
256  out << "writev";
257  break;
258  case Request::writeSync:// Allways leave writevSync directly after
259  out << "writeSync";
260  break;
261  // writeSync because SimblockAsyncFileSystem depends on it
262  case Request::writevSync:
263  out << "writevSync";
264  break;
265  case Request::sync:
266  out << "sync";
267  break;
268  case Request::end:
269  out << "end";
270  break;
271  case Request::append:
272  out << "append";
273  break;
274  case Request::rmrf:
275  out << "rmrf";
276  break;
277  default:
278  out << (Uint32)req.action;
279  break;
280  }
281  out << " ]";
282  return out;
283 }