slave.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:144k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1.         2. Then the recorded value for master is 1 and the recorded value for
  2.         slave is 2. At SHOW SLAVE STATUS time, assume that the difference
  3.         between timestamp of slave and rli->last_master_timestamp is 0
  4.         (i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
  5.         This confuses users, so we don't go below 0: hence the max().
  6.         last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
  7.         special marker to say "consider we have caught up".
  8.       */
  9.       protocol->store((longlong)(mi->rli.last_master_timestamp ? max(0, tmp)
  10.                                  : 0));
  11.     }
  12.     else
  13.       protocol->store_null();
  14.     pthread_mutex_unlock(&mi->rli.data_lock);
  15.     pthread_mutex_unlock(&mi->data_lock);
  16.   
  17.     if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length()))
  18.       DBUG_RETURN(-1);
  19.   }
  20.   send_eof(thd);
  21.   DBUG_RETURN(0);
  22. }
  23. bool flush_master_info(MASTER_INFO* mi, bool flush_relay_log_cache)
  24. {
  25.   IO_CACHE* file = &mi->file;
  26.   char lbuf[22];
  27.   DBUG_ENTER("flush_master_info");
  28.   DBUG_PRINT("enter",("master_pos: %ld", (long) mi->master_log_pos));
  29.   /*
  30.     Flush the relay log to disk. If we don't do it, then the relay log while
  31.     have some part (its last kilobytes) in memory only, so if the slave server
  32.     dies now, with, say, from master's position 100 to 150 in memory only (not
  33.     on disk), and with position 150 in master.info, then when the slave
  34.     restarts, the I/O thread will fetch binlogs from 150, so in the relay log
  35.     we will have "[0, 100] U [150, infinity[" and nobody will notice it, so the
  36.     SQL thread will jump from 100 to 150, and replication will silently break.
  37.     When we come to this place in code, relay log may or not be initialized;
  38.     the caller is responsible for setting 'flush_relay_log_cache' accordingly.
  39.   */
  40.   if (flush_relay_log_cache)
  41.     flush_io_cache(mi->rli.relay_log.get_log_file());
  42.   /*
  43.     We flushed the relay log BEFORE the master.info file, because if we crash
  44.     now, we will get a duplicate event in the relay log at restart. If we
  45.     flushed in the other order, we would get a hole in the relay log.
  46.     And duplicate is better than hole (with a duplicate, in later versions we
  47.     can add detection and scrap one event; with a hole there's nothing we can
  48.     do).
  49.   */
  50.   /*
  51.      In certain cases this code may create master.info files that seems 
  52.      corrupted, because of extra lines filled with garbage in the end 
  53.      file (this happens if new contents take less space than previous 
  54.      contents of file). But because of number of lines in the first line 
  55.      of file we don't care about this garbage.
  56.   */
  57.   
  58.   my_b_seek(file, 0L);
  59.   my_b_printf(file, "%un%sn%sn%sn%sn%sn%dn%dn%dn%sn%sn%sn%sn%sn",
  60.       LINES_IN_MASTER_INFO_WITH_SSL,
  61.               mi->master_log_name, llstr(mi->master_log_pos, lbuf),
  62.       mi->host, mi->user,
  63.       mi->password, mi->port, mi->connect_retry,
  64.               (int)(mi->ssl), mi->ssl_ca, mi->ssl_capath, mi->ssl_cert,
  65.               mi->ssl_cipher, mi->ssl_key);
  66.   flush_io_cache(file);
  67.   DBUG_RETURN(0);
  68. }
  69. st_relay_log_info::st_relay_log_info()
  70.   :info_fd(-1), cur_log_fd(-1), save_temporary_tables(0),
  71.    cur_log_old_open_count(0), group_master_log_pos(0), log_space_total(0),
  72.    ignore_log_space_limit(0), last_master_timestamp(0), slave_skip_counter(0),
  73.    abort_pos_wait(0), slave_run_id(0), sql_thd(0), last_slave_errno(0),
  74.    inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
  75.    until_log_pos(0), retried_trans(0)
  76. {
  77.   group_relay_log_name[0]= event_relay_log_name[0]=
  78.     group_master_log_name[0]= 0;
  79.   last_slave_error[0]= until_log_name[0]= ign_master_log_name_end[0]= 0;
  80.   bzero((char*) &info_file, sizeof(info_file));
  81.   bzero((char*) &cache_buf, sizeof(cache_buf));
  82.   pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
  83.   pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
  84.   pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST);
  85.   pthread_cond_init(&data_cond, NULL);
  86.   pthread_cond_init(&start_cond, NULL);
  87.   pthread_cond_init(&stop_cond, NULL);
  88.   pthread_cond_init(&log_space_cond, NULL);
  89.   relay_log.init_pthread_objects();
  90. }
  91. st_relay_log_info::~st_relay_log_info()
  92. {
  93.   pthread_mutex_destroy(&run_lock);
  94.   pthread_mutex_destroy(&data_lock);
  95.   pthread_mutex_destroy(&log_space_lock);
  96.   pthread_cond_destroy(&data_cond);
  97.   pthread_cond_destroy(&start_cond);
  98.   pthread_cond_destroy(&stop_cond);
  99.   pthread_cond_destroy(&log_space_cond);
  100. }
  101. /*
  102.   Waits until the SQL thread reaches (has executed up to) the
  103.   log/position or timed out.
  104.   SYNOPSIS
  105.     wait_for_pos()
  106.     thd             client thread that sent SELECT MASTER_POS_WAIT
  107.     log_name        log name to wait for
  108.     log_pos         position to wait for 
  109.     timeout         timeout in seconds before giving up waiting
  110.   NOTES
  111.     timeout is longlong whereas it should be ulong ; but this is
  112.     to catch if the user submitted a negative timeout.
  113.   RETURN VALUES
  114.     -2          improper arguments (log_pos<0)
  115.                 or slave not running, or master info changed
  116.                 during the function's execution,
  117.                 or client thread killed. -2 is translated to NULL by caller
  118.     -1          timed out
  119.     >=0         number of log events the function had to wait
  120.                 before reaching the desired log/position
  121.  */
  122. int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
  123.                                     longlong log_pos,
  124.                                     longlong timeout)
  125. {
  126.   if (!inited)
  127.     return -1;
  128.   int event_count = 0;
  129.   ulong init_abort_pos_wait;
  130.   int error=0;
  131.   struct timespec abstime; // for timeout checking
  132.   set_timespec(abstime,timeout);
  133.   DBUG_ENTER("wait_for_pos");
  134.   DBUG_PRINT("enter",("group_master_log_name: '%s'  pos: %lu timeout: %ld",
  135.                       group_master_log_name, (ulong) group_master_log_pos, 
  136.                       (long) timeout));
  137.   pthread_mutex_lock(&data_lock);
  138.   const char *msg= thd->enter_cond(&data_cond, &data_lock,
  139.                                    "Waiting for the slave SQL thread to "
  140.                                    "advance position");
  141.   /* 
  142.      This function will abort when it notices that some CHANGE MASTER or
  143.      RESET MASTER has changed the master info.
  144.      To catch this, these commands modify abort_pos_wait ; We just monitor
  145.      abort_pos_wait and see if it has changed.
  146.      Why do we have this mechanism instead of simply monitoring slave_running
  147.      in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
  148.      the SQL thread be stopped?
  149.      This is becasue if someones does:
  150.      STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
  151.      the change may happen very quickly and we may not notice that
  152.      slave_running briefly switches between 1/0/1.
  153.   */
  154.   init_abort_pos_wait= abort_pos_wait;
  155.   /*
  156.     We'll need to
  157.     handle all possible log names comparisons (e.g. 999 vs 1000).
  158.     We use ulong for string->number conversion ; this is no
  159.     stronger limitation than in find_uniq_filename in sql/log.cc
  160.   */
  161.   ulong log_name_extension;
  162.   char log_name_tmp[FN_REFLEN]; //make a char[] from String
  163.   strmake(log_name_tmp, log_name->ptr(), min(log_name->length(), FN_REFLEN-1));
  164.   char *p= fn_ext(log_name_tmp);
  165.   char *p_end;
  166.   if (!*p || log_pos<0)
  167.   {
  168.     error= -2; //means improper arguments
  169.     goto err;
  170.   }
  171.   // Convert 0-3 to 4
  172.   log_pos= max(log_pos, BIN_LOG_HEADER_SIZE);
  173.   /* p points to '.' */
  174.   log_name_extension= strtoul(++p, &p_end, 10);
  175.   /*
  176.     p_end points to the first invalid character.
  177.     If it equals to p, no digits were found, error.
  178.     If it contains '' it means conversion went ok.
  179.   */
  180.   if (p_end==p || *p_end)
  181.   {
  182.     error= -2;
  183.     goto err;
  184.   }    
  185.   /* The "compare and wait" main loop */
  186.   while (!thd->killed &&
  187.          init_abort_pos_wait == abort_pos_wait &&
  188.          slave_running)
  189.   {
  190.     bool pos_reached;
  191.     int cmp_result= 0;
  192.     /*
  193.       group_master_log_name can be "", if we are just after a fresh
  194.       replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
  195.       (before we have executed one Rotate event from the master) or
  196.       (rare) if the user is doing a weird slave setup (see next
  197.       paragraph).  If group_master_log_name is "", we assume we don't
  198.       have enough info to do the comparison yet, so we just wait until
  199.       more data. In this case master_log_pos is always 0 except if
  200.       somebody (wrongly) sets this slave to be a slave of itself
  201.       without using --replicate-same-server-id (an unsupported
  202.       configuration which does nothing), then group_master_log_pos
  203.       will grow and group_master_log_name will stay "".
  204.     */
  205.     if (*group_master_log_name)
  206.     {
  207.       char *basename= (group_master_log_name +
  208.                        dirname_length(group_master_log_name));
  209.       /*
  210.         First compare the parts before the extension.
  211.         Find the dot in the master's log basename,
  212.         and protect against user's input error :
  213.         if the names do not match up to '.' included, return error
  214.       */
  215.       char *q= (char*)(fn_ext(basename)+1);
  216.       if (strncmp(basename, log_name_tmp, (int)(q-basename)))
  217.       {
  218.         error= -2;
  219.         break;
  220.       }
  221.       // Now compare extensions.
  222.       char *q_end;
  223.       ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
  224.       if (group_master_log_name_extension < log_name_extension)
  225.         cmp_result= -1 ;
  226.       else
  227.         cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
  228.       pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
  229.                     cmp_result > 0);
  230.       if (pos_reached || thd->killed)
  231.         break;
  232.     }
  233.     //wait for master update, with optional timeout.
  234.     
  235.     DBUG_PRINT("info",("Waiting for master update"));
  236.     /*
  237.       We are going to pthread_cond_(timed)wait(); if the SQL thread stops it
  238.       will wake us up.
  239.     */
  240.     if (timeout > 0)
  241.     {
  242.       /*
  243.         Note that pthread_cond_timedwait checks for the timeout
  244.         before for the condition ; i.e. it returns ETIMEDOUT 
  245.         if the system time equals or exceeds the time specified by abstime
  246.         before the condition variable is signaled or broadcast, _or_ if
  247.         the absolute time specified by abstime has already passed at the time
  248.         of the call.
  249.         For that reason, pthread_cond_timedwait will do the "timeoutting" job
  250.         even if its condition is always immediately signaled (case of a loaded
  251.         master).
  252.       */
  253.       error=pthread_cond_timedwait(&data_cond, &data_lock, &abstime);
  254.     }
  255.     else
  256.       pthread_cond_wait(&data_cond, &data_lock);
  257.     DBUG_PRINT("info",("Got signal of master update or timed out"));
  258.     if (error == ETIMEDOUT || error == ETIME)
  259.     {
  260.       error= -1;
  261.       break;
  262.     }
  263.     error=0;
  264.     event_count++;
  265.     DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
  266.   }
  267. err:
  268.   thd->exit_cond(msg);
  269.   DBUG_PRINT("exit",("killed: %d  abort: %d  slave_running: %d 
  270. improper_arguments: %d  timed_out: %d",
  271.                      (int) thd->killed,
  272.                      (int) (init_abort_pos_wait != abort_pos_wait),
  273.                      (int) slave_running,
  274.                      (int) (error == -2),
  275.                      (int) (error == -1)));
  276.   if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
  277.       !slave_running) 
  278.   {
  279.     error= -2;
  280.   }
  281.   DBUG_RETURN( error ? error : event_count );
  282. }
  283. /*
  284.   init_slave_thread()
  285. */
  286. static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
  287. {
  288.   DBUG_ENTER("init_slave_thread");
  289.   thd->system_thread = (thd_type == SLAVE_THD_SQL) ?
  290.     SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO; 
  291.   thd->host_or_ip= "";
  292.   my_net_init(&thd->net, 0);
  293.   thd->net.read_timeout = slave_net_timeout;
  294.   thd->master_access= ~(ulong)0;
  295.   thd->priv_user = 0;
  296.   thd->slave_thread = 1;
  297.   /* 
  298.      It's nonsense to constrain the slave threads with max_join_size; if a
  299.      query succeeded on master, we HAVE to execute it. So set
  300.      OPTION_BIG_SELECTS. Setting max_join_size to HA_POS_ERROR is not enough
  301.      (and it's not needed if we have OPTION_BIG_SELECTS) because an INSERT
  302.      SELECT examining more than 4 billion rows would still fail (yes, because
  303.      when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
  304.      only for client threads.
  305.   */
  306.   thd->options = ((opt_log_slave_updates) ? OPTION_BIN_LOG:0) |
  307.     OPTION_AUTO_IS_NULL | OPTION_BIG_SELECTS;
  308.   thd->client_capabilities = CLIENT_LOCAL_FILES;
  309.   thd->real_id=pthread_self();
  310.   pthread_mutex_lock(&LOCK_thread_count);
  311.   thd->thread_id = thread_id++;
  312.   pthread_mutex_unlock(&LOCK_thread_count);
  313.   if (init_thr_lock() || thd->store_globals())
  314.   {
  315.     thd->cleanup();
  316.     delete thd;
  317.     DBUG_RETURN(-1);
  318.   }
  319. #if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__)
  320.   sigset_t set;
  321.   VOID(sigemptyset(&set)); // Get mask in use
  322.   VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
  323. #endif
  324.   if (thd_type == SLAVE_THD_SQL)
  325.     thd->proc_info= "Waiting for the next event in relay log";
  326.   else
  327.     thd->proc_info= "Waiting for master update";
  328.   thd->version=refresh_version;
  329.   thd->set_time();
  330.   DBUG_RETURN(0);
  331. }
  332. static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
  333.       void* thread_killed_arg)
  334. {
  335.   int nap_time;
  336.   thr_alarm_t alarmed;
  337.   thr_alarm_init(&alarmed);
  338.   time_t start_time= time((time_t*) 0);
  339.   time_t end_time= start_time+sec;
  340.   while ((nap_time= (int) (end_time - start_time)) > 0)
  341.   {
  342.     ALARM alarm_buff;
  343.     /*
  344.       The only reason we are asking for alarm is so that
  345.       we will be woken up in case of murder, so if we do not get killed,
  346.       set the alarm so it goes off after we wake up naturally
  347.     */
  348.     thr_alarm(&alarmed, 2 * nap_time, &alarm_buff);
  349.     sleep(nap_time);
  350.     thr_end_alarm(&alarmed);
  351.     
  352.     if ((*thread_killed)(thd,thread_killed_arg))
  353.       return 1;
  354.     start_time=time((time_t*) 0);
  355.   }
  356.   return 0;
  357. }
  358. static int request_dump(MYSQL* mysql, MASTER_INFO* mi,
  359. bool *suppress_warnings)
  360. {
  361.   char buf[FN_REFLEN + 10];
  362.   int len;
  363.   int binlog_flags = 0; // for now
  364.   char* logname = mi->master_log_name;
  365.   DBUG_ENTER("request_dump");
  366.   // TODO if big log files: Change next to int8store()
  367.   int4store(buf, (ulong) mi->master_log_pos);
  368.   int2store(buf + 4, binlog_flags);
  369.   int4store(buf + 6, server_id);
  370.   len = (uint) strlen(logname);
  371.   memcpy(buf + 10, logname,len);
  372.   if (simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
  373.   {
  374.     /*
  375.       Something went wrong, so we will just reconnect and retry later
  376.       in the future, we should do a better error analysis, but for
  377.       now we just fill up the error log :-)
  378.     */
  379.     if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
  380.       *suppress_warnings= 1; // Suppress reconnect warning
  381.     else
  382.       sql_print_error("Error on COM_BINLOG_DUMP: %d  %s, will retry in %d secs",
  383.       mysql_errno(mysql), mysql_error(mysql),
  384.       master_connect_retry);
  385.     DBUG_RETURN(1);
  386.   }
  387.   DBUG_RETURN(0);
  388. }
  389. static int request_table_dump(MYSQL* mysql, const char* db, const char* table)
  390. {
  391.   char buf[1024];
  392.   char * p = buf;
  393.   uint table_len = (uint) strlen(table);
  394.   uint db_len = (uint) strlen(db);
  395.   if (table_len + db_len > sizeof(buf) - 2)
  396.   {
  397.     sql_print_error("request_table_dump: Buffer overrun");
  398.     return 1;
  399.   } 
  400.   
  401.   *p++ = db_len;
  402.   memcpy(p, db, db_len);
  403.   p += db_len;
  404.   *p++ = table_len;
  405.   memcpy(p, table, table_len);
  406.   
  407.   if (simple_command(mysql, COM_TABLE_DUMP, buf, p - buf + table_len, 1))
  408.   {
  409.     sql_print_error("request_table_dump: Error sending the table dump 
  410. command");
  411.     return 1;
  412.   }
  413.   return 0;
  414. }
  415. /*
  416.   Read one event from the master
  417.   
  418.   SYNOPSIS
  419.     read_event()
  420.     mysql MySQL connection
  421.     mi Master connection information
  422.     suppress_warnings TRUE when a normal net read timeout has caused us to
  423. try a reconnect.  We do not want to print anything to
  424. the error log in this case because this a anormal
  425. event in an idle server.
  426.     RETURN VALUES
  427.     'packet_error' Error
  428.     number Length of packet
  429. */
  430. static ulong read_event(MYSQL* mysql, MASTER_INFO *mi, bool* suppress_warnings)
  431. {
  432.   ulong len;
  433.   *suppress_warnings= 0;
  434.   /*
  435.     my_real_read() will time us out
  436.     We check if we were told to die, and if not, try reading again
  437.     TODO:  Move 'events_till_disconnect' to the MASTER_INFO structure
  438.   */
  439. #ifndef DBUG_OFF
  440.   if (disconnect_slave_event_count && !(events_till_disconnect--))
  441.     return packet_error;      
  442. #endif
  443.   
  444.   len = net_safe_read(mysql);
  445.   if (len == packet_error || (long) len < 1)
  446.   {
  447.     if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
  448.     {
  449.       /*
  450. We are trying a normal reconnect after a read timeout;
  451. we suppress prints to .err file as long as the reconnect
  452. happens without problems
  453.       */
  454.       *suppress_warnings= TRUE;
  455.     }
  456.     else
  457.       sql_print_error("Error reading packet from server: %s (
  458. server_errno=%d)",
  459.       mysql_error(mysql), mysql_errno(mysql));
  460.     return packet_error;
  461.   }
  462.   /* Check if eof packet */
  463.   if (len < 8 && mysql->net.read_pos[0] == 254)
  464.   {
  465.      sql_print_error("Slave: received end packet from server, apparent
  466.  master shutdown: %s",
  467.      mysql_error(mysql));
  468.      return packet_error;
  469.   }
  470.   
  471.   DBUG_PRINT("info",( "len=%u, net->read_pos[4] = %dn",
  472.       len, mysql->net.read_pos[4]));
  473.   return len - 1;   
  474. }
  475. int check_expected_error(THD* thd, RELAY_LOG_INFO* rli, int expected_error)
  476. {
  477.   switch (expected_error) {
  478.   case ER_NET_READ_ERROR:
  479.   case ER_NET_ERROR_ON_WRITE:  
  480.   case ER_SERVER_SHUTDOWN:  
  481.   case ER_NEW_ABORTING_CONNECTION:
  482.     return 1;
  483.   default:
  484.     return 0;
  485.   }
  486. }
  487. /*
  488.      Check if condition stated in UNTIL clause of START SLAVE is reached.
  489.    SYNOPSYS
  490.      st_relay_log_info::is_until_satisfied()
  491.    DESCRIPTION
  492.      Checks if UNTIL condition is reached. Uses caching result of last 
  493.      comparison of current log file name and target log file name. So cached 
  494.      value should be invalidated if current log file name changes 
  495.      (see st_relay_log_info::notify_... functions).
  496.      
  497.      This caching is needed to avoid of expensive string comparisons and 
  498.      strtol() conversions needed for log names comparison. We don't need to
  499.      compare them each time this function is called, we only need to do this 
  500.      when current log name changes. If we have UNTIL_MASTER_POS condition we 
  501.      need to do this only after Rotate_log_event::exec_event() (which is 
  502.      rare, so caching gives real benifit), and if we have UNTIL_RELAY_POS 
  503.      condition then we should invalidate cached comarison value after 
  504.      inc_group_relay_log_pos() which called for each group of events (so we
  505.      have some benefit if we have something like queries that use 
  506.      autoincrement or if we have transactions).
  507.      
  508.      Should be called ONLY if until_condition != UNTIL_NONE !
  509.    RETURN VALUE
  510.      true - condition met or error happened (condition seems to have 
  511.             bad log file name)
  512.      false - condition not met
  513. */
  514. bool st_relay_log_info::is_until_satisfied()
  515. {
  516.   const char *log_name;
  517.   ulonglong log_pos;
  518.   DBUG_ASSERT(until_condition != UNTIL_NONE);
  519.   
  520.   if (until_condition == UNTIL_MASTER_POS)
  521.   {
  522.     log_name= group_master_log_name;
  523.     log_pos= group_master_log_pos;
  524.   }
  525.   else
  526.   { /* until_condition == UNTIL_RELAY_POS */
  527.     log_name= group_relay_log_name;
  528.     log_pos= group_relay_log_pos;
  529.   }
  530.   
  531.   if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
  532.   {
  533.     /* 
  534.        We have no cached comaprison results so we should compare log names
  535.        and cache result
  536.     */
  537.     DBUG_ASSERT(*log_name || log_pos == 0);
  538.     
  539.     if (*log_name)
  540.     {
  541.       const char *basename= log_name + dirname_length(log_name);
  542.       
  543.       const char *q= (const char*)(fn_ext(basename)+1);
  544.       if (strncmp(basename, until_log_name, (int)(q-basename)) == 0)
  545.       {
  546.         /* Now compare extensions. */
  547.         char *q_end;
  548.         ulong log_name_extension= strtoul(q, &q_end, 10);
  549.         if (log_name_extension < until_log_name_extension)
  550.           until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
  551.         else
  552.           until_log_names_cmp_result= 
  553.             (log_name_extension > until_log_name_extension) ? 
  554.             UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
  555.       }
  556.       else  
  557.       {
  558.         /* Probably error so we aborting */
  559.         sql_print_error("Slave SQL thread is stopped because UNTIL "
  560.                         "condition is bad.");
  561.         return TRUE;
  562.       }
  563.     }
  564.     else
  565.       return until_log_pos == 0;
  566.   }
  567.     
  568.   return ((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL && 
  569.            log_pos >= until_log_pos) ||
  570.           until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER);
  571. }
  572. static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
  573. {
  574.   /*
  575.      We acquire this mutex since we need it for all operations except
  576.      event execution. But we will release it in places where we will 
  577.      wait for something for example inside of next_event().
  578.    */
  579.   pthread_mutex_lock(&rli->data_lock);
  580.   
  581.   /*
  582.     This tests if the position of the end of the last previous executed event
  583.     hits the UNTIL barrier.
  584.     We would prefer to test if the position of the start (or possibly) end of
  585.     the to-be-read event hits the UNTIL barrier, this is different if there
  586.     was an event ignored by the I/O thread just before (BUG#13861 to be
  587.     fixed).
  588.   */
  589.   if (rli->until_condition!=RELAY_LOG_INFO::UNTIL_NONE && 
  590.       rli->is_until_satisfied()) 
  591.   {
  592.     char buf[22];
  593.     sql_print_error("Slave SQL thread stopped because it reached its"
  594.                     " UNTIL position %s", llstr(rli->until_pos(), buf));
  595.     /* 
  596.       Setting abort_slave flag because we do not want additional message about
  597.       error in query execution to be printed.
  598.     */
  599.     rli->abort_slave= 1;
  600.     pthread_mutex_unlock(&rli->data_lock);
  601.     return 1;
  602.   }
  603.   
  604.   Log_event * ev = next_event(rli);
  605.   
  606.   DBUG_ASSERT(rli->sql_thd==thd);
  607.   
  608.   if (sql_slave_killed(thd,rli))
  609.   {
  610.     pthread_mutex_unlock(&rli->data_lock);
  611.     delete ev;
  612.     return 1;
  613.   }
  614.   if (ev)
  615.   {
  616.     int type_code = ev->get_type_code();
  617.     int exec_res;
  618.     /*
  619.       Skip queries originating from this server or number of
  620.       queries specified by the user in slave_skip_counter
  621.       We can't however skip event's that has something to do with the
  622.       log files themselves.
  623.     */
  624.     if ((ev->server_id == (uint32) ::server_id && !replicate_same_server_id) ||
  625. (rli->slave_skip_counter && type_code != ROTATE_EVENT))
  626.     {
  627.       rli->inc_group_relay_log_pos(ev->get_event_len(),
  628.    type_code != STOP_EVENT ? ev->log_pos : LL(0),
  629.    1/* skip lock*/);
  630.       flush_relay_log_info(rli);
  631.       /*
  632. Protect against common user error of setting the counter to 1
  633. instead of 2 while recovering from an failed auto-increment insert
  634.       */
  635.       if (rli->slave_skip_counter && 
  636.   !((type_code == INTVAR_EVENT || type_code == STOP_EVENT) &&
  637.     rli->slave_skip_counter == 1))
  638.         --rli->slave_skip_counter;
  639.       pthread_mutex_unlock(&rli->data_lock);
  640.       delete ev;     
  641.       return 0; // avoid infinite update loops
  642.     } 
  643.     pthread_mutex_unlock(&rli->data_lock);
  644.   
  645.     thd->server_id = ev->server_id; // use the original server id for logging
  646.     thd->set_time(); // time the query
  647.     thd->lex->current_select= 0;
  648.     if (!ev->when)
  649.       ev->when = time(NULL);
  650.     ev->thd = thd;
  651.     exec_res = ev->exec_event(rli);
  652.     DBUG_ASSERT(rli->sql_thd==thd);
  653.     delete ev;
  654.     if (slave_trans_retries)
  655.     {
  656.       if (exec_res &&
  657.           (thd->net.last_errno == ER_LOCK_DEADLOCK ||
  658.            thd->net.last_errno == ER_LOCK_WAIT_TIMEOUT) &&
  659.           !thd->is_fatal_error)
  660.       {
  661.         const char *errmsg;
  662.         /*
  663.           We were in a transaction which has been rolled back because of a
  664.           deadlock (currently, InnoDB deadlock detected by InnoDB) or lock
  665.           wait timeout (innodb_lock_wait_timeout exceeded); let's seek back to
  666.           BEGIN log event and retry it all again.
  667.           We have to not only seek but also
  668.           a) init_master_info(), to seek back to hot relay log's start for later
  669.           (for when we will come back to this hot log after re-processing the
  670.           possibly existing old logs where BEGIN is: check_binlog_magic() will
  671.           then need the cache to be at position 0 (see comments at beginning of
  672.           init_master_info()).
  673.           b) init_relay_log_pos(), because the BEGIN may be an older relay log.
  674.         */
  675.         if (rli->trans_retries < slave_trans_retries)
  676.         {
  677.           if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL))
  678.             sql_print_error("Failed to initialize the master info structure");
  679.           else if (init_relay_log_pos(rli,
  680.                                       rli->group_relay_log_name,
  681.                                       rli->group_relay_log_pos,
  682.                                       1, &errmsg))
  683.             sql_print_error("Error initializing relay log position: %s",
  684.                             errmsg);
  685.           else
  686.           {
  687.             exec_res= 0;
  688.     /* chance for concurrent connection to get more locks */
  689.             safe_sleep(thd, min(rli->trans_retries, MAX_SLAVE_RETRY_PAUSE),
  690.        (CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
  691.             pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
  692.     rli->trans_retries++;
  693.             rli->retried_trans++;
  694.             pthread_mutex_unlock(&rli->data_lock);
  695.             DBUG_PRINT("info", ("Slave retries transaction "
  696.                                 "rli->trans_retries: %lu", rli->trans_retries));
  697.   }
  698.         }
  699.         else
  700.           sql_print_error("Slave SQL thread retried transaction %lu time(s) "
  701.                           "in vain, giving up. Consider raising the value of "
  702.                           "the slave_transaction_retries variable.",
  703.                           slave_trans_retries);
  704.       }
  705.       if (!((thd->options & OPTION_BEGIN) && opt_using_transactions))
  706.          rli->trans_retries= 0; // restart from fresh
  707.      }
  708.     return exec_res;
  709.   }
  710.   else
  711.   {
  712.     pthread_mutex_unlock(&rli->data_lock);
  713.     slave_print_error(rli, 0, "
  714. Could not parse relay log event entry. The possible reasons are: the master's 
  715. binary log is corrupted (you can check this by running 'mysqlbinlog' on the 
  716. binary log), the slave's relay log is corrupted (you can check this by running 
  717. 'mysqlbinlog' on the relay log), a network problem, or a bug in the master's 
  718. or slave's MySQL code. If you want to check the master's binary log or slave's 
  719. relay log, you will be able to know their names by issuing 'SHOW SLAVE STATUS' 
  720. on this slave.
  721. ");
  722.     return 1;
  723.   }
  724. }
  725. /* Slave I/O Thread entry point */
  726. extern "C" pthread_handler_decl(handle_slave_io,arg)
  727. {
  728.   THD *thd; // needs to be first for thread_stack
  729.   MYSQL *mysql;
  730.   MASTER_INFO *mi = (MASTER_INFO*)arg;
  731.   RELAY_LOG_INFO *rli= &mi->rli;
  732.   char llbuff[22];
  733.   uint retry_count;
  734.   
  735.   // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
  736.   my_thread_init();
  737.   DBUG_ENTER("handle_slave_io");
  738. #ifndef DBUG_OFF
  739. slave_begin:
  740. #endif  
  741.   DBUG_ASSERT(mi->inited);
  742.   mysql= NULL ;
  743.   retry_count= 0;
  744.   pthread_mutex_lock(&mi->run_lock);
  745.   /* Inform waiting threads that slave has started */
  746.   mi->slave_run_id++;
  747. #ifndef DBUG_OFF  
  748.   mi->events_till_abort = abort_slave_event_count;
  749. #endif  
  750.   
  751.   thd= new THD; // note that contructor of THD uses DBUG_ !
  752.   THD_CHECK_SENTRY(thd);
  753.   pthread_detach_this_thread();
  754.   if (init_slave_thread(thd, SLAVE_THD_IO))
  755.   {
  756.     pthread_cond_broadcast(&mi->start_cond);
  757.     pthread_mutex_unlock(&mi->run_lock);
  758.     sql_print_error("Failed during slave I/O thread initialization");
  759.     goto err;
  760.   }
  761.   mi->io_thd = thd;
  762.   thd->thread_stack = (char*)&thd; // remember where our stack is
  763.   pthread_mutex_lock(&LOCK_thread_count);
  764.   threads.append(thd);
  765.   pthread_mutex_unlock(&LOCK_thread_count);
  766.   mi->slave_running = 1;
  767.   mi->abort_slave = 0;
  768.   pthread_mutex_unlock(&mi->run_lock);
  769.   pthread_cond_broadcast(&mi->start_cond);
  770.   
  771.   DBUG_PRINT("master_info",("log_file_name: '%s'  position: %s",
  772.     mi->master_log_name,
  773.     llstr(mi->master_log_pos,llbuff)));
  774.   
  775.   if (!(mi->mysql = mysql = mysql_init(NULL)))
  776.   {
  777.     sql_print_error("Slave I/O thread: error in mysql_init()");
  778.     goto err;
  779.   }
  780.   
  781.   thd->proc_info = "Connecting to master";
  782.   // we can get killed during safe_connect
  783.   if (!safe_connect(thd, mysql, mi))
  784.     sql_print_information("Slave I/O thread: connected to master '%s@%s:%d',
  785.   replication started in log '%s' at position %s", mi->user,
  786.     mi->host, mi->port,
  787.     IO_RPL_LOG_NAME,
  788.     llstr(mi->master_log_pos,llbuff));
  789.   else
  790.   {
  791.     sql_print_error("Slave I/O thread killed while connecting to master");
  792.     goto err;
  793.   }
  794. connected:
  795.   // TODO: the assignment below should be under mutex (5.0)
  796.   mi->slave_running= MYSQL_SLAVE_RUN_CONNECT;
  797.   thd->slave_net = &mysql->net;
  798.   thd->proc_info = "Checking master version";
  799.   if (get_master_version_and_clock(mysql, mi))
  800.     goto err;
  801.   if (!mi->old_format)
  802.   {
  803.     /*
  804.       Register ourselves with the master.
  805.       If fails, this is not fatal - we just print the error message and go
  806.       on with life.
  807.     */
  808.     thd->proc_info = "Registering slave on master";
  809.     if (register_slave_on_master(mysql) ||  update_slave_list(mysql, mi))
  810.       goto err;
  811.   }
  812.   
  813.   DBUG_PRINT("info",("Starting reading binary log from master"));
  814.   while (!io_slave_killed(thd,mi))
  815.   {
  816.     bool suppress_warnings= 0;    
  817.     thd->proc_info = "Requesting binlog dump";
  818.     if (request_dump(mysql, mi, &suppress_warnings))
  819.     {
  820.       sql_print_error("Failed on request_dump()");
  821.       if (io_slave_killed(thd,mi))
  822.       {
  823. sql_print_error("Slave I/O thread killed while requesting master 
  824. dump");
  825. goto err;
  826.       }
  827.   
  828.       mi->slave_running= MYSQL_SLAVE_RUN_NOT_CONNECT;
  829.       thd->proc_info= "Waiting to reconnect after a failed binlog dump request";
  830. #ifdef SIGNAL_WITH_VIO_CLOSE
  831.       thd->clear_active_vio();
  832. #endif
  833.       end_server(mysql);
  834.       /*
  835. First time retry immediately, assuming that we can recover
  836. right away - if first time fails, sleep between re-tries
  837. hopefuly the admin can fix the problem sometime
  838.       */
  839.       if (retry_count++)
  840.       {
  841. if (retry_count > master_retry_count)
  842.   goto err; // Don't retry forever
  843. safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
  844.    (void*)mi);
  845.       }
  846.       if (io_slave_killed(thd,mi))
  847.       {
  848. sql_print_error("Slave I/O thread killed while retrying master 
  849. dump");
  850. goto err;
  851.       }
  852.       thd->proc_info = "Reconnecting after a failed binlog dump request";
  853.       if (!suppress_warnings)
  854. sql_print_error("Slave I/O thread: failed dump request, 
  855. reconnecting to try again, log '%s' at postion %s", IO_RPL_LOG_NAME,
  856. llstr(mi->master_log_pos,llbuff));
  857.       if (safe_reconnect(thd, mysql, mi, suppress_warnings) ||
  858.   io_slave_killed(thd,mi))
  859.       {
  860. sql_print_error("Slave I/O thread killed during or 
  861. after reconnect");
  862. goto err;
  863.       }
  864.       goto connected;
  865.     }
  866.     while (!io_slave_killed(thd,mi))
  867.     {
  868.       bool suppress_warnings= 0;    
  869.       /* 
  870.          We say "waiting" because read_event() will wait if there's nothing to
  871.          read. But if there's something to read, it will not wait. The important
  872.          thing is to not confuse users by saying "reading" whereas we're in fact
  873.          receiving nothing.
  874.       */
  875.       thd->proc_info = "Waiting for master to send event";
  876.       ulong event_len = read_event(mysql, mi, &suppress_warnings);
  877.       if (io_slave_killed(thd,mi))
  878.       {
  879. if (global_system_variables.log_warnings)
  880.   sql_print_error("Slave I/O thread killed while reading event");
  881. goto err;
  882.       }
  883.      
  884.       if (event_len == packet_error)
  885.       {
  886. uint mysql_error_number= mysql_errno(mysql);
  887. if (mysql_error_number == ER_NET_PACKET_TOO_LARGE)
  888. {
  889.   sql_print_error("
  890. Log entry on master is longer than max_allowed_packet (%ld) on 
  891. slave. If the entry is correct, restart the server with a higher value of 
  892. max_allowed_packet",
  893.   thd->variables.max_allowed_packet);
  894.   goto err;
  895. }
  896. if (mysql_error_number == ER_MASTER_FATAL_ERROR_READING_BINLOG)
  897. {
  898.   sql_print_error(ER(mysql_error_number), mysql_error_number,
  899.   mysql_error(mysql));
  900.   goto err;
  901. }
  902.         mi->slave_running= MYSQL_SLAVE_RUN_NOT_CONNECT;
  903. thd->proc_info = "Waiting to reconnect after a failed master event read";
  904. #ifdef SIGNAL_WITH_VIO_CLOSE
  905.         thd->clear_active_vio();
  906. #endif
  907. end_server(mysql);
  908. if (retry_count++)
  909. {
  910.   if (retry_count > master_retry_count)
  911.     goto err; // Don't retry forever
  912.   safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
  913.      (void*) mi);
  914. }     
  915. if (io_slave_killed(thd,mi))
  916. {
  917.   if (global_system_variables.log_warnings)
  918.     sql_print_error("Slave I/O thread killed while waiting to 
  919. reconnect after a failed read");
  920.   goto err;
  921. }
  922. thd->proc_info = "Reconnecting after a failed master event read";
  923. if (!suppress_warnings)
  924.   sql_print_error("Slave I/O thread: Failed reading log event, 
  925. reconnecting to retry, log '%s' position %s", IO_RPL_LOG_NAME,
  926.   llstr(mi->master_log_pos, llbuff));
  927. if (safe_reconnect(thd, mysql, mi, suppress_warnings) ||
  928.     io_slave_killed(thd,mi))
  929. {
  930.   if (global_system_variables.log_warnings)
  931.     sql_print_error("Slave I/O thread killed during or after a 
  932. reconnect done to recover from failed read");
  933.   goto err;
  934. }
  935. goto connected;
  936.       } // if (event_len == packet_error)
  937.   
  938.       retry_count=0; // ok event, reset retry counter
  939.       thd->proc_info = "Queueing master event to the relay log";
  940.       if (queue_event(mi,(const char*)mysql->net.read_pos + 1,
  941.       event_len))
  942.       {
  943. sql_print_error("Slave I/O thread could not queue event from master");
  944. goto err;
  945.       }
  946.       flush_master_info(mi, 1); /* sure that we can flush the relay log */
  947.       /*
  948.         See if the relay logs take too much space.
  949.         We don't lock mi->rli.log_space_lock here; this dirty read saves time
  950.         and does not introduce any problem:
  951.         - if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
  952.         the clean value is 0), then we are reading only one more event as we
  953.         should, and we'll block only at the next event. No big deal.
  954.         - if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
  955.         the clean value is 1), then we are going into wait_for_relay_log_space()
  956.         for no reason, but this function will do a clean read, notice the clean
  957.         value and exit immediately.
  958.       */
  959. #ifndef DBUG_OFF
  960.       {
  961.         char llbuf1[22], llbuf2[22];
  962.         DBUG_PRINT("info", ("log_space_limit=%s log_space_total=%s 
  963. ignore_log_space_limit=%d",
  964.                             llstr(rli->log_space_limit,llbuf1),
  965.                             llstr(rli->log_space_total,llbuf2),
  966.                             (int) rli->ignore_log_space_limit)); 
  967.       }
  968. #endif
  969.       if (rli->log_space_limit && rli->log_space_limit <
  970.   rli->log_space_total &&
  971.           !rli->ignore_log_space_limit)
  972. if (wait_for_relay_log_space(rli))
  973. {
  974.   sql_print_error("Slave I/O thread aborted while waiting for relay 
  975. log space");
  976.   goto err;
  977. }
  978.       // TODO: check debugging abort code
  979. #ifndef DBUG_OFF
  980.       if (abort_slave_event_count && !--events_till_abort)
  981.       {
  982. sql_print_error("Slave I/O thread: debugging abort");
  983. goto err;
  984.       }
  985. #endif
  986.     } 
  987.   }
  988.   // error = 0;
  989. err:
  990.   // print the current replication position
  991.   sql_print_error("Slave I/O thread exiting, read up to log '%s', position %s",
  992.   IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
  993.   VOID(pthread_mutex_lock(&LOCK_thread_count));
  994.   thd->query = thd->db = 0; // extra safety
  995.   thd->query_length= thd->db_length= 0;
  996.   VOID(pthread_mutex_unlock(&LOCK_thread_count));
  997.   if (mysql)
  998.   {
  999.     mysql_close(mysql);
  1000.     mi->mysql=0;
  1001.   }
  1002.   write_ignored_events_info_to_relay_log(thd, mi);
  1003.   thd->proc_info = "Waiting for slave mutex on exit";
  1004.   pthread_mutex_lock(&mi->run_lock);
  1005.   mi->slave_running = 0;
  1006.   mi->io_thd = 0;
  1007.   // TODO: make rpl_status part of MASTER_INFO
  1008.   change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
  1009.   mi->abort_slave = 0; // TODO: check if this is needed
  1010.   DBUG_ASSERT(thd->net.buff != 0);
  1011.   net_end(&thd->net); // destructor will not free it, because net.vio is 0
  1012.   close_thread_tables(thd, 0);
  1013.   pthread_mutex_lock(&LOCK_thread_count);
  1014.   THD_CHECK_SENTRY(thd);
  1015.   delete thd;
  1016.   pthread_mutex_unlock(&LOCK_thread_count);
  1017.   pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done
  1018.   pthread_mutex_unlock(&mi->run_lock);
  1019. #ifndef DBUG_OFF
  1020.   if (abort_slave_event_count && !events_till_abort)
  1021.     goto slave_begin;
  1022. #endif  
  1023.   my_thread_end();
  1024.   pthread_exit(0);
  1025.   DBUG_RETURN(0); // Can't return anything here
  1026. }
  1027. /* Slave SQL Thread entry point */
  1028. extern "C" pthread_handler_decl(handle_slave_sql,arg)
  1029. {
  1030.   THD *thd; /* needs to be first for thread_stack */
  1031.   char llbuff[22],llbuff1[22];
  1032.   RELAY_LOG_INFO* rli = &((MASTER_INFO*)arg)->rli; 
  1033.   const char *errmsg;
  1034.   // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
  1035.   my_thread_init();
  1036.   DBUG_ENTER("handle_slave_sql");
  1037. #ifndef DBUG_OFF
  1038. slave_begin:  
  1039. #endif  
  1040.   DBUG_ASSERT(rli->inited);
  1041.   pthread_mutex_lock(&rli->run_lock);
  1042.   DBUG_ASSERT(!rli->slave_running);
  1043.   errmsg= 0;
  1044. #ifndef DBUG_OFF  
  1045.   rli->events_till_abort = abort_slave_event_count;
  1046. #endif  
  1047.   thd = new THD; // note that contructor of THD uses DBUG_ !
  1048.   thd->thread_stack = (char*)&thd; // remember where our stack is
  1049.   
  1050.   /* Inform waiting threads that slave has started */
  1051.   rli->slave_run_id++;
  1052.   pthread_detach_this_thread();
  1053.   if (init_slave_thread(thd, SLAVE_THD_SQL))
  1054.   {
  1055.     /*
  1056.       TODO: this is currently broken - slave start and change master
  1057.       will be stuck if we fail here
  1058.     */
  1059.     pthread_cond_broadcast(&rli->start_cond);
  1060.     pthread_mutex_unlock(&rli->run_lock);
  1061.     sql_print_error("Failed during slave thread initialization");
  1062.     goto err;
  1063.   }
  1064.   thd->init_for_queries();
  1065.   rli->sql_thd= thd;
  1066.   thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
  1067.   pthread_mutex_lock(&LOCK_thread_count);
  1068.   threads.append(thd);
  1069.   pthread_mutex_unlock(&LOCK_thread_count);
  1070.   /*
  1071.     We are going to set slave_running to 1. Assuming slave I/O thread is
  1072.     alive and connected, this is going to make Seconds_Behind_Master be 0
  1073.     i.e. "caught up". Even if we're just at start of thread. Well it's ok, at
  1074.     the moment we start we can think we are caught up, and the next second we
  1075.     start receiving data so we realize we are not caught up and
  1076.     Seconds_Behind_Master grows. No big deal.
  1077.   */
  1078.   rli->slave_running = 1;
  1079.   rli->abort_slave = 0;
  1080.   pthread_mutex_unlock(&rli->run_lock);
  1081.   pthread_cond_broadcast(&rli->start_cond);
  1082.   /*
  1083.     Reset errors for a clean start (otherwise, if the master is idle, the SQL
  1084.     thread may execute no Query_log_event, so the error will remain even
  1085.     though there's no problem anymore). Do not reset the master timestamp
  1086.     (imagine the slave has caught everything, the STOP SLAVE and START SLAVE:
  1087.     as we are not sure that we are going to receive a query, we want to
  1088.     remember the last master timestamp (to say how many seconds behind we are
  1089.     now.
  1090.     But the master timestamp is reset by RESET SLAVE & CHANGE MASTER.
  1091.   */
  1092.   clear_slave_error(rli);
  1093.   //tell the I/O thread to take relay_log_space_limit into account from now on
  1094.   pthread_mutex_lock(&rli->log_space_lock);
  1095.   rli->ignore_log_space_limit= 0;
  1096.   pthread_mutex_unlock(&rli->log_space_lock);
  1097.   rli->trans_retries= 0; // start from "no error"
  1098.   if (init_relay_log_pos(rli,
  1099.  rli->group_relay_log_name,
  1100.  rli->group_relay_log_pos,
  1101.  1 /*need data lock*/, &errmsg))
  1102.   {
  1103.     sql_print_error("Error initializing relay log position: %s",
  1104.     errmsg);
  1105.     goto err;
  1106.   }
  1107.   THD_CHECK_SENTRY(thd);
  1108.   DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
  1109.   DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
  1110.   DBUG_ASSERT(rli->sql_thd == thd);
  1111.   DBUG_PRINT("master_info",("log_file_name: %s  position: %s",
  1112.     rli->group_master_log_name,
  1113.     llstr(rli->group_master_log_pos,llbuff)));
  1114.   if (global_system_variables.log_warnings)
  1115.     sql_print_information("Slave SQL thread initialized, starting replication in 
  1116. log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
  1117.     llstr(rli->group_master_log_pos,llbuff),rli->group_relay_log_name,
  1118.     llstr(rli->group_relay_log_pos,llbuff1));
  1119.   /* execute init_slave variable */
  1120.   if (sys_init_slave.value_length)
  1121.   {
  1122.     execute_init_command(thd, &sys_init_slave, &LOCK_sys_init_slave);
  1123.     if (thd->query_error)
  1124.     {
  1125.       sql_print_error("
  1126. Slave SQL thread aborted. Can't execute init_slave query");
  1127.       goto err;
  1128.     }
  1129.   }
  1130.   /* Read queries from the IO/THREAD until this thread is killed */
  1131.   while (!sql_slave_killed(thd,rli))
  1132.   {
  1133.     thd->proc_info = "Reading event from the relay log";
  1134.     DBUG_ASSERT(rli->sql_thd == thd);
  1135.     THD_CHECK_SENTRY(thd);
  1136.     if (exec_relay_log_event(thd,rli))
  1137.     {
  1138.       // do not scare the user if SQL thread was simply killed or stopped
  1139.       if (!sql_slave_killed(thd,rli))
  1140.         sql_print_error("
  1141. Error running query, slave SQL thread aborted. Fix the problem, and restart 
  1142. the slave SQL thread with "SLAVE START". We stopped at log 
  1143. '%s' position %s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff));
  1144.       goto err;
  1145.     }
  1146.   }
  1147.   /* Thread stopped. Print the current replication position to the log */
  1148.   sql_print_information("Slave SQL thread exiting, replication stopped in log 
  1149.  '%s' at position %s", RPL_LOG_NAME, llstr(rli->group_master_log_pos,llbuff));
  1150.  err:
  1151.   VOID(pthread_mutex_lock(&LOCK_thread_count));
  1152.   thd->query = thd->db = 0; // extra safety
  1153.   thd->query_length= thd->db_length= 0;
  1154.   VOID(pthread_mutex_unlock(&LOCK_thread_count));
  1155.   thd->proc_info = "Waiting for slave mutex on exit";
  1156.   pthread_mutex_lock(&rli->run_lock);
  1157.   /* We need data_lock, at least to wake up any waiting master_pos_wait() */
  1158.   pthread_mutex_lock(&rli->data_lock);
  1159.   DBUG_ASSERT(rli->slave_running == 1); // tracking buffer overrun
  1160.   /* When master_pos_wait() wakes up it will check this and terminate */
  1161.   rli->slave_running= 0; 
  1162.   /* 
  1163.      Going out of the transaction. Necessary to mark it, in case the user
  1164.      restarts replication from a non-transactional statement (with CHANGE
  1165.      MASTER).
  1166.   */
  1167.   /* Wake up master_pos_wait() */
  1168.   pthread_mutex_unlock(&rli->data_lock);
  1169.   DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions"));
  1170.   pthread_cond_broadcast(&rli->data_cond);
  1171.   rli->ignore_log_space_limit= 0; /* don't need any lock */
  1172.   rli->save_temporary_tables = thd->temporary_tables;
  1173.   /*
  1174.     TODO: see if we can do this conditionally in next_event() instead
  1175.     to avoid unneeded position re-init
  1176.   */
  1177.   thd->temporary_tables = 0; // remove tempation from destructor to close them
  1178.   DBUG_ASSERT(thd->net.buff != 0);
  1179.   net_end(&thd->net); // destructor will not free it, because we are weird
  1180.   DBUG_ASSERT(rli->sql_thd == thd);
  1181.   THD_CHECK_SENTRY(thd);
  1182.   rli->sql_thd= 0;
  1183.   pthread_mutex_lock(&LOCK_thread_count);
  1184.   THD_CHECK_SENTRY(thd);
  1185.   delete thd;
  1186.   pthread_mutex_unlock(&LOCK_thread_count);
  1187.   pthread_cond_broadcast(&rli->stop_cond);
  1188.   // tell the world we are done
  1189.   pthread_mutex_unlock(&rli->run_lock);
  1190. #ifndef DBUG_OFF // TODO: reconsider the code below
  1191.   if (abort_slave_event_count && !rli->events_till_abort)
  1192.     goto slave_begin;
  1193. #endif  
  1194.   my_thread_end();
  1195.   pthread_exit(0);
  1196.   DBUG_RETURN(0); // Can't return anything here
  1197. }
  1198. /*
  1199.   process_io_create_file()
  1200. */
  1201. static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev)
  1202. {
  1203.   int error = 1;
  1204.   ulong num_bytes;
  1205.   bool cev_not_written;
  1206.   THD *thd = mi->io_thd;
  1207.   NET *net = &mi->mysql->net;
  1208.   DBUG_ENTER("process_io_create_file");
  1209.   if (unlikely(!cev->is_valid()))
  1210.     DBUG_RETURN(1);
  1211.   /*
  1212.     TODO: fix to honor table rules, not only db rules
  1213.   */
  1214.   if (!db_ok(cev->db, replicate_do_db, replicate_ignore_db))
  1215.   {
  1216.     skip_load_data_infile(net);
  1217.     DBUG_RETURN(0);
  1218.   }
  1219.   DBUG_ASSERT(cev->inited_from_old);
  1220.   thd->file_id = cev->file_id = mi->file_id++;
  1221.   thd->server_id = cev->server_id;
  1222.   cev_not_written = 1;
  1223.   
  1224.   if (unlikely(net_request_file(net,cev->fname)))
  1225.   {
  1226.     sql_print_error("Slave I/O: failed requesting download of '%s'",
  1227.     cev->fname);
  1228.     goto err;
  1229.   }
  1230.   /*
  1231.     This dummy block is so we could instantiate Append_block_log_event
  1232.     once and then modify it slightly instead of doing it multiple times
  1233.     in the loop
  1234.   */
  1235.   {
  1236.     Append_block_log_event aev(thd,0,0,0,0);
  1237.   
  1238.     for (;;)
  1239.     {
  1240.       if (unlikely((num_bytes=my_net_read(net)) == packet_error))
  1241.       {
  1242. sql_print_error("Network read error downloading '%s' from master",
  1243. cev->fname);
  1244. goto err;
  1245.       }
  1246.       if (unlikely(!num_bytes)) /* eof */
  1247.       {
  1248. net_write_command(net, 0, "", 0, "", 0);/* 3.23 master wants it */
  1249.         /*
  1250.           If we wrote Create_file_log_event, then we need to write
  1251.           Execute_load_log_event. If we did not write Create_file_log_event,
  1252.           then this is an empty file and we can just do as if the LOAD DATA
  1253.           INFILE had not existed, i.e. write nothing.
  1254.         */
  1255.         if (unlikely(cev_not_written))
  1256.   break;
  1257. Execute_load_log_event xev(thd,0,0);
  1258. xev.log_pos = mi->master_log_pos;
  1259. if (unlikely(mi->rli.relay_log.append(&xev)))
  1260. {
  1261.   sql_print_error("Slave I/O: error writing Exec_load event to 
  1262. relay log");
  1263.   goto err;
  1264. }
  1265. mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
  1266. break;
  1267.       }
  1268.       if (unlikely(cev_not_written))
  1269.       {
  1270. cev->block = (char*)net->read_pos;
  1271. cev->block_len = num_bytes;
  1272. cev->log_pos = mi->master_log_pos;
  1273. if (unlikely(mi->rli.relay_log.append(cev)))
  1274. {
  1275.   sql_print_error("Slave I/O: error writing Create_file event to 
  1276. relay log");
  1277.   goto err;
  1278. }
  1279. cev_not_written=0;
  1280. mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
  1281.       }
  1282.       else
  1283.       {
  1284. aev.block = (char*)net->read_pos;
  1285. aev.block_len = num_bytes;
  1286. aev.log_pos = mi->master_log_pos;
  1287. if (unlikely(mi->rli.relay_log.append(&aev)))
  1288. {
  1289.   sql_print_error("Slave I/O: error writing Append_block event to 
  1290. relay log");
  1291.   goto err;
  1292. }
  1293. mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
  1294.       }
  1295.     }
  1296.   }
  1297.   error=0;
  1298. err:
  1299.   DBUG_RETURN(error);
  1300. }
  1301. /*
  1302.   Start using a new binary log on the master
  1303.   SYNOPSIS
  1304.     process_io_rotate()
  1305.     mi master_info for the slave
  1306.     rev The rotate log event read from the binary log
  1307.   DESCRIPTION
  1308.     Updates the master info with the place in the next binary
  1309.     log where we should start reading.
  1310.   NOTES
  1311.     We assume we already locked mi->data_lock
  1312.   RETURN VALUES
  1313.     0 ok
  1314.     1         Log event is illegal
  1315. */
  1316. static int process_io_rotate(MASTER_INFO *mi, Rotate_log_event *rev)
  1317. {
  1318.   DBUG_ENTER("process_io_rotate");
  1319.   safe_mutex_assert_owner(&mi->data_lock);
  1320.   if (unlikely(!rev->is_valid()))
  1321.     DBUG_RETURN(1);
  1322.   /* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
  1323.   memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1);
  1324.   mi->master_log_pos= rev->pos;
  1325.   DBUG_PRINT("info", ("master_log_pos: '%s' %d",
  1326.       mi->master_log_name, (ulong) mi->master_log_pos));
  1327. #ifndef DBUG_OFF
  1328.   /*
  1329.     If we do not do this, we will be getting the first
  1330.     rotate event forever, so we need to not disconnect after one.
  1331.   */
  1332.   if (disconnect_slave_event_count)
  1333.     events_till_disconnect++;
  1334. #endif
  1335.   DBUG_RETURN(0);
  1336. }
  1337. /*
  1338.   queue_old_event()
  1339.   Writes a 3.23 event to the relay log.
  1340.   TODO: 
  1341.     Test this code before release - it has to be tested on a separate
  1342.     setup with 3.23 master 
  1343. */
  1344. static int queue_old_event(MASTER_INFO *mi, const char *buf,
  1345.    ulong event_len)
  1346. {
  1347.   const char *errmsg = 0;
  1348.   ulong inc_pos;
  1349.   bool ignore_event= 0;
  1350.   char *tmp_buf = 0;
  1351.   RELAY_LOG_INFO *rli= &mi->rli;
  1352.   DBUG_ENTER("queue_old_event");
  1353.   /*
  1354.     If we get Load event, we need to pass a non-reusable buffer
  1355.     to read_log_event, so we do a trick
  1356.   */
  1357.   if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
  1358.   {
  1359.     if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
  1360.     {
  1361.       sql_print_error("Slave I/O: out of memory for Load event");
  1362.       DBUG_RETURN(1);
  1363.     }
  1364.     memcpy(tmp_buf,buf,event_len);
  1365.     /*
  1366.       Create_file constructor wants a 0 as last char of buffer, this 0 will
  1367.       serve as the string-termination char for the file's name (which is at the
  1368.       end of the buffer)
  1369.       We must increment event_len, otherwise the event constructor will not see
  1370.       this end 0, which leads to segfault.
  1371.     */
  1372.     tmp_buf[event_len++]=0;
  1373.     int4store(tmp_buf+EVENT_LEN_OFFSET, event_len);
  1374.     buf = (const char*)tmp_buf;
  1375.   }
  1376.   /*
  1377.     This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to
  1378.     send the loaded file, and write it to the relay log in the form of
  1379.     Append_block/Exec_load (the SQL thread needs the data, as that thread is not
  1380.     connected to the master).
  1381.   */
  1382.   Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
  1383.     1 /*old format*/ );
  1384.   if (unlikely(!ev))
  1385.   {
  1386.     sql_print_error("Read invalid event from master: '%s',
  1387.  master could be corrupt but a more likely cause of this is a bug",
  1388.     errmsg);
  1389.     my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
  1390.     DBUG_RETURN(1);
  1391.   }
  1392.   pthread_mutex_lock(&mi->data_lock);
  1393.   ev->log_pos = mi->master_log_pos;
  1394.   switch (ev->get_type_code()) {
  1395.   case STOP_EVENT:
  1396.     ignore_event= 1;
  1397.     inc_pos= event_len;
  1398.     break;
  1399.   case ROTATE_EVENT:
  1400.     if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
  1401.     {
  1402.       delete ev;
  1403.       pthread_mutex_unlock(&mi->data_lock);
  1404.       DBUG_RETURN(1);
  1405.     }
  1406.     inc_pos= 0;
  1407.     break;
  1408.   case CREATE_FILE_EVENT:
  1409.     /*
  1410.       Yes it's possible to have CREATE_FILE_EVENT here, even if we're in
  1411.       queue_old_event() which is for 3.23 events which don't comprise
  1412.       CREATE_FILE_EVENT. This is because read_log_event() above has just
  1413.       transformed LOAD_EVENT into CREATE_FILE_EVENT.
  1414.     */
  1415.   {
  1416.     /* We come here when and only when tmp_buf != 0 */
  1417.     DBUG_ASSERT(tmp_buf);
  1418.     int error = process_io_create_file(mi,(Create_file_log_event*)ev);
  1419.     delete ev;
  1420.     /*
  1421.       We had incremented event_len, but now when it is used to calculate the
  1422.       position in the master's log, we must use the original value.
  1423.     */
  1424.     mi->master_log_pos += --event_len;
  1425.     DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
  1426.     pthread_mutex_unlock(&mi->data_lock);
  1427.     my_free((char*)tmp_buf, MYF(0));
  1428.     DBUG_RETURN(error);
  1429.   }
  1430.   default:
  1431.     inc_pos= event_len;
  1432.     break;
  1433.   }
  1434.   if (likely(!ignore_event))
  1435.   {
  1436.     if (unlikely(rli->relay_log.append(ev)))
  1437.     {
  1438.       delete ev;
  1439.       pthread_mutex_unlock(&mi->data_lock);
  1440.       DBUG_RETURN(1);
  1441.     }
  1442.     rli->relay_log.harvest_bytes_written(&rli->log_space_total);
  1443.   }
  1444.   delete ev;
  1445.   mi->master_log_pos+= inc_pos;
  1446.   DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
  1447.   pthread_mutex_unlock(&mi->data_lock);
  1448.   DBUG_RETURN(0);
  1449. }
  1450. /*
  1451.   queue_event()
  1452. */
  1453. int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
  1454. {
  1455.   int error= 0;
  1456.   ulong inc_pos;
  1457.   RELAY_LOG_INFO *rli= &mi->rli;
  1458.   pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
  1459.   DBUG_ENTER("queue_event");
  1460.   if (mi->old_format)
  1461.     DBUG_RETURN(queue_old_event(mi,buf,event_len));
  1462.   pthread_mutex_lock(&mi->data_lock);
  1463.   switch (buf[EVENT_TYPE_OFFSET]) {
  1464.   case STOP_EVENT:
  1465.     /*
  1466.       We needn't write this event to the relay log. Indeed, it just indicates a
  1467.       master server shutdown. The only thing this does is cleaning. But
  1468.       cleaning is already done on a per-master-thread basis (as the master
  1469.       server is shutting down cleanly, it has written all DROP TEMPORARY TABLE
  1470.       and DO RELEASE_LOCK; prepared statements' deletion are TODO).
  1471.       
  1472.       We don't even increment mi->master_log_pos, because we may be just after
  1473.       a Rotate event. Btw, in a few milliseconds we are going to have a Start
  1474.       event from the next binlog (unless the master is presently running
  1475.       without --log-bin).
  1476.     */
  1477.     goto err;
  1478.   case ROTATE_EVENT:
  1479.   {
  1480.     Rotate_log_event rev(buf,event_len,0);
  1481.     if (unlikely(process_io_rotate(mi,&rev)))
  1482.     {
  1483.       error= 1;
  1484.       goto err;
  1485.     }
  1486.     /*
  1487.       Now the I/O thread has just changed its mi->master_log_name, so
  1488.       incrementing mi->master_log_pos is nonsense.
  1489.     */
  1490.     inc_pos= 0;
  1491.     break;
  1492.   }
  1493.   default:
  1494.     inc_pos= event_len;
  1495.     break;
  1496.   }
  1497.   /* 
  1498.      If this event is originating from this server, don't queue it. 
  1499.      We don't check this for 3.23 events because it's simpler like this; 3.23
  1500.      will be filtered anyway by the SQL slave thread which also tests the
  1501.      server id (we must also keep this test in the SQL thread, in case somebody
  1502.      upgrades a 4.0 slave which has a not-filtered relay log).
  1503.      ANY event coming from ourselves can be ignored: it is obvious for queries;
  1504.      for STOP_EVENT/ROTATE_EVENT/START_EVENT: these cannot come from ourselves
  1505.      (--log-slave-updates would not log that) unless this slave is also its
  1506.      direct master (an unsupported, useless setup!).
  1507.   */
  1508.   pthread_mutex_lock(log_lock);
  1509.   if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
  1510.       !replicate_same_server_id)
  1511.   {
  1512.     /*
  1513.       Do not write it to the relay log.
  1514.       a) We still want to increment mi->master_log_pos, so that we won't
  1515.       re-read this event from the master if the slave IO thread is now
  1516.       stopped/restarted (more efficient if the events we are ignoring are big
  1517.       LOAD DATA INFILE).
  1518.       b) We want to record that we are skipping events, for the information of
  1519.       the slave SQL thread, otherwise that thread may let
  1520.       rli->group_relay_log_pos stay too small if the last binlog's event is
  1521.       ignored.
  1522.     */
  1523.     mi->master_log_pos+= inc_pos;
  1524.     memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
  1525.     DBUG_ASSERT(rli->ign_master_log_name_end[0]);
  1526.     rli->ign_master_log_pos_end= mi->master_log_pos;
  1527.     rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
  1528.     DBUG_PRINT("info", ("master_log_pos: %d, event originating from the same server, ignored", (ulong) mi->master_log_pos));
  1529.   }  
  1530.   else
  1531.   {
  1532.     /* write the event to the relay log */
  1533.     if (likely(!(error= rli->relay_log.appendv(buf,event_len,0))))
  1534.     {
  1535.       mi->master_log_pos+= inc_pos;
  1536.       DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
  1537.       rli->relay_log.harvest_bytes_written(&rli->log_space_total);
  1538.     }
  1539.     rli->ign_master_log_name_end[0]= 0; // last event is not ignored
  1540.   }
  1541.   pthread_mutex_unlock(log_lock);
  1542. err:
  1543.   pthread_mutex_unlock(&mi->data_lock);
  1544.   DBUG_RETURN(error);
  1545. }
  1546. void end_relay_log_info(RELAY_LOG_INFO* rli)
  1547. {
  1548.   DBUG_ENTER("end_relay_log_info");
  1549.   if (!rli->inited)
  1550.     DBUG_VOID_RETURN;
  1551.   if (rli->info_fd >= 0)
  1552.   {
  1553.     end_io_cache(&rli->info_file);
  1554.     (void) my_close(rli->info_fd, MYF(MY_WME));
  1555.     rli->info_fd = -1;
  1556.   }
  1557.   if (rli->cur_log_fd >= 0)
  1558.   {
  1559.     end_io_cache(&rli->cache_buf);
  1560.     (void)my_close(rli->cur_log_fd, MYF(MY_WME));
  1561.     rli->cur_log_fd = -1;
  1562.   }
  1563.   rli->inited = 0;
  1564.   rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
  1565.   /*
  1566.     Delete the slave's temporary tables from memory.
  1567.     In the future there will be other actions than this, to ensure persistance
  1568.     of slave's temp tables after shutdown.
  1569.   */
  1570.   rli->close_temporary_tables();
  1571.   DBUG_VOID_RETURN;
  1572. }
  1573. /*
  1574.   Try to connect until successful or slave killed
  1575.   SYNPOSIS
  1576.     safe_connect()
  1577.     thd Thread handler for slave
  1578.     mysql MySQL connection handle
  1579.     mi Replication handle
  1580.   RETURN
  1581.     0 ok
  1582.     # Error
  1583. */
  1584. static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
  1585. {
  1586.   return connect_to_master(thd, mysql, mi, 0, 0);
  1587. }
  1588. /*
  1589.   SYNPOSIS
  1590.     connect_to_master()
  1591.   IMPLEMENTATION
  1592.     Try to connect until successful or slave killed or we have retried
  1593.     master_retry_count times
  1594. */
  1595. static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
  1596.      bool reconnect, bool suppress_warnings)
  1597. {
  1598.   int slave_was_killed;
  1599.   int last_errno= -2; // impossible error
  1600.   ulong err_count=0;
  1601.   char llbuff[22];
  1602.   DBUG_ENTER("connect_to_master");
  1603. #ifndef DBUG_OFF
  1604.   events_till_disconnect = disconnect_slave_event_count;
  1605. #endif
  1606.   ulong client_flag= CLIENT_REMEMBER_OPTIONS;
  1607.   if (opt_slave_compressed_protocol)
  1608.     client_flag=CLIENT_COMPRESS; /* We will use compression */
  1609.   mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
  1610.   mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
  1611.  
  1612. #ifdef HAVE_OPENSSL
  1613.   if (mi->ssl)
  1614.     mysql_ssl_set(mysql, 
  1615.                   mi->ssl_key[0]?mi->ssl_key:0,
  1616.                   mi->ssl_cert[0]?mi->ssl_cert:0, 
  1617.                   mi->ssl_ca[0]?mi->ssl_ca:0,
  1618.                   mi->ssl_capath[0]?mi->ssl_capath:0,
  1619.                   mi->ssl_cipher[0]?mi->ssl_cipher:0);
  1620. #endif
  1621.   mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
  1622.   /* This one is not strictly needed but we have it here for completeness */
  1623.   mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir);
  1624.   while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
  1625.  (reconnect ? mysql_reconnect(mysql) != 0 :
  1626.   mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0,
  1627.      mi->port, 0, client_flag) == 0))
  1628.   {
  1629.     /* Don't repeat last error */
  1630.     if ((int)mysql_errno(mysql) != last_errno)
  1631.     {
  1632.       last_errno=mysql_errno(mysql);
  1633.       suppress_warnings= 0;
  1634.       sql_print_error("Slave I/O thread: error %s to master 
  1635. '%s@%s:%d': 
  1636. Error: '%s'  errno: %d  retry-time: %d  retries: %d",
  1637.       (reconnect ? "reconnecting" : "connecting"),
  1638.       mi->user,mi->host,mi->port,
  1639.       mysql_error(mysql), last_errno,
  1640.       mi->connect_retry,
  1641.       master_retry_count);
  1642.     }
  1643.     /*
  1644.       By default we try forever. The reason is that failure will trigger
  1645.       master election, so if the user did not set master_retry_count we
  1646.       do not want to have election triggered on the first failure to
  1647.       connect
  1648.     */
  1649.     if (++err_count == master_retry_count)
  1650.     {
  1651.       slave_was_killed=1;
  1652.       if (reconnect)
  1653.         change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
  1654.       break;
  1655.     }
  1656.     safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
  1657.        (void*)mi);
  1658.   }
  1659.   if (!slave_was_killed)
  1660.   {
  1661.     if (reconnect)
  1662.     { 
  1663.       if (!suppress_warnings && global_system_variables.log_warnings)
  1664. sql_print_information("Slave: connected to master '%s@%s:%d',
  1665. replication resumed in log '%s' at position %s", mi->user,
  1666. mi->host, mi->port,
  1667. IO_RPL_LOG_NAME,
  1668. llstr(mi->master_log_pos,llbuff));
  1669.     }
  1670.     else
  1671.     {
  1672.       change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
  1673.       mysql_log.write(thd, COM_CONNECT_OUT, "%s@%s:%d",
  1674.       mi->user, mi->host, mi->port);
  1675.     }
  1676. #ifdef SIGNAL_WITH_VIO_CLOSE
  1677.     thd->set_active_vio(mysql->net.vio);
  1678. #endif      
  1679.   }
  1680.   DBUG_PRINT("exit",("slave_was_killed: %d", slave_was_killed));
  1681.   DBUG_RETURN(slave_was_killed);
  1682. }
  1683. /*
  1684.   safe_reconnect()
  1685.   IMPLEMENTATION
  1686.     Try to connect until successful or slave killed or we have retried
  1687.     master_retry_count times
  1688. */
  1689. static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
  1690.   bool suppress_warnings)
  1691. {
  1692.   DBUG_ENTER("safe_reconnect");
  1693.   DBUG_RETURN(connect_to_master(thd, mysql, mi, 1, suppress_warnings));
  1694. }
  1695. /*
  1696.   Store the file and position where the execute-slave thread are in the
  1697.   relay log.
  1698.   SYNOPSIS
  1699.     flush_relay_log_info()
  1700.     rli Relay log information
  1701.   NOTES
  1702.     - As this is only called by the slave thread, we don't need to
  1703.       have a lock on this.
  1704.     - If there is an active transaction, then we don't update the position
  1705.       in the relay log.  This is to ensure that we re-execute statements
  1706.       if we die in the middle of an transaction that was rolled back.
  1707.     - As a transaction never spans binary logs, we don't have to handle the
  1708.       case where we do a relay-log-rotation in the middle of the transaction.
  1709.       If this would not be the case, we would have to ensure that we
  1710.       don't delete the relay log file where the transaction started when
  1711.       we switch to a new relay log file.
  1712.   TODO
  1713.     - Change the log file information to a binary format to avoid calling
  1714.       longlong2str.
  1715.   RETURN VALUES
  1716.     0 ok
  1717.     1 write error
  1718. */
  1719. bool flush_relay_log_info(RELAY_LOG_INFO* rli)
  1720. {
  1721.   bool error=0;
  1722.   IO_CACHE *file = &rli->info_file;
  1723.   char buff[FN_REFLEN*2+22*2+4], *pos;
  1724.   my_b_seek(file, 0L);
  1725.   pos=strmov(buff, rli->group_relay_log_name);
  1726.   *pos++='n';
  1727.   pos=longlong2str(rli->group_relay_log_pos, pos, 10);
  1728.   *pos++='n';
  1729.   pos=strmov(pos, rli->group_master_log_name);
  1730.   *pos++='n';
  1731.   pos=longlong2str(rli->group_master_log_pos, pos, 10);
  1732.   *pos='n';
  1733.   if (my_b_write(file, (byte*) buff, (ulong) (pos-buff)+1))
  1734.     error=1;
  1735.   if (flush_io_cache(file))
  1736.     error=1;
  1737.   /* Flushing the relay log is done by the slave I/O thread */
  1738.   return error;
  1739. }
  1740. /*
  1741.   Called when we notice that the current "hot" log got rotated under our feet.
  1742. */
  1743. static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg)
  1744. {
  1745.   DBUG_ASSERT(rli->cur_log != &rli->cache_buf);
  1746.   DBUG_ASSERT(rli->cur_log_fd == -1);
  1747.   DBUG_ENTER("reopen_relay_log");
  1748.   IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
  1749.   if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name,
  1750.    errmsg)) <0)
  1751.     DBUG_RETURN(0);
  1752.   /*
  1753.     We want to start exactly where we was before:
  1754.     relay_log_pos Current log pos
  1755.     pending Number of bytes already processed from the event
  1756.   */
  1757.   my_b_seek(cur_log,rli->event_relay_log_pos);
  1758.   DBUG_RETURN(cur_log);
  1759. }
  1760. Log_event* next_event(RELAY_LOG_INFO* rli)
  1761. {
  1762.   Log_event* ev;
  1763.   IO_CACHE* cur_log = rli->cur_log;
  1764.   pthread_mutex_t *log_lock = rli->relay_log.get_log_lock(); 
  1765.   const char* errmsg=0;
  1766.   THD* thd = rli->sql_thd;
  1767.   
  1768.   DBUG_ENTER("next_event");
  1769.   DBUG_ASSERT(thd != 0);
  1770.   /*
  1771.     For most operations we need to protect rli members with data_lock,
  1772.     so we assume calling function acquired this mutex for us and we will
  1773.     hold it for the most of the loop below However, we will release it
  1774.     whenever it is worth the hassle,  and in the cases when we go into a
  1775.     pthread_cond_wait() with the non-data_lock mutex
  1776.   */
  1777.   safe_mutex_assert_owner(&rli->data_lock);
  1778.   
  1779.   while (!sql_slave_killed(thd,rli))
  1780.   {
  1781.     /*
  1782.       We can have two kinds of log reading:
  1783.       hot_log:
  1784.         rli->cur_log points at the IO_CACHE of relay_log, which
  1785.         is actively being updated by the I/O thread. We need to be careful
  1786.         in this case and make sure that we are not looking at a stale log that
  1787.         has already been rotated. If it has been, we reopen the log.
  1788.       The other case is much simpler:
  1789.         We just have a read only log that nobody else will be updating.
  1790.     */
  1791.     bool hot_log;
  1792.     if ((hot_log = (cur_log != &rli->cache_buf)))
  1793.     {
  1794.       DBUG_ASSERT(rli->cur_log_fd == -1); // foreign descriptor
  1795.       pthread_mutex_lock(log_lock);
  1796.       /*
  1797. Reading xxx_file_id is safe because the log will only
  1798. be rotated when we hold relay_log.LOCK_log
  1799.       */
  1800.       if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
  1801.       {
  1802. // The master has switched to a new log file; Reopen the old log file
  1803. cur_log=reopen_relay_log(rli, &errmsg);
  1804. pthread_mutex_unlock(log_lock);
  1805. if (!cur_log) // No more log files
  1806.   goto err;
  1807. hot_log=0; // Using old binary log
  1808.       }
  1809.     }
  1810. #ifndef DBUG_OFF
  1811.     {
  1812.       char llbuf1[22], llbuf2[22];
  1813.       DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
  1814.       /*
  1815. The next assertion sometimes (very rarely) fails, let's try to track
  1816. it
  1817.       */
  1818.       DBUG_PRINT("info", ("
  1819. Before assert, my_b_tell(cur_log)=%s  rli->event_relay_log_pos=%s",
  1820.                           llstr(my_b_tell(cur_log),llbuf1), 
  1821.                           llstr(rli->group_relay_log_pos,llbuf2)));
  1822.        DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos);
  1823.     }
  1824. #endif
  1825.     /*
  1826.       Relay log is always in new format - if the master is 3.23, the
  1827.       I/O thread will convert the format for us
  1828.     */
  1829.     if ((ev=Log_event::read_log_event(cur_log,0,(bool)0 /* new format */)))
  1830.     {
  1831.       DBUG_ASSERT(thd==rli->sql_thd);
  1832.       if (hot_log)
  1833. pthread_mutex_unlock(log_lock);
  1834.       DBUG_RETURN(ev);
  1835.     }
  1836.     DBUG_ASSERT(thd==rli->sql_thd);
  1837.     if (opt_reckless_slave) // For mysql-test
  1838.       cur_log->error = 0;
  1839.     if (cur_log->error < 0)
  1840.     {
  1841.       errmsg = "slave SQL thread aborted because of I/O error";
  1842.       if (hot_log)
  1843. pthread_mutex_unlock(log_lock);
  1844.       goto err;
  1845.     }
  1846.     if (!cur_log->error) /* EOF */
  1847.     {
  1848.       /*
  1849. On a hot log, EOF means that there are no more updates to
  1850. process and we must block until I/O thread adds some and
  1851. signals us to continue
  1852.       */
  1853.       if (hot_log)
  1854.       {
  1855.         /*
  1856.           We say in Seconds_Behind_Master that we have "caught up". Note that
  1857.           for example if network link is broken but I/O slave thread hasn't
  1858.           noticed it (slave_net_timeout not elapsed), then we'll say "caught
  1859.           up" whereas we're not really caught up. Fixing that would require
  1860.           internally cutting timeout in smaller pieces in network read, no
  1861.           thanks. Another example: SQL has caught up on I/O, now I/O has read
  1862.           a new event and is queuing it; the false "0" will exist until SQL
  1863.           finishes executing the new event; it will be look abnormal only if
  1864.           the events have old timestamps (then you get "many", 0, "many").
  1865.           Transient phases like this can't really be fixed.
  1866.         */
  1867.         time_t save_timestamp= rli->last_master_timestamp;
  1868.         rli->last_master_timestamp= 0;
  1869. DBUG_ASSERT(rli->relay_log.get_open_count() ==
  1870.                     rli->cur_log_old_open_count);
  1871.         if (rli->ign_master_log_name_end[0])
  1872.         {
  1873.           /* We generate and return a Rotate, to make our positions advance */
  1874.           DBUG_PRINT("info",("seeing an ignored end segment"));
  1875.           ev= new Rotate_log_event(thd, rli->ign_master_log_name_end,
  1876.                                    0, rli->ign_master_log_pos_end,
  1877.                                    Rotate_log_event::DUP_NAME |
  1878.                                    Rotate_log_event::ZERO_LEN);
  1879.           rli->ign_master_log_name_end[0]= 0;
  1880.           pthread_mutex_unlock(log_lock);
  1881.           if (unlikely(!ev))
  1882.           {
  1883.             errmsg= "Slave SQL thread failed to create a Rotate event "
  1884.               "(out of memory?), SHOW SLAVE STATUS may be inaccurate";
  1885.             goto err;
  1886.           }
  1887.           ev->server_id= 0; // don't be ignored by slave SQL thread
  1888.           DBUG_RETURN(ev);
  1889.         }
  1890. /*
  1891.   We can, and should release data_lock while we are waiting for
  1892.   update. If we do not, show slave status will block
  1893. */
  1894. pthread_mutex_unlock(&rli->data_lock);
  1895.         /*
  1896.           Possible deadlock : 
  1897.           - the I/O thread has reached log_space_limit
  1898.           - the SQL thread has read all relay logs, but cannot purge for some
  1899.           reason:
  1900.             * it has already purged all logs except the current one
  1901.             * there are other logs than the current one but they're involved in
  1902.             a transaction that finishes in the current one (or is not finished)
  1903.           Solution :
  1904.           Wake up the possibly waiting I/O thread, and set a boolean asking
  1905.           the I/O thread to temporarily ignore the log_space_limit
  1906.           constraint, because we do not want the I/O thread to block because of
  1907.           space (it's ok if it blocks for any other reason (e.g. because the
  1908.           master does not send anything). Then the I/O thread stops waiting 
  1909.           and reads more events.
  1910.           The SQL thread decides when the I/O thread should take log_space_limit
  1911.           into account again : ignore_log_space_limit is reset to 0 
  1912.           in purge_first_log (when the SQL thread purges the just-read relay
  1913.           log), and also when the SQL thread starts. We should also reset
  1914.           ignore_log_space_limit to 0 when the user does RESET SLAVE, but in
  1915.           fact, no need as RESET SLAVE requires that the slave
  1916.           be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
  1917.           it stops.
  1918.         */
  1919.         pthread_mutex_lock(&rli->log_space_lock);
  1920.         // prevent the I/O thread from blocking next times
  1921.         rli->ignore_log_space_limit= 1; 
  1922.         /*
  1923.           If the I/O thread is blocked, unblock it.
  1924.           Ok to broadcast after unlock, because the mutex is only destroyed in
  1925.           ~st_relay_log_info(), i.e. when rli is destroyed, and rli will not be
  1926.           destroyed before we exit the present function.
  1927.         */
  1928.         pthread_mutex_unlock(&rli->log_space_lock);
  1929.         pthread_cond_broadcast(&rli->log_space_cond);
  1930.         // Note that wait_for_update unlocks lock_log !
  1931.         rli->relay_log.wait_for_update(rli->sql_thd, 1);
  1932.         // re-acquire data lock since we released it earlier
  1933.         pthread_mutex_lock(&rli->data_lock);
  1934.         rli->last_master_timestamp= save_timestamp;
  1935. continue;
  1936.       }
  1937.       /*
  1938. If the log was not hot, we need to move to the next log in
  1939. sequence. The next log could be hot or cold, we deal with both
  1940. cases separately after doing some common initialization
  1941.       */
  1942.       end_io_cache(cur_log);
  1943.       DBUG_ASSERT(rli->cur_log_fd >= 0);
  1944.       my_close(rli->cur_log_fd, MYF(MY_WME));
  1945.       rli->cur_log_fd = -1;
  1946.       if (relay_log_purge)
  1947.       {
  1948. /*
  1949.           purge_first_log will properly set up relay log coordinates in rli.
  1950.           If the group's coordinates are equal to the event's coordinates
  1951.           (i.e. the relay log was not rotated in the middle of a group),
  1952.           we can purge this relay log too.
  1953.           We do ulonglong and string comparisons, this may be slow but
  1954.           - purging the last relay log is nice (it can save 1GB of disk), so we
  1955.           like to detect the case where we can do it, and given this,
  1956.           - I see no better detection method
  1957.           - purge_first_log is not called that often
  1958.         */
  1959. if (rli->relay_log.purge_first_log
  1960.             (rli,
  1961.              rli->group_relay_log_pos == rli->event_relay_log_pos
  1962.              && !strcmp(rli->group_relay_log_name,rli->event_relay_log_name)))
  1963. {
  1964.   errmsg = "Error purging processed logs";
  1965.   goto err;
  1966. }
  1967.       }
  1968.       else
  1969.       {
  1970. /*
  1971.   If hot_log is set, then we already have a lock on
  1972.   LOCK_log.  If not, we have to get the lock.
  1973.   According to Sasha, the only time this code will ever be executed
  1974.   is if we are recovering from a bug.
  1975. */
  1976. if (rli->relay_log.find_next_log(&rli->linfo, !hot_log))
  1977. {
  1978.   errmsg = "error switching to the next log";
  1979.   goto err;
  1980. }
  1981. rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
  1982. strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
  1983. sizeof(rli->event_relay_log_name)-1);
  1984. flush_relay_log_info(rli);
  1985.       }
  1986.       /*
  1987.         Now we want to open this next log. To know if it's a hot log (the one
  1988.         being written by the I/O thread now) or a cold log, we can use
  1989.         is_active(); if it is hot, we use the I/O cache; if it's cold we open
  1990.         the file normally. But if is_active() reports that the log is hot, this
  1991.         may change between the test and the consequence of the test. So we may
  1992.         open the I/O cache whereas the log is now cold, which is nonsense.
  1993.         To guard against this, we need to have LOCK_log.
  1994.       */
  1995.       DBUG_PRINT("info",("hot_log: %d",hot_log));
  1996.       if (!hot_log) /* if hot_log, we already have this mutex */
  1997.         pthread_mutex_lock(log_lock);
  1998.       if (rli->relay_log.is_active(rli->linfo.log_file_name))
  1999.       {
  2000. #ifdef EXTRA_DEBUG
  2001. if (global_system_variables.log_warnings)
  2002.   sql_print_error("next log '%s' is currently active",
  2003.   rli->linfo.log_file_name);
  2004. #endif   
  2005. rli->cur_log= cur_log= rli->relay_log.get_log_file();
  2006. rli->cur_log_old_open_count= rli->relay_log.get_open_count();
  2007. DBUG_ASSERT(rli->cur_log_fd == -1);
  2008.   
  2009. /*
  2010.   Read pointer has to be at the start since we are the only
  2011.   reader.
  2012.           We must keep the LOCK_log to read the 4 first bytes, as this is a hot
  2013.           log (same as when we call read_log_event() above: for a hot log we
  2014.           take the mutex).
  2015. */
  2016. if (check_binlog_magic(cur_log,&errmsg))
  2017.         {
  2018.           if (!hot_log) pthread_mutex_unlock(log_lock);
  2019.   goto err;
  2020.         }
  2021.         if (!hot_log) pthread_mutex_unlock(log_lock);
  2022. continue;
  2023.       }
  2024.       if (!hot_log) pthread_mutex_unlock(log_lock);
  2025.       /*
  2026. if we get here, the log was not hot, so we will have to open it
  2027. ourselves. We are sure that the log is still not hot now (a log can get
  2028. from hot to cold, but not from cold to hot). No need for LOCK_log.
  2029.       */
  2030. #ifdef EXTRA_DEBUG
  2031.       if (global_system_variables.log_warnings)
  2032. sql_print_error("next log '%s' is not active",
  2033. rli->linfo.log_file_name);
  2034. #endif   
  2035.       // open_binlog() will check the magic header
  2036.       if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
  2037.        &errmsg)) <0)
  2038. goto err;
  2039.     }
  2040.     else
  2041.     {
  2042.       /*
  2043. Read failed with a non-EOF error.
  2044. TODO: come up with something better to handle this error
  2045.       */
  2046.       if (hot_log)
  2047. pthread_mutex_unlock(log_lock);
  2048.       sql_print_error("Slave SQL thread: I/O error reading 
  2049. event(errno: %d  cur_log->error: %d)",
  2050.       my_errno,cur_log->error);
  2051.       // set read position to the beginning of the event
  2052.       my_b_seek(cur_log,rli->event_relay_log_pos);
  2053.       /* otherwise, we have had a partial read */
  2054.       errmsg = "Aborting slave SQL thread because of partial event read";
  2055.       break; // To end of function
  2056.     }
  2057.   }
  2058.   if (!errmsg && global_system_variables.log_warnings)
  2059.     errmsg = "slave SQL thread was killed";
  2060. err:
  2061.   if (errmsg)
  2062.     sql_print_error("Error reading relay log event: %s", errmsg);
  2063.   DBUG_RETURN(0);
  2064. }
  2065. /*
  2066.   Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation
  2067.   because of size is simpler because when we do it we already have all relevant
  2068.   locks; here we don't, so this function is mainly taking locks). 
  2069.   Returns nothing as we cannot catch any error (MYSQL_LOG::new_file() is void).
  2070. */
  2071. void rotate_relay_log(MASTER_INFO* mi)
  2072. {
  2073.   DBUG_ENTER("rotate_relay_log");
  2074.   RELAY_LOG_INFO* rli= &mi->rli;
  2075.   lock_slave_threads(mi);
  2076.   pthread_mutex_lock(&mi->data_lock);
  2077.   pthread_mutex_lock(&rli->data_lock);
  2078.   /* 
  2079.      We need to test inited because otherwise, new_file() will attempt to lock
  2080.      LOCK_log, which may not be inited (if we're not a slave).
  2081.   */
  2082.   if (!rli->inited)
  2083.   {
  2084.     DBUG_PRINT("info", ("rli->inited == 0"));
  2085.     goto end;
  2086.   }
  2087.   /* If the relay log is closed, new_file() will do nothing. */
  2088.   rli->relay_log.new_file(1);
  2089.   /*
  2090.     We harvest now, because otherwise BIN_LOG_HEADER_SIZE will not immediately
  2091.     be counted, so imagine a succession of FLUSH LOGS  and assume the slave
  2092.     threads are started:
  2093.     relay_log_space decreases by the size of the deleted relay log, but does
  2094.     not increase, so flush-after-flush we may become negative, which is wrong.
  2095.     Even if this will be corrected as soon as a query is replicated on the
  2096.     slave (because the I/O thread will then call harvest_bytes_written() which
  2097.     will harvest all these BIN_LOG_HEADER_SIZE we forgot), it may give strange
  2098.     output in SHOW SLAVE STATUS meanwhile. So we harvest now.
  2099.     If the log is closed, then this will just harvest the last writes, probably
  2100.     0 as they probably have been harvested.
  2101.   */
  2102.   rli->relay_log.harvest_bytes_written(&rli->log_space_total);
  2103. end:
  2104.   pthread_mutex_unlock(&rli->data_lock);
  2105.   pthread_mutex_unlock(&mi->data_lock);
  2106.   unlock_slave_threads(mi);
  2107.   DBUG_VOID_RETURN;
  2108. }
  2109. #ifdef __GNUC__
  2110. template class I_List_iterator<i_string>;
  2111. template class I_List_iterator<i_string_pair>;
  2112. #endif
  2113. #endif /* HAVE_REPLICATION */