50 #include "mysys_priv.h"
53 #include "mysys_err.h"
54 static void my_aiowait(my_aio_result *result);
58 #define lock_append_buffer(info) \
59 mysql_mutex_lock(&(info)->append_buffer_lock)
60 #define unlock_append_buffer(info) \
61 mysql_mutex_unlock(&(info)->append_buffer_lock)
63 #define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
64 #define IO_ROUND_DN(X) ( (X) & ~(IO_SIZE-1))
82 if (info->type == WRITE_CACHE)
84 info->current_pos= &info->write_pos;
85 info->current_end= &info->write_end;
89 info->current_pos= &info->read_pos;
90 info->current_end= &info->read_end;
98 enum cache_type
type= info->type;
109 case SEQ_READ_APPEND:
110 info->read_function = _my_b_seq_read;
111 info->write_function = 0;
114 info->read_function = info->share ? _my_b_read_r : _my_b_read;
115 info->write_function = _my_b_write;
118 setup_io_cache(info);
145 int init_io_cache(
IO_CACHE *info, File
file,
size_t cachesize,
146 enum cache_type type, my_off_t seek_offset,
147 pbool use_async_io, myf cache_myflags)
151 my_off_t end_of_file= ~(my_off_t) 0;
152 DBUG_ENTER(
"init_io_cache");
153 DBUG_PRINT(
"enter",(
"cache: 0x%lx type: %d pos: %ld",
154 (ulong) info, (
int) type, (ulong) seek_offset));
157 info->type= TYPE_NOT_SET;
158 info->pos_in_file= seek_offset;
159 info->pre_close = info->pre_read = info->post_read = 0;
161 info->alloced_buffer = 0;
163 info->seek_not_done= 0;
168 if ((pos == (my_off_t) -1) && (my_errno == ESPIPE))
174 info->seek_not_done= 0;
180 DBUG_ASSERT(seek_offset == 0);
183 info->seek_not_done=
test(seek_offset != pos);
186 info->disk_writes= 0;
189 if (!cachesize && !(cachesize= my_default_record_cache_size))
191 min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
192 if (type == READ_CACHE || type == SEQ_READ_APPEND)
194 if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
199 info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
200 if (end_of_file < seek_offset)
201 end_of_file=seek_offset;
203 if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
205 cachesize= (size_t) (end_of_file-seek_offset)+IO_SIZE*2-1;
210 cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
211 if (type != READ_NET && type != WRITE_NET)
214 cachesize= ((cachesize + min_cache-1) & ~(min_cache-1));
222 myf
flags= (myf) (cache_myflags & ~(MY_WME | MY_WAIT_IF_FULL));
224 if (cachesize < min_cache)
225 cachesize = min_cache;
226 buffer_block= cachesize;
227 if (type == SEQ_READ_APPEND)
229 if (cachesize == min_cache)
230 flags|= (myf) MY_WME;
232 if ((info->buffer= (uchar*) my_malloc(buffer_block, flags)) != 0)
234 info->write_buffer=info->buffer;
235 if (type == SEQ_READ_APPEND)
236 info->write_buffer = info->buffer + cachesize;
237 info->alloced_buffer=1;
240 if (cachesize == min_cache)
243 cachesize= (cachesize*3/4 & ~(min_cache-1));
247 DBUG_PRINT(
"info",(
"init_io_cache: cachesize = %lu", (ulong) cachesize));
248 info->read_length=info->buffer_length=cachesize;
249 info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
250 info->request_pos= info->read_pos= info->write_pos = info->buffer;
251 if (type == SEQ_READ_APPEND)
253 info->append_read_pos = info->write_pos = info->write_buffer;
254 info->write_end = info->write_buffer + info->buffer_length;
256 &info->append_buffer_lock, MY_MUTEX_INIT_FAST);
258 #if defined(SAFE_MUTEX)
262 memset(&info->append_buffer_lock, 0,
sizeof(info->append_buffer_lock));
266 if (type == WRITE_CACHE)
268 info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
270 info->read_end=info->buffer;
273 info->end_of_file= end_of_file;
276 init_functions(info);
278 if (use_async_io && ! my_disable_async_io)
280 DBUG_PRINT(
"info",(
"Using async io"));
281 info->read_length/=2;
282 info->read_function=_my_b_async_read;
284 info->inited=info->aio_result.pending=0;
292 static void my_aiowait(my_aio_result *result)
296 struct aio_result_t *tmp;
299 if ((
int) (tmp=aiowait((
struct timeval *) 0)) == -1)
303 DBUG_PRINT(
"error",(
"No aio request, error: %d",errno));
307 ((my_aio_result*) tmp)->pending=0;
308 if ((my_aio_result*) tmp == result)
324 my_bool reinit_io_cache(
IO_CACHE *info,
enum cache_type type,
325 my_off_t seek_offset,
326 pbool use_async_io __attribute__((unused)),
329 DBUG_ENTER(
"reinit_io_cache");
330 DBUG_PRINT(
"enter",(
"cache: 0x%lx type: %d seek_offset: %lu clear_cache: %d",
331 (ulong) info, type, (ulong) seek_offset,
335 DBUG_ASSERT(type != READ_NET && info->type != READ_NET &&
336 type != WRITE_NET && info->type != WRITE_NET &&
337 type != SEQ_READ_APPEND && info->type != SEQ_READ_APPEND);
341 seek_offset >= info->pos_in_file &&
342 seek_offset <= my_b_tell(info))
346 if (info->type == WRITE_CACHE && type == READ_CACHE)
348 info->read_end=info->write_pos;
349 info->end_of_file=my_b_tell(info);
354 info->seek_not_done= (info->file != -1);
356 else if (type == WRITE_CACHE)
358 if (info->type == READ_CACHE)
360 info->write_end=info->write_buffer+info->buffer_length;
361 info->seek_not_done=1;
363 info->end_of_file = ~(my_off_t) 0;
365 pos=info->request_pos+(seek_offset-info->pos_in_file);
366 if (type == WRITE_CACHE)
371 my_aiowait(&info->aio_result);
380 if (info->type == WRITE_CACHE && type == READ_CACHE)
381 info->end_of_file=my_b_tell(info);
383 if (!clear_cache && my_b_flush_io_cache(info,1))
385 info->pos_in_file=seek_offset;
387 info->seek_not_done=1;
388 info->request_pos=info->read_pos=info->write_pos=info->buffer;
389 if (type == READ_CACHE)
391 info->read_end=info->buffer;
395 info->write_end=(info->buffer + info->buffer_length -
396 (seek_offset & (IO_SIZE-1)));
397 info->end_of_file= ~(my_off_t) 0;
402 init_functions(info);
405 if (use_async_io && ! my_disable_async_io &&
406 ((ulong) info->buffer_length <
407 (ulong) (info->end_of_file - seek_offset)))
409 info->read_length=info->buffer_length/2;
410 info->read_function=_my_b_async_read;
449 int _my_b_read(
register IO_CACHE *info, uchar *
Buffer,
size_t Count)
451 size_t length,diff_length,left_length, max_length;
452 my_off_t pos_in_file;
453 DBUG_ENTER(
"_my_b_read");
456 if ((left_length= (
size_t) (info->read_end-info->read_pos)))
458 DBUG_ASSERT(Count >= left_length);
459 memcpy(Buffer,info->read_pos, left_length);
465 pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
473 if (info->seek_not_done)
476 != MY_FILEPOS_ERROR))
479 info->seek_not_done= 0;
488 DBUG_ASSERT(my_errno != ESPIPE);
498 diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
504 if (Count >= (
size_t) (IO_SIZE+(IO_SIZE-diff_length)))
507 if (info->end_of_file <= pos_in_file)
510 info->error= (int) left_length;
518 length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
519 if ((read_length=
mysql_file_read(info->file,Buffer, length, info->myflags))
526 info->error= (read_length == (size_t) -1 ? -1 :
527 (
int) (read_length+left_length));
542 max_length= info->read_length-diff_length;
544 if (info->type != READ_FIFO &&
545 max_length > (info->end_of_file - pos_in_file))
546 max_length= (size_t) (info->end_of_file - pos_in_file);
557 info->error= left_length;
563 info->myflags)) < Count ||
564 length == (
size_t) -1)
570 if (length != (
size_t) -1)
571 memcpy(Buffer, info->buffer, length);
572 info->pos_in_file= pos_in_file;
574 info->error= length == (size_t) -1 ? -1 : (
int) (length+left_length);
575 info->read_pos=info->read_end=info->buffer;
583 info->read_pos=info->buffer+Count;
584 info->read_end=info->buffer+length;
585 info->pos_in_file=pos_in_file;
586 memcpy(Buffer, info->buffer, Count);
660 IO_CACHE *write_cache, uint num_threads)
662 DBUG_ENTER(
"init_io_cache_share");
663 DBUG_PRINT(
"io_cache_share", (
"read_cache: 0x%lx share: 0x%lx "
664 "write_cache: 0x%lx threads: %u",
665 (
long) read_cache, (
long) cshare,
666 (
long) write_cache, num_threads));
668 DBUG_ASSERT(num_threads > 1);
669 DBUG_ASSERT(read_cache->type == READ_CACHE);
670 DBUG_ASSERT(!write_cache || (write_cache->type == WRITE_CACHE));
673 &cshare->mutex, MY_MUTEX_INIT_FAST);
675 mysql_cond_init(key_IO_CACHE_SHARE_cond_writer, &cshare->cond_writer, 0);
677 cshare->running_threads= num_threads;
678 cshare->total_threads= num_threads;
680 cshare->buffer= read_cache->buffer;
681 cshare->read_end= NULL;
682 cshare->pos_in_file= 0;
683 cshare->source_cache= write_cache;
685 read_cache->share= cshare;
686 read_cache->read_function= _my_b_read_r;
687 read_cache->current_pos= NULL;
688 read_cache->current_end= NULL;
691 write_cache->share= cshare;
716 void remove_io_thread(
IO_CACHE *cache)
720 DBUG_ENTER(
"remove_io_thread");
723 if (cache == cshare->source_cache)
724 flush_io_cache(cache);
727 DBUG_PRINT(
"io_cache_share", (
"%s: 0x%lx",
728 (cache == cshare->source_cache) ?
729 "writer" :
"reader", (
long) cache));
732 total= --cshare->total_threads;
733 DBUG_PRINT(
"io_cache_share", (
"remaining threads: %u", total));
739 if (cache == cshare->source_cache)
741 DBUG_PRINT(
"io_cache_share", (
"writer leaves"));
742 cshare->source_cache= NULL;
746 if (!--cshare->running_threads)
748 DBUG_PRINT(
"io_cache_share", (
"the last running thread leaves, wake all"));
757 DBUG_PRINT(
"io_cache_share", (
"last thread removed, destroy share"));
795 static int lock_io_cache(
IO_CACHE *cache, my_off_t pos)
798 DBUG_ENTER(
"lock_io_cache");
802 cshare->running_threads--;
803 DBUG_PRINT(
"io_cache_share", (
"%s: 0x%lx pos: %lu running: %u",
804 (cache == cshare->source_cache) ?
805 "writer" :
"reader", (
long) cache, (ulong) pos,
806 cshare->running_threads));
808 if (cshare->source_cache)
812 if (cache == cshare->source_cache)
815 while (cshare->running_threads)
817 DBUG_PRINT(
"io_cache_share", (
"writer waits in lock"));
820 DBUG_PRINT(
"io_cache_share", (
"writer awoke, going to copy"));
827 if (!cshare->running_threads)
829 DBUG_PRINT(
"io_cache_share", (
"waking writer"));
838 while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
839 cshare->source_cache)
841 DBUG_PRINT(
"io_cache_share", (
"reader waits in lock"));
853 if (!cshare->read_end || (cshare->pos_in_file < pos))
855 DBUG_PRINT(
"io_cache_share", (
"reader found writer removed. EOF"));
856 cshare->read_end= cshare->buffer;
866 if (!cshare->running_threads)
868 DBUG_PRINT(
"io_cache_share", (
"last thread joined, going to read"));
880 while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
881 cshare->running_threads)
883 DBUG_PRINT(
"io_cache_share", (
"reader waits in lock"));
888 if (!cshare->read_end || (cshare->pos_in_file < pos))
890 DBUG_PRINT(
"io_cache_share", (
"reader awoke, going to read"));
897 DBUG_PRINT(
"io_cache_share", (
"reader awoke, going to process %u bytes",
898 (uint) (cshare->read_end ? (
size_t)
899 (cshare->read_end - cshare->buffer) :
938 static void unlock_io_cache(
IO_CACHE *cache)
941 DBUG_ENTER(
"unlock_io_cache");
942 DBUG_PRINT(
"io_cache_share", (
"%s: 0x%lx pos: %lu running: %u",
943 (cache == cshare->source_cache) ?
945 (
long) cache, (ulong) cshare->pos_in_file,
946 cshare->total_threads));
948 cshare->running_threads= cshare->total_threads;
992 int _my_b_read_r(
register IO_CACHE *cache, uchar *Buffer,
size_t Count)
994 my_off_t pos_in_file;
995 size_t length, diff_length, left_length;
997 DBUG_ENTER(
"_my_b_read_r");
999 if ((left_length= (
size_t) (cache->read_end - cache->read_pos)))
1001 DBUG_ASSERT(Count >= left_length);
1002 memcpy(Buffer, cache->read_pos, left_length);
1003 Buffer+= left_length;
1004 Count-= left_length;
1010 pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
1011 diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
1012 length=IO_ROUND_UP(Count+diff_length)-diff_length;
1013 length= ((length <= cache->read_length) ?
1014 length + IO_ROUND_DN(cache->read_length - length) :
1015 length - IO_ROUND_UP(length - cache->read_length));
1016 if (cache->type != READ_FIFO &&
1017 (length > (cache->end_of_file - pos_in_file)))
1018 length= (
size_t) (cache->end_of_file - pos_in_file);
1021 cache->error= (int) left_length;
1024 if (lock_io_cache(cache, pos_in_file))
1027 DBUG_ASSERT(!cshare->source_cache);
1033 if (cache->file < 0)
1043 if (cache->seek_not_done)
1046 == MY_FILEPOS_ERROR)
1049 unlock_io_cache(cache);
1053 len=
mysql_file_read(cache->file, cache->buffer, length, cache->myflags);
1055 DBUG_PRINT(
"io_cache_share", (
"read %lu bytes", (ulong) len));
1057 cache->read_end= cache->buffer + (len == (size_t) -1 ? 0 : len);
1058 cache->error= (len == length ? 0 : (int) len);
1059 cache->pos_in_file= pos_in_file;
1062 cshare->error= cache->error;
1063 cshare->read_end= cache->read_end;
1064 cshare->pos_in_file= pos_in_file;
1067 unlock_io_cache(cache);
1075 cache->error= cshare->error;
1076 cache->read_end= cshare->read_end;
1077 cache->pos_in_file= cshare->pos_in_file;
1079 len= ((cache->error == -1) ? (
size_t) -1 :
1080 (size_t) (cache->read_end - cache->buffer));
1082 cache->read_pos= cache->buffer;
1083 cache->seek_not_done= 0;
1084 if (len == 0 || len == (
size_t) -1)
1086 DBUG_PRINT(
"io_cache_share", (
"reader error. len %lu left %lu",
1087 (ulong) len, (ulong) left_length));
1088 cache->error= (int) left_length;
1091 cnt= (len > Count) ? Count : len;
1092 memcpy(Buffer, cache->read_pos, cnt);
1096 cache->read_pos+= cnt;
1119 static void copy_to_read_buffer(
IO_CACHE *write_cache,
1120 const uchar *write_buffer,
size_t write_length)
1124 DBUG_ASSERT(cshare->source_cache == write_cache);
1129 while (write_length)
1131 size_t copy_length= MY_MIN(write_length, write_cache->buffer_length);
1132 int __attribute__((unused)) rc;
1134 rc= lock_io_cache(write_cache, write_cache->pos_in_file);
1138 memcpy(cshare->buffer, write_buffer, copy_length);
1141 cshare->read_end= cshare->buffer + copy_length;
1142 cshare->pos_in_file= write_cache->pos_in_file;
1145 unlock_io_cache(write_cache);
1147 write_buffer+= copy_length;
1148 write_length-= copy_length;
1166 int _my_b_seq_read(register
IO_CACHE *info, uchar *Buffer,
size_t Count)
1168 size_t length, diff_length, left_length, save_count, max_length;
1169 my_off_t pos_in_file;
1173 if ((left_length=(
size_t) (info->read_end-info->read_pos)))
1175 DBUG_ASSERT(Count > left_length);
1176 memcpy(Buffer,info->read_pos, left_length);
1177 Buffer+=left_length;
1180 lock_append_buffer(info);
1183 if ((pos_in_file=info->pos_in_file +
1184 (
size_t) (info->read_end - info->buffer)) >= info->end_of_file)
1185 goto read_append_buffer;
1191 if (
mysql_file_seek(info->file, pos_in_file, MY_SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR)
1194 unlock_append_buffer(info);
1197 info->seek_not_done=0;
1199 diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
1202 if (Count >= (
size_t) (IO_SIZE+(IO_SIZE-diff_length)))
1207 length=(Count & (size_t) ~(IO_SIZE-1))-diff_length;
1209 info->myflags)) == (
size_t) -1)
1212 unlock_append_buffer(info);
1216 Buffer+=read_length;
1217 pos_in_file+=read_length;
1219 if (read_length != length)
1225 goto read_append_buffer;
1227 left_length+=length;
1231 max_length= info->read_length-diff_length;
1232 if (max_length > (info->end_of_file - pos_in_file))
1233 max_length= (size_t) (info->end_of_file - pos_in_file);
1237 goto read_append_buffer;
1242 length=
mysql_file_read(info->file,info->buffer, max_length, info->myflags);
1243 if (length == (
size_t) -1)
1246 unlock_append_buffer(info);
1251 memcpy(Buffer, info->buffer, length);
1260 pos_in_file += length;
1261 goto read_append_buffer;
1264 unlock_append_buffer(info);
1265 info->read_pos=info->buffer+Count;
1266 info->read_end=info->buffer+length;
1267 info->pos_in_file=pos_in_file;
1268 memcpy(Buffer,info->buffer,(
size_t) Count);
1280 size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
1282 size_t transfer_len;
1284 DBUG_ASSERT(info->append_read_pos <= info->write_pos);
1288 DBUG_ASSERT(pos_in_file == info->end_of_file);
1289 copy_len= MY_MIN(Count, len_in_buff);
1290 memcpy(Buffer, info->append_read_pos, copy_len);
1291 info->append_read_pos += copy_len;
1294 info->error = save_count - Count;
1297 memcpy(info->buffer, info->append_read_pos,
1298 (
size_t) (transfer_len=len_in_buff - copy_len));
1299 info->read_pos= info->buffer;
1300 info->read_end= info->buffer+transfer_len;
1301 info->append_read_pos=info->write_pos;
1302 info->pos_in_file=pos_in_file+copy_len;
1303 info->end_of_file+=len_in_buff;
1305 unlock_append_buffer(info);
1306 return Count ? 1 : 0;
1328 int _my_b_async_read(
register IO_CACHE *info, uchar *Buffer,
size_t Count)
1330 size_t length,read_length,diff_length,left_length,use_length,org_Count;
1332 my_off_t next_pos_in_file;
1335 memcpy(Buffer,info->read_pos,
1336 (left_length= (
size_t) (info->read_end-info->read_pos)));
1337 Buffer+=left_length;
1344 my_aiowait(&info->aio_result);
1345 if (info->aio_result.result.aio_errno)
1347 if (info->myflags & MY_WME)
1349 char errbuf[MYSYS_STRERROR_SIZE];
1350 my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
1351 my_filename(info->file),
1352 info->aio_result.result.aio_errno,
1353 my_strerror(errbuf,
sizeof(errbuf),
1354 info->aio_result.result.aio_errno));
1355 my_errno=info->aio_result.result.aio_errno;
1359 if (! (read_length= (
size_t) info->aio_result.result.aio_return) ||
1360 read_length == (
size_t) -1)
1363 info->error= (read_length == (size_t) -1 ? -1 :
1364 (
int) (read_length+left_length));
1367 info->pos_in_file+= (size_t) (info->read_end - info->request_pos);
1369 if (info->request_pos != info->buffer)
1370 info->request_pos=info->buffer;
1372 info->request_pos=info->buffer+info->read_length;
1373 info->read_pos=info->request_pos;
1374 next_pos_in_file=info->aio_read_pos+read_length;
1379 if (info->aio_read_pos < info->pos_in_file)
1381 if (info->aio_read_pos + read_length < info->pos_in_file)
1384 next_pos_in_file=info->pos_in_file;
1388 my_off_t
offset= (info->pos_in_file - info->aio_read_pos);
1389 info->pos_in_file=info->aio_read_pos;
1390 info->read_pos=info->request_pos+
offset;
1395 if (info->aio_read_pos > info->pos_in_file)
1398 return(info->read_length= (
size_t) -1);
1402 length= MY_MIN(Count, read_length);
1403 memcpy(Buffer,info->read_pos,(
size_t) length);
1406 left_length+=length;
1407 info->read_end=info->rc_pos+read_length;
1408 info->read_pos+=length;
1411 next_pos_in_file=(info->pos_in_file+ (size_t)
1412 (info->read_end - info->request_pos));
1417 if (next_pos_in_file == info->end_of_file)
1419 info->error=(int) (read_length+left_length);
1423 if (
mysql_file_seek(info->file, next_pos_in_file, MY_SEEK_SET, MYF(0))
1424 == MY_FILEPOS_ERROR)
1430 read_length=IO_SIZE*2- (size_t) (next_pos_in_file & (IO_SIZE-1));
1431 if (Count < read_length)
1434 read_length, info->myflags)) == (
size_t) -1)
1435 return info->error= -1;
1436 use_length= MY_MIN(Count, read_length);
1437 memcpy(Buffer,info->request_pos,(
size_t) use_length);
1438 info->read_pos=info->request_pos+Count;
1439 info->read_end=info->request_pos+read_length;
1440 info->pos_in_file=next_pos_in_file;
1441 next_pos_in_file+=read_length;
1443 if (Count != use_length)
1445 if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
1447 char errbuf[MYSYS_STRERROR_SIZE];
1448 my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG), my_filename(info->file),
1449 my_errno, my_strerror(errbuf,
sizeof(errbuf), my_errno));
1451 info->error=(int) (read_length+left_length);
1457 if ((read_length=
mysql_file_read(info->file, Buffer, Count,info->myflags))
1460 info->error= read_length == (size_t) -1 ? -1 : read_length+left_length;
1463 info->read_pos=info->read_end=info->request_pos;
1464 info->pos_in_file=(next_pos_in_file+=Count);
1469 diff_length=(next_pos_in_file & (IO_SIZE-1));
1470 max_length= info->read_length - diff_length;
1471 if (max_length > info->end_of_file - next_pos_in_file)
1472 max_length= (size_t) (info->end_of_file - next_pos_in_file);
1474 if (info->request_pos != info->buffer)
1475 read_buffer=info->buffer;
1477 read_buffer=info->buffer+info->read_length;
1478 info->aio_read_pos=next_pos_in_file;
1481 info->aio_result.result.aio_errno=AIO_INPROGRESS;
1482 DBUG_PRINT(
"aioread",(
"filepos: %ld length: %lu",
1483 (ulong) next_pos_in_file, (ulong) max_length));
1484 if (aioread(info->file,read_buffer, max_length,
1485 (my_off_t) next_pos_in_file,MY_SEEK_SET,
1486 &info->aio_result.result))
1489 DBUG_PRINT(
"error",(
"got error: %d, aio_result: %d from aioread, async skipped",
1490 errno, info->aio_result.result.aio_errno));
1491 if (info->request_pos != info->buffer)
1493 bmove(info->buffer,info->request_pos,
1494 (
size_t) (info->read_end - info->read_pos));
1495 info->request_pos=info->buffer;
1496 info->read_pos-=info->read_length;
1497 info->read_end-=info->read_length;
1499 info->read_length=info->buffer_length;
1500 info->read_function=_my_b_read;
1503 info->inited=info->aio_result.pending=1;
1515 IO_CACHE_CALLBACK pre_read,post_read;
1516 if ((pre_read = info->pre_read))
1518 if ((*(info)->read_function)(info,&buff,1))
1520 if ((post_read = info->post_read))
1522 return (
int) (uchar) buff;
1535 int _my_b_write(
register IO_CACHE *info,
const uchar *Buffer,
size_t Count)
1537 size_t rest_length,length;
1538 my_off_t pos_in_file= info->pos_in_file;
1540 DBUG_EXECUTE_IF(
"simulate_huge_load_data_file",
1542 pos_in_file=(my_off_t)(5000000000ULL);
1544 if (pos_in_file+info->buffer_length > info->end_of_file)
1546 my_errno=errno=EFBIG;
1547 return info->error = -1;
1550 rest_length= (size_t) (info->write_end - info->write_pos);
1551 memcpy(info->write_pos,Buffer,(
size_t) rest_length);
1552 Buffer+=rest_length;
1554 info->write_pos+=rest_length;
1556 if (my_b_flush_io_cache(info,1))
1558 if (Count >= IO_SIZE)
1560 length=Count & (size_t) ~(IO_SIZE-1);
1561 if (info->seek_not_done)
1569 if (
mysql_file_seek(info->file, info->pos_in_file, MY_SEEK_SET, MYF(0)))
1574 info->seek_not_done=0;
1577 return info->error= -1;
1591 copy_to_read_buffer(info, Buffer, length);
1595 info->pos_in_file+=length;
1597 memcpy(info->write_pos,Buffer,(
size_t) Count);
1598 info->write_pos+=Count;
1609 int my_b_append(
register IO_CACHE *info,
const uchar *Buffer,
size_t Count)
1611 size_t rest_length,length;
1617 DBUG_ASSERT(!info->share);
1619 lock_append_buffer(info);
1620 rest_length= (size_t) (info->write_end - info->write_pos);
1621 if (Count <= rest_length)
1623 memcpy(info->write_pos, Buffer, rest_length);
1624 Buffer+=rest_length;
1626 info->write_pos+=rest_length;
1627 if (my_b_flush_io_cache(info,0))
1629 unlock_append_buffer(info);
1632 if (Count >= IO_SIZE)
1634 length=Count & (size_t) ~(IO_SIZE-1);
1637 unlock_append_buffer(info);
1638 return info->error= -1;
1642 info->end_of_file+=length;
1646 memcpy(info->write_pos,Buffer,(
size_t) Count);
1647 info->write_pos+=Count;
1648 unlock_append_buffer(info);
1653 int my_b_safe_write(
IO_CACHE *info,
const uchar *Buffer,
size_t Count)
1661 if (info->type == SEQ_READ_APPEND)
1662 return my_b_append(info, Buffer, Count);
1663 return my_b_write(info, Buffer, Count);
1673 int my_block_write(
register IO_CACHE *info,
const uchar *Buffer,
size_t Count,
1683 DBUG_ASSERT(!info->share);
1685 if (pos < info->pos_in_file)
1688 if (pos + Count <= info->pos_in_file)
1690 info->myflags | MY_NABP);
1692 length= (uint) (info->pos_in_file - pos);
1693 if (
mysql_file_pwrite(info->file, Buffer, length, pos, info->myflags | MY_NABP))
1694 info->error= error= -1;
1699 info->seek_not_done=1;
1704 length= (size_t) (info->write_end - info->buffer);
1705 if (pos < info->pos_in_file + length)
1707 size_t offset= (size_t) (pos - info->pos_in_file);
1711 memcpy(info->buffer+offset, Buffer, length);
1715 if (info->buffer+length > info->write_pos)
1716 info->write_pos=info->buffer+length;
1721 if (_my_b_write(info, Buffer, Count))
1729 #define LOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1730 lock_append_buffer(info);
1731 #define UNLOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1732 unlock_append_buffer(info);
1734 int my_b_flush_io_cache(
IO_CACHE *info,
1735 int need_append_buffer_lock __attribute__((unused)))
1738 my_off_t pos_in_file;
1739 my_bool append_cache= (info->type == SEQ_READ_APPEND);
1740 DBUG_ENTER(
"my_b_flush_io_cache");
1741 DBUG_PRINT(
"enter", (
"cache: 0x%lx", (
long) info));
1744 need_append_buffer_lock= 0;
1746 if (info->type == WRITE_CACHE || append_cache)
1748 if (info->file == -1)
1750 if (real_open_cached_file(info))
1751 DBUG_RETURN((info->error= -1));
1755 if ((length=(
size_t) (info->write_pos - info->write_buffer)))
1764 copy_to_read_buffer(info, info->write_buffer, length);
1766 pos_in_file=info->pos_in_file;
1771 if (!append_cache && info->seek_not_done)
1776 UNLOCK_APPEND_BUFFER;
1777 DBUG_RETURN((info->error= -1));
1780 info->seek_not_done=0;
1783 info->pos_in_file+=length;
1784 info->write_end= (info->write_buffer+info->buffer_length-
1785 ((pos_in_file+length) & (IO_SIZE-1)));
1788 info->myflags | MY_NABP))
1794 set_if_bigger(info->end_of_file,(pos_in_file+length));
1798 info->end_of_file+=(info->write_pos-info->append_read_pos);
1799 DBUG_ASSERT(info->end_of_file ==
mysql_file_tell(info->file, MYF(0)));
1802 info->append_read_pos=info->write_pos=info->write_buffer;
1803 ++info->disk_writes;
1804 UNLOCK_APPEND_BUFFER;
1805 DBUG_RETURN(info->error);
1809 else if (info->type != READ_NET)
1811 my_aiowait(&info->aio_result);
1815 UNLOCK_APPEND_BUFFER;
1839 IO_CACHE_CALLBACK pre_close;
1840 DBUG_ENTER(
"end_io_cache");
1841 DBUG_PRINT(
"enter",(
"cache: 0x%lx", (ulong) info));
1847 DBUG_ASSERT(!info->share || !info->share->total_threads);
1849 if ((pre_close=info->pre_close))
1854 if (info->alloced_buffer)
1856 info->alloced_buffer=0;
1857 if (info->file != -1)
1858 error= my_b_flush_io_cache(info,1);
1859 my_free(info->buffer);
1860 info->buffer=info->read_pos=(uchar*) 0;
1862 if (info->type == SEQ_READ_APPEND)
1865 info->type= TYPE_NOT_SET;
1880 void die(
const char*
fmt, ...)
1883 va_start(va_args,fmt);
1884 fprintf(stderr,
"Error:");
1885 vfprintf(stderr, fmt,va_args);
1886 fprintf(stderr,
", errno=%d\n", errno);
1890 int open_file(
const char* fname,
IO_CACHE* info,
int cache_size)
1893 if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
1894 die(
"Could not open %s", fname);
1895 if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
1896 die(
"failed in init_io_cache()");
1903 my_close(info->file, MYF(MY_WME));
1906 int main(
int argc,
char** argv)
1910 const char* fname=
"/tmp/iocache.test";
1911 int cache_size=16384;
1913 int max_block,total_bytes=0;
1914 int i,num_loops=100,error=0;
1916 char*
block, *block_end;
1918 max_block = cache_size*3;
1919 if (!(block=(
char*)my_malloc(max_block,MYF(MY_WME))))
1920 die(
"Not enough memory to allocate test block");
1921 block_end = block + max_block;
1922 for (p = block,i=0; p < block_end;i++)
1926 if (my_stat(fname,&status, MYF(0)) &&
1927 my_delete(fname,MYF(MY_WME)))
1929 die(
"Delete of %s failed, aborting", fname);
1931 open_file(fname,&sra_cache, cache_size);
1932 for (i = 0; i < num_loops; i++)
1935 int block_size = abs(rand() % max_block);
1936 int4store(buf, block_size);
1937 if (my_b_append(&sra_cache,buf,4) ||
1938 my_b_append(&sra_cache, block, block_size))
1939 die(
"write failed");
1940 total_bytes += 4+block_size;
1942 close_file(&sra_cache);
1944 if (!my_stat(fname,&status,MYF(MY_WME)))
1945 die(
"%s failed to stat, but I had just closed it,\
1946 wonder how that happened");
1947 printf(
"Final size of %s is %s, wrote %d bytes\n",fname,
1948 llstr(status.st_size,llstr_buf),
1950 my_delete(fname, MYF(MY_WME));
1952 if (total_bytes != status.st_size)
1954 fprintf(stderr,
"Not the same number of bytes acutally in file as bytes \
1955 supposedly written\n");