slave.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:144k
- 2. Then the recorded value for master is 1 and the recorded value for
- slave is 2. At SHOW SLAVE STATUS time, assume that the difference
- between timestamp of slave and rli->last_master_timestamp is 0
- (i.e. they are in the same second), then we get 0-(2-1)=-1 as a result.
- This confuses users, so we don't go below 0: hence the max().
- last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
- special marker to say "consider we have caught up".
- */
- protocol->store((longlong)(mi->rli.last_master_timestamp ? max(0, tmp)
- : 0));
- }
- else
- protocol->store_null();
- pthread_mutex_unlock(&mi->rli.data_lock);
- pthread_mutex_unlock(&mi->data_lock);
-
- if (my_net_write(&thd->net, (char*)thd->packet.ptr(), packet->length()))
- DBUG_RETURN(-1);
- }
- send_eof(thd);
- DBUG_RETURN(0);
- }
- bool flush_master_info(MASTER_INFO* mi, bool flush_relay_log_cache)
- {
- IO_CACHE* file = &mi->file;
- char lbuf[22];
- DBUG_ENTER("flush_master_info");
- DBUG_PRINT("enter",("master_pos: %ld", (long) mi->master_log_pos));
- /*
- Flush the relay log to disk. If we don't do it, then the relay log while
- have some part (its last kilobytes) in memory only, so if the slave server
- dies now, with, say, from master's position 100 to 150 in memory only (not
- on disk), and with position 150 in master.info, then when the slave
- restarts, the I/O thread will fetch binlogs from 150, so in the relay log
- we will have "[0, 100] U [150, infinity[" and nobody will notice it, so the
- SQL thread will jump from 100 to 150, and replication will silently break.
- When we come to this place in code, relay log may or not be initialized;
- the caller is responsible for setting 'flush_relay_log_cache' accordingly.
- */
- if (flush_relay_log_cache)
- flush_io_cache(mi->rli.relay_log.get_log_file());
- /*
- We flushed the relay log BEFORE the master.info file, because if we crash
- now, we will get a duplicate event in the relay log at restart. If we
- flushed in the other order, we would get a hole in the relay log.
- And duplicate is better than hole (with a duplicate, in later versions we
- can add detection and scrap one event; with a hole there's nothing we can
- do).
- */
- /*
- In certain cases this code may create master.info files that seems
- corrupted, because of extra lines filled with garbage in the end
- file (this happens if new contents take less space than previous
- contents of file). But because of number of lines in the first line
- of file we don't care about this garbage.
- */
-
- my_b_seek(file, 0L);
- my_b_printf(file, "%un%sn%sn%sn%sn%sn%dn%dn%dn%sn%sn%sn%sn%sn",
- LINES_IN_MASTER_INFO_WITH_SSL,
- mi->master_log_name, llstr(mi->master_log_pos, lbuf),
- mi->host, mi->user,
- mi->password, mi->port, mi->connect_retry,
- (int)(mi->ssl), mi->ssl_ca, mi->ssl_capath, mi->ssl_cert,
- mi->ssl_cipher, mi->ssl_key);
- flush_io_cache(file);
- DBUG_RETURN(0);
- }
- st_relay_log_info::st_relay_log_info()
- :info_fd(-1), cur_log_fd(-1), save_temporary_tables(0),
- cur_log_old_open_count(0), group_master_log_pos(0), log_space_total(0),
- ignore_log_space_limit(0), last_master_timestamp(0), slave_skip_counter(0),
- abort_pos_wait(0), slave_run_id(0), sql_thd(0), last_slave_errno(0),
- inited(0), abort_slave(0), slave_running(0), until_condition(UNTIL_NONE),
- until_log_pos(0), retried_trans(0)
- {
- group_relay_log_name[0]= event_relay_log_name[0]=
- group_master_log_name[0]= 0;
- last_slave_error[0]= until_log_name[0]= ign_master_log_name_end[0]= 0;
- bzero((char*) &info_file, sizeof(info_file));
- bzero((char*) &cache_buf, sizeof(cache_buf));
- pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
- pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
- pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST);
- pthread_cond_init(&data_cond, NULL);
- pthread_cond_init(&start_cond, NULL);
- pthread_cond_init(&stop_cond, NULL);
- pthread_cond_init(&log_space_cond, NULL);
- relay_log.init_pthread_objects();
- }
- st_relay_log_info::~st_relay_log_info()
- {
- pthread_mutex_destroy(&run_lock);
- pthread_mutex_destroy(&data_lock);
- pthread_mutex_destroy(&log_space_lock);
- pthread_cond_destroy(&data_cond);
- pthread_cond_destroy(&start_cond);
- pthread_cond_destroy(&stop_cond);
- pthread_cond_destroy(&log_space_cond);
- }
- /*
- Waits until the SQL thread reaches (has executed up to) the
- log/position or timed out.
- SYNOPSIS
- wait_for_pos()
- thd client thread that sent SELECT MASTER_POS_WAIT
- log_name log name to wait for
- log_pos position to wait for
- timeout timeout in seconds before giving up waiting
- NOTES
- timeout is longlong whereas it should be ulong ; but this is
- to catch if the user submitted a negative timeout.
- RETURN VALUES
- -2 improper arguments (log_pos<0)
- or slave not running, or master info changed
- during the function's execution,
- or client thread killed. -2 is translated to NULL by caller
- -1 timed out
- >=0 number of log events the function had to wait
- before reaching the desired log/position
- */
- int st_relay_log_info::wait_for_pos(THD* thd, String* log_name,
- longlong log_pos,
- longlong timeout)
- {
- if (!inited)
- return -1;
- int event_count = 0;
- ulong init_abort_pos_wait;
- int error=0;
- struct timespec abstime; // for timeout checking
- set_timespec(abstime,timeout);
- DBUG_ENTER("wait_for_pos");
- DBUG_PRINT("enter",("group_master_log_name: '%s' pos: %lu timeout: %ld",
- group_master_log_name, (ulong) group_master_log_pos,
- (long) timeout));
- pthread_mutex_lock(&data_lock);
- const char *msg= thd->enter_cond(&data_cond, &data_lock,
- "Waiting for the slave SQL thread to "
- "advance position");
- /*
- This function will abort when it notices that some CHANGE MASTER or
- RESET MASTER has changed the master info.
- To catch this, these commands modify abort_pos_wait ; We just monitor
- abort_pos_wait and see if it has changed.
- Why do we have this mechanism instead of simply monitoring slave_running
- in the loop (we do this too), as CHANGE MASTER/RESET SLAVE require that
- the SQL thread be stopped?
- This is becasue if someones does:
- STOP SLAVE;CHANGE MASTER/RESET SLAVE; START SLAVE;
- the change may happen very quickly and we may not notice that
- slave_running briefly switches between 1/0/1.
- */
- init_abort_pos_wait= abort_pos_wait;
- /*
- We'll need to
- handle all possible log names comparisons (e.g. 999 vs 1000).
- We use ulong for string->number conversion ; this is no
- stronger limitation than in find_uniq_filename in sql/log.cc
- */
- ulong log_name_extension;
- char log_name_tmp[FN_REFLEN]; //make a char[] from String
- strmake(log_name_tmp, log_name->ptr(), min(log_name->length(), FN_REFLEN-1));
- char *p= fn_ext(log_name_tmp);
- char *p_end;
- if (!*p || log_pos<0)
- {
- error= -2; //means improper arguments
- goto err;
- }
- // Convert 0-3 to 4
- log_pos= max(log_pos, BIN_LOG_HEADER_SIZE);
- /* p points to '.' */
- log_name_extension= strtoul(++p, &p_end, 10);
- /*
- p_end points to the first invalid character.
- If it equals to p, no digits were found, error.
- If it contains ' ' it means conversion went ok.
- */
- if (p_end==p || *p_end)
- {
- error= -2;
- goto err;
- }
- /* The "compare and wait" main loop */
- while (!thd->killed &&
- init_abort_pos_wait == abort_pos_wait &&
- slave_running)
- {
- bool pos_reached;
- int cmp_result= 0;
- /*
- group_master_log_name can be "", if we are just after a fresh
- replication start or after a CHANGE MASTER TO MASTER_HOST/PORT
- (before we have executed one Rotate event from the master) or
- (rare) if the user is doing a weird slave setup (see next
- paragraph). If group_master_log_name is "", we assume we don't
- have enough info to do the comparison yet, so we just wait until
- more data. In this case master_log_pos is always 0 except if
- somebody (wrongly) sets this slave to be a slave of itself
- without using --replicate-same-server-id (an unsupported
- configuration which does nothing), then group_master_log_pos
- will grow and group_master_log_name will stay "".
- */
- if (*group_master_log_name)
- {
- char *basename= (group_master_log_name +
- dirname_length(group_master_log_name));
- /*
- First compare the parts before the extension.
- Find the dot in the master's log basename,
- and protect against user's input error :
- if the names do not match up to '.' included, return error
- */
- char *q= (char*)(fn_ext(basename)+1);
- if (strncmp(basename, log_name_tmp, (int)(q-basename)))
- {
- error= -2;
- break;
- }
- // Now compare extensions.
- char *q_end;
- ulong group_master_log_name_extension= strtoul(q, &q_end, 10);
- if (group_master_log_name_extension < log_name_extension)
- cmp_result= -1 ;
- else
- cmp_result= (group_master_log_name_extension > log_name_extension) ? 1 : 0 ;
- pos_reached= ((!cmp_result && group_master_log_pos >= (ulonglong)log_pos) ||
- cmp_result > 0);
- if (pos_reached || thd->killed)
- break;
- }
- //wait for master update, with optional timeout.
-
- DBUG_PRINT("info",("Waiting for master update"));
- /*
- We are going to pthread_cond_(timed)wait(); if the SQL thread stops it
- will wake us up.
- */
- if (timeout > 0)
- {
- /*
- Note that pthread_cond_timedwait checks for the timeout
- before for the condition ; i.e. it returns ETIMEDOUT
- if the system time equals or exceeds the time specified by abstime
- before the condition variable is signaled or broadcast, _or_ if
- the absolute time specified by abstime has already passed at the time
- of the call.
- For that reason, pthread_cond_timedwait will do the "timeoutting" job
- even if its condition is always immediately signaled (case of a loaded
- master).
- */
- error=pthread_cond_timedwait(&data_cond, &data_lock, &abstime);
- }
- else
- pthread_cond_wait(&data_cond, &data_lock);
- DBUG_PRINT("info",("Got signal of master update or timed out"));
- if (error == ETIMEDOUT || error == ETIME)
- {
- error= -1;
- break;
- }
- error=0;
- event_count++;
- DBUG_PRINT("info",("Testing if killed or SQL thread not running"));
- }
- err:
- thd->exit_cond(msg);
- DBUG_PRINT("exit",("killed: %d abort: %d slave_running: %d
- improper_arguments: %d timed_out: %d",
- (int) thd->killed,
- (int) (init_abort_pos_wait != abort_pos_wait),
- (int) slave_running,
- (int) (error == -2),
- (int) (error == -1)));
- if (thd->killed || init_abort_pos_wait != abort_pos_wait ||
- !slave_running)
- {
- error= -2;
- }
- DBUG_RETURN( error ? error : event_count );
- }
- /*
- init_slave_thread()
- */
- static int init_slave_thread(THD* thd, SLAVE_THD_TYPE thd_type)
- {
- DBUG_ENTER("init_slave_thread");
- thd->system_thread = (thd_type == SLAVE_THD_SQL) ?
- SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
- thd->host_or_ip= "";
- my_net_init(&thd->net, 0);
- thd->net.read_timeout = slave_net_timeout;
- thd->master_access= ~(ulong)0;
- thd->priv_user = 0;
- thd->slave_thread = 1;
- /*
- It's nonsense to constrain the slave threads with max_join_size; if a
- query succeeded on master, we HAVE to execute it. So set
- OPTION_BIG_SELECTS. Setting max_join_size to HA_POS_ERROR is not enough
- (and it's not needed if we have OPTION_BIG_SELECTS) because an INSERT
- SELECT examining more than 4 billion rows would still fail (yes, because
- when max_join_size is 4G, OPTION_BIG_SELECTS is automatically set, but
- only for client threads.
- */
- thd->options = ((opt_log_slave_updates) ? OPTION_BIN_LOG:0) |
- OPTION_AUTO_IS_NULL | OPTION_BIG_SELECTS;
- thd->client_capabilities = CLIENT_LOCAL_FILES;
- thd->real_id=pthread_self();
- pthread_mutex_lock(&LOCK_thread_count);
- thd->thread_id = thread_id++;
- pthread_mutex_unlock(&LOCK_thread_count);
- if (init_thr_lock() || thd->store_globals())
- {
- thd->cleanup();
- delete thd;
- DBUG_RETURN(-1);
- }
- #if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__)
- sigset_t set;
- VOID(sigemptyset(&set)); // Get mask in use
- VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
- #endif
- if (thd_type == SLAVE_THD_SQL)
- thd->proc_info= "Waiting for the next event in relay log";
- else
- thd->proc_info= "Waiting for master update";
- thd->version=refresh_version;
- thd->set_time();
- DBUG_RETURN(0);
- }
- static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
- void* thread_killed_arg)
- {
- int nap_time;
- thr_alarm_t alarmed;
- thr_alarm_init(&alarmed);
- time_t start_time= time((time_t*) 0);
- time_t end_time= start_time+sec;
- while ((nap_time= (int) (end_time - start_time)) > 0)
- {
- ALARM alarm_buff;
- /*
- The only reason we are asking for alarm is so that
- we will be woken up in case of murder, so if we do not get killed,
- set the alarm so it goes off after we wake up naturally
- */
- thr_alarm(&alarmed, 2 * nap_time, &alarm_buff);
- sleep(nap_time);
- thr_end_alarm(&alarmed);
-
- if ((*thread_killed)(thd,thread_killed_arg))
- return 1;
- start_time=time((time_t*) 0);
- }
- return 0;
- }
- static int request_dump(MYSQL* mysql, MASTER_INFO* mi,
- bool *suppress_warnings)
- {
- char buf[FN_REFLEN + 10];
- int len;
- int binlog_flags = 0; // for now
- char* logname = mi->master_log_name;
- DBUG_ENTER("request_dump");
- // TODO if big log files: Change next to int8store()
- int4store(buf, (ulong) mi->master_log_pos);
- int2store(buf + 4, binlog_flags);
- int4store(buf + 6, server_id);
- len = (uint) strlen(logname);
- memcpy(buf + 10, logname,len);
- if (simple_command(mysql, COM_BINLOG_DUMP, buf, len + 10, 1))
- {
- /*
- Something went wrong, so we will just reconnect and retry later
- in the future, we should do a better error analysis, but for
- now we just fill up the error log :-)
- */
- if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
- *suppress_warnings= 1; // Suppress reconnect warning
- else
- sql_print_error("Error on COM_BINLOG_DUMP: %d %s, will retry in %d secs",
- mysql_errno(mysql), mysql_error(mysql),
- master_connect_retry);
- DBUG_RETURN(1);
- }
- DBUG_RETURN(0);
- }
- static int request_table_dump(MYSQL* mysql, const char* db, const char* table)
- {
- char buf[1024];
- char * p = buf;
- uint table_len = (uint) strlen(table);
- uint db_len = (uint) strlen(db);
- if (table_len + db_len > sizeof(buf) - 2)
- {
- sql_print_error("request_table_dump: Buffer overrun");
- return 1;
- }
-
- *p++ = db_len;
- memcpy(p, db, db_len);
- p += db_len;
- *p++ = table_len;
- memcpy(p, table, table_len);
-
- if (simple_command(mysql, COM_TABLE_DUMP, buf, p - buf + table_len, 1))
- {
- sql_print_error("request_table_dump: Error sending the table dump
- command");
- return 1;
- }
- return 0;
- }
- /*
- Read one event from the master
-
- SYNOPSIS
- read_event()
- mysql MySQL connection
- mi Master connection information
- suppress_warnings TRUE when a normal net read timeout has caused us to
- try a reconnect. We do not want to print anything to
- the error log in this case because this a anormal
- event in an idle server.
- RETURN VALUES
- 'packet_error' Error
- number Length of packet
- */
- static ulong read_event(MYSQL* mysql, MASTER_INFO *mi, bool* suppress_warnings)
- {
- ulong len;
- *suppress_warnings= 0;
- /*
- my_real_read() will time us out
- We check if we were told to die, and if not, try reading again
- TODO: Move 'events_till_disconnect' to the MASTER_INFO structure
- */
- #ifndef DBUG_OFF
- if (disconnect_slave_event_count && !(events_till_disconnect--))
- return packet_error;
- #endif
-
- len = net_safe_read(mysql);
- if (len == packet_error || (long) len < 1)
- {
- if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
- {
- /*
- We are trying a normal reconnect after a read timeout;
- we suppress prints to .err file as long as the reconnect
- happens without problems
- */
- *suppress_warnings= TRUE;
- }
- else
- sql_print_error("Error reading packet from server: %s (
- server_errno=%d)",
- mysql_error(mysql), mysql_errno(mysql));
- return packet_error;
- }
- /* Check if eof packet */
- if (len < 8 && mysql->net.read_pos[0] == 254)
- {
- sql_print_error("Slave: received end packet from server, apparent
- master shutdown: %s",
- mysql_error(mysql));
- return packet_error;
- }
-
- DBUG_PRINT("info",( "len=%u, net->read_pos[4] = %dn",
- len, mysql->net.read_pos[4]));
- return len - 1;
- }
- int check_expected_error(THD* thd, RELAY_LOG_INFO* rli, int expected_error)
- {
- switch (expected_error) {
- case ER_NET_READ_ERROR:
- case ER_NET_ERROR_ON_WRITE:
- case ER_SERVER_SHUTDOWN:
- case ER_NEW_ABORTING_CONNECTION:
- return 1;
- default:
- return 0;
- }
- }
- /*
- Check if condition stated in UNTIL clause of START SLAVE is reached.
- SYNOPSYS
- st_relay_log_info::is_until_satisfied()
- DESCRIPTION
- Checks if UNTIL condition is reached. Uses caching result of last
- comparison of current log file name and target log file name. So cached
- value should be invalidated if current log file name changes
- (see st_relay_log_info::notify_... functions).
-
- This caching is needed to avoid of expensive string comparisons and
- strtol() conversions needed for log names comparison. We don't need to
- compare them each time this function is called, we only need to do this
- when current log name changes. If we have UNTIL_MASTER_POS condition we
- need to do this only after Rotate_log_event::exec_event() (which is
- rare, so caching gives real benifit), and if we have UNTIL_RELAY_POS
- condition then we should invalidate cached comarison value after
- inc_group_relay_log_pos() which called for each group of events (so we
- have some benefit if we have something like queries that use
- autoincrement or if we have transactions).
-
- Should be called ONLY if until_condition != UNTIL_NONE !
- RETURN VALUE
- true - condition met or error happened (condition seems to have
- bad log file name)
- false - condition not met
- */
- bool st_relay_log_info::is_until_satisfied()
- {
- const char *log_name;
- ulonglong log_pos;
- DBUG_ASSERT(until_condition != UNTIL_NONE);
-
- if (until_condition == UNTIL_MASTER_POS)
- {
- log_name= group_master_log_name;
- log_pos= group_master_log_pos;
- }
- else
- { /* until_condition == UNTIL_RELAY_POS */
- log_name= group_relay_log_name;
- log_pos= group_relay_log_pos;
- }
-
- if (until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_UNKNOWN)
- {
- /*
- We have no cached comaprison results so we should compare log names
- and cache result
- */
- DBUG_ASSERT(*log_name || log_pos == 0);
-
- if (*log_name)
- {
- const char *basename= log_name + dirname_length(log_name);
-
- const char *q= (const char*)(fn_ext(basename)+1);
- if (strncmp(basename, until_log_name, (int)(q-basename)) == 0)
- {
- /* Now compare extensions. */
- char *q_end;
- ulong log_name_extension= strtoul(q, &q_end, 10);
- if (log_name_extension < until_log_name_extension)
- until_log_names_cmp_result= UNTIL_LOG_NAMES_CMP_LESS;
- else
- until_log_names_cmp_result=
- (log_name_extension > until_log_name_extension) ?
- UNTIL_LOG_NAMES_CMP_GREATER : UNTIL_LOG_NAMES_CMP_EQUAL ;
- }
- else
- {
- /* Probably error so we aborting */
- sql_print_error("Slave SQL thread is stopped because UNTIL "
- "condition is bad.");
- return TRUE;
- }
- }
- else
- return until_log_pos == 0;
- }
-
- return ((until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_EQUAL &&
- log_pos >= until_log_pos) ||
- until_log_names_cmp_result == UNTIL_LOG_NAMES_CMP_GREATER);
- }
- static int exec_relay_log_event(THD* thd, RELAY_LOG_INFO* rli)
- {
- /*
- We acquire this mutex since we need it for all operations except
- event execution. But we will release it in places where we will
- wait for something for example inside of next_event().
- */
- pthread_mutex_lock(&rli->data_lock);
-
- /*
- This tests if the position of the end of the last previous executed event
- hits the UNTIL barrier.
- We would prefer to test if the position of the start (or possibly) end of
- the to-be-read event hits the UNTIL barrier, this is different if there
- was an event ignored by the I/O thread just before (BUG#13861 to be
- fixed).
- */
- if (rli->until_condition!=RELAY_LOG_INFO::UNTIL_NONE &&
- rli->is_until_satisfied())
- {
- char buf[22];
- sql_print_error("Slave SQL thread stopped because it reached its"
- " UNTIL position %s", llstr(rli->until_pos(), buf));
- /*
- Setting abort_slave flag because we do not want additional message about
- error in query execution to be printed.
- */
- rli->abort_slave= 1;
- pthread_mutex_unlock(&rli->data_lock);
- return 1;
- }
-
- Log_event * ev = next_event(rli);
-
- DBUG_ASSERT(rli->sql_thd==thd);
-
- if (sql_slave_killed(thd,rli))
- {
- pthread_mutex_unlock(&rli->data_lock);
- delete ev;
- return 1;
- }
- if (ev)
- {
- int type_code = ev->get_type_code();
- int exec_res;
- /*
- Skip queries originating from this server or number of
- queries specified by the user in slave_skip_counter
- We can't however skip event's that has something to do with the
- log files themselves.
- */
- if ((ev->server_id == (uint32) ::server_id && !replicate_same_server_id) ||
- (rli->slave_skip_counter && type_code != ROTATE_EVENT))
- {
- rli->inc_group_relay_log_pos(ev->get_event_len(),
- type_code != STOP_EVENT ? ev->log_pos : LL(0),
- 1/* skip lock*/);
- flush_relay_log_info(rli);
- /*
- Protect against common user error of setting the counter to 1
- instead of 2 while recovering from an failed auto-increment insert
- */
- if (rli->slave_skip_counter &&
- !((type_code == INTVAR_EVENT || type_code == STOP_EVENT) &&
- rli->slave_skip_counter == 1))
- --rli->slave_skip_counter;
- pthread_mutex_unlock(&rli->data_lock);
- delete ev;
- return 0; // avoid infinite update loops
- }
- pthread_mutex_unlock(&rli->data_lock);
-
- thd->server_id = ev->server_id; // use the original server id for logging
- thd->set_time(); // time the query
- thd->lex->current_select= 0;
- if (!ev->when)
- ev->when = time(NULL);
- ev->thd = thd;
- exec_res = ev->exec_event(rli);
- DBUG_ASSERT(rli->sql_thd==thd);
- delete ev;
- if (slave_trans_retries)
- {
- if (exec_res &&
- (thd->net.last_errno == ER_LOCK_DEADLOCK ||
- thd->net.last_errno == ER_LOCK_WAIT_TIMEOUT) &&
- !thd->is_fatal_error)
- {
- const char *errmsg;
- /*
- We were in a transaction which has been rolled back because of a
- deadlock (currently, InnoDB deadlock detected by InnoDB) or lock
- wait timeout (innodb_lock_wait_timeout exceeded); let's seek back to
- BEGIN log event and retry it all again.
- We have to not only seek but also
- a) init_master_info(), to seek back to hot relay log's start for later
- (for when we will come back to this hot log after re-processing the
- possibly existing old logs where BEGIN is: check_binlog_magic() will
- then need the cache to be at position 0 (see comments at beginning of
- init_master_info()).
- b) init_relay_log_pos(), because the BEGIN may be an older relay log.
- */
- if (rli->trans_retries < slave_trans_retries)
- {
- if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL))
- sql_print_error("Failed to initialize the master info structure");
- else if (init_relay_log_pos(rli,
- rli->group_relay_log_name,
- rli->group_relay_log_pos,
- 1, &errmsg))
- sql_print_error("Error initializing relay log position: %s",
- errmsg);
- else
- {
- exec_res= 0;
- /* chance for concurrent connection to get more locks */
- safe_sleep(thd, min(rli->trans_retries, MAX_SLAVE_RETRY_PAUSE),
- (CHECK_KILLED_FUNC)sql_slave_killed, (void*)rli);
- pthread_mutex_lock(&rli->data_lock); // because of SHOW STATUS
- rli->trans_retries++;
- rli->retried_trans++;
- pthread_mutex_unlock(&rli->data_lock);
- DBUG_PRINT("info", ("Slave retries transaction "
- "rli->trans_retries: %lu", rli->trans_retries));
- }
- }
- else
- sql_print_error("Slave SQL thread retried transaction %lu time(s) "
- "in vain, giving up. Consider raising the value of "
- "the slave_transaction_retries variable.",
- slave_trans_retries);
- }
- if (!((thd->options & OPTION_BEGIN) && opt_using_transactions))
- rli->trans_retries= 0; // restart from fresh
- }
- return exec_res;
- }
- else
- {
- pthread_mutex_unlock(&rli->data_lock);
- slave_print_error(rli, 0, "
- Could not parse relay log event entry. The possible reasons are: the master's
- binary log is corrupted (you can check this by running 'mysqlbinlog' on the
- binary log), the slave's relay log is corrupted (you can check this by running
- 'mysqlbinlog' on the relay log), a network problem, or a bug in the master's
- or slave's MySQL code. If you want to check the master's binary log or slave's
- relay log, you will be able to know their names by issuing 'SHOW SLAVE STATUS'
- on this slave.
- ");
- return 1;
- }
- }
- /* Slave I/O Thread entry point */
- extern "C" pthread_handler_decl(handle_slave_io,arg)
- {
- THD *thd; // needs to be first for thread_stack
- MYSQL *mysql;
- MASTER_INFO *mi = (MASTER_INFO*)arg;
- RELAY_LOG_INFO *rli= &mi->rli;
- char llbuff[22];
- uint retry_count;
-
- // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
- my_thread_init();
- DBUG_ENTER("handle_slave_io");
- #ifndef DBUG_OFF
- slave_begin:
- #endif
- DBUG_ASSERT(mi->inited);
- mysql= NULL ;
- retry_count= 0;
- pthread_mutex_lock(&mi->run_lock);
- /* Inform waiting threads that slave has started */
- mi->slave_run_id++;
- #ifndef DBUG_OFF
- mi->events_till_abort = abort_slave_event_count;
- #endif
-
- thd= new THD; // note that contructor of THD uses DBUG_ !
- THD_CHECK_SENTRY(thd);
- pthread_detach_this_thread();
- if (init_slave_thread(thd, SLAVE_THD_IO))
- {
- pthread_cond_broadcast(&mi->start_cond);
- pthread_mutex_unlock(&mi->run_lock);
- sql_print_error("Failed during slave I/O thread initialization");
- goto err;
- }
- mi->io_thd = thd;
- thd->thread_stack = (char*)&thd; // remember where our stack is
- pthread_mutex_lock(&LOCK_thread_count);
- threads.append(thd);
- pthread_mutex_unlock(&LOCK_thread_count);
- mi->slave_running = 1;
- mi->abort_slave = 0;
- pthread_mutex_unlock(&mi->run_lock);
- pthread_cond_broadcast(&mi->start_cond);
-
- DBUG_PRINT("master_info",("log_file_name: '%s' position: %s",
- mi->master_log_name,
- llstr(mi->master_log_pos,llbuff)));
-
- if (!(mi->mysql = mysql = mysql_init(NULL)))
- {
- sql_print_error("Slave I/O thread: error in mysql_init()");
- goto err;
- }
-
- thd->proc_info = "Connecting to master";
- // we can get killed during safe_connect
- if (!safe_connect(thd, mysql, mi))
- sql_print_information("Slave I/O thread: connected to master '%s@%s:%d',
- replication started in log '%s' at position %s", mi->user,
- mi->host, mi->port,
- IO_RPL_LOG_NAME,
- llstr(mi->master_log_pos,llbuff));
- else
- {
- sql_print_error("Slave I/O thread killed while connecting to master");
- goto err;
- }
- connected:
- // TODO: the assignment below should be under mutex (5.0)
- mi->slave_running= MYSQL_SLAVE_RUN_CONNECT;
- thd->slave_net = &mysql->net;
- thd->proc_info = "Checking master version";
- if (get_master_version_and_clock(mysql, mi))
- goto err;
- if (!mi->old_format)
- {
- /*
- Register ourselves with the master.
- If fails, this is not fatal - we just print the error message and go
- on with life.
- */
- thd->proc_info = "Registering slave on master";
- if (register_slave_on_master(mysql) || update_slave_list(mysql, mi))
- goto err;
- }
-
- DBUG_PRINT("info",("Starting reading binary log from master"));
- while (!io_slave_killed(thd,mi))
- {
- bool suppress_warnings= 0;
- thd->proc_info = "Requesting binlog dump";
- if (request_dump(mysql, mi, &suppress_warnings))
- {
- sql_print_error("Failed on request_dump()");
- if (io_slave_killed(thd,mi))
- {
- sql_print_error("Slave I/O thread killed while requesting master
- dump");
- goto err;
- }
-
- mi->slave_running= MYSQL_SLAVE_RUN_NOT_CONNECT;
- thd->proc_info= "Waiting to reconnect after a failed binlog dump request";
- #ifdef SIGNAL_WITH_VIO_CLOSE
- thd->clear_active_vio();
- #endif
- end_server(mysql);
- /*
- First time retry immediately, assuming that we can recover
- right away - if first time fails, sleep between re-tries
- hopefuly the admin can fix the problem sometime
- */
- if (retry_count++)
- {
- if (retry_count > master_retry_count)
- goto err; // Don't retry forever
- safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
- (void*)mi);
- }
- if (io_slave_killed(thd,mi))
- {
- sql_print_error("Slave I/O thread killed while retrying master
- dump");
- goto err;
- }
- thd->proc_info = "Reconnecting after a failed binlog dump request";
- if (!suppress_warnings)
- sql_print_error("Slave I/O thread: failed dump request,
- reconnecting to try again, log '%s' at postion %s", IO_RPL_LOG_NAME,
- llstr(mi->master_log_pos,llbuff));
- if (safe_reconnect(thd, mysql, mi, suppress_warnings) ||
- io_slave_killed(thd,mi))
- {
- sql_print_error("Slave I/O thread killed during or
- after reconnect");
- goto err;
- }
- goto connected;
- }
- while (!io_slave_killed(thd,mi))
- {
- bool suppress_warnings= 0;
- /*
- We say "waiting" because read_event() will wait if there's nothing to
- read. But if there's something to read, it will not wait. The important
- thing is to not confuse users by saying "reading" whereas we're in fact
- receiving nothing.
- */
- thd->proc_info = "Waiting for master to send event";
- ulong event_len = read_event(mysql, mi, &suppress_warnings);
- if (io_slave_killed(thd,mi))
- {
- if (global_system_variables.log_warnings)
- sql_print_error("Slave I/O thread killed while reading event");
- goto err;
- }
-
- if (event_len == packet_error)
- {
- uint mysql_error_number= mysql_errno(mysql);
- if (mysql_error_number == ER_NET_PACKET_TOO_LARGE)
- {
- sql_print_error("
- Log entry on master is longer than max_allowed_packet (%ld) on
- slave. If the entry is correct, restart the server with a higher value of
- max_allowed_packet",
- thd->variables.max_allowed_packet);
- goto err;
- }
- if (mysql_error_number == ER_MASTER_FATAL_ERROR_READING_BINLOG)
- {
- sql_print_error(ER(mysql_error_number), mysql_error_number,
- mysql_error(mysql));
- goto err;
- }
- mi->slave_running= MYSQL_SLAVE_RUN_NOT_CONNECT;
- thd->proc_info = "Waiting to reconnect after a failed master event read";
- #ifdef SIGNAL_WITH_VIO_CLOSE
- thd->clear_active_vio();
- #endif
- end_server(mysql);
- if (retry_count++)
- {
- if (retry_count > master_retry_count)
- goto err; // Don't retry forever
- safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
- (void*) mi);
- }
- if (io_slave_killed(thd,mi))
- {
- if (global_system_variables.log_warnings)
- sql_print_error("Slave I/O thread killed while waiting to
- reconnect after a failed read");
- goto err;
- }
- thd->proc_info = "Reconnecting after a failed master event read";
- if (!suppress_warnings)
- sql_print_error("Slave I/O thread: Failed reading log event,
- reconnecting to retry, log '%s' position %s", IO_RPL_LOG_NAME,
- llstr(mi->master_log_pos, llbuff));
- if (safe_reconnect(thd, mysql, mi, suppress_warnings) ||
- io_slave_killed(thd,mi))
- {
- if (global_system_variables.log_warnings)
- sql_print_error("Slave I/O thread killed during or after a
- reconnect done to recover from failed read");
- goto err;
- }
- goto connected;
- } // if (event_len == packet_error)
-
- retry_count=0; // ok event, reset retry counter
- thd->proc_info = "Queueing master event to the relay log";
- if (queue_event(mi,(const char*)mysql->net.read_pos + 1,
- event_len))
- {
- sql_print_error("Slave I/O thread could not queue event from master");
- goto err;
- }
- flush_master_info(mi, 1); /* sure that we can flush the relay log */
- /*
- See if the relay logs take too much space.
- We don't lock mi->rli.log_space_lock here; this dirty read saves time
- and does not introduce any problem:
- - if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
- the clean value is 0), then we are reading only one more event as we
- should, and we'll block only at the next event. No big deal.
- - if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
- the clean value is 1), then we are going into wait_for_relay_log_space()
- for no reason, but this function will do a clean read, notice the clean
- value and exit immediately.
- */
- #ifndef DBUG_OFF
- {
- char llbuf1[22], llbuf2[22];
- DBUG_PRINT("info", ("log_space_limit=%s log_space_total=%s
- ignore_log_space_limit=%d",
- llstr(rli->log_space_limit,llbuf1),
- llstr(rli->log_space_total,llbuf2),
- (int) rli->ignore_log_space_limit));
- }
- #endif
- if (rli->log_space_limit && rli->log_space_limit <
- rli->log_space_total &&
- !rli->ignore_log_space_limit)
- if (wait_for_relay_log_space(rli))
- {
- sql_print_error("Slave I/O thread aborted while waiting for relay
- log space");
- goto err;
- }
- // TODO: check debugging abort code
- #ifndef DBUG_OFF
- if (abort_slave_event_count && !--events_till_abort)
- {
- sql_print_error("Slave I/O thread: debugging abort");
- goto err;
- }
- #endif
- }
- }
- // error = 0;
- err:
- // print the current replication position
- sql_print_error("Slave I/O thread exiting, read up to log '%s', position %s",
- IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
- VOID(pthread_mutex_lock(&LOCK_thread_count));
- thd->query = thd->db = 0; // extra safety
- thd->query_length= thd->db_length= 0;
- VOID(pthread_mutex_unlock(&LOCK_thread_count));
- if (mysql)
- {
- mysql_close(mysql);
- mi->mysql=0;
- }
- write_ignored_events_info_to_relay_log(thd, mi);
- thd->proc_info = "Waiting for slave mutex on exit";
- pthread_mutex_lock(&mi->run_lock);
- mi->slave_running = 0;
- mi->io_thd = 0;
- // TODO: make rpl_status part of MASTER_INFO
- change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
- mi->abort_slave = 0; // TODO: check if this is needed
- DBUG_ASSERT(thd->net.buff != 0);
- net_end(&thd->net); // destructor will not free it, because net.vio is 0
- close_thread_tables(thd, 0);
- pthread_mutex_lock(&LOCK_thread_count);
- THD_CHECK_SENTRY(thd);
- delete thd;
- pthread_mutex_unlock(&LOCK_thread_count);
- pthread_cond_broadcast(&mi->stop_cond); // tell the world we are done
- pthread_mutex_unlock(&mi->run_lock);
- #ifndef DBUG_OFF
- if (abort_slave_event_count && !events_till_abort)
- goto slave_begin;
- #endif
- my_thread_end();
- pthread_exit(0);
- DBUG_RETURN(0); // Can't return anything here
- }
- /* Slave SQL Thread entry point */
- extern "C" pthread_handler_decl(handle_slave_sql,arg)
- {
- THD *thd; /* needs to be first for thread_stack */
- char llbuff[22],llbuff1[22];
- RELAY_LOG_INFO* rli = &((MASTER_INFO*)arg)->rli;
- const char *errmsg;
- // needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
- my_thread_init();
- DBUG_ENTER("handle_slave_sql");
- #ifndef DBUG_OFF
- slave_begin:
- #endif
- DBUG_ASSERT(rli->inited);
- pthread_mutex_lock(&rli->run_lock);
- DBUG_ASSERT(!rli->slave_running);
- errmsg= 0;
- #ifndef DBUG_OFF
- rli->events_till_abort = abort_slave_event_count;
- #endif
- thd = new THD; // note that contructor of THD uses DBUG_ !
- thd->thread_stack = (char*)&thd; // remember where our stack is
-
- /* Inform waiting threads that slave has started */
- rli->slave_run_id++;
- pthread_detach_this_thread();
- if (init_slave_thread(thd, SLAVE_THD_SQL))
- {
- /*
- TODO: this is currently broken - slave start and change master
- will be stuck if we fail here
- */
- pthread_cond_broadcast(&rli->start_cond);
- pthread_mutex_unlock(&rli->run_lock);
- sql_print_error("Failed during slave thread initialization");
- goto err;
- }
- thd->init_for_queries();
- rli->sql_thd= thd;
- thd->temporary_tables = rli->save_temporary_tables; // restore temp tables
- pthread_mutex_lock(&LOCK_thread_count);
- threads.append(thd);
- pthread_mutex_unlock(&LOCK_thread_count);
- /*
- We are going to set slave_running to 1. Assuming slave I/O thread is
- alive and connected, this is going to make Seconds_Behind_Master be 0
- i.e. "caught up". Even if we're just at start of thread. Well it's ok, at
- the moment we start we can think we are caught up, and the next second we
- start receiving data so we realize we are not caught up and
- Seconds_Behind_Master grows. No big deal.
- */
- rli->slave_running = 1;
- rli->abort_slave = 0;
- pthread_mutex_unlock(&rli->run_lock);
- pthread_cond_broadcast(&rli->start_cond);
- /*
- Reset errors for a clean start (otherwise, if the master is idle, the SQL
- thread may execute no Query_log_event, so the error will remain even
- though there's no problem anymore). Do not reset the master timestamp
- (imagine the slave has caught everything, the STOP SLAVE and START SLAVE:
- as we are not sure that we are going to receive a query, we want to
- remember the last master timestamp (to say how many seconds behind we are
- now.
- But the master timestamp is reset by RESET SLAVE & CHANGE MASTER.
- */
- clear_slave_error(rli);
- //tell the I/O thread to take relay_log_space_limit into account from now on
- pthread_mutex_lock(&rli->log_space_lock);
- rli->ignore_log_space_limit= 0;
- pthread_mutex_unlock(&rli->log_space_lock);
- rli->trans_retries= 0; // start from "no error"
- if (init_relay_log_pos(rli,
- rli->group_relay_log_name,
- rli->group_relay_log_pos,
- 1 /*need data lock*/, &errmsg))
- {
- sql_print_error("Error initializing relay log position: %s",
- errmsg);
- goto err;
- }
- THD_CHECK_SENTRY(thd);
- DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
- DBUG_ASSERT(my_b_tell(rli->cur_log) == rli->event_relay_log_pos);
- DBUG_ASSERT(rli->sql_thd == thd);
- DBUG_PRINT("master_info",("log_file_name: %s position: %s",
- rli->group_master_log_name,
- llstr(rli->group_master_log_pos,llbuff)));
- if (global_system_variables.log_warnings)
- sql_print_information("Slave SQL thread initialized, starting replication in
- log '%s' at position %s, relay log '%s' position: %s", RPL_LOG_NAME,
- llstr(rli->group_master_log_pos,llbuff),rli->group_relay_log_name,
- llstr(rli->group_relay_log_pos,llbuff1));
- /* execute init_slave variable */
- if (sys_init_slave.value_length)
- {
- execute_init_command(thd, &sys_init_slave, &LOCK_sys_init_slave);
- if (thd->query_error)
- {
- sql_print_error("
- Slave SQL thread aborted. Can't execute init_slave query");
- goto err;
- }
- }
- /* Read queries from the IO/THREAD until this thread is killed */
- while (!sql_slave_killed(thd,rli))
- {
- thd->proc_info = "Reading event from the relay log";
- DBUG_ASSERT(rli->sql_thd == thd);
- THD_CHECK_SENTRY(thd);
- if (exec_relay_log_event(thd,rli))
- {
- // do not scare the user if SQL thread was simply killed or stopped
- if (!sql_slave_killed(thd,rli))
- sql_print_error("
- Error running query, slave SQL thread aborted. Fix the problem, and restart
- the slave SQL thread with "SLAVE START". We stopped at log
- '%s' position %s", RPL_LOG_NAME, llstr(rli->group_master_log_pos, llbuff));
- goto err;
- }
- }
- /* Thread stopped. Print the current replication position to the log */
- sql_print_information("Slave SQL thread exiting, replication stopped in log
- '%s' at position %s", RPL_LOG_NAME, llstr(rli->group_master_log_pos,llbuff));
- err:
- VOID(pthread_mutex_lock(&LOCK_thread_count));
- thd->query = thd->db = 0; // extra safety
- thd->query_length= thd->db_length= 0;
- VOID(pthread_mutex_unlock(&LOCK_thread_count));
- thd->proc_info = "Waiting for slave mutex on exit";
- pthread_mutex_lock(&rli->run_lock);
- /* We need data_lock, at least to wake up any waiting master_pos_wait() */
- pthread_mutex_lock(&rli->data_lock);
- DBUG_ASSERT(rli->slave_running == 1); // tracking buffer overrun
- /* When master_pos_wait() wakes up it will check this and terminate */
- rli->slave_running= 0;
- /*
- Going out of the transaction. Necessary to mark it, in case the user
- restarts replication from a non-transactional statement (with CHANGE
- MASTER).
- */
- /* Wake up master_pos_wait() */
- pthread_mutex_unlock(&rli->data_lock);
- DBUG_PRINT("info",("Signaling possibly waiting master_pos_wait() functions"));
- pthread_cond_broadcast(&rli->data_cond);
- rli->ignore_log_space_limit= 0; /* don't need any lock */
- rli->save_temporary_tables = thd->temporary_tables;
- /*
- TODO: see if we can do this conditionally in next_event() instead
- to avoid unneeded position re-init
- */
- thd->temporary_tables = 0; // remove tempation from destructor to close them
- DBUG_ASSERT(thd->net.buff != 0);
- net_end(&thd->net); // destructor will not free it, because we are weird
- DBUG_ASSERT(rli->sql_thd == thd);
- THD_CHECK_SENTRY(thd);
- rli->sql_thd= 0;
- pthread_mutex_lock(&LOCK_thread_count);
- THD_CHECK_SENTRY(thd);
- delete thd;
- pthread_mutex_unlock(&LOCK_thread_count);
- pthread_cond_broadcast(&rli->stop_cond);
- // tell the world we are done
- pthread_mutex_unlock(&rli->run_lock);
- #ifndef DBUG_OFF // TODO: reconsider the code below
- if (abort_slave_event_count && !rli->events_till_abort)
- goto slave_begin;
- #endif
- my_thread_end();
- pthread_exit(0);
- DBUG_RETURN(0); // Can't return anything here
- }
- /*
- process_io_create_file()
- */
- static int process_io_create_file(MASTER_INFO* mi, Create_file_log_event* cev)
- {
- int error = 1;
- ulong num_bytes;
- bool cev_not_written;
- THD *thd = mi->io_thd;
- NET *net = &mi->mysql->net;
- DBUG_ENTER("process_io_create_file");
- if (unlikely(!cev->is_valid()))
- DBUG_RETURN(1);
- /*
- TODO: fix to honor table rules, not only db rules
- */
- if (!db_ok(cev->db, replicate_do_db, replicate_ignore_db))
- {
- skip_load_data_infile(net);
- DBUG_RETURN(0);
- }
- DBUG_ASSERT(cev->inited_from_old);
- thd->file_id = cev->file_id = mi->file_id++;
- thd->server_id = cev->server_id;
- cev_not_written = 1;
-
- if (unlikely(net_request_file(net,cev->fname)))
- {
- sql_print_error("Slave I/O: failed requesting download of '%s'",
- cev->fname);
- goto err;
- }
- /*
- This dummy block is so we could instantiate Append_block_log_event
- once and then modify it slightly instead of doing it multiple times
- in the loop
- */
- {
- Append_block_log_event aev(thd,0,0,0,0);
-
- for (;;)
- {
- if (unlikely((num_bytes=my_net_read(net)) == packet_error))
- {
- sql_print_error("Network read error downloading '%s' from master",
- cev->fname);
- goto err;
- }
- if (unlikely(!num_bytes)) /* eof */
- {
- net_write_command(net, 0, "", 0, "", 0);/* 3.23 master wants it */
- /*
- If we wrote Create_file_log_event, then we need to write
- Execute_load_log_event. If we did not write Create_file_log_event,
- then this is an empty file and we can just do as if the LOAD DATA
- INFILE had not existed, i.e. write nothing.
- */
- if (unlikely(cev_not_written))
- break;
- Execute_load_log_event xev(thd,0,0);
- xev.log_pos = mi->master_log_pos;
- if (unlikely(mi->rli.relay_log.append(&xev)))
- {
- sql_print_error("Slave I/O: error writing Exec_load event to
- relay log");
- goto err;
- }
- mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
- break;
- }
- if (unlikely(cev_not_written))
- {
- cev->block = (char*)net->read_pos;
- cev->block_len = num_bytes;
- cev->log_pos = mi->master_log_pos;
- if (unlikely(mi->rli.relay_log.append(cev)))
- {
- sql_print_error("Slave I/O: error writing Create_file event to
- relay log");
- goto err;
- }
- cev_not_written=0;
- mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
- }
- else
- {
- aev.block = (char*)net->read_pos;
- aev.block_len = num_bytes;
- aev.log_pos = mi->master_log_pos;
- if (unlikely(mi->rli.relay_log.append(&aev)))
- {
- sql_print_error("Slave I/O: error writing Append_block event to
- relay log");
- goto err;
- }
- mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
- }
- }
- }
- error=0;
- err:
- DBUG_RETURN(error);
- }
- /*
- Start using a new binary log on the master
- SYNOPSIS
- process_io_rotate()
- mi master_info for the slave
- rev The rotate log event read from the binary log
- DESCRIPTION
- Updates the master info with the place in the next binary
- log where we should start reading.
- NOTES
- We assume we already locked mi->data_lock
- RETURN VALUES
- 0 ok
- 1 Log event is illegal
- */
- static int process_io_rotate(MASTER_INFO *mi, Rotate_log_event *rev)
- {
- DBUG_ENTER("process_io_rotate");
- safe_mutex_assert_owner(&mi->data_lock);
- if (unlikely(!rev->is_valid()))
- DBUG_RETURN(1);
- /* Safe copy as 'rev' has been "sanitized" in Rotate_log_event's ctor */
- memcpy(mi->master_log_name, rev->new_log_ident, rev->ident_len+1);
- mi->master_log_pos= rev->pos;
- DBUG_PRINT("info", ("master_log_pos: '%s' %d",
- mi->master_log_name, (ulong) mi->master_log_pos));
- #ifndef DBUG_OFF
- /*
- If we do not do this, we will be getting the first
- rotate event forever, so we need to not disconnect after one.
- */
- if (disconnect_slave_event_count)
- events_till_disconnect++;
- #endif
- DBUG_RETURN(0);
- }
- /*
- queue_old_event()
- Writes a 3.23 event to the relay log.
- TODO:
- Test this code before release - it has to be tested on a separate
- setup with 3.23 master
- */
- static int queue_old_event(MASTER_INFO *mi, const char *buf,
- ulong event_len)
- {
- const char *errmsg = 0;
- ulong inc_pos;
- bool ignore_event= 0;
- char *tmp_buf = 0;
- RELAY_LOG_INFO *rli= &mi->rli;
- DBUG_ENTER("queue_old_event");
- /*
- If we get Load event, we need to pass a non-reusable buffer
- to read_log_event, so we do a trick
- */
- if (buf[EVENT_TYPE_OFFSET] == LOAD_EVENT)
- {
- if (unlikely(!(tmp_buf=(char*)my_malloc(event_len+1,MYF(MY_WME)))))
- {
- sql_print_error("Slave I/O: out of memory for Load event");
- DBUG_RETURN(1);
- }
- memcpy(tmp_buf,buf,event_len);
- /*
- Create_file constructor wants a 0 as last char of buffer, this 0 will
- serve as the string-termination char for the file's name (which is at the
- end of the buffer)
- We must increment event_len, otherwise the event constructor will not see
- this end 0, which leads to segfault.
- */
- tmp_buf[event_len++]=0;
- int4store(tmp_buf+EVENT_LEN_OFFSET, event_len);
- buf = (const char*)tmp_buf;
- }
- /*
- This will transform LOAD_EVENT into CREATE_FILE_EVENT, ask the master to
- send the loaded file, and write it to the relay log in the form of
- Append_block/Exec_load (the SQL thread needs the data, as that thread is not
- connected to the master).
- */
- Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
- 1 /*old format*/ );
- if (unlikely(!ev))
- {
- sql_print_error("Read invalid event from master: '%s',
- master could be corrupt but a more likely cause of this is a bug",
- errmsg);
- my_free((char*) tmp_buf, MYF(MY_ALLOW_ZERO_PTR));
- DBUG_RETURN(1);
- }
- pthread_mutex_lock(&mi->data_lock);
- ev->log_pos = mi->master_log_pos;
- switch (ev->get_type_code()) {
- case STOP_EVENT:
- ignore_event= 1;
- inc_pos= event_len;
- break;
- case ROTATE_EVENT:
- if (unlikely(process_io_rotate(mi,(Rotate_log_event*)ev)))
- {
- delete ev;
- pthread_mutex_unlock(&mi->data_lock);
- DBUG_RETURN(1);
- }
- inc_pos= 0;
- break;
- case CREATE_FILE_EVENT:
- /*
- Yes it's possible to have CREATE_FILE_EVENT here, even if we're in
- queue_old_event() which is for 3.23 events which don't comprise
- CREATE_FILE_EVENT. This is because read_log_event() above has just
- transformed LOAD_EVENT into CREATE_FILE_EVENT.
- */
- {
- /* We come here when and only when tmp_buf != 0 */
- DBUG_ASSERT(tmp_buf);
- int error = process_io_create_file(mi,(Create_file_log_event*)ev);
- delete ev;
- /*
- We had incremented event_len, but now when it is used to calculate the
- position in the master's log, we must use the original value.
- */
- mi->master_log_pos += --event_len;
- DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
- pthread_mutex_unlock(&mi->data_lock);
- my_free((char*)tmp_buf, MYF(0));
- DBUG_RETURN(error);
- }
- default:
- inc_pos= event_len;
- break;
- }
- if (likely(!ignore_event))
- {
- if (unlikely(rli->relay_log.append(ev)))
- {
- delete ev;
- pthread_mutex_unlock(&mi->data_lock);
- DBUG_RETURN(1);
- }
- rli->relay_log.harvest_bytes_written(&rli->log_space_total);
- }
- delete ev;
- mi->master_log_pos+= inc_pos;
- DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
- pthread_mutex_unlock(&mi->data_lock);
- DBUG_RETURN(0);
- }
- /*
- queue_event()
- */
- int queue_event(MASTER_INFO* mi,const char* buf, ulong event_len)
- {
- int error= 0;
- ulong inc_pos;
- RELAY_LOG_INFO *rli= &mi->rli;
- pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
- DBUG_ENTER("queue_event");
- if (mi->old_format)
- DBUG_RETURN(queue_old_event(mi,buf,event_len));
- pthread_mutex_lock(&mi->data_lock);
- switch (buf[EVENT_TYPE_OFFSET]) {
- case STOP_EVENT:
- /*
- We needn't write this event to the relay log. Indeed, it just indicates a
- master server shutdown. The only thing this does is cleaning. But
- cleaning is already done on a per-master-thread basis (as the master
- server is shutting down cleanly, it has written all DROP TEMPORARY TABLE
- and DO RELEASE_LOCK; prepared statements' deletion are TODO).
-
- We don't even increment mi->master_log_pos, because we may be just after
- a Rotate event. Btw, in a few milliseconds we are going to have a Start
- event from the next binlog (unless the master is presently running
- without --log-bin).
- */
- goto err;
- case ROTATE_EVENT:
- {
- Rotate_log_event rev(buf,event_len,0);
- if (unlikely(process_io_rotate(mi,&rev)))
- {
- error= 1;
- goto err;
- }
- /*
- Now the I/O thread has just changed its mi->master_log_name, so
- incrementing mi->master_log_pos is nonsense.
- */
- inc_pos= 0;
- break;
- }
- default:
- inc_pos= event_len;
- break;
- }
- /*
- If this event is originating from this server, don't queue it.
- We don't check this for 3.23 events because it's simpler like this; 3.23
- will be filtered anyway by the SQL slave thread which also tests the
- server id (we must also keep this test in the SQL thread, in case somebody
- upgrades a 4.0 slave which has a not-filtered relay log).
- ANY event coming from ourselves can be ignored: it is obvious for queries;
- for STOP_EVENT/ROTATE_EVENT/START_EVENT: these cannot come from ourselves
- (--log-slave-updates would not log that) unless this slave is also its
- direct master (an unsupported, useless setup!).
- */
- pthread_mutex_lock(log_lock);
- if ((uint4korr(buf + SERVER_ID_OFFSET) == ::server_id) &&
- !replicate_same_server_id)
- {
- /*
- Do not write it to the relay log.
- a) We still want to increment mi->master_log_pos, so that we won't
- re-read this event from the master if the slave IO thread is now
- stopped/restarted (more efficient if the events we are ignoring are big
- LOAD DATA INFILE).
- b) We want to record that we are skipping events, for the information of
- the slave SQL thread, otherwise that thread may let
- rli->group_relay_log_pos stay too small if the last binlog's event is
- ignored.
- */
- mi->master_log_pos+= inc_pos;
- memcpy(rli->ign_master_log_name_end, mi->master_log_name, FN_REFLEN);
- DBUG_ASSERT(rli->ign_master_log_name_end[0]);
- rli->ign_master_log_pos_end= mi->master_log_pos;
- rli->relay_log.signal_update(); // the slave SQL thread needs to re-check
- DBUG_PRINT("info", ("master_log_pos: %d, event originating from the same server, ignored", (ulong) mi->master_log_pos));
- }
- else
- {
- /* write the event to the relay log */
- if (likely(!(error= rli->relay_log.appendv(buf,event_len,0))))
- {
- mi->master_log_pos+= inc_pos;
- DBUG_PRINT("info", ("master_log_pos: %d", (ulong) mi->master_log_pos));
- rli->relay_log.harvest_bytes_written(&rli->log_space_total);
- }
- rli->ign_master_log_name_end[0]= 0; // last event is not ignored
- }
- pthread_mutex_unlock(log_lock);
- err:
- pthread_mutex_unlock(&mi->data_lock);
- DBUG_RETURN(error);
- }
- void end_relay_log_info(RELAY_LOG_INFO* rli)
- {
- DBUG_ENTER("end_relay_log_info");
- if (!rli->inited)
- DBUG_VOID_RETURN;
- if (rli->info_fd >= 0)
- {
- end_io_cache(&rli->info_file);
- (void) my_close(rli->info_fd, MYF(MY_WME));
- rli->info_fd = -1;
- }
- if (rli->cur_log_fd >= 0)
- {
- end_io_cache(&rli->cache_buf);
- (void)my_close(rli->cur_log_fd, MYF(MY_WME));
- rli->cur_log_fd = -1;
- }
- rli->inited = 0;
- rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
- /*
- Delete the slave's temporary tables from memory.
- In the future there will be other actions than this, to ensure persistance
- of slave's temp tables after shutdown.
- */
- rli->close_temporary_tables();
- DBUG_VOID_RETURN;
- }
- /*
- Try to connect until successful or slave killed
- SYNPOSIS
- safe_connect()
- thd Thread handler for slave
- mysql MySQL connection handle
- mi Replication handle
- RETURN
- 0 ok
- # Error
- */
- static int safe_connect(THD* thd, MYSQL* mysql, MASTER_INFO* mi)
- {
- return connect_to_master(thd, mysql, mi, 0, 0);
- }
- /*
- SYNPOSIS
- connect_to_master()
- IMPLEMENTATION
- Try to connect until successful or slave killed or we have retried
- master_retry_count times
- */
- static int connect_to_master(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
- bool reconnect, bool suppress_warnings)
- {
- int slave_was_killed;
- int last_errno= -2; // impossible error
- ulong err_count=0;
- char llbuff[22];
- DBUG_ENTER("connect_to_master");
- #ifndef DBUG_OFF
- events_till_disconnect = disconnect_slave_event_count;
- #endif
- ulong client_flag= CLIENT_REMEMBER_OPTIONS;
- if (opt_slave_compressed_protocol)
- client_flag=CLIENT_COMPRESS; /* We will use compression */
- mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
- mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
-
- #ifdef HAVE_OPENSSL
- if (mi->ssl)
- mysql_ssl_set(mysql,
- mi->ssl_key[0]?mi->ssl_key:0,
- mi->ssl_cert[0]?mi->ssl_cert:0,
- mi->ssl_ca[0]?mi->ssl_ca:0,
- mi->ssl_capath[0]?mi->ssl_capath:0,
- mi->ssl_cipher[0]?mi->ssl_cipher:0);
- #endif
- mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
- /* This one is not strictly needed but we have it here for completeness */
- mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir);
- while (!(slave_was_killed = io_slave_killed(thd,mi)) &&
- (reconnect ? mysql_reconnect(mysql) != 0 :
- mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0,
- mi->port, 0, client_flag) == 0))
- {
- /* Don't repeat last error */
- if ((int)mysql_errno(mysql) != last_errno)
- {
- last_errno=mysql_errno(mysql);
- suppress_warnings= 0;
- sql_print_error("Slave I/O thread: error %s to master
- '%s@%s:%d':
- Error: '%s' errno: %d retry-time: %d retries: %d",
- (reconnect ? "reconnecting" : "connecting"),
- mi->user,mi->host,mi->port,
- mysql_error(mysql), last_errno,
- mi->connect_retry,
- master_retry_count);
- }
- /*
- By default we try forever. The reason is that failure will trigger
- master election, so if the user did not set master_retry_count we
- do not want to have election triggered on the first failure to
- connect
- */
- if (++err_count == master_retry_count)
- {
- slave_was_killed=1;
- if (reconnect)
- change_rpl_status(RPL_ACTIVE_SLAVE,RPL_LOST_SOLDIER);
- break;
- }
- safe_sleep(thd,mi->connect_retry,(CHECK_KILLED_FUNC)io_slave_killed,
- (void*)mi);
- }
- if (!slave_was_killed)
- {
- if (reconnect)
- {
- if (!suppress_warnings && global_system_variables.log_warnings)
- sql_print_information("Slave: connected to master '%s@%s:%d',
- replication resumed in log '%s' at position %s", mi->user,
- mi->host, mi->port,
- IO_RPL_LOG_NAME,
- llstr(mi->master_log_pos,llbuff));
- }
- else
- {
- change_rpl_status(RPL_IDLE_SLAVE,RPL_ACTIVE_SLAVE);
- mysql_log.write(thd, COM_CONNECT_OUT, "%s@%s:%d",
- mi->user, mi->host, mi->port);
- }
- #ifdef SIGNAL_WITH_VIO_CLOSE
- thd->set_active_vio(mysql->net.vio);
- #endif
- }
- DBUG_PRINT("exit",("slave_was_killed: %d", slave_was_killed));
- DBUG_RETURN(slave_was_killed);
- }
- /*
- safe_reconnect()
- IMPLEMENTATION
- Try to connect until successful or slave killed or we have retried
- master_retry_count times
- */
- static int safe_reconnect(THD* thd, MYSQL* mysql, MASTER_INFO* mi,
- bool suppress_warnings)
- {
- DBUG_ENTER("safe_reconnect");
- DBUG_RETURN(connect_to_master(thd, mysql, mi, 1, suppress_warnings));
- }
- /*
- Store the file and position where the execute-slave thread are in the
- relay log.
- SYNOPSIS
- flush_relay_log_info()
- rli Relay log information
- NOTES
- - As this is only called by the slave thread, we don't need to
- have a lock on this.
- - If there is an active transaction, then we don't update the position
- in the relay log. This is to ensure that we re-execute statements
- if we die in the middle of an transaction that was rolled back.
- - As a transaction never spans binary logs, we don't have to handle the
- case where we do a relay-log-rotation in the middle of the transaction.
- If this would not be the case, we would have to ensure that we
- don't delete the relay log file where the transaction started when
- we switch to a new relay log file.
- TODO
- - Change the log file information to a binary format to avoid calling
- longlong2str.
- RETURN VALUES
- 0 ok
- 1 write error
- */
- bool flush_relay_log_info(RELAY_LOG_INFO* rli)
- {
- bool error=0;
- IO_CACHE *file = &rli->info_file;
- char buff[FN_REFLEN*2+22*2+4], *pos;
- my_b_seek(file, 0L);
- pos=strmov(buff, rli->group_relay_log_name);
- *pos++='n';
- pos=longlong2str(rli->group_relay_log_pos, pos, 10);
- *pos++='n';
- pos=strmov(pos, rli->group_master_log_name);
- *pos++='n';
- pos=longlong2str(rli->group_master_log_pos, pos, 10);
- *pos='n';
- if (my_b_write(file, (byte*) buff, (ulong) (pos-buff)+1))
- error=1;
- if (flush_io_cache(file))
- error=1;
- /* Flushing the relay log is done by the slave I/O thread */
- return error;
- }
- /*
- Called when we notice that the current "hot" log got rotated under our feet.
- */
- static IO_CACHE *reopen_relay_log(RELAY_LOG_INFO *rli, const char **errmsg)
- {
- DBUG_ASSERT(rli->cur_log != &rli->cache_buf);
- DBUG_ASSERT(rli->cur_log_fd == -1);
- DBUG_ENTER("reopen_relay_log");
- IO_CACHE *cur_log = rli->cur_log=&rli->cache_buf;
- if ((rli->cur_log_fd=open_binlog(cur_log,rli->event_relay_log_name,
- errmsg)) <0)
- DBUG_RETURN(0);
- /*
- We want to start exactly where we was before:
- relay_log_pos Current log pos
- pending Number of bytes already processed from the event
- */
- my_b_seek(cur_log,rli->event_relay_log_pos);
- DBUG_RETURN(cur_log);
- }
- Log_event* next_event(RELAY_LOG_INFO* rli)
- {
- Log_event* ev;
- IO_CACHE* cur_log = rli->cur_log;
- pthread_mutex_t *log_lock = rli->relay_log.get_log_lock();
- const char* errmsg=0;
- THD* thd = rli->sql_thd;
-
- DBUG_ENTER("next_event");
- DBUG_ASSERT(thd != 0);
- /*
- For most operations we need to protect rli members with data_lock,
- so we assume calling function acquired this mutex for us and we will
- hold it for the most of the loop below However, we will release it
- whenever it is worth the hassle, and in the cases when we go into a
- pthread_cond_wait() with the non-data_lock mutex
- */
- safe_mutex_assert_owner(&rli->data_lock);
-
- while (!sql_slave_killed(thd,rli))
- {
- /*
- We can have two kinds of log reading:
- hot_log:
- rli->cur_log points at the IO_CACHE of relay_log, which
- is actively being updated by the I/O thread. We need to be careful
- in this case and make sure that we are not looking at a stale log that
- has already been rotated. If it has been, we reopen the log.
- The other case is much simpler:
- We just have a read only log that nobody else will be updating.
- */
- bool hot_log;
- if ((hot_log = (cur_log != &rli->cache_buf)))
- {
- DBUG_ASSERT(rli->cur_log_fd == -1); // foreign descriptor
- pthread_mutex_lock(log_lock);
- /*
- Reading xxx_file_id is safe because the log will only
- be rotated when we hold relay_log.LOCK_log
- */
- if (rli->relay_log.get_open_count() != rli->cur_log_old_open_count)
- {
- // The master has switched to a new log file; Reopen the old log file
- cur_log=reopen_relay_log(rli, &errmsg);
- pthread_mutex_unlock(log_lock);
- if (!cur_log) // No more log files
- goto err;
- hot_log=0; // Using old binary log
- }
- }
- #ifndef DBUG_OFF
- {
- char llbuf1[22], llbuf2[22];
- DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE);
- /*
- The next assertion sometimes (very rarely) fails, let's try to track
- it
- */
- DBUG_PRINT("info", ("
- Before assert, my_b_tell(cur_log)=%s rli->event_relay_log_pos=%s",
- llstr(my_b_tell(cur_log),llbuf1),
- llstr(rli->group_relay_log_pos,llbuf2)));
- DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos);
- }
- #endif
- /*
- Relay log is always in new format - if the master is 3.23, the
- I/O thread will convert the format for us
- */
- if ((ev=Log_event::read_log_event(cur_log,0,(bool)0 /* new format */)))
- {
- DBUG_ASSERT(thd==rli->sql_thd);
- if (hot_log)
- pthread_mutex_unlock(log_lock);
- DBUG_RETURN(ev);
- }
- DBUG_ASSERT(thd==rli->sql_thd);
- if (opt_reckless_slave) // For mysql-test
- cur_log->error = 0;
- if (cur_log->error < 0)
- {
- errmsg = "slave SQL thread aborted because of I/O error";
- if (hot_log)
- pthread_mutex_unlock(log_lock);
- goto err;
- }
- if (!cur_log->error) /* EOF */
- {
- /*
- On a hot log, EOF means that there are no more updates to
- process and we must block until I/O thread adds some and
- signals us to continue
- */
- if (hot_log)
- {
- /*
- We say in Seconds_Behind_Master that we have "caught up". Note that
- for example if network link is broken but I/O slave thread hasn't
- noticed it (slave_net_timeout not elapsed), then we'll say "caught
- up" whereas we're not really caught up. Fixing that would require
- internally cutting timeout in smaller pieces in network read, no
- thanks. Another example: SQL has caught up on I/O, now I/O has read
- a new event and is queuing it; the false "0" will exist until SQL
- finishes executing the new event; it will be look abnormal only if
- the events have old timestamps (then you get "many", 0, "many").
- Transient phases like this can't really be fixed.
- */
- time_t save_timestamp= rli->last_master_timestamp;
- rli->last_master_timestamp= 0;
- DBUG_ASSERT(rli->relay_log.get_open_count() ==
- rli->cur_log_old_open_count);
- if (rli->ign_master_log_name_end[0])
- {
- /* We generate and return a Rotate, to make our positions advance */
- DBUG_PRINT("info",("seeing an ignored end segment"));
- ev= new Rotate_log_event(thd, rli->ign_master_log_name_end,
- 0, rli->ign_master_log_pos_end,
- Rotate_log_event::DUP_NAME |
- Rotate_log_event::ZERO_LEN);
- rli->ign_master_log_name_end[0]= 0;
- pthread_mutex_unlock(log_lock);
- if (unlikely(!ev))
- {
- errmsg= "Slave SQL thread failed to create a Rotate event "
- "(out of memory?), SHOW SLAVE STATUS may be inaccurate";
- goto err;
- }
- ev->server_id= 0; // don't be ignored by slave SQL thread
- DBUG_RETURN(ev);
- }
- /*
- We can, and should release data_lock while we are waiting for
- update. If we do not, show slave status will block
- */
- pthread_mutex_unlock(&rli->data_lock);
- /*
- Possible deadlock :
- - the I/O thread has reached log_space_limit
- - the SQL thread has read all relay logs, but cannot purge for some
- reason:
- * it has already purged all logs except the current one
- * there are other logs than the current one but they're involved in
- a transaction that finishes in the current one (or is not finished)
- Solution :
- Wake up the possibly waiting I/O thread, and set a boolean asking
- the I/O thread to temporarily ignore the log_space_limit
- constraint, because we do not want the I/O thread to block because of
- space (it's ok if it blocks for any other reason (e.g. because the
- master does not send anything). Then the I/O thread stops waiting
- and reads more events.
- The SQL thread decides when the I/O thread should take log_space_limit
- into account again : ignore_log_space_limit is reset to 0
- in purge_first_log (when the SQL thread purges the just-read relay
- log), and also when the SQL thread starts. We should also reset
- ignore_log_space_limit to 0 when the user does RESET SLAVE, but in
- fact, no need as RESET SLAVE requires that the slave
- be stopped, and the SQL thread sets ignore_log_space_limit to 0 when
- it stops.
- */
- pthread_mutex_lock(&rli->log_space_lock);
- // prevent the I/O thread from blocking next times
- rli->ignore_log_space_limit= 1;
- /*
- If the I/O thread is blocked, unblock it.
- Ok to broadcast after unlock, because the mutex is only destroyed in
- ~st_relay_log_info(), i.e. when rli is destroyed, and rli will not be
- destroyed before we exit the present function.
- */
- pthread_mutex_unlock(&rli->log_space_lock);
- pthread_cond_broadcast(&rli->log_space_cond);
- // Note that wait_for_update unlocks lock_log !
- rli->relay_log.wait_for_update(rli->sql_thd, 1);
- // re-acquire data lock since we released it earlier
- pthread_mutex_lock(&rli->data_lock);
- rli->last_master_timestamp= save_timestamp;
- continue;
- }
- /*
- If the log was not hot, we need to move to the next log in
- sequence. The next log could be hot or cold, we deal with both
- cases separately after doing some common initialization
- */
- end_io_cache(cur_log);
- DBUG_ASSERT(rli->cur_log_fd >= 0);
- my_close(rli->cur_log_fd, MYF(MY_WME));
- rli->cur_log_fd = -1;
-
- if (relay_log_purge)
- {
- /*
- purge_first_log will properly set up relay log coordinates in rli.
- If the group's coordinates are equal to the event's coordinates
- (i.e. the relay log was not rotated in the middle of a group),
- we can purge this relay log too.
- We do ulonglong and string comparisons, this may be slow but
- - purging the last relay log is nice (it can save 1GB of disk), so we
- like to detect the case where we can do it, and given this,
- - I see no better detection method
- - purge_first_log is not called that often
- */
- if (rli->relay_log.purge_first_log
- (rli,
- rli->group_relay_log_pos == rli->event_relay_log_pos
- && !strcmp(rli->group_relay_log_name,rli->event_relay_log_name)))
- {
- errmsg = "Error purging processed logs";
- goto err;
- }
- }
- else
- {
- /*
- If hot_log is set, then we already have a lock on
- LOCK_log. If not, we have to get the lock.
- According to Sasha, the only time this code will ever be executed
- is if we are recovering from a bug.
- */
- if (rli->relay_log.find_next_log(&rli->linfo, !hot_log))
- {
- errmsg = "error switching to the next log";
- goto err;
- }
- rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
- strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
- sizeof(rli->event_relay_log_name)-1);
- flush_relay_log_info(rli);
- }
- /*
- Now we want to open this next log. To know if it's a hot log (the one
- being written by the I/O thread now) or a cold log, we can use
- is_active(); if it is hot, we use the I/O cache; if it's cold we open
- the file normally. But if is_active() reports that the log is hot, this
- may change between the test and the consequence of the test. So we may
- open the I/O cache whereas the log is now cold, which is nonsense.
- To guard against this, we need to have LOCK_log.
- */
- DBUG_PRINT("info",("hot_log: %d",hot_log));
- if (!hot_log) /* if hot_log, we already have this mutex */
- pthread_mutex_lock(log_lock);
- if (rli->relay_log.is_active(rli->linfo.log_file_name))
- {
- #ifdef EXTRA_DEBUG
- if (global_system_variables.log_warnings)
- sql_print_error("next log '%s' is currently active",
- rli->linfo.log_file_name);
- #endif
- rli->cur_log= cur_log= rli->relay_log.get_log_file();
- rli->cur_log_old_open_count= rli->relay_log.get_open_count();
- DBUG_ASSERT(rli->cur_log_fd == -1);
-
- /*
- Read pointer has to be at the start since we are the only
- reader.
- We must keep the LOCK_log to read the 4 first bytes, as this is a hot
- log (same as when we call read_log_event() above: for a hot log we
- take the mutex).
- */
- if (check_binlog_magic(cur_log,&errmsg))
- {
- if (!hot_log) pthread_mutex_unlock(log_lock);
- goto err;
- }
- if (!hot_log) pthread_mutex_unlock(log_lock);
- continue;
- }
- if (!hot_log) pthread_mutex_unlock(log_lock);
- /*
- if we get here, the log was not hot, so we will have to open it
- ourselves. We are sure that the log is still not hot now (a log can get
- from hot to cold, but not from cold to hot). No need for LOCK_log.
- */
- #ifdef EXTRA_DEBUG
- if (global_system_variables.log_warnings)
- sql_print_error("next log '%s' is not active",
- rli->linfo.log_file_name);
- #endif
- // open_binlog() will check the magic header
- if ((rli->cur_log_fd=open_binlog(cur_log,rli->linfo.log_file_name,
- &errmsg)) <0)
- goto err;
- }
- else
- {
- /*
- Read failed with a non-EOF error.
- TODO: come up with something better to handle this error
- */
- if (hot_log)
- pthread_mutex_unlock(log_lock);
- sql_print_error("Slave SQL thread: I/O error reading
- event(errno: %d cur_log->error: %d)",
- my_errno,cur_log->error);
- // set read position to the beginning of the event
- my_b_seek(cur_log,rli->event_relay_log_pos);
- /* otherwise, we have had a partial read */
- errmsg = "Aborting slave SQL thread because of partial event read";
- break; // To end of function
- }
- }
- if (!errmsg && global_system_variables.log_warnings)
- errmsg = "slave SQL thread was killed";
- err:
- if (errmsg)
- sql_print_error("Error reading relay log event: %s", errmsg);
- DBUG_RETURN(0);
- }
- /*
- Rotate a relay log (this is used only by FLUSH LOGS; the automatic rotation
- because of size is simpler because when we do it we already have all relevant
- locks; here we don't, so this function is mainly taking locks).
- Returns nothing as we cannot catch any error (MYSQL_LOG::new_file() is void).
- */
- void rotate_relay_log(MASTER_INFO* mi)
- {
- DBUG_ENTER("rotate_relay_log");
- RELAY_LOG_INFO* rli= &mi->rli;
- lock_slave_threads(mi);
- pthread_mutex_lock(&mi->data_lock);
- pthread_mutex_lock(&rli->data_lock);
- /*
- We need to test inited because otherwise, new_file() will attempt to lock
- LOCK_log, which may not be inited (if we're not a slave).
- */
- if (!rli->inited)
- {
- DBUG_PRINT("info", ("rli->inited == 0"));
- goto end;
- }
- /* If the relay log is closed, new_file() will do nothing. */
- rli->relay_log.new_file(1);
- /*
- We harvest now, because otherwise BIN_LOG_HEADER_SIZE will not immediately
- be counted, so imagine a succession of FLUSH LOGS and assume the slave
- threads are started:
- relay_log_space decreases by the size of the deleted relay log, but does
- not increase, so flush-after-flush we may become negative, which is wrong.
- Even if this will be corrected as soon as a query is replicated on the
- slave (because the I/O thread will then call harvest_bytes_written() which
- will harvest all these BIN_LOG_HEADER_SIZE we forgot), it may give strange
- output in SHOW SLAVE STATUS meanwhile. So we harvest now.
- If the log is closed, then this will just harvest the last writes, probably
- 0 as they probably have been harvested.
- */
- rli->relay_log.harvest_bytes_written(&rli->log_space_total);
- end:
- pthread_mutex_unlock(&rli->data_lock);
- pthread_mutex_unlock(&mi->data_lock);
- unlock_slave_threads(mi);
- DBUG_VOID_RETURN;
- }
- #ifdef __GNUC__
- template class I_List_iterator<i_string>;
- template class I_List_iterator<i_string_pair>;
- #endif
- #endif /* HAVE_REPLICATION */