18 #include <ndb_global.h>
20 #include <my_pthread.h>
27 #include "AsyncFile.hpp"
28 #include "PosixAsyncFile.hpp"
30 #include <ErrorHandlingMacros.hpp>
31 #include <kernel_types.h>
32 #include <ndbd_malloc.hpp>
33 #include <NdbThread.h>
34 #include <signaldata/FsRef.hpp>
35 #include <signaldata/FsOpenReq.hpp>
36 #include <signaldata/FsReadWriteReq.hpp>
49 memset(&nzf,0,
sizeof(nzf));
53 int PosixAsyncFile::init()
59 const size_t read_size = ndbz_bufsize_read();
60 const size_t write_size = ndbz_bufsize_write();
62 nzfBufferUnaligned= ndbd_malloc(read_size + write_size +
63 NDB_O_DIRECT_WRITE_ALIGNMENT-1);
64 nzf.inbuf= (Byte*)(((UintPtr)nzfBufferUnaligned
65 + NDB_O_DIRECT_WRITE_ALIGNMENT - 1) &
66 ~(UintPtr)(NDB_O_DIRECT_WRITE_ALIGNMENT - 1));
67 nzf.outbuf= nzf.inbuf + read_size;
70 nz_mempool.size = nz_mempool.mfree =
71 ndbz_inflate_mem_size() + ndbz_deflate_mem_size();
73 ndbout_c(
"NDBFS/AsyncFile: Allocating %u for In/Deflate buffer",
74 (
unsigned int)nz_mempool.size);
75 nz_mempool.mem = (
char*) ndbd_malloc(nz_mempool.size);
77 nzf.stream.opaque= &nz_mempool;
83 static char g_odirect_readbuf[2*GLOBAL_PAGE_SIZE -1];
86 int PosixAsyncFile::check_odirect_write(Uint32
flags,
int& new_flags,
int mode)
88 assert(new_flags & (O_CREAT | O_TRUNC));
91 char * bufptr = (
char*)((UintPtr(g_odirect_readbuf)+(GLOBAL_PAGE_SIZE - 1)) & ~(GLOBAL_PAGE_SIZE - 1));
92 while (((ret = ::write(theFd, bufptr, GLOBAL_PAGE_SIZE)) == -1) &&
96 new_flags &= ~O_DIRECT;
97 ndbout_c(
"%s Failed to write using O_DIRECT, disabling",
105 theFd = ::open(theFileName.c_str(), new_flags | O_TRUNC,
mode);
113 int PosixAsyncFile::check_odirect_read(Uint32 flags,
int &new_flags,
int mode)
117 char * bufptr = (
char*)((UintPtr(g_odirect_readbuf)+(GLOBAL_PAGE_SIZE - 1)) & ~(GLOBAL_PAGE_SIZE - 1));
118 while (((ret = ::read(theFd, bufptr, GLOBAL_PAGE_SIZE)) == -1) &&
119 (errno == EINTR)) {};
122 ndbout_c(
"%s Failed to read using O_DIRECT, disabling",
123 theFileName.c_str());
127 if(lseek(theFd, 0, SEEK_SET) != 0)
132 if ((flags & FsOpenReq::OM_CHECK_SIZE) == 0)
135 if ((fstat(theFd, &
buf) == -1))
139 else if ((
buf.st_size % GLOBAL_PAGE_SIZE) != 0)
141 ndbout_c(
"%s filesize not a multiple of %d, disabling O_DIRECT",
142 theFileName.c_str(), GLOBAL_PAGE_SIZE);
151 new_flags &= ~O_DIRECT;
152 theFd = ::open(theFileName.c_str(), new_flags,
mode);
161 m_auto_sync_freq = 0;
163 m_open_flags = request->par.open.flags;
166 Uint32 flags = request->par.open.flags;
170 if (flags & FsOpenReq::OM_CREATE)
172 new_flags |= O_CREAT;
175 if (flags & FsOpenReq::OM_TRUNCATE){
176 new_flags |= O_TRUNC;
179 if (flags & FsOpenReq::OM_AUTOSYNC)
181 m_auto_sync_freq = request->par.open.auto_sync_size;
184 if (flags & FsOpenReq::OM_APPEND){
185 new_flags |= O_APPEND;
189 if (flags & FsOpenReq::OM_DIRECT)
191 new_flags |= O_DIRECT;
195 if ((flags & FsOpenReq::OM_SYNC) && ! (flags & FsOpenReq::OM_INIT))
202 const char * rw =
"";
204 case FsOpenReq::OM_READONLY:
206 new_flags |= O_RDONLY;
208 case FsOpenReq::OM_WRITEONLY:
210 new_flags |= O_WRONLY;
212 case FsOpenReq::OM_READWRITE:
217 request->error = 1000;
221 if (flags & FsOpenReq::OM_GZ)
231 const int mode = S_IRUSR | S_IWUSR |
234 if (flags & FsOpenReq::OM_CREATE_IF_NONE)
236 Uint32 tmp_flags = new_flags;
238 tmp_flags &= ~O_DIRECT;
240 if ((theFd = ::open(theFileName.c_str(), tmp_flags,
mode)) != -1)
243 request->error = FsRef::fsErrFileExists;
246 new_flags |= O_CREAT;
247 flags |= FsOpenReq::OM_CREATE;
251 theFd = ::open(theFileName.c_str(), new_flags,
mode);
254 PRINT_ERRORANDFLAGS(new_flags);
255 if ((errno == ENOENT) && (new_flags & O_CREAT))
258 theFd = ::open(theFileName.c_str(), new_flags,
mode);
262 if (new_flags & O_DIRECT)
264 new_flags &= ~O_DIRECT;
268 PRINT_ERRORANDFLAGS(new_flags);
269 request->error = errno;
274 else if (new_flags & O_DIRECT)
276 new_flags &= ~O_DIRECT;
282 request->error = errno;
287 if (flags & FsOpenReq::OM_CHECK_SIZE)
290 if ((fstat(theFd, &buf) == -1))
292 request->error = errno;
294 else if((Uint64)buf.st_size != request->par.open.file_size)
296 request->error = FsRef::fsErrInvalidFileSize;
302 if (flags & FsOpenReq::OM_INIT)
305 const off_t sz = request->par.open.file_size;
308 bzero(signal,
sizeof(tmp));
312 Uint32
block = refToMain(request->theUserReference);
313 Uint32 instance = refToInstance(request->theUserReference);
315 #ifdef HAVE_XFS_XFS_H
316 if(platform_test_xfs_fd(theFd))
318 ndbout_c(
"Using xfsctl(XFS_IOC_RESVSP64) to allocate disk space");
322 fl.l_len= (off64_t)sz;
323 if(xfsctl(NULL, theFd, XFS_IOC_RESVSP64, &fl) < 0)
324 ndbout_c(
"failed to optimally allocate disk space");
327 #ifdef HAVE_POSIX_FALLOCATE
328 posix_fallocate(theFd, 0, sz);
336 Uint32 write_cnt = 0;
337 Uint64 start = NdbTick_CurrentMillisecond();
343 while (cnt < m_page_cnt && (off + size) < sz)
345 req->filePointer = 0;
346 req->userPointer = request->theUserPointer;
347 req->numberOfPages = 1;
348 req->varIndex = index++;
349 req->data.pageData[0] = m_page_ptr.i + cnt;
352 FsReadWriteReq::FixedLength + 1,
356 size += request->par.open.page_size;
359 off_t save_size =
size;
360 char*
buf = (
char*)m_page_ptr.p;
368 n= ndbzwrite(&nzf, buf, size);
370 n= write(theFd, buf, size);
371 if(n == -1 && errno == EINTR)
375 if(n == -1 || n == 0)
377 ndbout_c(
"ndbzwrite|write returned %d: errno: %d my_errno: %d",n,errno,my_errno);
387 if ((new_flags & O_DIRECT) && off == 0)
389 ndbout_c(
"error on first write(%d), disable O_DIRECT", err);
390 new_flags &= ~O_DIRECT;
392 theFd = ::open(theFileName.c_str(), new_flags,
mode);
398 unlink(theFileName.c_str());
399 request->error = err;
406 Uint64 stop = NdbTick_CurrentMillisecond();
407 Uint64 diff = stop - start;
410 ndbout_c(
"wrote %umb in %u writes %us -> %ukb/write %umb/s",
411 Uint32(sz /1024/1024),
414 Uint32(sz / 1024 / write_cnt),
418 if(lseek(theFd, 0, SEEK_SET) != 0)
419 request->error = errno;
421 else if (flags & FsOpenReq::OM_DIRECT)
424 if (flags & (FsOpenReq::OM_TRUNCATE | FsOpenReq::OM_CREATE))
426 request->error = check_odirect_write(flags, new_flags, mode);
430 request->error = check_odirect_read(flags, new_flags, mode);
439 if (flags & FsOpenReq::OM_DIRECT)
442 ndbout_c(
"%s %s O_DIRECT: %d",
443 theFileName.c_str(), rw,
444 !!(new_flags & O_DIRECT));
446 ndbout_c(
"%s %s O_DIRECT: 0",
447 theFileName.c_str(), rw);
452 if ((flags & FsOpenReq::OM_SYNC) && (flags & FsOpenReq::OM_INIT))
459 new_flags &= ~(O_CREAT | O_TRUNC);
461 theFd = ::open(theFileName.c_str(), new_flags,
mode);
464 request->error = errno;
469 #if ! defined(O_DIRECT) && defined HAVE_DIRECTIO && defined(DIRECTIO_ON)
470 if (flags & FsOpenReq::OM_DIRECT)
472 if (directio(theFd, DIRECTIO_ON) == -1)
474 ndbout_c(
"%s Failed to set DIRECTIO_ON errno: %u",
475 theFileName.c_str(), errno);
480 ndbout_c(
"%s DIRECTIO_ON", theFileName.c_str());
489 if((err= ndbzdopen(&nzf, theFd, new_flags)) < 1)
491 ndbout_c(
"Stewart's brain broke: %d %d %s",
492 err, my_errno, theFileName.c_str());
502 req->par.readWrite.pages[0].size = 0;
504 #if ! defined(HAVE_PREAD)
505 FileGuard guard(
this);
508 while((seek_val= lseek(theFd, offset, SEEK_SET)) == (off_t)-1
509 && errno == EINTR) {};
510 if(seek_val == (off_t)-1)
518 while((seek_val= ndbzseek(&nzf, offset, SEEK_SET)) == (off_t)-1
519 && errno == EINTR) {};
520 if(seek_val == (off_t)-1)
529 size_t bytes_read = 0;
531 #if ! defined(HAVE_PREAD)
533 return_value = ndbzread(&nzf, buf, size, &error);
535 return_value = ::read(theFd, buf, size);
538 return_value = ::pread(theFd, buf, size, offset);
540 return_value = ndbzread(&nzf, buf, size, &error);
542 if (return_value == -1 && errno == EINTR) {
543 DEBUG(ndbout_c(
"EINTR in read"));
545 }
else if (!use_gz) {
546 if (return_value == -1)
549 else if (return_value < 1 && nzf.z_eof!=1)
551 if(my_errno==0 && errno==0 && error==0 && nzf.z_err==Z_STREAM_END)
553 DEBUG(ndbout_c(
"ERROR DURING %sRead: %d off: %d from %s",(use_gz)?
"gz":
"",size,offset,theFileName.c_str()));
554 ndbout_c(
"ERROR IN PosixAsyncFile::readBuffer %d %d %d %d",
555 my_errno, errno, nzf.z_err, error);
560 bytes_read = return_value;
561 req->par.readWrite.pages[0].size += bytes_read;
563 if(req->action == Request::readPartial)
567 DEBUG(ndbout_c(
"Read underflow %d %d\n %x\n%d %d",
568 size, offset, buf, bytes_read, return_value));
569 return ERR_ReadUnderflow;
572 if(bytes_read != size){
573 DEBUG(ndbout_c(
"Warning partial read %d != %d on %s",
574 bytes_read, size, theFileName.c_str()));
579 offset += bytes_read;
586 #if ! defined(HAVE_PREAD)
592 struct iovec iov[20];
593 for(
int i=0;
i < request->par.readWrite.numberOfPages ;
i++) {
594 iov[
i].iov_base= request->par.readWrite.pages[
i].buf;
595 iov[
i].iov_len= request->par.readWrite.pages[
i].size;
596 length = length + iov[
i].iov_len;
598 lseek( theFd, request->par.readWrite.pages[0].offset, SEEK_SET );
599 return_value = ::readv(theFd, iov, request->par.readWrite.numberOfPages);
600 if (return_value == -1) {
601 request->error = errno;
603 }
else if (return_value != length) {
604 request->error = 1011;
612 size_t chunk_size = 256*1024;
613 size_t bytes_to_write = chunk_size;
616 m_write_wo_sync +=
size;
618 #if ! defined(HAVE_PWRITE)
619 FileGuard guard(
this);
621 while((seek_val= lseek(theFd, offset, SEEK_SET)) == (off_t)-1
622 && errno == EINTR) {};
623 if(seek_val == (off_t)-1)
630 if (size < bytes_to_write){
632 bytes_to_write =
size;
634 size_t bytes_written = 0;
636 #if ! defined(HAVE_PWRITE)
638 return_value= ndbzwrite(&nzf, buf, bytes_to_write);
640 return_value = ::write(theFd, buf, bytes_to_write);
643 return_value= ndbzwrite(&nzf, buf, bytes_to_write);
645 return_value = ::pwrite(theFd, buf, bytes_to_write, offset);
647 if (return_value == -1 && errno == EINTR) {
649 DEBUG(ndbout_c(
"EINTR in write"));
650 }
else if (return_value == -1 || return_value < 1){
651 ndbout_c(
"ERROR IN PosixAsyncFile::writeBuffer %d %d %d",
652 my_errno, errno, nzf.z_err);
657 bytes_written = return_value;
659 if(bytes_written == 0){
660 DEBUG(ndbout_c(
"no bytes written"));
664 if(bytes_written != bytes_to_write){
665 DEBUG(ndbout_c(
"Warning partial write %d != %d",
666 bytes_written, bytes_to_write));
670 buf += bytes_written;
671 size -= bytes_written;
672 offset += bytes_written;
677 void PosixAsyncFile::closeReq(
Request *request)
680 FsOpenReq::OM_WRITEONLY |
681 FsOpenReq::OM_READWRITE |
682 FsOpenReq::OM_APPEND )) {
694 memset(&nzf,0,
sizeof(nzf));
697 nzf.stream.opaque = (
void*)&nz_mempool;
702 DEBUG(ndbout_c(
"close on fd = -1"));
706 request->error = errno;
711 bool PosixAsyncFile::isOpen(){
712 return (theFd != -1);
716 void PosixAsyncFile::syncReq(
Request *request)
718 if(m_auto_sync_freq && m_write_wo_sync == 0){
721 if (-1 == ::fsync(theFd)){
722 request->error = errno;
728 void PosixAsyncFile::appendReq(
Request *request)
730 const char *
buf = request->par.append.buf;
731 Uint32
size = request->par.append.size;
733 m_write_wo_sync +=
size;
738 n= ndbzwrite(&nzf,buf,size);
740 n= write(theFd, buf, size);
741 if(n == -1 && errno == EINTR){
746 request->error = my_errno;
748 request->error = errno;
752 DEBUG(ndbout_c(
"append with n=0"));
759 if(m_auto_sync_freq && m_write_wo_sync > m_auto_sync_freq){
764 void PosixAsyncFile::removeReq(
Request *request)
766 if (-1 == ::
remove(theFileName.c_str())) {
767 request->error = errno;
773 PosixAsyncFile::rmrfReq(
Request *request,
const char * src,
bool removePath)
775 if(!request->par.rmrf.directory)
778 if(unlink(src) != 0 && errno != ENOENT)
779 request->error = errno;
790 dirp = opendir(path);
794 request->error = errno;
798 while ((dp = readdir(dirp)) != NULL)
800 if ((strcmp(
".", dp->d_name) != 0) && (strcmp(
"..", dp->d_name) != 0))
802 int len = strlen(path);
803 strcat(path, dp->d_name);
804 if (
remove(path) == 0)
816 path[strlen(path)-1] = 0;
817 if (strcmp(src, path) != 0)
819 char * t = strrchr(path,
'/');
824 if(removePath && rmdir(src) != 0)
826 request->error = errno;
830 PosixAsyncFile::~PosixAsyncFile()
833 if (nzfBufferUnaligned)
834 ndbd_free(nzfBufferUnaligned,
835 ndbz_bufsize_read() +
836 ndbz_bufsize_write() +
837 NDB_O_DIRECT_WRITE_ALIGNMENT-1);
838 nzfBufferUnaligned = NULL;
842 ndbd_free(nz_mempool.mem, nz_mempool.size);
843 nz_mempool.mem = NULL;
848 void PosixAsyncFile::createDirectories()
851 const char *
name = theFileName.c_str();
852 const char * base = theFileName.get_base_name();
853 while((tmp = (
char *)strstr(base, DIR_SEPARATOR)))
857 mkdir(name, S_IRUSR | S_IWUSR | S_IXUSR | S_IXGRP | S_IRGRP);
859 base = tmp +
sizeof(DIR_SEPARATOR);