sort.c
上传用户:tsgydb
上传日期:2007-04-14
资源大小:10674k
文件大小:14k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
  2.    
  3.    This program is free software; you can redistribute it and/or modify
  4.    it under the terms of the GNU General Public License as published by
  5.    the Free Software Foundation; either version 2 of the License, or
  6.    (at your option) any later version.
  7.    
  8.    This program is distributed in the hope that it will be useful,
  9.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  10.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  11.    GNU General Public License for more details.
  12.    
  13.    You should have received a copy of the GNU General Public License
  14.    along with this program; if not, write to the Free Software
  15.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  16. /*
  17.   Creates a index for a database by reading keys, sorting them and outputing
  18.   them in sorted order through SORT_INFO functions.
  19. */
  20. #include "myisamdef.h"
  21. #if defined(MSDOS) || defined(__WIN__)
  22. #include <fcntl.h>
  23. #else
  24. #include <stddef.h>
  25. #endif
  26. #include <queues.h>
  27. /* static variabels */
  28. #define MERGEBUFF 15
  29. #define MERGEBUFF2 31
  30. #define MIN_SORT_MEMORY (4096-MALLOC_OVERHEAD)
  31. #define MYF_RW MYF(MY_NABP | MY_WME | MY_WAIT_IF_FULL)
  32. #define DISK_BUFFER_SIZE (IO_SIZE*16)
  33. typedef struct st_buffpek {
  34.   my_off_t file_pos; /* position to buffer */
  35.   ha_rows count; /* keys in buffer */
  36.   uchar *base,*key; /* Pekare inom sort_key - indexdel */
  37.   uint mem_count; /* keys left in memory */
  38.   uint max_keys; /* Max keys in buffert */
  39. } BUFFPEK;
  40. extern void print_error _VARARGS((const char *fmt,...));
  41. /* functions defined in this file */
  42. static ha_rows NEAR_F find_all_keys(MI_SORT_PARAM *info,uint keys,
  43.     uchar **sort_keys,
  44.     BUFFPEK *buffpek,int *maxbuffer,
  45.     IO_CACHE *tempfile);
  46. static int NEAR_F write_keys(MI_SORT_PARAM *info,uchar * *sort_keys,
  47.      uint count, BUFFPEK *buffpek,IO_CACHE *tempfile);
  48. static int NEAR_F write_index(MI_SORT_PARAM *info,uchar * *sort_keys,
  49.       uint count);
  50. static int NEAR_F merge_many_buff(MI_SORT_PARAM *info,uint keys,
  51.   uchar * *sort_keys,
  52.   BUFFPEK *buffpek,int *maxbuffer,
  53.   IO_CACHE *t_file);
  54. static uint NEAR_F read_to_buffer(IO_CACHE *fromfile,BUFFPEK *buffpek,
  55.   uint sort_length);
  56. static int NEAR_F merge_buffers(MI_SORT_PARAM *info,uint keys,
  57. IO_CACHE *from_file, IO_CACHE *to_file,
  58. uchar * *sort_keys, BUFFPEK *lastbuff,
  59. BUFFPEK *Fb, BUFFPEK *Tb);
  60. static int NEAR_F merge_index(MI_SORT_PARAM *,uint,uchar **,BUFFPEK *, int,
  61.       IO_CACHE *);
  62. static char **make_char_array(uint fields,uint length,myf my_flag);
  63. /* Creates a index of sorted keys */
  64. /* Returns 0 if everything went ok */
  65. int _create_index_by_sort(MI_SORT_PARAM *info,my_bool no_messages,
  66.   ulong sortbuff_size)
  67. {
  68.   int error,maxbuffer,skr;
  69.   uint memavl,old_memavl,keys,sort_length;
  70.   BUFFPEK *buffpek;
  71.   ha_rows records;
  72.   uchar **sort_keys;
  73.   IO_CACHE tempfile;
  74.   DBUG_ENTER("_create_index_by_sort");
  75.   DBUG_PRINT("enter",("sort_length: %d", info->key_length));
  76.   my_b_clear(&tempfile);
  77.   buffpek= (BUFFPEK *) NULL; sort_keys= (uchar **) NULL; error= 1;
  78.   maxbuffer=1;
  79.   memavl=max(sortbuff_size,MIN_SORT_MEMORY);
  80.   records= info->max_records;
  81.   sort_length= info->key_length;
  82.   LINT_INIT(keys);
  83.   while (memavl >= MIN_SORT_MEMORY)
  84.   {
  85.     if ((my_off_t) (records+1)*(sort_length+sizeof(char*)) <=
  86. (my_off_t) memavl)
  87.       keys= records+1;
  88.     else
  89.       do
  90.       {
  91. skr=maxbuffer;
  92. if (memavl < sizeof(BUFFPEK)*(uint) maxbuffer ||
  93.     (keys=(memavl-sizeof(BUFFPEK)*(uint) maxbuffer)/
  94.      (sort_length+sizeof(char*))) <= 1)
  95. {
  96.   mi_check_print_error(info->sort_info->param,
  97.        "sort_buffer_size is to small");
  98.   goto err;
  99. }
  100.       }
  101.       while ((maxbuffer= (int) (records/(keys-1)+1)) != skr);
  102.     if ((sort_keys= (uchar **) make_char_array(keys,sort_length,MYF(0))))
  103.     {
  104.       if ((buffpek = (BUFFPEK*) my_malloc((uint) (sizeof(BUFFPEK)*
  105.   (uint) maxbuffer),
  106.   MYF(0))))
  107. break;
  108.       else
  109. my_free((gptr) sort_keys,MYF(0));
  110.     }
  111.     old_memavl=memavl;
  112.     if ((memavl=memavl/4*3) < MIN_SORT_MEMORY && old_memavl > MIN_SORT_MEMORY)
  113.       memavl=MIN_SORT_MEMORY;
  114.   }
  115.   if (memavl < MIN_SORT_MEMORY)
  116.   {
  117.     mi_check_print_error(info->sort_info->param,"Sort buffer to small"); /* purecov: tested */
  118.     goto err; /* purecov: tested */
  119.   }
  120.   (*info->lock_in_memory)(info->sort_info->param);/* Everything is allocated */
  121.   if (!no_messages)
  122.     printf("  - Searching for keys, allocating buffer for %d keysn",keys);
  123.   if ((records=find_all_keys(info,keys,sort_keys,buffpek,&maxbuffer,&tempfile))
  124.       == HA_POS_ERROR)
  125.     goto err; /* purecov: tested */
  126.   if (maxbuffer == 0)
  127.   {
  128.     if (!no_messages)
  129.       printf("  - Dumping %lu keysn",records);
  130.     if (write_index(info,sort_keys,(uint) records))
  131.       goto err; /* purecov: inspected */
  132.   }
  133.   else
  134.   {
  135.     keys=(keys*(sort_length+sizeof(char*)))/sort_length;
  136.     if (maxbuffer >= MERGEBUFF2)
  137.     {
  138.       if (!no_messages)
  139. printf("  - Merging %lu keysn",records); /* purecov: tested */
  140.       if (merge_many_buff(info,keys,sort_keys,buffpek,&maxbuffer,&tempfile))
  141. goto err; /* purecov: inspected */
  142.     }
  143.     if (flush_io_cache(&tempfile) ||
  144. reinit_io_cache(&tempfile,READ_CACHE,0L,0,0))
  145.       goto err; /* purecov: inspected */
  146.     if (!no_messages)
  147.       puts("  - Last merge and dumping keys"); /* purecov: tested */
  148.     if (merge_index(info,keys,sort_keys,buffpek,maxbuffer,&tempfile))
  149.       goto err; /* purecov: inspected */
  150.   }
  151.   error =0;
  152. err:
  153.   if (sort_keys)
  154.     my_free((gptr) sort_keys,MYF(0));
  155.   if (buffpek)
  156.     my_free((gptr) buffpek,MYF(0));
  157.   close_cached_file(&tempfile);
  158.   DBUG_RETURN(error ? -1 : 0);
  159. } /* _create_index_by_sort */
  160. /* Search after all keys and place them in a temp. file */
  161. static ha_rows NEAR_F find_all_keys(MI_SORT_PARAM *info, uint keys,
  162.     uchar **sort_keys, BUFFPEK *buffpek,
  163.     int *maxbuffer, IO_CACHE *tempfile)
  164. {
  165.   int error;
  166.   uint idx,indexpos;
  167.   DBUG_ENTER("find_all_keys");
  168.   idx=indexpos=error=0;
  169.   while (!(error=(*info->key_read)(info->sort_info,sort_keys[idx])))
  170.   {
  171.     if ((uint) ++idx == keys)
  172.     {
  173.       if (indexpos >= (uint) *maxbuffer ||
  174.   write_keys(info,sort_keys,idx-1,buffpek+indexpos,tempfile))
  175. DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */
  176.       memcpy(sort_keys[0],sort_keys[idx-1],(size_t) info->key_length);
  177.       idx=1; indexpos++;
  178.     }
  179.   }
  180.   if (error > 0)
  181.     DBUG_RETURN(HA_POS_ERROR); /* Aborted by get_key */ /* purecov: inspected */
  182.   if (indexpos)
  183.     if (indexpos >= (uint) *maxbuffer ||
  184. write_keys(info,sort_keys,idx,buffpek+indexpos,tempfile))
  185.       DBUG_RETURN(HA_POS_ERROR); /* purecov: inspected */
  186.   *maxbuffer=(int) indexpos;
  187.   DBUG_RETURN(indexpos*(keys-1)+idx);
  188. } /* find_all_keys */
  189. /* Write all keys in memory to file for later merge */
  190. static int NEAR_F write_keys(MI_SORT_PARAM *info, register uchar **sort_keys,
  191.      uint count, BUFFPEK *buffpek,
  192.      IO_CACHE *tempfile)
  193. {
  194.   uchar **end;
  195.   uint sort_length=info->key_length;
  196.   DBUG_ENTER("write_keys");
  197.   qsort2((byte*) sort_keys,count,sizeof(byte*),(qsort2_cmp) info->key_cmp,
  198. info->sort_info);
  199.   if (!my_b_inited(tempfile) &&
  200.       open_cached_file(tempfile, info->tmpdir, "ST", DISK_BUFFER_SIZE,
  201.        info->myf_rw))
  202.     DBUG_RETURN(1); /* purecov: inspected */
  203.   buffpek->file_pos=my_b_tell(tempfile);
  204.   buffpek->count=count;
  205.   for (end=sort_keys+count ; sort_keys != end ; sort_keys++)
  206.     if (my_b_write(tempfile,(byte*) *sort_keys,(uint) sort_length))
  207.       DBUG_RETURN(1); /* purecov: inspected */
  208.   DBUG_RETURN(0);
  209. } /* write_keys */
  210. /* Write index */
  211. static int NEAR_F write_index(MI_SORT_PARAM *info, register uchar **sort_keys,
  212.       register uint count)
  213. {
  214.   DBUG_ENTER("write_index");
  215.   qsort2((gptr) sort_keys,(size_t) count,sizeof(byte*),
  216. (qsort2_cmp) info->key_cmp,info->sort_info);
  217.   while (count--)
  218.     if ((*info->key_write)(info->sort_info,*sort_keys++))
  219.       DBUG_RETURN(-1); /* purecov: inspected */
  220.   DBUG_RETURN(0);
  221. } /* write_index */
  222. /* Merge buffers to make < MERGEBUFF2 buffers */
  223. static int NEAR_F merge_many_buff(MI_SORT_PARAM *info, uint keys,
  224.   uchar **sort_keys, BUFFPEK *buffpek,
  225.   int *maxbuffer, IO_CACHE *t_file)
  226. {
  227.   register int i;
  228.   IO_CACHE t_file2, *from_file, *to_file, *temp;
  229.   BUFFPEK *lastbuff;
  230.   DBUG_ENTER("merge_many_buff");
  231.   if (*maxbuffer < MERGEBUFF2)
  232.     DBUG_RETURN(0); /* purecov: inspected */
  233.   if (flush_io_cache(t_file) ||
  234.       open_cached_file(&t_file2,info->tmpdir,"ST",DISK_BUFFER_SIZE,
  235.        info->myf_rw))
  236.     DBUG_RETURN(1); /* purecov: inspected */
  237.   from_file= t_file ; to_file= &t_file2;
  238.   while (*maxbuffer >= MERGEBUFF2)
  239.   {
  240.     reinit_io_cache(from_file,READ_CACHE,0L,0,0);
  241.     reinit_io_cache(to_file,WRITE_CACHE,0L,0,0);
  242.     lastbuff=buffpek;
  243.     for (i=0 ; i <= *maxbuffer-MERGEBUFF*3/2 ; i+=MERGEBUFF)
  244.     {
  245.       if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
  246. buffpek+i,buffpek+i+MERGEBUFF-1))
  247. break; /* purecov: inspected */
  248.     }
  249.     if (merge_buffers(info,keys,from_file,to_file,sort_keys,lastbuff++,
  250.       buffpek+i,buffpek+ *maxbuffer))
  251.       break; /* purecov: inspected */
  252.     if (flush_io_cache(to_file))
  253.       break; /* purecov: inspected */
  254.     temp=from_file; from_file=to_file; to_file=temp;
  255.     *maxbuffer= (int) (lastbuff-buffpek)-1;
  256.   }
  257.   close_cached_file(to_file); /* This holds old result */
  258.   if (to_file == t_file)
  259.     *t_file=t_file2; /* Copy result file */
  260.   DBUG_RETURN(*maxbuffer >= MERGEBUFF2); /* Return 1 if interrupted */
  261. } /* merge_many_buff */
  262. /* Read data to buffer */
  263. /* This returns (uint) -1 if something goes wrong */
  264. static uint NEAR_F read_to_buffer(IO_CACHE *fromfile, BUFFPEK *buffpek,
  265.   uint sort_length)
  266. {
  267.   register uint count;
  268.   uint length;
  269.   if ((count=(uint) min((ha_rows) buffpek->max_keys,buffpek->count)))
  270.   {
  271.     if (my_pread(fromfile->file,(byte*) buffpek->base,
  272.  (length= sort_length*count),buffpek->file_pos,MYF_RW))
  273.       return((uint) -1); /* purecov: inspected */
  274.     buffpek->key=buffpek->base;
  275.     buffpek->file_pos+= length; /* New filepos */
  276.     buffpek->count-= count;
  277.     buffpek->mem_count= count;
  278.   }
  279.   return (count*sort_length);
  280. } /* read_to_buffer */
  281. /* Merge buffers to one buffer */
  282. /* If to_file == 0 then use info->key_write */
  283. static int NEAR_F
  284. merge_buffers(MI_SORT_PARAM *info, uint keys, IO_CACHE *from_file, 
  285.       IO_CACHE *to_file, uchar **sort_keys, BUFFPEK *lastbuff,
  286.       BUFFPEK *Fb, BUFFPEK *Tb)
  287. {
  288.   int error;
  289.   uint sort_length,maxcount;
  290.   ha_rows count;
  291.   my_off_t to_start_filepos;
  292.   uchar *strpos;
  293.   BUFFPEK *buffpek,**refpek;
  294.   QUEUE queue;
  295.   DBUG_ENTER("merge_buffers");
  296.   count=error=0;
  297.   maxcount=keys/((uint) (Tb-Fb) +1);
  298.   LINT_INIT(to_start_filepos);
  299.   if (to_file)
  300.     to_start_filepos=my_b_tell(to_file);
  301.   strpos=(uchar*) sort_keys;
  302.   sort_length=info->key_length;
  303.   if (init_queue(&queue,(uint) (Tb-Fb)+1,offsetof(BUFFPEK,key),0,
  304.  (int (*)(void*, byte *,byte*)) info->key_cmp,
  305.  (void*) info->sort_info))
  306.     DBUG_RETURN(1); /* purecov: inspected */
  307.   for (buffpek= Fb ; buffpek <= Tb ; buffpek++)
  308.   {
  309.     count+= buffpek->count;
  310.     buffpek->base= strpos;
  311.     buffpek->max_keys=maxcount;
  312.     strpos+= (uint) (error=(int) read_to_buffer(from_file,buffpek,
  313. sort_length));
  314.     if (error == -1)
  315.       goto err; /* purecov: inspected */
  316.     queue_insert(&queue,(void*) buffpek);
  317.   }
  318.   while (queue.elements > 1)
  319.   {
  320.     for (;;)
  321.     {
  322.       buffpek=(BUFFPEK*) queue_top(&queue);
  323.       if (to_file)
  324.       {
  325. if (my_b_write(to_file,(byte*) buffpek->key,(uint) sort_length))
  326. {
  327.   error=1; goto err; /* purecov: inspected */
  328. }
  329.       }
  330.       else
  331.       {
  332. if ((*info->key_write)(info->sort_info,(void*) buffpek->key))
  333. {
  334.   error=1; goto err; /* purecov: inspected */
  335. }
  336.       }
  337.       buffpek->key+=sort_length;
  338.       if (! --buffpek->mem_count)
  339.       {
  340. if (!(error=(int) read_to_buffer(from_file,buffpek,sort_length)))
  341. {
  342.   uchar *base=buffpek->base;
  343.   uint max_keys=buffpek->max_keys;
  344.   VOID(queue_remove(&queue,0));
  345.   /* Put room used by buffer to use in other buffer */
  346.   for (refpek= (BUFFPEK**) &queue_top(&queue);
  347.        refpek <= (BUFFPEK**) &queue_end(&queue);
  348.        refpek++)
  349.   {
  350.     buffpek= *refpek;
  351.     if (buffpek->base+buffpek->max_keys*sort_length == base)
  352.     {
  353.       buffpek->max_keys+=max_keys;
  354.       break;
  355.     }
  356.     else if (base+max_keys*sort_length == buffpek->base)
  357.     {
  358.       buffpek->base=base;
  359.       buffpek->max_keys+=max_keys;
  360.       break;
  361.     }
  362.   }
  363.   break; /* One buffer have been removed */
  364. }
  365.       }
  366.       else if (error == -1)
  367. goto err; /* purecov: inspected */
  368.       queue_replaced(&queue); /* Top element has been replaced */
  369.     }
  370.   }
  371.   buffpek=(BUFFPEK*) queue_top(&queue);
  372.   buffpek->base=(uchar *) sort_keys;
  373.   buffpek->max_keys=keys;
  374.   do
  375.   {
  376.     if (to_file)
  377.     {
  378.       if (my_b_write(to_file,(byte*) buffpek->key,
  379.      (sort_length*buffpek->mem_count)))
  380.       {
  381. error=1; goto err; /* purecov: inspected */
  382.       }
  383.     }
  384.     else
  385.     {
  386.       register uchar *end;
  387.       strpos= buffpek->key;
  388.       for (end=strpos+buffpek->mem_count*sort_length;
  389.    strpos != end ;
  390.    strpos+=sort_length)
  391.       {
  392. if ((*info->key_write)(info->sort_info,(void*) strpos))
  393. {
  394.   error=1; goto err; /* purecov: inspected */
  395. }
  396.       }
  397.     }
  398.   }
  399.   while ((error=(int) read_to_buffer(from_file,buffpek,sort_length)) != -1 &&
  400.  error != 0);
  401.   lastbuff->count=count;
  402.   if (to_file)
  403.     lastbuff->file_pos=to_start_filepos;
  404. err:
  405.   delete_queue(&queue);
  406.   DBUG_RETURN(error);
  407. } /* merge_buffers */
  408. /* Do a merge to output-file (save only positions) */
  409. static int NEAR_F
  410. merge_index(MI_SORT_PARAM *info, uint keys, uchar **sort_keys,
  411.     BUFFPEK *buffpek, int maxbuffer, IO_CACHE *tempfile)
  412. {
  413.   DBUG_ENTER("merge_index");
  414.   if (merge_buffers(info,keys,tempfile,(IO_CACHE*) 0,sort_keys,buffpek,buffpek,
  415.     buffpek+maxbuffer))
  416.     DBUG_RETURN(1); /* purecov: inspected */
  417.   DBUG_RETURN(0);
  418. } /* merge_index */
  419. /* Make a pointer of arrays to keys */
  420. static char **make_char_array(register uint fields, uint length, myf my_flag)
  421. {
  422.   register char **pos;
  423.   char **old_pos,*char_pos;
  424.   DBUG_ENTER("make_char_array");
  425.   if ((old_pos= (char**) my_malloc( fields*(length+sizeof(char*)), my_flag)))
  426.   {
  427.     pos=old_pos; char_pos=((char*) (pos+fields)) -length;
  428.     while (fields--)
  429.       *(pos++) = (char_pos+= length);
  430.   }
  431.   DBUG_RETURN(old_pos);
  432. } /* make_char_array */