mf_iocache.c
上传用户:tsgydb
上传日期:2007-04-14
资源大小:10674k
文件大小:18k
- /* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
-
- This library is free software; you can redistribute it and/or
- modify it under the terms of the GNU Library General Public
- License as published by the Free Software Foundation; either
- version 2 of the License, or (at your option) any later version.
-
- This library is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Library General Public License for more details.
-
- You should have received a copy of the GNU Library General Public
- License along with this library; if not, write to the Free
- Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
- MA 02111-1307, USA */
- /*
- Cashing of files with only does (sequential) read or writes of fixed-
- length records. A read isn't allowed to go over file-length. A read is ok
- if it ends at file-length and next read can try to read after file-length
- (and get a EOF-error).
- Possibly use of asyncronic io.
- macros for read and writes for faster io.
- Used instead of FILE when reading or writing whole files.
- This will make mf_rec_cache obsolete.
- One can change info->pos_in_file to a higher value to skip bytes in file if
- also info->rc_pos is set to info->rc_end.
- If called through open_cached_file(), then the temporary file will
- only be created if a write exeeds the file buffer or if one calls
- flush_io_cache().
- */
- #define MAP_TO_USE_RAID
- #include "mysys_priv.h"
- #include <m_string.h>
- #ifdef HAVE_AIOWAIT
- #include "mysys_err.h"
- #include <errno.h>
- static void my_aiowait(my_aio_result *result);
- #endif
- /*
- ** if cachesize == 0 then use default cachesize (from s-file)
- ** if file == -1 then real_open_cached_file() will be called.
- ** returns 0 if ok
- */
- int init_io_cache(IO_CACHE *info, File file, uint cachesize,
- enum cache_type type, my_off_t seek_offset,
- pbool use_async_io, myf cache_myflags)
- {
- uint min_cache;
- DBUG_ENTER("init_io_cache");
- DBUG_PRINT("enter",("type: %d pos: %ld",(int) type, (ulong) seek_offset));
- info->file=file;
- if (!cachesize)
- if (! (cachesize= my_default_record_cache_size))
- DBUG_RETURN(1); /* No cache requested */
- min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
- if (type == READ_CACHE)
- { /* Assume file isn't growing */
- if (cache_myflags & MY_DONT_CHECK_FILESIZE)
- {
- cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
- }
- else
- {
- my_off_t file_pos,end_of_file;
- if ((file_pos=my_tell(file,MYF(0)) == MY_FILEPOS_ERROR))
- DBUG_RETURN(1);
- end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0));
- if (end_of_file < seek_offset)
- end_of_file=seek_offset;
- VOID(my_seek(file,file_pos,MY_SEEK_SET,MYF(0)));
- if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
- {
- cachesize=(uint) (end_of_file-seek_offset)+IO_SIZE*2-1;
- use_async_io=0; /* No nead to use async */
- }
- }
- }
- for (;;)
- {
- cachesize=(uint) ((ulong) (cachesize + min_cache-1) &
- (ulong) ~(min_cache-1));
- if (cachesize < min_cache)
- cachesize = min_cache;
- if ((info->buffer=
- (byte*) my_malloc(cachesize,
- MYF((cache_myflags & ~ MY_WME) |
- (cachesize == min_cache ? MY_WME : 0)))) != 0)
- break; /* Enough memory found */
- if (cachesize == min_cache)
- DBUG_RETURN(2); /* Can't alloc cache */
- cachesize= (uint) ((long) cachesize*3/4); /* Try with less memory */
- }
- info->pos_in_file=seek_offset;
- info->read_length=info->buffer_length=cachesize;
- info->seek_not_done= test(file >= 0); /* Seek not done */
- info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
- info->rc_request_pos=info->rc_pos=info->buffer;
- if (type == READ_CACHE)
- {
- info->rc_end=info->buffer; /* Nothing in cache */
- }
- else /* type == WRITE_CACHE */
- {
- info->rc_end=info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
- }
- info->end_of_file=MY_FILEPOS_ERROR; /* May be changed by user */
- info->type=type;
- info->error=0;
- info->read_function=_my_b_read;
- #ifdef HAVE_AIOWAIT
- if (use_async_io && ! my_disable_async_io)
- {
- DBUG_PRINT("info",("Using async io"));
- info->read_length/=2;
- info->read_function=_my_b_async_read;
- }
- info->inited=info->aio_result.pending=0;
- #endif
- DBUG_RETURN(0);
- } /* init_io_cache */
- /* Wait until current request is ready */
- #ifdef HAVE_AIOWAIT
- static void my_aiowait(my_aio_result *result)
- {
- if (result->pending)
- {
- struct aio_result_t *tmp;
- for (;;)
- {
- if ((int) (tmp=aiowait((struct timeval *) 0)) == -1)
- {
- if (errno == EINTR)
- continue;
- DBUG_PRINT("error",("No aio request, error: %d",errno));
- result->pending=0; /* Assume everythings is ok */
- break;
- }
- ((my_aio_result*) tmp)->pending=0;
- if ((my_aio_result*) tmp == result)
- break;
- }
- }
- return;
- }
- #endif
- /* Use this to reset cache to start or other type */
- /* Some simple optimizing is done when reinit in current buffer */
- my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
- my_off_t seek_offset,
- pbool use_async_io __attribute__((unused)),
- pbool clear_cache)
- {
- DBUG_ENTER("reinit_io_cache");
- info->seek_not_done= test(info->file >= 0); /* Seek not done */
- if (! clear_cache &&
- seek_offset >= info->pos_in_file &&
- seek_offset <= info->pos_in_file +
- (uint) (info->rc_end - info->rc_request_pos))
- { /* use current buffer */
- if (info->type == WRITE_CACHE && type == READ_CACHE)
- {
- info->rc_end=info->rc_pos;
- info->end_of_file=my_b_tell(info);
- }
- else if (info->type == READ_CACHE && type == WRITE_CACHE)
- info->rc_end=info->buffer+info->buffer_length;
- info->rc_pos=info->rc_request_pos+(seek_offset-info->pos_in_file);
- #ifdef HAVE_AIOWAIT
- my_aiowait(&info->aio_result); /* Wait for outstanding req */
- #endif
- }
- else
- {
- if (info->type == WRITE_CACHE && type == READ_CACHE)
- info->end_of_file=my_b_tell(info);
- if (flush_io_cache(info))
- DBUG_RETURN(1);
- info->pos_in_file=seek_offset;
- info->rc_request_pos=info->rc_pos=info->buffer;
- if (type == READ_CACHE)
- {
- info->rc_end=info->buffer; /* Nothing in cache */
- }
- else
- {
- info->rc_end=info->buffer+info->buffer_length-
- (seek_offset & (IO_SIZE-1));
- info->end_of_file=MY_FILEPOS_ERROR; /* May be changed by user */
- }
- }
- info->type=type;
- info->error=0;
- info->read_function=_my_b_read;
- #ifdef HAVE_AIOWAIT
- if (use_async_io && ! my_disable_async_io &&
- ((ulong) info->buffer_length <
- (ulong) (info->end_of_file - seek_offset)))
- {
- info->read_length=info->buffer_length/2;
- info->read_function=_my_b_async_read;
- }
- info->inited=0;
- #endif
- DBUG_RETURN(0);
- } /* init_io_cache */
- /* Read buffered. Returns 1 if can't read requested characters */
- /* Returns 0 if record read */
- int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
- {
- uint length,diff_length,left_length;
- my_off_t max_length, pos_in_file;
- memcpy(Buffer,info->rc_pos,
- (size_t) (left_length=(uint) (info->rc_end-info->rc_pos)));
- Buffer+=left_length;
- Count-=left_length;
- pos_in_file=info->pos_in_file+(uint) (info->rc_end - info->buffer);
- if (info->seek_not_done)
- { /* File touched, do seek */
- VOID(my_seek(info->file,pos_in_file,MY_SEEK_SET,MYF(0)));
- info->seek_not_done=0;
- }
- diff_length=(uint) (pos_in_file & (IO_SIZE-1));
- if (Count >= (uint) (IO_SIZE+(IO_SIZE-diff_length)))
- { /* Fill first intern buffer */
- uint read_length;
- if (info->end_of_file == pos_in_file)
- { /* End of file */
- info->error=(int) left_length;
- return 1;
- }
- length=(Count & (uint) ~(IO_SIZE-1))-diff_length;
- if ((read_length=my_read(info->file,Buffer,(uint) length,info->myflags))
- != (uint) length)
- {
- info->error= read_length == (uint) -1 ? -1 :
- (int) (read_length+left_length);
- return 1;
- }
- Count-=length;
- Buffer+=length;
- pos_in_file+=length;
- left_length+=length;
- diff_length=0;
- }
- max_length=info->end_of_file - pos_in_file;
- if (max_length > info->read_length-diff_length)
- max_length=info->read_length-diff_length;
- if (!max_length)
- {
- if (Count)
- {
- info->error= left_length; /* We only got this many char */
- return 1;
- }
- length=0; /* Didn't read any chars */
- }
- else if ((length=my_read(info->file,info->buffer,(uint) max_length,
- info->myflags)) < Count ||
- length == (uint) -1)
- {
- if (length != (uint) -1)
- memcpy(Buffer,info->buffer,(size_t) length);
- info->error= length == (uint) -1 ? -1 : (int) (length+left_length);
- return 1;
- }
- info->rc_pos=info->buffer+Count;
- info->rc_end=info->buffer+length;
- info->pos_in_file=pos_in_file;
- memcpy(Buffer,info->buffer,(size_t) Count);
- return 0;
- }
- #ifdef HAVE_AIOWAIT
- int _my_b_async_read(register IO_CACHE *info, byte *Buffer, uint Count)
- {
- uint length,read_length,diff_length,left_length,use_length,org_Count;
- my_off_t max_length;
- my_off_t next_pos_in_file;
- byte *read_buffer;
- memcpy(Buffer,info->rc_pos,
- (size_t) (left_length=(uint) (info->rc_end-info->rc_pos)));
- Buffer+=left_length;
- org_Count=Count;
- Count-=left_length;
- if (info->inited)
- { /* wait for read block */
- info->inited=0; /* No more block to read */
- my_aiowait(&info->aio_result); /* Wait for outstanding req */
- if (info->aio_result.result.aio_errno)
- {
- if (info->myflags & MY_WME)
- my_error(EE_READ, MYF(ME_BELL+ME_WAITTANG),
- my_filename(info->file),
- info->aio_result.result.aio_errno);
- my_errno=info->aio_result.result.aio_errno;
- info->error= -1;
- return(1);
- }
- if (! (read_length = (uint) info->aio_result.result.aio_return) ||
- read_length == (uint) -1)
- {
- my_errno=0; /* For testing */
- info->error= (read_length == (uint) -1 ? -1 :
- (int) (read_length+left_length));
- return(1);
- }
- info->pos_in_file+=(uint) (info->rc_end - info->rc_request_pos);
- if (info->rc_request_pos != info->buffer)
- info->rc_request_pos=info->buffer;
- else
- info->rc_request_pos=info->buffer+info->read_length;
- info->rc_pos=info->rc_request_pos;
- next_pos_in_file=info->aio_read_pos+read_length;
- /* Check if pos_in_file is changed
- (_ni_read_cache may have skipped some bytes) */
- if (info->aio_read_pos < info->pos_in_file)
- { /* Fix if skipped bytes */
- if (info->aio_read_pos + read_length < info->pos_in_file)
- {
- read_length=0; /* Skipp block */
- next_pos_in_file=info->pos_in_file;
- }
- else
- {
- my_off_t offset= (info->pos_in_file - info->aio_read_pos);
- info->pos_in_file=info->aio_read_pos; /* Whe are here */
- info->rc_pos=info->rc_request_pos+offset;
- read_length-=offset; /* Bytes left from rc_pos */
- }
- }
- #ifndef DBUG_OFF
- if (info->aio_read_pos > info->pos_in_file)
- {
- my_errno=EINVAL;
- return(info->read_length= -1);
- }
- #endif
- /* Copy found bytes to buffer */
- length=min(Count,read_length);
- memcpy(Buffer,info->rc_pos,(size_t) length);
- Buffer+=length;
- Count-=length;
- left_length+=length;
- info->rc_end=info->rc_pos+read_length;
- info->rc_pos+=length;
- }
- else
- next_pos_in_file=(info->pos_in_file+ (uint)
- (info->rc_end - info->rc_request_pos));
- /* If reading large blocks, or first read or read with skipp */
- if (Count)
- {
- if (next_pos_in_file == info->end_of_file)
- {
- info->error=(int) (read_length+left_length);
- return 1;
- }
- VOID(my_seek(info->file,next_pos_in_file,MY_SEEK_SET,MYF(0)));
- read_length=IO_SIZE*2- (uint) (next_pos_in_file & (IO_SIZE-1));
- if (Count < read_length)
- { /* Small block, read to cache */
- if ((read_length=my_read(info->file,info->rc_request_pos,
- read_length, info->myflags)) == (uint) -1)
- return info->error= -1;
- use_length=min(Count,read_length);
- memcpy(Buffer,info->rc_request_pos,(size_t) use_length);
- info->rc_pos=info->rc_request_pos+Count;
- info->rc_end=info->rc_request_pos+read_length;
- info->pos_in_file=next_pos_in_file; /* Start of block in cache */
- next_pos_in_file+=read_length;
- if (Count != use_length)
- { /* Didn't find hole block */
- if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
- my_error(EE_EOFERR, MYF(ME_BELL+ME_WAITTANG),
- my_filename(info->file),my_errno);
- info->error=(int) (read_length+left_length);
- return 1;
- }
- }
- else
- { /* Big block, don't cache it */
- if ((read_length=my_read(info->file,Buffer,(uint) Count,info->myflags))
- != Count)
- {
- info->error= read_length == (uint) -1 ? -1 : read_length+left_length;
- return 1;
- }
- info->rc_pos=info->rc_end=info->rc_request_pos;
- info->pos_in_file=(next_pos_in_file+=Count);
- }
- }
- /* Read next block with asyncronic io */
- max_length=info->end_of_file - next_pos_in_file;
- diff_length=(next_pos_in_file & (IO_SIZE-1));
- if (max_length > (my_off_t) info->read_length - diff_length)
- max_length= (my_off_t) info->read_length - diff_length;
- if (info->rc_request_pos != info->buffer)
- read_buffer=info->buffer;
- else
- read_buffer=info->buffer+info->read_length;
- info->aio_read_pos=next_pos_in_file;
- if (max_length)
- {
- info->aio_result.result.aio_errno=AIO_INPROGRESS; /* Marker for test */
- DBUG_PRINT("aioread",("filepos: %ld length: %ld",
- (ulong) next_pos_in_file,(ulong) max_length));
- if (aioread(info->file,read_buffer,(int) max_length,
- (my_off_t) next_pos_in_file,MY_SEEK_SET,
- &info->aio_result.result))
- { /* Skipp async io */
- my_errno=errno;
- DBUG_PRINT("error",("got error: %d, aio_result: %d from aioread, async skipped",
- errno, info->aio_result.result.aio_errno));
- if (info->rc_request_pos != info->buffer)
- {
- bmove(info->buffer,info->rc_request_pos,
- (uint) (info->rc_end - info->rc_pos));
- info->rc_request_pos=info->buffer;
- info->rc_pos-=info->read_length;
- info->rc_end-=info->read_length;
- }
- info->read_length=info->buffer_length; /* Use hole buffer */
- info->read_function=_my_b_read; /* Use normal IO_READ next */
- }
- else
- info->inited=info->aio_result.pending=1;
- }
- return 0; /* Block read, async in use */
- } /* _my_b_async_read */
- #endif
- /* Read one byte when buffer is empty */
- int _my_b_get(IO_CACHE *info)
- {
- byte buff;
- if ((*(info)->read_function)(info,&buff,1))
- return my_b_EOF;
- return (int) (uchar) buff;
- }
- /* Returns != 0 if error on write */
- int _my_b_write(register IO_CACHE *info, const byte *Buffer, uint Count)
- {
- uint rest_length,length;
- rest_length=(uint) (info->rc_end - info->rc_pos);
- memcpy(info->rc_pos,Buffer,(size_t) rest_length);
- Buffer+=rest_length;
- Count-=rest_length;
- info->rc_pos+=rest_length;
- if (flush_io_cache(info))
- return 1;
- if (Count >= IO_SIZE)
- { /* Fill first intern buffer */
- length=Count & (uint) ~(IO_SIZE-1);
- if (info->seek_not_done)
- { /* File touched, do seek */
- VOID(my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)));
- info->seek_not_done=0;
- }
- if (my_write(info->file,Buffer,(uint) length,info->myflags | MY_NABP))
- return info->error= -1;
- Count-=length;
- Buffer+=length;
- info->pos_in_file+=length;
- }
- memcpy(info->rc_pos,Buffer,(size_t) Count);
- info->rc_pos+=Count;
- return 0;
- }
- /*
- Write a block to disk where part of the data may be inside the record
- buffer. As all write calls to the data goes through the cache,
- we will never get a seek over the end of the buffer
- */
- int my_block_write(register IO_CACHE *info, const byte *Buffer, uint Count,
- my_off_t pos)
- {
- uint length;
- int error=0;
- if (pos < info->pos_in_file)
- {
- /* Of no overlap, write everything without buffering */
- if (pos + Count <= info->pos_in_file)
- return my_pwrite(info->file, Buffer, Count, pos,
- info->myflags | MY_NABP);
- /* Write the part of the block that is before buffer */
- length= (uint) (info->pos_in_file - pos);
- if (my_pwrite(info->file, Buffer, length, pos, info->myflags | MY_NABP))
- info->error=error=-1;
- Buffer+=length;
- pos+= length;
- Count-= length;
- }
- /* Check if we want to write inside the used part of the buffer.*/
- length= (uint) (info->rc_end - info->buffer);
- if (pos < info->pos_in_file + length)
- {
- uint offset= (uint) (pos - info->pos_in_file);
- length-=offset;
- if (length > Count)
- length=Count;
- memcpy(info->buffer+offset, Buffer, length);
- Buffer+=length;
- Count-= length;
- /* Fix length of buffer if the new data was larger */
- if (info->buffer+length > info->rc_pos)
- info->rc_pos=info->buffer+length;
- if (!Count)
- return (error);
- }
- /* Write at the end of the current buffer; This is the normal case */
- if (_my_b_write(info, Buffer, Count))
- error= -1;
- return error;
- }
- /* Flush write cache */
- int flush_io_cache(IO_CACHE *info)
- {
- uint length;
- DBUG_ENTER("flush_io_cache");
- if (info->type == WRITE_CACHE)
- {
- if (info->file == -1)
- {
- if (real_open_cached_file(info))
- DBUG_RETURN((info->error= -1));
- }
- if (info->rc_pos != info->buffer)
- {
- length=(uint) (info->rc_pos - info->buffer);
- if (info->seek_not_done)
- { /* File touched, do seek */
- if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)) ==
- MY_FILEPOS_ERROR)
- DBUG_RETURN((info->error= -1));
- info->seek_not_done=0;
- }
- info->rc_pos=info->buffer;
- info->pos_in_file+=length;
- info->rc_end=(info->buffer+info->buffer_length-
- (info->pos_in_file & (IO_SIZE-1)));
- if (my_write(info->file,info->buffer,length,info->myflags | MY_NABP))
- DBUG_RETURN((info->error= -1));
- DBUG_RETURN(0);
- }
- }
- #ifdef HAVE_AIOWAIT
- else
- {
- my_aiowait(&info->aio_result); /* Wait for outstanding req */
- info->inited=0;
- }
- #endif
- DBUG_RETURN(0);
- }
- int end_io_cache(IO_CACHE *info)
- {
- int error=0;
- DBUG_ENTER("end_io_cache");
- if (info->buffer)
- {
- if (info->file != -1) /* File doesn't exist */
- error=flush_io_cache(info);
- my_free((gptr) info->buffer,MYF(MY_WME));
- info->buffer=info->rc_pos=(byte*) 0;
- }
- DBUG_RETURN(error);
- } /* end_io_cache */