38 #include "sql_class.h"
40 int unique_write_to_file(uchar* key, element_count count, Unique *unique)
48 return my_b_write(&unique->file, key, unique->size) ? 1 : 0;
51 int unique_write_to_ptrs(uchar* key, element_count count, Unique *unique)
53 memcpy(unique->record_pointers, key, unique->size);
54 unique->record_pointers+=unique->size;
58 Unique::Unique(qsort_cmp2 comp_func,
void * comp_func_fixed_arg,
59 uint size_arg, ulonglong max_in_memory_size_arg)
60 :max_in_memory_size(max_in_memory_size_arg),
61 record_pointers(NULL),
66 init_tree(&tree, (ulong) (max_in_memory_size / 16), 0,
size, comp_func, 0,
67 NULL, comp_func_fixed_arg);
69 my_init_dynamic_array(&file_ptrs,
sizeof(
BUFFPEK), 16, 16);
73 max_elements= (ulong) (max_in_memory_size /
75 (void) open_cached_file(&
file, mysql_tmpdir,TEMP_PREFIX, DISK_BUFFER_SIZE,
95 inline double log2_n_fact(
double x)
97 return (log(2*M_PI*x)/2 + x*log(x/M_E)) / M_LN2;
131 static double get_merge_buffers_cost(uint *buff_elems, uint elem_size,
132 uint *first, uint *last)
134 uint total_buf_elems= 0;
135 for (uint *pbuf= first; pbuf <= last; pbuf++)
136 total_buf_elems+= *pbuf;
137 *last= total_buf_elems;
139 size_t n_buffers= last - first + 1;
142 return 2*((double)total_buf_elems*elem_size) / IO_SIZE +
174 static double get_merge_many_buffs_cost(uint *buffer,
175 uint maxbuffer, uint max_n_elems,
176 uint last_n_elems,
int elem_size)
179 double total_cost= 0.0;
180 uint *buff_elems= buffer;
186 for (i = 0; i < (int)maxbuffer; i++)
187 buff_elems[i]= max_n_elems;
188 buff_elems[maxbuffer]= last_n_elems;
194 if (maxbuffer >= MERGEBUFF2)
196 while (maxbuffer >= MERGEBUFF2)
199 for (i = 0; i <= (int) maxbuffer - MERGEBUFF*3/2; i += MERGEBUFF)
201 total_cost+=get_merge_buffers_cost(buff_elems, elem_size,
203 buff_elems + i + MERGEBUFF-1);
206 total_cost+=get_merge_buffers_cost(buff_elems, elem_size,
208 buff_elems + maxbuffer);
214 total_cost += get_merge_buffers_cost(buff_elems, elem_size,
215 buff_elems, buff_elems + maxbuffer);
267 double Unique::get_use_cost(uint *buffer, uint nkeys, uint key_size,
268 ulonglong max_in_memory_size)
270 ulong max_elements_in_tree;
271 ulong last_tree_elems;
274 max_elements_in_tree= ((ulong) max_in_memory_size /
277 n_full_trees= nkeys / max_elements_in_tree;
278 last_tree_elems= nkeys % max_elements_in_tree;
281 double result= 2 * log2_n_fact(last_tree_elems + 1.0);
283 result+= n_full_trees * log2_n_fact(max_elements_in_tree + 1.0);
286 DBUG_PRINT(
"info",(
"unique trees sizes: %u=%u*%lu + %lu", nkeys,
287 n_full_trees, n_full_trees?max_elements_in_tree:0,
298 result += DISK_SEEK_BASE_COST * n_full_trees *
299 ceil(((
double) key_size)*max_elements_in_tree / IO_SIZE);
300 result += DISK_SEEK_BASE_COST * ceil(((
double) key_size)*last_tree_elems / IO_SIZE);
303 double merge_cost= get_merge_many_buffs_cost(buffer, n_full_trees,
304 max_elements_in_tree,
305 last_tree_elems, key_size);
306 if (merge_cost < 0.0)
309 result += merge_cost;
314 result += ceil((
double)key_size*nkeys/IO_SIZE);
321 close_cached_file(&
file);
323 delete_dynamic(&file_ptrs);
331 elements+= tree.elements_in_tree;
332 file_ptr.count=tree.elements_in_tree;
333 file_ptr.file_pos=my_b_tell(&
file);
335 if (tree_walk(&tree, (tree_walk_action) unique_write_to_file,
336 (
void*)
this, left_root_right) ||
337 insert_dynamic(&file_ptrs, &file_ptr))
361 reset_dynamic(&file_ptrs);
362 reinit_io_cache(&
file, WRITE_CACHE, 0L, 0, 1);
376 static int buffpek_compare(
void *arg, uchar *key_ptr1, uchar *key_ptr2)
379 return ctx->key_compare(ctx->key_compare_arg,
380 *((uchar **) key_ptr1), *((uchar **)key_ptr2));
420 static bool merge_walk(uchar *merge_buffer, ulong merge_buffer_size,
422 tree_walk_action walk_action,
void *walk_action_arg,
423 qsort_cmp2
compare,
const void *compare_arg,
429 merge_buffer_size < (ulong) (key_length * (end - begin + 1)) ||
430 init_queue(&queue, (uint) (end - begin), offsetof(
BUFFPEK, key), 0,
431 buffpek_compare, &compare_context))
434 merge_buffer_size-= key_length;
435 uchar *save_key_buff= merge_buffer + merge_buffer_size;
436 uint max_key_count_per_piece= (uint) (merge_buffer_size/(end-begin) /
439 uint piece_size= max_key_count_per_piece * key_length;
449 for (top= begin; top != end; ++top)
451 top->base= merge_buffer + (top - begin) * piece_size;
452 top->max_keys= max_key_count_per_piece;
454 if (bytes_read == (uint) (-1))
456 DBUG_ASSERT(bytes_read);
457 queue_insert(&queue, (uchar *) top);
459 top= (
BUFFPEK *) queue_top(&queue);
460 while (queue.elements > 1)
469 void *old_key= top->key;
474 top->key+= key_length;
475 if (--top->mem_count)
476 queue_replaced(&queue);
480 memcpy(save_key_buff, old_key, key_length);
481 old_key= save_key_buff;
483 if (bytes_read == (uint) (-1))
485 else if (bytes_read > 0)
486 queue_replaced(&queue);
493 queue_remove(&queue, 0);
497 top= (
BUFFPEK *) queue_top(&queue);
499 if (
compare(compare_arg, old_key, top->key))
501 if (walk_action(old_key, 1, walk_action_arg))
514 if (walk_action(top->key, 1, walk_action_arg))
516 top->key+= key_length;
518 while (--top->mem_count);
520 if (bytes_read == (uint) (-1))
526 delete_queue(&queue);
551 bool Unique::walk(tree_walk_action action,
void *walk_action_arg)
557 return tree_walk(&tree, action, walk_action_arg, left_root_right);
562 if (flush_io_cache(&file) || reinit_io_cache(&file, READ_CACHE, 0L, 0, 0))
564 if (!(merge_buffer= (uchar *) my_malloc((ulong) max_in_memory_size, MYF(0))))
566 res= merge_walk(merge_buffer, (ulong) max_in_memory_size,
size,
568 (
BUFFPEK *) file_ptrs.buffer + file_ptrs.elements,
569 action, walk_action_arg,
570 tree.compare, tree.custom_arg, &file);
571 my_free(merge_buffer);
582 table->sort.found_records=elements+tree.elements_in_tree;
584 if (my_b_tell(&file) == 0)
587 DBUG_ASSERT(table->sort.record_pointers == NULL);
588 if ((record_pointers=table->sort.record_pointers= (uchar*)
589 my_malloc(
size * tree.elements_in_tree, MYF(0))))
591 (void) tree_walk(&tree, (tree_walk_action) unique_write_to_ptrs,
592 this, left_root_right);
600 IO_CACHE *outfile=table->sort.io_cache;
602 uint maxbuffer= file_ptrs.elements - 1;
608 DBUG_ASSERT(table->sort.io_cache == NULL);
612 if (!outfile || (! my_b_inited(outfile) &&
613 open_cached_file(outfile,mysql_tmpdir,TEMP_PREFIX,READ_RECORD_BUFFER,
616 reinit_io_cache(outfile,WRITE_CACHE,0L,0,0);
619 sort_param.max_rows= elements;
620 sort_param.sort_form=
table;
621 sort_param.rec_length= sort_param.sort_length= sort_param.ref_length=
size;
622 sort_param.max_keys_per_buffer=
623 (uint) (max_in_memory_size / sort_param.sort_length);
624 sort_param.not_killable=1;
626 if (!(sort_buffer=(uchar*) my_malloc((sort_param.max_keys_per_buffer + 1) *
627 sort_param.sort_length,
630 sort_param.unique_buff= sort_buffer+(sort_param.max_keys_per_buffer *
631 sort_param.sort_length);
633 sort_param.compare= (qsort2_cmp) buffpek_compare;
634 sort_param.cmp_context.key_compare= tree.compare;
635 sort_param.cmp_context.key_compare_arg= tree.custom_arg;
638 if (
merge_many_buff(&sort_param,sort_buffer,file_ptr,&maxbuffer,&file))
640 if (flush_io_cache(&file) ||
641 reinit_io_cache(&file,READ_CACHE,0L,0,0))
643 if (
merge_buffers(&sort_param, &file, outfile, sort_buffer, file_ptr,
644 file_ptr, file_ptr+maxbuffer,0))
648 my_free(sort_buffer);
649 if (flush_io_cache(outfile))
653 save_pos=outfile->pos_in_file;
654 if (reinit_io_cache(outfile,READ_CACHE,0L,0,0))
656 outfile->end_of_file=save_pos;