slave.cc
上传用户:tsgydb
上传日期:2007-04-14
资源大小:10674k
文件大小:39k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
  2.    
  3.    This program is free software; you can redistribute it and/or modify
  4.    it under the terms of the GNU General Public License as published by
  5.    the Free Software Foundation; either version 2 of the License, or
  6.    (at your option) any later version.
  7.    
  8.    This program is distributed in the hope that it will be useful,
  9.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  10.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  11.    GNU General Public License for more details.
  12.    
  13.    You should have received a copy of the GNU General Public License
  14.    along with this program; if not, write to the Free Software
  15.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  16. #include "mysql_priv.h"
  17. #include <mysql.h>
  18. #include <myisam.h>
  19. #include "mini_client.h"
  20. #include "slave.h"
  21. #include <thr_alarm.h>
  22. #include <my_dir.h>
  23. #define RPL_LOG_NAME (glob_mi.log_file_name[0] ? glob_mi.log_file_name :
  24.  "FIRST")
  25. bool slave_running = 0;
  26. pthread_t slave_real_id;
  27. MASTER_INFO glob_mi;
  28. HASH replicate_do_table, replicate_ignore_table;
  29. DYNAMIC_ARRAY replicate_wild_do_table, replicate_wild_ignore_table;
  30. bool do_table_inited = 0, ignore_table_inited = 0;
  31. bool wild_do_table_inited = 0, wild_ignore_table_inited = 0;
  32. bool table_rules_on = 0;
  33. uint32 slave_skip_counter = 0; 
  34. static TABLE* save_temporary_tables = 0;
  35. THD* slave_thd = 0;
  36. // when slave thread exits, we need to remember the temporary tables so we
  37. // can re-use them on slave start
  38. static int last_slave_errno = 0;
  39. static char last_slave_error[1024] = "";
  40. #ifndef DBUG_OFF
  41. int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
  42. static int events_till_disconnect = -1, events_till_abort = -1;
  43. static int stuck_count = 0;
  44. #endif
  45. inline void skip_load_data_infile(NET* net);
  46. inline bool slave_killed(THD* thd);
  47. static int init_slave_thread(THD* thd);
  48. static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
  49. static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi);
  50. static int safe_sleep(THD* thd, int sec);
  51. static int request_table_dump(MYSQL* mysql, char* db, char* table);
  52. static int create_table_from_dump(THD* thd, NET* net, const char* db,
  53.   const char* table_name);
  54. inline char* rewrite_db(char* db);
  55. static void free_table_ent(TABLE_RULE_ENT* e)
  56. {
  57.   my_free((gptr) e, MYF(0));
  58. }
  59. static byte* get_table_key(TABLE_RULE_ENT* e, uint* len,
  60.    my_bool not_used __attribute__((unused)))
  61. {
  62.   *len = e->key_len;
  63.   return (byte*)e->db;
  64. }
  65. void init_table_rule_hash(HASH* h, bool* h_inited)
  66. {
  67.   hash_init(h, TABLE_RULE_HASH_SIZE,0,0,
  68.     (hash_get_key) get_table_key,
  69.     (void (*)(void*)) free_table_ent, 0);
  70.   *h_inited = 1;
  71. }
  72. void init_table_rule_array(DYNAMIC_ARRAY* a, bool* a_inited)
  73. {
  74.   init_dynamic_array(a, sizeof(TABLE_RULE_ENT*), TABLE_RULE_ARR_SIZE,
  75.      TABLE_RULE_ARR_SIZE);
  76.   *a_inited = 1;
  77. }
  78. static TABLE_RULE_ENT* find_wild(DYNAMIC_ARRAY *a, const char* key, int len)
  79. {
  80.   uint i;
  81.   const char* key_end = key + len;
  82.   
  83.   for(i = 0; i < a->elements; i++)
  84.     {
  85.       TABLE_RULE_ENT* e ;
  86.       get_dynamic(a, (gptr)&e, i);
  87.       if(!wild_case_compare(key, key_end, (const char*)e->db,
  88.     (const char*)(e->db + e->key_len),'\'))
  89. return e;
  90.     }
  91.   
  92.   return 0;
  93. }
  94. int tables_ok(THD* thd, TABLE_LIST* tables)
  95. {
  96.   for (; tables; tables = tables->next)
  97.   {
  98.     if (!tables->updating) 
  99.       continue;
  100.     char hash_key[2*NAME_LEN+2];
  101.     char* p;
  102.     p = strmov(hash_key, tables->db ? tables->db : thd->db);
  103.     *p++ = '.';
  104.     uint len = strmov(p, tables->real_name) - hash_key ;
  105.     if (do_table_inited) // if there are any do's
  106.     {
  107.       if (hash_search(&replicate_do_table, (byte*) hash_key, len))
  108. return 1;
  109.     }
  110.     if (ignore_table_inited) // if there are any do's
  111.     {
  112.       if (hash_search(&replicate_ignore_table, (byte*) hash_key, len))
  113. return 0; 
  114.     }
  115.     if (wild_do_table_inited && find_wild(&replicate_wild_do_table,
  116.   hash_key, len))
  117.       return 1;
  118.     if (wild_ignore_table_inited && find_wild(&replicate_wild_ignore_table,
  119.       hash_key, len))
  120.       return 0;
  121.   }
  122.   // if no explicit rule found
  123.   // and there was a do list, do not replicate. If there was
  124.   // no do list, go ahead
  125.   return !do_table_inited && !wild_do_table_inited;
  126. }
  127. int add_table_rule(HASH* h, const char* table_spec)
  128. {
  129.   const char* dot = strchr(table_spec, '.');
  130.   if(!dot) return 1;
  131.   // len is always > 0 because we know the there exists a '.'
  132.   uint len = (uint)strlen(table_spec);
  133.   TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT)
  134.  + len, MYF(MY_WME));
  135.   if(!e) return 1;
  136.   e->db = (char*)e + sizeof(TABLE_RULE_ENT);
  137.   e->tbl_name = e->db + (dot - table_spec) + 1;
  138.   e->key_len = len;
  139.   memcpy(e->db, table_spec, len);
  140.   (void)hash_insert(h, (byte*)e);
  141.   return 0;
  142. }
  143. int add_wild_table_rule(DYNAMIC_ARRAY* a, const char* table_spec)
  144. {
  145.   const char* dot = strchr(table_spec, '.');
  146.   if(!dot) return 1;
  147.   uint len = (uint)strlen(table_spec);
  148.   TABLE_RULE_ENT* e = (TABLE_RULE_ENT*)my_malloc(sizeof(TABLE_RULE_ENT)
  149.  + len, MYF(MY_WME));
  150.   if(!e) return 1;
  151.   e->db = (char*)e + sizeof(TABLE_RULE_ENT);
  152.   e->tbl_name = e->db + (dot - table_spec) + 1;
  153.   e->key_len = len;
  154.   memcpy(e->db, table_spec, len);
  155.   insert_dynamic(a, (gptr)&e);
  156.   return 0;
  157. }
  158. static void free_string_array(DYNAMIC_ARRAY *a)
  159. {
  160.   uint i;
  161.   for(i = 0; i < a->elements; i++)
  162.     {
  163.       char* p;
  164.       get_dynamic(a, (gptr) &p, i);
  165.       my_free(p, MYF(MY_WME));
  166.     }
  167.   delete_dynamic(a);
  168. }
  169. void end_slave()
  170. {
  171.   end_master_info(&glob_mi);
  172.   if(do_table_inited)
  173.     hash_free(&replicate_do_table);
  174.   if(ignore_table_inited)
  175.     hash_free(&replicate_ignore_table);
  176.   if(wild_do_table_inited)
  177.     free_string_array(&replicate_wild_do_table);
  178.   if(wild_ignore_table_inited)
  179.     free_string_array(&replicate_wild_ignore_table);
  180. }
  181. inline bool slave_killed(THD* thd)
  182. {
  183.   return abort_slave || abort_loop || thd->killed;
  184. }
  185. inline void skip_load_data_infile(NET* net)
  186. {
  187.   (void)my_net_write(net, "xfb/dev/null", 10);
  188.   (void)net_flush(net);
  189.   (void)my_net_read(net); // discard response
  190.   send_ok(net); // the master expects it
  191. }
  192. inline char* rewrite_db(char* db)
  193. {
  194.   if(replicate_rewrite_db.is_empty() || !db) return db;
  195.   I_List_iterator<i_string_pair> it(replicate_rewrite_db);
  196.   i_string_pair* tmp;
  197.   while((tmp=it++))
  198.     {
  199.       if(!strcmp(tmp->key, db))
  200. return tmp->val;
  201.     }
  202.   return db;
  203. }
  204. int db_ok(const char* db, I_List<i_string> &do_list,
  205.   I_List<i_string> &ignore_list )
  206. {
  207.   if(do_list.is_empty() && ignore_list.is_empty())
  208.     return 1; // ok to replicate if the user puts no constraints
  209.   // if the user has specified restrictions on which databases to replicate
  210.   // and db was not selected, do not replicate
  211.   if(!db)
  212.     return 0;
  213.   if(!do_list.is_empty()) // if the do's are not empty
  214.     {
  215.       I_List_iterator<i_string> it(do_list);
  216.       i_string* tmp;
  217.       while((tmp=it++))
  218. {
  219.   if(!strcmp(tmp->ptr, db))
  220.     return 1; // match
  221. }
  222.       return 0;
  223.     }
  224.   else // there are some elements in the don't, otherwise we cannot get here
  225.     {
  226.       I_List_iterator<i_string> it(ignore_list);
  227.       i_string* tmp;
  228.       while((tmp=it++))
  229. {
  230.   if(!strcmp(tmp->ptr, db))
  231.     return 0; // match
  232. }
  233.       
  234.       return 1;
  235.     }
  236. }
  237. static int init_strvar_from_file(char* var, int max_size, IO_CACHE* f,
  238.        char* default_val)
  239. {
  240.   uint length;
  241.   if ((length=my_b_gets(f,var, max_size)))
  242.   {
  243.     char* last_p = var + length -1;
  244.     if (*last_p == 'n')
  245.       *last_p = 0; // if we stopped on newline, kill it
  246.     else
  247.     {
  248.       // if we truncated a line or stopped on last char, remove all chars
  249.       // up to and including newline
  250.       int c;
  251.       while( ((c=my_b_get(f)) != 'n' && c != my_b_EOF));
  252.     }
  253.     return 0;
  254.   }
  255.   else if (default_val)
  256.   {
  257.     strmake(var,  default_val, max_size);
  258.     return 0;
  259.   }
  260.   return 1;
  261. }
  262. static int init_intvar_from_file(int* var, IO_CACHE* f, int default_val)
  263. {
  264.   char buf[32];
  265.   
  266.   if (my_b_gets(f, buf, sizeof(buf))) 
  267.   {
  268.     *var = atoi(buf);
  269.     return 0;
  270.   }
  271.   else if(default_val)
  272.   {
  273.     *var = default_val;
  274.     return 0;
  275.   }
  276.   return 1;
  277. }
  278. static int create_table_from_dump(THD* thd, NET* net, const char* db,
  279.   const char* table_name)
  280. {
  281.   uint packet_len = my_net_read(net); // read create table statement
  282.   TABLE_LIST tables;
  283.   int error = 0;
  284.   
  285.   if(packet_len == packet_error)
  286.     {
  287.       send_error(&thd->net, ER_MASTER_NET_READ);
  288.       return 1;
  289.     }
  290.   if(net->read_pos[0] == 255) // error from master
  291.     {
  292.       net->read_pos[packet_len] = 0;
  293.       net_printf(&thd->net, ER_MASTER, net->read_pos + 3);
  294.       return 1;
  295.     }
  296.   thd->command = COM_TABLE_DUMP;
  297.   thd->query = sql_alloc(packet_len + 1);
  298.   if(!thd->query)
  299.     {
  300.       sql_print_error("create_table_from_dump: out of memory");
  301.       net_printf(&thd->net, ER_GET_ERRNO, "Out of memory");
  302.       return 1;
  303.     }
  304.   memcpy(thd->query, net->read_pos, packet_len);
  305.   thd->query[packet_len] = 0;
  306.   thd->current_tablenr = 0;
  307.   thd->query_error = 0;
  308.   thd->net.no_send_ok = 1;
  309.   thd->proc_info = "Creating table from master dump";
  310.   // save old db in case we are creating in a different database
  311.   char* save_db = thd->db;
  312.   thd->db = thd->last_nx_db;
  313.   mysql_parse(thd, thd->query, packet_len); // run create table
  314.   thd->db = save_db; // leave things the way the were before
  315.   
  316.   if(thd->query_error)
  317.   {
  318.     close_thread_tables(thd); // mysql_parse takes care of the error send
  319.     return 1;
  320.   }
  321.   bzero((char*) &tables,sizeof(tables));
  322.   tables.db = (char*)db;
  323.   tables.name = tables.real_name = (char*)table_name;
  324.   tables.lock_type = TL_WRITE;
  325.   thd->proc_info = "Opening master dump table";
  326.   if (!open_ltable(thd, &tables, TL_WRITE))
  327.   {
  328.     // open tables will send the error
  329.     sql_print_error("create_table_from_dump: could not open created table");
  330.     close_thread_tables(thd);
  331.     return 1;
  332.   }
  333.   
  334.   handler *file = tables.table->file;
  335.   thd->proc_info = "Reading master dump table data";
  336.   if (file->net_read_dump(net))
  337.   {
  338.     net_printf(&thd->net, ER_MASTER_NET_READ);
  339.     sql_print_error("create_table_from_dump::failed in
  340.  handler::net_read_dump()");
  341.     close_thread_tables(thd);
  342.     return 1;
  343.   }
  344.   HA_CHECK_OPT check_opt;
  345.   check_opt.init();
  346.   check_opt.flags|= T_VERY_SILENT;
  347.   check_opt.quick = 1;
  348.   thd->proc_info = "Rebuilding the index on master dump table";
  349.   Vio* save_vio = thd->net.vio;
  350.   // we do not want repair() to spam us with messages
  351.   // just send them to the error log, and report the failure in case of
  352.   // problems
  353.   thd->net.vio = 0;
  354.   if (file->repair(thd,&check_opt ))
  355.   {
  356.       net_printf(&thd->net, ER_INDEX_REBUILD,tables.table->real_name );
  357.       error = 1;
  358.   }
  359.   thd->net.vio = save_vio;
  360.   close_thread_tables(thd);
  361.   
  362.   thd->net.no_send_ok = 0;
  363.   return error; 
  364. }
  365. int fetch_nx_table(THD* thd, MASTER_INFO* mi)
  366. {
  367.   MYSQL* mysql = mc_mysql_init(NULL);
  368.   int error = 1;
  369.   int nx_errno = 0;
  370.   if(!mysql)
  371.     {
  372.       sql_print_error("fetch_nx_table: Error in mysql_init()");
  373.       nx_errno = ER_GET_ERRNO;
  374.       goto err;
  375.     }
  376.   safe_connect(thd, mysql, mi);
  377.   if(slave_killed(thd))
  378.     goto err;
  379.   if(request_table_dump(mysql, thd->last_nx_db, thd->last_nx_table))
  380.     {
  381.       nx_errno = ER_GET_ERRNO;
  382.       sql_print_error("fetch_nx_table: failed on table dump request ");
  383.       goto err;
  384.     }
  385.   if(create_table_from_dump(thd, &mysql->net, thd->last_nx_db,
  386.     thd->last_nx_table))
  387.     {
  388.       // create_table_from_dump will have sent the error alread
  389.       sql_print_error("fetch_nx_table: failed on create table ");
  390.       goto err;
  391.     }
  392.   
  393.   error = 0;
  394.  err:
  395.   if (mysql)
  396.     mc_mysql_close(mysql);
  397.   if (nx_errno && thd->net.vio)
  398.     send_error(&thd->net, nx_errno, "Error in fetch_nx_table");
  399.   return error;
  400. }
  401. void end_master_info(MASTER_INFO* mi)
  402. {
  403.   if(mi->fd >= 0)
  404.     {
  405.       end_io_cache(&mi->file);
  406.       (void)my_close(mi->fd, MYF(MY_WME));
  407.       mi->fd = -1;
  408.     }
  409.   mi->inited = 0;
  410. }
  411. int init_master_info(MASTER_INFO* mi)
  412. {
  413.   if (mi->inited)
  414.     return 0;
  415.   int fd,length,error;
  416.   MY_STAT stat_area;
  417.   char fname[FN_REFLEN+128];
  418.   const char *msg;
  419.   fn_format(fname, master_info_file, mysql_data_home, "", 4+16+32);
  420.   // we need a mutex while we are changing master info parameters to
  421.   // keep other threads from reading bogus info
  422.   pthread_mutex_lock(&mi->lock);
  423.   mi->pending = 0;
  424.   fd = mi->fd;
  425.   
  426.   // we do not want any messages if the file does not exist
  427.   if (!my_stat(fname, &stat_area, MYF(0)))
  428.   {
  429.     // if someone removed the file from underneath our feet, just close
  430.     // the old descriptor and re-create the old file
  431.     if (fd >= 0)
  432.       my_close(fd, MYF(MY_WME));
  433.     if ((fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0
  434. || init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,0,
  435.  MYF(MY_WME)))
  436.     {
  437.       if(fd >= 0)
  438. my_close(fd, MYF(0));
  439.       pthread_mutex_unlock(&mi->lock);
  440.       return 1;
  441.     }
  442.     mi->log_file_name[0] = 0;
  443.     mi->pos = 4; // skip magic number
  444.     mi->fd = fd;
  445.       
  446.     if (master_host)
  447.       strmake(mi->host, master_host, sizeof(mi->host) - 1);
  448.     if (master_user)
  449.       strmake(mi->user, master_user, sizeof(mi->user) - 1);
  450.     if (master_password)
  451.       strmake(mi->password, master_password, sizeof(mi->password) - 1);
  452.     mi->port = master_port;
  453.     mi->connect_retry = master_connect_retry;
  454.   }
  455.   else // file exists
  456.   {
  457.     if(fd >= 0)
  458.       reinit_io_cache(&mi->file, READ_CACHE, 0L,0,0);
  459.     else if((fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0
  460.     || init_io_cache(&mi->file, fd, IO_SIZE*2, READ_CACHE, 0L,
  461.      0, MYF(MY_WME)))
  462.     {
  463.       if(fd >= 0)
  464. my_close(fd, MYF(0));
  465.       pthread_mutex_unlock(&mi->lock);
  466.       return 1;
  467.     }
  468.       
  469.     if ((length=my_b_gets(&mi->file, mi->log_file_name,
  470.    sizeof(mi->log_file_name))) < 1)
  471.     {
  472.       msg="Error reading log file name from master info file ";
  473.       goto error;
  474.     }
  475.     mi->log_file_name[length-1]= 0; // kill n
  476.     char buf[FN_REFLEN];
  477.     if(!my_b_gets(&mi->file, buf, sizeof(buf)))
  478.     {
  479.       msg="Error reading log file position from master info file";
  480.       goto error;
  481.     }
  482.     mi->pos = strtoull(buf,(char**) 0, 10);
  483.     mi->fd = fd;
  484.     if(init_strvar_from_file(mi->host, sizeof(mi->host), &mi->file,
  485.      master_host) ||
  486.        init_strvar_from_file(mi->user, sizeof(mi->user), &mi->file,
  487.      master_user) || 
  488.        init_strvar_from_file(mi->password, sizeof(mi->password), &mi->file,
  489.      master_password) ||
  490.        init_intvar_from_file((int*)&mi->port, &mi->file, master_port) ||
  491.        init_intvar_from_file((int*)&mi->connect_retry, &mi->file,
  492.      master_connect_retry))
  493.     {
  494.       msg="Error reading master configuration";
  495.       goto error;
  496.     }
  497.   }
  498.   
  499.   mi->inited = 1;
  500.   // now change the cache from READ to WRITE - must do this
  501.   // before flush_master_info
  502.   reinit_io_cache(&mi->file, WRITE_CACHE, 0L,0,1);
  503.   error=test(flush_master_info(mi));
  504.   pthread_mutex_unlock(&mi->lock);
  505.   return error;
  506. error:
  507.   sql_print_error(msg);
  508.   end_io_cache(&mi->file);
  509.   my_close(fd, MYF(0));
  510.   pthread_mutex_unlock(&mi->lock);
  511.   return 1;
  512. }
  513. int show_master_info(THD* thd)
  514. {
  515.   DBUG_ENTER("show_master_info");
  516.   List<Item> field_list;
  517.   field_list.push_back(new Item_empty_string("Master_Host",
  518.      sizeof(glob_mi.host)));
  519.   field_list.push_back(new Item_empty_string("Master_User",
  520.      sizeof(glob_mi.user)));
  521.   field_list.push_back(new Item_empty_string("Master_Port", 6));
  522.   field_list.push_back(new Item_empty_string("Connect_retry", 6));
  523.   field_list.push_back( new Item_empty_string("Log_File",
  524.      FN_REFLEN));
  525.   field_list.push_back(new Item_empty_string("Pos", 12));
  526.   field_list.push_back(new Item_empty_string("Slave_Running", 3));
  527.   field_list.push_back(new Item_empty_string("Replicate_do_db", 20));
  528.   field_list.push_back(new Item_empty_string("Replicate_ignore_db", 20));
  529.   field_list.push_back(new Item_empty_string("Last_errno", 4));
  530.   field_list.push_back(new Item_empty_string("Last_error", 20));
  531.   field_list.push_back(new Item_empty_string("Skip_counter", 12));
  532.   if(send_fields(thd, field_list, 1))
  533.     DBUG_RETURN(-1);
  534.   String* packet = &thd->packet;
  535.   packet->length(0);
  536.   
  537.   pthread_mutex_lock(&glob_mi.lock);
  538.   net_store_data(packet, glob_mi.host);
  539.   net_store_data(packet, glob_mi.user);
  540.   net_store_data(packet, (uint32) glob_mi.port);
  541.   net_store_data(packet, (uint32) glob_mi.connect_retry);
  542.   net_store_data(packet, glob_mi.log_file_name);
  543.   net_store_data(packet, (uint32) glob_mi.pos); // QQ: Should be fixed
  544.   pthread_mutex_unlock(&glob_mi.lock);
  545.   pthread_mutex_lock(&LOCK_slave);
  546.   net_store_data(packet, slave_running ? "Yes":"No");
  547.   pthread_mutex_unlock(&LOCK_slave);
  548.   net_store_data(packet, &replicate_do_db);
  549.   net_store_data(packet, &replicate_ignore_db);
  550.   net_store_data(packet, (uint32)last_slave_errno);
  551.   net_store_data(packet, last_slave_error);
  552.   net_store_data(packet, slave_skip_counter);
  553.   
  554.   if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length()))
  555.     DBUG_RETURN(-1);
  556.   send_eof(&thd->net);
  557.   DBUG_RETURN(0);
  558. }
  559. int flush_master_info(MASTER_INFO* mi)
  560. {
  561.   IO_CACHE* file = &mi->file;
  562.   char lbuf[22];
  563.   
  564.   my_b_seek(file, 0L);
  565.   my_b_printf(file, "%sn%sn%sn%sn%sn%dn%dn",
  566.       mi->log_file_name, llstr(mi->pos, lbuf), mi->host, mi->user,
  567.       mi->password, mi->port, mi->connect_retry);
  568.   flush_io_cache(file);
  569.   return 0;
  570. }
  571. int st_master_info::wait_for_pos(THD* thd, String* log_name, ulonglong log_pos)
  572. {
  573.   if (!inited) return -1;
  574.   bool pos_reached;
  575.   int event_count = 0;
  576.   pthread_mutex_lock(&lock);
  577.   while(!thd->killed)
  578.   {
  579.     int cmp_result;
  580.     if (*log_file_name)
  581.     {
  582.       /*
  583. We should use dirname_length() here when we have a version of
  584. this that doesn't modify the argument */
  585.       char *basename = strrchr(log_file_name, FN_LIBCHAR);
  586.       if (basename)
  587. ++basename;
  588.       else
  589. basename = log_file_name;
  590.       cmp_result =  strncmp(basename, log_name->ptr(),
  591.     log_name->length());
  592.     }
  593.     else
  594.       cmp_result = 0;
  595.       
  596.     pos_reached = ((!cmp_result && pos >= log_pos) || cmp_result > 0);
  597.     if (pos_reached || thd->killed)
  598.       break;
  599.     
  600.     const char* msg = thd->enter_cond(&cond, &lock,
  601.       "Waiting for master update");
  602.     pthread_cond_wait(&cond, &lock);
  603.     thd->exit_cond(msg);
  604.     event_count++;
  605.   }
  606.   pthread_mutex_unlock(&lock);
  607.   return thd->killed ? -1 : event_count;
  608. }
  609. static int init_slave_thread(THD* thd)
  610. {
  611.   DBUG_ENTER("init_slave_thread");
  612.   thd->system_thread = thd->bootstrap = 1;
  613.   thd->client_capabilities = 0;
  614.   my_net_init(&thd->net, 0);
  615.   thd->max_packet_length=thd->net.max_packet;
  616.   thd->master_access= ~0;
  617.   thd->priv_user = 0;
  618.   thd->slave_thread = 1;
  619.   thd->options = (((opt_log_slave_updates) ? OPTION_BIN_LOG:0) | OPTION_AUTO_IS_NULL) ;
  620.   thd->system_thread = 1;
  621.   thd->client_capabilities = CLIENT_LOCAL_FILES;
  622.   slave_real_id=thd->real_id=pthread_self();
  623.   pthread_mutex_lock(&LOCK_thread_count);
  624.   thd->thread_id = thread_id++;
  625.   pthread_mutex_unlock(&LOCK_thread_count);
  626.   if (init_thr_lock() ||
  627.       my_pthread_setspecific_ptr(THR_THD,  thd) ||
  628.       my_pthread_setspecific_ptr(THR_MALLOC, &thd->mem_root) ||
  629.       my_pthread_setspecific_ptr(THR_NET,  &thd->net))
  630.   {
  631.     close_connection(&thd->net,ER_OUT_OF_RESOURCES); // is this needed?
  632.     end_thread(thd,0);
  633.     DBUG_RETURN(-1);
  634.   }
  635.   thd->mysys_var=my_thread_var;
  636.   thd->dbug_thread_id=my_thread_id();
  637. #ifndef __WIN__
  638.   sigset_t set;
  639.   VOID(sigemptyset(&set)); // Get mask in use
  640.   VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
  641. #endif
  642.   thd->mem_root.free=thd->mem_root.used=0; // Probably not needed
  643.   if (thd->max_join_size == (ulong) ~0L)
  644.     thd->options |= OPTION_BIG_SELECTS;
  645.   thd->proc_info="Waiting for master update";
  646.   thd->version=refresh_version;
  647.   thd->set_time();
  648.   DBUG_RETURN(0);
  649. }
  650. static int safe_sleep(THD* thd, int sec)
  651. {
  652.   thr_alarm_t alarmed;
  653.   thr_alarm_init(&alarmed);
  654.   time_t start_time= time((time_t*) 0);
  655.   time_t end_time= start_time+sec;
  656.   ALARM  alarm_buff;
  657.   while (start_time < end_time)
  658.   {
  659.     int nap_time = (int) (end_time - start_time);
  660.     /*
  661.       the only reason we are asking for alarm is so that
  662.       we will be woken up in case of murder, so if we do not get killed,
  663.       set the alarm so it goes off after we wake up naturally
  664.     */
  665.     thr_alarm(&alarmed, 2 * nap_time,&alarm_buff);
  666.     sleep(nap_time);
  667.     // if we wake up before the alarm goes off, hit the button
  668.     // so it will not wake up the wife and kids :-)
  669.     if (thr_alarm_in_use(&alarmed))
  670.       thr_end_alarm(&alarmed);
  671.     
  672.     if (slave_killed(thd))
  673.       return 1;
  674.     start_time=time((time_t*) 0);
  675.   }
  676.   return 0;
  677. }
  678. static int request_dump(MYSQL* mysql, MASTER_INFO* mi)
  679. {
  680.   char buf[FN_REFLEN + 10];
  681.   int len;
  682.   int binlog_flags = 0; // for now
  683.   char* logname = mi->log_file_name;
  684.   int4store(buf, mi->pos);
  685.   int2store(buf + 4, binlog_flags);
  686.   int4store(buf + 6, server_id);
  687.   len = (uint) strlen(logname);
  688.   memcpy(buf + 10, logname,len);
  689.   if (mc_simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
  690.   {
  691.     // something went wrong, so we will just reconnect and retry later
  692.     // in the future, we should do a better error analysis, but for
  693.     // now we just fill up the error log :-)
  694.     sql_print_error("Error on COM_BINLOG_DUMP: %s, will retry in %d secs",
  695.     mc_mysql_error(mysql), master_connect_retry);
  696.     return 1;
  697.   }
  698.   return 0;
  699. }
  700. static int request_table_dump(MYSQL* mysql, char* db, char* table)
  701. {
  702.   char buf[1024];
  703.   char * p = buf;
  704.   uint table_len = (uint) strlen(table);
  705.   uint db_len = (uint) strlen(db);
  706.   if(table_len + db_len > sizeof(buf) - 2)
  707.     {
  708.       sql_print_error("request_table_dump: Buffer overrun");
  709.       return 1;
  710.     } 
  711.   
  712.   *p++ = db_len;
  713.   memcpy(p, db, db_len);
  714.   p += db_len;
  715.   *p++ = table_len;
  716.   memcpy(p, table, table_len);
  717.   
  718.   if (mc_simple_command(mysql, COM_TABLE_DUMP, buf, p - buf + table_len, 1))
  719.   {
  720.     sql_print_error("request_table_dump: Error sending the table dump 
  721. command");
  722.     return 1;
  723.   }
  724.   return 0;
  725. }
  726. static uint read_event(MYSQL* mysql, MASTER_INFO *mi)
  727. {
  728.   uint len = packet_error;
  729.   // for convinience lets think we start by
  730.   // being in the interrupted state :-)
  731.   int read_errno = EINTR;
  732.   // my_real_read() will time us out
  733.   // we check if we were told to die, and if not, try reading again
  734. #ifndef DBUG_OFF
  735.   if (disconnect_slave_event_count && !(events_till_disconnect--))
  736.     return packet_error;      
  737. #endif
  738.   
  739.   while (!abort_loop && !abort_slave && len == packet_error &&
  740.  read_errno == EINTR )
  741.   {
  742.     len = mc_net_safe_read(mysql);
  743.     read_errno = errno;
  744.   }
  745.   if (abort_loop || abort_slave)
  746.     return packet_error;
  747.   if (len == packet_error || (int) len < 1)
  748.   {
  749.     sql_print_error("Error reading packet from server: %s (read_errno %d,
  750. server_errno=%d)",
  751.     mc_mysql_error(mysql), read_errno, mc_mysql_errno(mysql));
  752.     return packet_error;
  753.   }
  754.   if (len == 1)
  755.   {
  756.      sql_print_error("Slave: received 0 length packet from server, apparent
  757.  master shutdown: %s (%d)",
  758.      mc_mysql_error(mysql), read_errno);
  759.      return packet_error;
  760.   }
  761.   
  762.   DBUG_PRINT("info",( "len=%u, net->read_pos[4] = %dn",
  763.       len, mysql->net.read_pos[4]));
  764.   return len - 1;   
  765. }
  766. static int exec_event(THD* thd, NET* net, MASTER_INFO* mi, int event_len)
  767. {
  768.   Log_event * ev = Log_event::read_log_event((const char*)net->read_pos + 1,
  769.      event_len);
  770.   char llbuff[22];
  771.   
  772.   if (ev)
  773.   {
  774.     int type_code = ev->get_type_code();
  775.     if (ev->server_id == ::server_id || slave_skip_counter)
  776.     {
  777.       if(type_code == LOAD_EVENT)
  778. skip_load_data_infile(net);
  779.       mi->inc_pos(event_len);
  780.       flush_master_info(mi);
  781.       if(slave_skip_counter)
  782.         --slave_skip_counter;
  783.       delete ev;     
  784.       return 0; // avoid infinite update loops
  785.     }
  786.   
  787.     thd->server_id = ev->server_id; // use the original server id for logging
  788.     thd->set_time(); // time the query
  789.     if(!ev->when)
  790.       ev->when = time(NULL);
  791.     
  792.     switch(type_code) {
  793.     case QUERY_EVENT:
  794.     {
  795.       Query_log_event* qev = (Query_log_event*)ev;
  796.       int q_len = qev->q_len;
  797.       int expected_error,actual_error = 0;
  798.       init_sql_alloc(&thd->mem_root, 8192,0);
  799.       thd->db = rewrite_db((char*)qev->db);
  800.       if (db_ok(thd->db, replicate_do_db, replicate_ignore_db))
  801.       {
  802. thd->query = (char*)qev->query;
  803. thd->set_time((time_t)qev->when);
  804. thd->current_tablenr = 0;
  805. VOID(pthread_mutex_lock(&LOCK_thread_count));
  806. thd->query_id = query_id++;
  807. VOID(pthread_mutex_unlock(&LOCK_thread_count));
  808. thd->last_nx_table = thd->last_nx_db = 0;
  809. thd->query_error = 0; // clear error
  810. thd->net.last_errno = 0;
  811. thd->net.last_error[0] = 0;
  812. thd->slave_proxy_id = qev->thread_id; // for temp tables
  813. mysql_parse(thd, thd->query, q_len);
  814. if ((expected_error = qev->error_code) !=
  815.     (actual_error = thd->net.last_errno) && expected_error)
  816. {
  817.   const char* errmsg = "Slave: did not get the expected error
  818.  running query from master - expected: '%s', got '%s'"; 
  819.   sql_print_error(errmsg, ER(expected_error),
  820.   actual_error ? thd->net.last_error:"no error"
  821.   );
  822.   thd->query_error = 1;
  823. }
  824. else if (expected_error == actual_error)
  825.   {
  826.     thd->query_error = 0;
  827.     *last_slave_error = 0;
  828.     last_slave_errno = 0;
  829.   }
  830.       }
  831.       thd->db = 0; // prevent db from being freed
  832.       thd->query = 0; // just to be sure
  833.       // assume no convert for next query unless set explictly
  834.       thd->convert_set = 0;
  835.       close_thread_tables(thd);
  836.       
  837.       if (thd->query_error || thd->fatal_error)
  838.       {
  839. sql_print_error("Slave:  error running query '%s' ",
  840. qev->query);
  841. last_slave_errno = actual_error ? actual_error : -1;
  842. my_snprintf(last_slave_error, sizeof(last_slave_error),
  843.     "error '%s' on query '%s'",
  844.     actual_error ? thd->net.last_error :
  845.     "unexpected success or fatal error",
  846.     qev->query
  847.     );
  848.         free_root(&thd->mem_root,0);
  849. delete ev;
  850. return 1;
  851.       }
  852.       free_root(&thd->mem_root,0);
  853.       delete ev;
  854.       mi->inc_pos(event_len);
  855.       flush_master_info(mi);
  856.       break;
  857.     }
  858.   
  859.     case LOAD_EVENT:
  860.     {
  861.       Load_log_event* lev = (Load_log_event*)ev;
  862.       init_sql_alloc(&thd->mem_root, 8192,0);
  863.       thd->db = rewrite_db((char*)lev->db);
  864.       thd->query = 0;
  865.       thd->query_error = 0;
  866.     
  867.       if(db_ok(thd->db, replicate_do_db, replicate_ignore_db))
  868.       {
  869. thd->set_time((time_t)lev->when);
  870. thd->current_tablenr = 0;
  871. VOID(pthread_mutex_lock(&LOCK_thread_count));
  872. thd->query_id = query_id++;
  873. VOID(pthread_mutex_unlock(&LOCK_thread_count));
  874. TABLE_LIST tables;
  875. bzero((char*) &tables,sizeof(tables));
  876. tables.db = thd->db;
  877. tables.name = tables.real_name = (char*)lev->table_name;
  878. tables.lock_type = TL_WRITE;
  879. // the table will be opened in mysql_load    
  880.         if(table_rules_on && !tables_ok(thd, &tables))
  881. {
  882.   skip_load_data_infile(net);
  883. }
  884. else
  885. {
  886.   enum enum_duplicates handle_dup = DUP_IGNORE;
  887.   if(lev->sql_ex.opt_flags && REPLACE_FLAG)
  888.     handle_dup = DUP_REPLACE;
  889.   sql_exchange ex((char*)lev->fname, lev->sql_ex.opt_flags &&
  890.   DUMPFILE_FLAG );
  891.   String field_term(&lev->sql_ex.field_term, 1),
  892.     enclosed(&lev->sql_ex.enclosed, 1),
  893.     line_term(&lev->sql_ex.line_term,1),
  894.     escaped(&lev->sql_ex.escaped, 1),
  895.     line_start(&lev->sql_ex.line_start, 1);
  896.     
  897.   ex.field_term = &field_term;
  898.   if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY)
  899.     ex.field_term->length(0);
  900.     
  901.   ex.enclosed = &enclosed;
  902.   if(lev->sql_ex.empty_flags & ENCLOSED_EMPTY)
  903.     ex.enclosed->length(0);
  904.   ex.line_term = &line_term;
  905.   if(lev->sql_ex.empty_flags & LINE_TERM_EMPTY)
  906.     ex.line_term->length(0);
  907.   ex.line_start = &line_start;
  908.   if(lev->sql_ex.empty_flags & LINE_START_EMPTY)
  909.     ex.line_start->length(0);
  910.   ex.escaped = &escaped;
  911.   if(lev->sql_ex.empty_flags & ESCAPED_EMPTY)
  912.     ex.escaped->length(0);
  913.   ex.opt_enclosed = (lev->sql_ex.opt_flags & OPT_ENCLOSED_FLAG);
  914.   if(lev->sql_ex.empty_flags & FIELD_TERM_EMPTY)
  915.     ex.field_term->length(0);
  916.     
  917.   ex.skip_lines = lev->skip_lines;
  918.     
  919.   List<Item> fields;
  920.   lev->set_fields(fields);
  921.   thd->slave_proxy_id = thd->thread_id;
  922.   thd->net.vio = net->vio;
  923.   // mysql_load will use thd->net to read the file
  924.   thd->net.pkt_nr = net->pkt_nr;
  925.   // make sure the client does get confused
  926.   // about the packet sequence
  927.   if(mysql_load(thd, &ex, &tables, fields, handle_dup, 1,
  928. TL_WRITE))
  929.     thd->query_error = 1;
  930.   if(thd->cuted_fields)
  931.     sql_print_error("Slave: load data infile at position %s in log 
  932. '%s' produced %d warning(s)", llstr(glob_mi.pos,llbuff), RPL_LOG_NAME,
  933.     thd->cuted_fields );
  934.   net->pkt_nr = thd->net.pkt_nr;
  935. }
  936.       }
  937.       else
  938.       {
  939. // we will just ask the master to send us /dev/null if we do not
  940. // want to load the data :-)
  941. skip_load_data_infile(net);
  942.       }
  943.     
  944.       thd->net.vio = 0; 
  945.       thd->db = 0;// prevent db from being freed
  946.       close_thread_tables(thd);
  947.       if(thd->query_error)
  948.       {
  949. int sql_error = thd->net.last_errno;
  950. if(!sql_error)
  951.   sql_error = ER_UNKNOWN_ERROR;
  952. sql_print_error("Slave: Error '%s' running load data infile ",
  953. ER(sql_error));
  954. delete ev;
  955.         free_root(&thd->mem_root,0);
  956. return 1;
  957.       }
  958.       
  959.       delete ev;
  960.       free_root(&thd->mem_root,0);
  961.     
  962.       if(thd->fatal_error)
  963.       {
  964. sql_print_error("Slave: Fatal error running query '%s' ",
  965. thd->query);
  966. return 1;
  967.       }
  968.       mi->inc_pos(event_len);
  969.       flush_master_info(mi);
  970.       break;
  971.     }
  972.     case START_EVENT:
  973.       close_temporary_tables(thd);
  974.       mi->inc_pos(event_len);
  975.       flush_master_info(mi);
  976.       delete ev;
  977.       break;
  978.                   
  979.     case STOP_EVENT:
  980.       if(mi->pos > 4) // stop event should be ignored after rotate event
  981. {
  982.           close_temporary_tables(thd);
  983.           mi->inc_pos(event_len);
  984.           flush_master_info(mi);
  985. }
  986.       delete ev;
  987.       break;
  988.     case ROTATE_EVENT:
  989.     {
  990.       Rotate_log_event* rev = (Rotate_log_event*)ev;
  991.       int ident_len = rev->ident_len;
  992.       pthread_mutex_lock(&mi->lock);
  993.       memcpy(mi->log_file_name, rev->new_log_ident,ident_len );
  994.       mi->log_file_name[ident_len] = 0;
  995.       mi->pos = 4; // skip magic number
  996.       pthread_cond_broadcast(&mi->cond);
  997.       pthread_mutex_unlock(&mi->lock);
  998.       flush_master_info(mi);
  999. #ifndef DBUG_OFF
  1000.       if(abort_slave_event_count)
  1001. ++events_till_abort;
  1002. #endif      
  1003.       delete ev;
  1004.       break;
  1005.     }
  1006.     case INTVAR_EVENT:
  1007.     {
  1008.       Intvar_log_event* iev = (Intvar_log_event*)ev;
  1009.       switch(iev->type)
  1010.       {
  1011.       case LAST_INSERT_ID_EVENT:
  1012. thd->last_insert_id_used = 1;
  1013. thd->last_insert_id = iev->val;
  1014. break;
  1015.       case INSERT_ID_EVENT:
  1016. thd->next_insert_id = iev->val;
  1017. break;
  1018.       }
  1019.       mi->inc_pending(event_len);
  1020.       delete ev;
  1021.       break;
  1022.     }
  1023.     }
  1024.   }
  1025.   else
  1026.   {
  1027.     sql_print_error("
  1028. Could not parse log event entry, check the master for binlog corruptionn
  1029. This may also be a network problem, or just a bug in the master or slave code.
  1030. ");
  1031.     return 1;
  1032.   }
  1033.   return 0;   
  1034. }
  1035.       
  1036. // slave thread
  1037. pthread_handler_decl(handle_slave,arg __attribute__((unused)))
  1038. {
  1039. #ifndef DBUG_OFF
  1040.  slave_begin:  
  1041. #endif  
  1042.   THD *thd; // needs to be first for thread_stack
  1043.   MYSQL *mysql = NULL ;
  1044.   char llbuff[22];
  1045.   pthread_mutex_lock(&LOCK_slave);
  1046.   if (!server_id)
  1047.   {
  1048.     pthread_cond_broadcast(&COND_slave_start);
  1049.     pthread_mutex_unlock(&LOCK_slave);
  1050.     sql_print_error("Server id not set, will not start slave");
  1051.     pthread_exit((void*)1);
  1052.   }
  1053.   
  1054.   if(slave_running)
  1055.     {
  1056.       pthread_cond_broadcast(&COND_slave_start);
  1057.       pthread_mutex_unlock(&LOCK_slave);
  1058.       pthread_exit((void*)1);  // safety just in case
  1059.     }
  1060.   slave_running = 1;
  1061.   abort_slave = 0;
  1062. #ifndef DBUG_OFF  
  1063.   events_till_abort = abort_slave_event_count;
  1064. #endif  
  1065.   pthread_cond_broadcast(&COND_slave_start);
  1066.   pthread_mutex_unlock(&LOCK_slave);
  1067.   
  1068.   int error = 1;
  1069.   bool retried_once = 0;
  1070.   ulonglong last_failed_pos = 0;
  1071.   
  1072.   // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
  1073.   my_thread_init();
  1074.   slave_thd = thd = new THD; // note that contructor of THD uses DBUG_ !
  1075.   thd->set_time();
  1076.   DBUG_ENTER("handle_slave");
  1077.   pthread_detach_this_thread();
  1078.   if (init_slave_thread(thd) || init_master_info(&glob_mi))
  1079.     {
  1080.       sql_print_error("Failed during slave thread initialization");
  1081.       goto err;
  1082.     }
  1083.   thd->thread_stack = (char*)&thd; // remember where our stack is
  1084.   thd->temporary_tables = save_temporary_tables; // restore temp tables
  1085.   threads.append(thd);
  1086.   
  1087.   DBUG_PRINT("info",("master info: log_file_name=%s, position=%s",
  1088.      glob_mi.log_file_name, llstr(glob_mi.pos,llbuff)));
  1089.   
  1090.   if (!(mysql = mc_mysql_init(NULL)))
  1091.   {
  1092.     sql_print_error("Slave thread: error in mc_mysql_init()");
  1093.     goto err;
  1094.   }
  1095.   
  1096.   thd->proc_info = "connecting to master";
  1097. #ifndef DBUG_OFF  
  1098.   sql_print_error("Slave thread initialized");
  1099. #endif
  1100.   // we can get killed during safe_connect
  1101.   if (!safe_connect(thd, mysql, &glob_mi))
  1102.    sql_print_error("Slave: connected to master '%s@%s:%d',
  1103.   replication started in log '%s' at position %s", glob_mi.user,
  1104.    glob_mi.host, glob_mi.port,
  1105.    RPL_LOG_NAME,
  1106.    llstr(glob_mi.pos,llbuff));
  1107.   else
  1108.   {
  1109.     sql_print_error("Slave thread killed while connecting to master");
  1110.     goto err;
  1111.   }
  1112.   
  1113.   while (!slave_killed(thd))
  1114.   {
  1115.       thd->proc_info = "Requesting binlog dump";
  1116.       if(request_dump(mysql, &glob_mi))
  1117. {
  1118.   sql_print_error("Failed on request_dump()");
  1119.   if(slave_killed(thd))
  1120.     {
  1121.       sql_print_error("Slave thread killed while requesting master 
  1122. dump");
  1123.               goto err;
  1124.     }
  1125.   
  1126.   thd->proc_info = "Waiiting to reconnect after a failed dump request";
  1127.   if(mysql->net.vio)
  1128.     vio_close(mysql->net.vio);
  1129.   // first time retry immediately, assuming that we can recover
  1130.   // right away - if first time fails, sleep between re-tries
  1131.   // hopefuly the admin can fix the problem sometime
  1132.   if(retried_once)
  1133.     safe_sleep(thd, glob_mi.connect_retry);
  1134.   else
  1135.     retried_once = 1;
  1136.   
  1137.   if(slave_killed(thd))
  1138.     {
  1139.       sql_print_error("Slave thread killed while retrying master 
  1140. dump");
  1141.       goto err;
  1142.     }
  1143.   thd->proc_info = "Reconnecting after a failed dump request";
  1144.   last_failed_pos=glob_mi.pos;
  1145.           sql_print_error("Slave: failed dump request, reconnecting to 
  1146. try again, log '%s' at postion %s", RPL_LOG_NAME,
  1147.   llstr(last_failed_pos,llbuff));
  1148.   if(safe_reconnect(thd, mysql, &glob_mi) || slave_killed(thd))
  1149.     {
  1150.       sql_print_error("Slave thread killed during or after reconnect");
  1151.       goto err;
  1152.     }
  1153.   continue;
  1154. }
  1155.       while(!slave_killed(thd))
  1156. {
  1157.   thd->proc_info = "Reading master update";
  1158.   uint event_len = read_event(mysql, &glob_mi);
  1159.   if(slave_killed(thd))
  1160.     {
  1161.       sql_print_error("Slave thread killed while reading event");
  1162.       goto err;
  1163.     }
  1164.   
  1165.   if (event_len == packet_error)
  1166.   {
  1167.     thd->proc_info = "Waiting to reconnect after a failed read";
  1168.     if(mysql->net.vio)
  1169.         vio_close(mysql->net.vio);
  1170.     if(retried_once) // punish repeat offender with sleep
  1171.       safe_sleep(thd, glob_mi.connect_retry);
  1172.     else
  1173.       retried_once = 1; 
  1174.     
  1175.     if(slave_killed(thd))
  1176.       {
  1177. sql_print_error("Slave thread killed while waiting to 
  1178. reconnect after a failed read");
  1179.         goto err;
  1180.       }
  1181.     thd->proc_info = "Reconnecting after a failed read";
  1182.     last_failed_pos= glob_mi.pos;
  1183.     sql_print_error("Slave: Failed reading log event, 
  1184. reconnecting to retry, log '%s' position %s", RPL_LOG_NAME,
  1185.     llstr(last_failed_pos, llbuff));
  1186.     if(safe_reconnect(thd, mysql, &glob_mi) || slave_killed(thd))
  1187.       {
  1188. sql_print_error("Slave thread killed during or after a 
  1189. reconnect done to recover from failed read");
  1190.         goto err;
  1191.       }
  1192.     break;
  1193.   }
  1194.   
  1195.   thd->proc_info = "Processing master log event"; 
  1196.   if(exec_event(thd, &mysql->net, &glob_mi, event_len))
  1197.     {
  1198.       sql_print_error("
  1199. Error running query, slave aborted. Fix the problem, and re-start 
  1200. the slave thread with "mysqladmin start-slave". We stopped at log 
  1201. '%s' position %s",
  1202.       RPL_LOG_NAME, llstr(glob_mi.pos, llbuff));
  1203.       goto err;
  1204.       // there was an error running the query
  1205.       // abort the slave thread, when the problem is fixed, the user
  1206.       // should restart the slave with mysqladmin start-slave
  1207.     }
  1208. #ifndef DBUG_OFF
  1209.   if(abort_slave_event_count && !--events_till_abort)
  1210.     {
  1211.       sql_print_error("Slave: debugging abort");
  1212.       goto err;
  1213.     }
  1214. #endif   
  1215.   
  1216.   // successful exec with offset advance,
  1217.   // the slave repents and his sins are forgiven!
  1218.   if(glob_mi.pos > last_failed_pos)
  1219.     {
  1220.      retried_once = 0;
  1221. #ifndef DBUG_OFF
  1222.      stuck_count = 0;
  1223. #endif
  1224.     }
  1225. #ifndef DBUG_OFF
  1226.   else
  1227.     {
  1228.       // show a little mercy, allow slave to read one more event
  1229.       // before cutting him off - otherwise he gets stuck
  1230.       // on Invar events, since they do not advance the offset
  1231.       // immediately
  1232.       if (++stuck_count > 2)
  1233.         events_till_disconnect++;
  1234.     }
  1235. #endif   
  1236. }
  1237.     }
  1238.   error = 0;
  1239.  err:
  1240.   // print the current replication position 
  1241.   sql_print_error("Slave thread exiting, replication stopped in log '%s' at 
  1242. position %s",
  1243.   RPL_LOG_NAME, llstr(glob_mi.pos,llbuff));
  1244.   thd->query = thd->db = 0; // extra safety
  1245.   if(mysql)
  1246.       mc_mysql_close(mysql);
  1247.   thd->proc_info = "Waiting for slave mutex on exit";
  1248.   pthread_mutex_lock(&LOCK_slave);
  1249.   slave_running = 0;
  1250.   abort_slave = 0;
  1251.   save_temporary_tables = thd->temporary_tables;
  1252.   thd->temporary_tables = 0; // remove tempation from destructor to close them
  1253.   pthread_cond_broadcast(&COND_slave_stopped); // tell the world we are done
  1254.   pthread_mutex_unlock(&LOCK_slave);
  1255.   net_end(&thd->net); // destructor will not free it, because we are weird
  1256.   slave_thd = 0;
  1257.   delete thd;
  1258.   my_thread_end();
  1259. #ifndef DBUG_OFF
  1260.   if(abort_slave_event_count && !events_till_abort)
  1261.     goto slave_begin;
  1262. #endif  
  1263.   pthread_exit(0);
  1264.   DBUG_RETURN(0); // Can't return anything here
  1265. }
  1266. /* try to connect until successful or slave killed */
  1267. static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
  1268. {
  1269.   int slave_was_killed;
  1270. #ifndef DBUG_OFF
  1271.   events_till_disconnect = disconnect_slave_event_count;
  1272. #endif  
  1273.   while(!(slave_was_killed = slave_killed(thd)) &&
  1274. !mc_mysql_connect(mysql, mi->host, mi->user, mi->password, 0,
  1275.   mi->port, 0, 0))
  1276.   {
  1277.     sql_print_error("Slave thread: error connecting to master:%s(%d),
  1278.  retry in %d sec", mc_mysql_error(mysql), errno, mi->connect_retry);
  1279.     safe_sleep(thd, mi->connect_retry);
  1280.   }
  1281.   
  1282.   if(!slave_was_killed)
  1283.     {
  1284.       mysql_log.write(thd, COM_CONNECT_OUT, "%s@%s:%d",
  1285.   mi->user, mi->host, mi->port);
  1286. #ifdef SIGNAL_WITH_VIO_CLOSE
  1287.       thd->set_active_vio(mysql->net.vio);
  1288. #endif      
  1289.     }
  1290.   
  1291.   return slave_was_killed;
  1292. }
  1293. /* try to connect until successful or slave killed */
  1294. static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
  1295. {
  1296.   int slave_was_killed;
  1297.   char llbuff[22];
  1298.  // if we lost connection after reading a state set event
  1299.   // we will be re-reading it, so pending needs to be cleared
  1300.   mi->pending = 0;
  1301. #ifndef DBUG_OFF
  1302.   events_till_disconnect = disconnect_slave_event_count;
  1303. #endif
  1304.   while(!(slave_was_killed = slave_killed(thd)) && mc_mysql_reconnect(mysql))
  1305.   {
  1306.     sql_print_error("Slave thread: error re-connecting to master:
  1307. %s, last_errno=%d, retry in %d sec",
  1308.     mc_mysql_error(mysql), errno, mi->connect_retry);
  1309.      safe_sleep(thd, mi->connect_retry);
  1310.   }
  1311.   if(!slave_was_killed)
  1312.     {
  1313.      sql_print_error("Slave: reconnected to master '%s@%s:%d',
  1314. replication resumed in log '%s' at position %s", glob_mi.user,
  1315.     glob_mi.host, glob_mi.port,
  1316.     RPL_LOG_NAME,
  1317.     llstr(glob_mi.pos,llbuff));
  1318. #ifdef SIGNAL_WITH_VIO_CLOSE
  1319.       thd->set_active_vio(mysql->net.vio);
  1320. #endif      
  1321.     }
  1322.   return slave_was_killed;
  1323. }
  1324. #ifdef __GNUC__
  1325. template class I_List_iterator<i_string>;
  1326. template class I_List_iterator<i_string_pair>;
  1327. #endif