template <typename Pool, typename LockStrategy> structPoolManager { typedef Pool PoolType; typedeftypename PoolType::value_type value_type; PoolManager(size_t size) : m_size(size) { create(); } ~PoolManager() { destroy(); ut_a(m_pools.empty()); } /** Get an element from one of the pools.@return instance or NULL if pool is empty. */
value_type* get() { size_t index = 0; size_t delay = 1; value_type* ptr = NULL; do { m_lock_strategy.enter(); ut_ad(!m_pools.empty()); size_t n_pools = m_pools.size(); PoolType* pool = m_pools[index % n_pools]; m_lock_strategy.exit(); ptr = pool->get(); if (ptr == 0 && (index / n_pools) > 2) { if (!add_pool(n_pools)) { ib::error() << "Failed to allocate" " memory for a pool of size " << m_size << " bytes. Will" " wait for " << delay << " seconds for a thread to" " free a resource"; /* There is nothing much we can do except crash and burn, however lets be a little optimistic and wait for a resource to be freed. */ os_thread_sleep(delay * 1000000); if (delay < 32) { delay <<= 1; } } else { delay = 1; } } ++index; } while (ptr == NULL); return(ptr); } staticvoidmem_free(value_type* ptr) { PoolType::mem_free(ptr); } private: /** Add a new pool @param n_pools Number of pools that existed when the add pool was called. @return true on success */
booladd_pool(size_t n_pools) { bool added = false; m_lock_strategy.enter(); if (n_pools < m_pools.size()) { /* Some other thread already added a pool. */ added = true; } else { PoolType* pool; ut_ad(n_pools == m_pools.size()); pool = UT_NEW_NOKEY(PoolType(m_size)); if (pool != NULL) { ut_ad(n_pools <= m_pools.size()); m_pools.push_back(pool); ib::info() << "Number of pools: " << m_pools.size(); added = true; } } ut_ad(n_pools < m_pools.size() || !added); m_lock_strategy.exit(); return(added); } /** Create the pool manager. */ voidcreate() { ut_a(m_size > sizeof(value_type)); m_lock_strategy.create(); add_pool(0); } /** Release the resources. */ voiddestroy() { typename Pools::iterator it; typename Pools::iterator end = m_pools.end(); for (it = m_pools.begin(); it != end; ++it) { PoolType* pool = *it; UT_DELETE(pool); } m_pools.clear(); m_lock_strategy.destroy(); } private: // Disable copying PoolManager(const PoolManager&); PoolManager& operator=(const PoolManager&); typedefstd::vector<PoolType*, ut_allocator<PoolType*> > Pools; /** Size of each block */ size_t m_size; /** Pools managed this manager */ Pools m_pools; /** Lock strategy to use */ LockStrategy m_lock_strategy; };
template <typename Type, typename Factory, typename LockStrategy> structPool { typedef Type value_type; // FIXME: Add an assertion to check alignment and offset is // as we expect it. Also, sizeof(void*) can be 8, can we impove on this. structElement { Pool* m_pool; value_type m_type; }; /** Constructor @(Database)param size size of the memory block */ Pool(size_t size) : m_end(), m_start(), m_size(size), m_last() { ut_a(size >= sizeof(Element)); m_lock_strategy.create(); ut_a(m_start == 0); m_start = reinterpret_cast<Element*>(ut_zalloc_nokey(m_size)); m_last = m_start; m_end = &m_start[m_size / sizeof(*m_start)]; /* Note: Initialise only a small subset, even though we have allocated all the memory. This is required only because PFS (MTR) results change if we instantiate too many mutexes up front. */
init(ut_min(size_t(16), size_t(m_end - m_start))); ut_ad(m_pqueue.size() <= size_t(m_last - m_start)); } /** Destructor */ ~Pool() { m_lock_strategy.destroy(); for (Element* elem = m_start; elem != m_last; ++elem) { ut_ad(elem->m_pool == this); Factory::destroy(&elem->m_type); } ut_free(m_start); m_end = m_last = m_start = 0; m_size = 0; } /** Get an object from the pool.@retrun a free instance or NULL if exhausted. */
staticvoidmem_free(value_type* ptr) { Element* elem; byte* p = reinterpret_cast<byte*>(ptr + 1); elem = reinterpret_cast<Element*>(p - sizeof(*elem)); elem->m_pool->put(elem); } protected: // Disable copying Pool(const Pool&); Pool& operator=(const Pool&); private: /* We only need to compare on pointer address. */ typedefstd::priority_queue< Element*, std::vector<Element*, ut_allocator<Element*> >, std::greater<Element*> > pqueue_t; /** Release the object to the free pool @param elem element to free */
voidput(Element* elem) { m_lock_strategy.enter(); ut_ad(elem >= m_start && elem < m_last); ut_ad(Factory::debug(&elem->m_type)); m_pqueue.push(elem); m_lock_strategy.exit(); } /** Initialise the elements.@param n_elems Number of elements to initialise */
voidinit(size_t n_elems) { ut_ad(size_t(m_end - m_last) >= n_elems); for (size_t i = 0; i < n_elems; ++i, ++m_last) { m_last->m_pool = this; Factory::init(&m_last->m_type); m_pqueue.push(m_last); } ut_ad(m_last <= m_end); } private: /** Pointer to the last element */ Element* m_end; /** Pointer to the first element */ Element* m_start; /** Size of the block in bytes */ size_t m_size; /** Upper limit of used space */ Element* m_last; /** Priority queue ordered on the pointer addresse. */ pqueue_t m_pqueue; /** Lock strategy to use */ LockStrategy m_lock_strategy; };
/** For managing the life-cycle of the trx_t instance that we get from the pool. */ structTrxFactory { /** Initializes a transaction object. It must be explicitly started with trx_start_if_not_started() before using it. The default isolation level is TRX_ISO_REPEATABLE_READ. @param trx Transaction instance to initialise */
staticvoidinit(trx_t* trx) { /* Explicitly call the constructor of the already allocated object. trx_t objects are allocated by ut_zalloc() in Pool::Pool() which would not call the constructors of the trx_t members. */
new(&trx->mod_tables) trx_mod_tables_t(); new(&trx->lock.rec_pool) lock_pool_t(); new(&trx->lock.table_pool) lock_pool_t(); new(&trx->lock.table_locks) lock_pool_t(); new(&trx->hit_list) hit_list_t(); trx_init(trx); trx->state = TRX_STATE_NOT_STARTED; trx->dict_operation_lock_mode = 0; trx->xid = UT_NEW_NOKEY(xid_t()); trx->detailed_error = reinterpret_cast<char*>( ut_zalloc_nokey(MAX_DETAILED_ERROR_LEN)); trx->lock.lock_heap = mem_heap_create_typed( 1024, MEM_HEAP_FOR_LOCK_HEAP); lock_trx_lock_list_init(&trx->lock.trx_locks); UT_LIST_INIT( trx->trx_savepoints, &trx_named_savept_t::trx_savepoints); mutex_create(LATCH_ID_TRX, &trx->mutex); mutex_create(LATCH_ID_TRX_UNDO, &trx->undo_mutex); lock_trx_alloc_locks(trx); } /** Release resources held by the transaction object. @param trx the transaction for which to release resources */
staticvoiddestroy(trx_t* trx) { ut_ad(!trx->in_rw_trx_list); ut_ad(!trx->in_mysql_trx_list); ut_a(trx->lock.wait_lock == NULL); ut_a(trx->lock.wait_thr == NULL); ut_a(!trx->has_search_latch); ut_a(trx->dict_operation_lock_mode == 0); if (trx->lock.lock_heap != NULL) { mem_heap_free(trx->lock.lock_heap); trx->lock.lock_heap = NULL; } ut_a(UT_LIST_GET_LEN(trx->lock.trx_locks) == 0); UT_DELETE(trx->xid); ut_free(trx->detailed_error); mutex_free(&trx->mutex); mutex_free(&trx->undo_mutex); trx->mod_tables.~trx_mod_tables_t(); ut_ad(trx->read_view == NULL); if (!trx->lock.rec_pool.empty()) { /* See lock_trx_alloc_locks() why we only free the first element. */
ut_free(trx->lock.rec_pool[0]); } if (!trx->lock.table_pool.empty()) { /* See lock_trx_alloc_locks() why we only free the first element. */
ut_free(trx->lock.table_pool[0]); } trx->lock.rec_pool.~lock_pool_t(); trx->lock.table_pool.~lock_pool_t(); trx->lock.table_locks.~lock_pool_t(); trx->hit_list.~hit_list_t(); } /** Enforce any invariants here, this is called before the transaction is added to the pool. @return true if all OK */
/****************************************************************/ /**Commits a transaction in memory. */ static void trx_commit_in_memory( /*=================*/ trx_t* trx, /*!< in/out: transaction */ constmtr_t* mtr, /*!< in: mini-transaction of trx_write_serialisation_history(), or NULL if the transaction did not modify anything */ bool serialised)/*!< in: true if serialisation log was written */ { trx->must_flush_log_later = false; if (trx_is_autocommit_non_locking(trx)) { ut_ad(trx->id == 0); ut_ad(trx->read_only); ut_a(!trx->is_recovered); ut_ad(trx->rsegs.m_redo.rseg == NULL); ut_ad(!trx->in_rw_trx_list); /* Note: We are asserting without holding the lock mutex. But that is OK because this transaction is not waiting and cannot be rolled back and no new locks can (or should not) be added becuase it is flagged as a non-locking read-only transaction. */
ut_a(UT_LIST_GET_LEN(trx->lock.trx_locks) == 0); /* This state change is not protected by any mutex, therefore there is an inherent race here around state transition during printouts. We ignore this race for the sake of efficiency. However, the trx_sys_t::mutex will protect the trx_t instance and it cannot be removed from the mysql_trx_list and freed without first acquiring the trx_sys_t::mutex. */
ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE)); if (trx->read_view != NULL) { trx_sys->mvcc->view_close(trx->read_view, false); } MONITOR_INC(MONITOR_TRX_NL_RO_COMMIT); /* AC-NL-RO transactions can't be rolled back asynchronously. */ ut_ad(!trx->abort); ut_ad(!(trx->in_innodb & (TRX_FORCE_ROLLBACK | TRX_FORCE_ROLLBACK_ASYNC))); trx->state = TRX_STATE_NOT_STARTED; } else { if (trx->id > 0) { /* For consistent snapshot, we need to remove current transaction from running transaction id list for mvcc before doing commit and releasing locks. */
trx_erase_lists(trx, serialised); } lock_trx_release_locks(trx); /* Remove the transaction from the list of active transactions now that it no longer holds any user locks. */
ut_ad(trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY)); DEBUG_SYNC_C("after_trx_committed_in_memory"); if (trx->read_only || trx->rsegs.m_redo.rseg == NULL) { MONITOR_INC(MONITOR_TRX_RO_COMMIT); if (trx->read_view != NULL) { trx_sys->mvcc->view_close( trx->read_view, false); } } else { ut_ad(trx->id > 0); MONITOR_INC(MONITOR_TRX_RW_COMMIT); } } if (trx->rsegs.m_redo.rseg != NULL) { trx_rseg_t* rseg = trx->rsegs.m_redo.rseg; mutex_enter(&rseg->mutex); ut_ad(rseg->trx_ref_count > 0); --rseg->trx_ref_count; mutex_exit(&rseg->mutex); } if (mtr != NULL) { if (trx->rsegs.m_redo.insert_undo != NULL) { trx_undo_insert_cleanup(&trx->rsegs.m_redo, false); } if (trx->rsegs.m_noredo.insert_undo != NULL) { trx_undo_insert_cleanup(&trx->rsegs.m_noredo, true); } /* NOTE that we could possibly make a group commit more efficient here: call os_thread_yield here to allow also other trxs to come to commit! */ /*-------------------------------------*/ /* Depending on the my.cnf options, we may now write the log buffer to the log files, making the transaction durable if the OS does not crash. We may also flush the log files to disk, making the transaction durable also at an OS crash or a power outage. The idea in InnoDB's group commit is that a group of transactions gather behind a trx doing a physical disk write transactions gather behind a trx doing a physical disk write one of those transactions does a write which commits the whole group. Note that this group commit will only bring benefit if there are > 2 users in the database. Then at least 2 users can gather behind one doing the physical log write to disk. If we are calling trx_commit() under prepare_commit_mutex, we will delay possible log write and flush to a separate function trx_commit_complete_for_mysql(), which is only called when the thread has released the mutex. This is to make the group commit algorithm to work. Otherwise, the prepare_commit mutex would serialize all commits and prevent a group of transactions from gathering. */ lsn_t lsn = mtr->commit_lsn(); if (lsn == 0) { /* Nothing to be done. */ } elseif (trx->flush_log_later) { /* Do nothing yet */ trx->must_flush_log_later = true; } elseif (srv_flush_log_at_trx_commit == 0 || thd_requested_durability(trx->mysql_thd) == HA_IGNORE_DURABILITY) { /* Do nothing */ } else { } trx->commit_lsn = lsn; //把mtr提交的lsn赋值给事务trx /* Tell server some activity has happened, since the trx does changes something. Background utility threads like master thread, purge thread or page_cleaner thread might have some work to do. */
srv_active_wake_master_thread(); } /* Free all savepoints, starting from the first. */ trx_named_savept_t* savep = UT_LIST_GET_FIRST(trx->trx_savepoints); trx_roll_savepoints_free(trx, savep); if (trx->fts_trx != NULL) { trx_finalize_for_fts(trx, trx->undo_no != 0); } trx_mutex_enter(trx); trx->dict_operation = TRX_DICT_OP_NONE;
/* Because we can rollback transactions asynchronously, we change the state at the last step. trx_t::abort cannot change once commit or rollback has started because we will have released the locks by the time we get here. */
if (trx->abort) { trx->abort = false; trx->state = TRX_STATE_FORCED_ROLLBACK; } else { trx->state = TRX_STATE_NOT_STARTED; } /* trx->in_mysql_trx_list would hold between trx_allocate_for_mysql() and trx_free_for_mysql(). It does not hold for recovered transactions or system transactions. */ assert_trx_is_free(trx); trx_init(trx); trx_mutex_exit(trx); ut_a(trx->error_state == DB_SUCCESS); }
/** Undo node structure */ structundo_node_t{ que_common_t common; /*!< node type: QUE_NODE_UNDO */ enum undo_exec state; /*!< node execution state */ trx_t* trx; /*!< trx for which undo is done */ roll_ptr_t roll_ptr;/*!< roll pointer to undo log record */ trx_undo_rec_t* undo_rec;/*!< undo log record */ undo_no_t undo_no;/*!< undo number of the record */ ulint rec_type;/*!< undo log record type: TRX_UNDO_INSERT_REC, ... */
trx_id_t new_trx_id; /*!< trx id to restore to clustered index record */
btr_pcur_t pcur; /*!< persistent cursor used in searching the clustered index record */
dict_table_t* table; /*!< table where undo is done */ ulint cmpl_info;/*!< compiler analysis of an update */ upd_t* update; /*!< update vector for a clustered index record */
dtuple_t* ref; /*!< row reference to the next row to handle */ dtuple_t* row; /*!< a copy (also fields copied to heap) of the row to handle */
row_ext_t* ext; /*!< NULL, or prefixes of the externally stored columns of the row */
dtuple_t* undo_row;/*!< NULL, or the row after undo */ row_ext_t* undo_ext;/*!< NULL, or prefixes of the externally stored columns of undo_row */
dict_index_t* index; /*!< the next index whose record should be handled */
mem_heap_t* heap; /*!< memory heap used as auxiliary storage for row; this must be emptied after undo is tried on a row */
};
事务流程分析
实验一:包含select、update和insert的事务执行
执行下面sql语句,逐条跟踪:
1 2 3 4 5 6 7 8 9 10 11
CREATE TABLE trx( pk INT NOT NULL, DATA INT ) ENGINE = innodb; SET AUTOCOMMIT = false; -- 从此处跟踪 BEGIN; SELECT * FROM trx where pk = 1; UPDATE trx set data = data +1 where pk = 1; INSERT INTO trx VALUES(6,1); COMMIT;
thd->mdl_context.release_transactional_locks(); /* 根据传入的flags判断开启的是RO还是RW事务 *{ */ //如果以显式的READ_ONLY开启,则设置线程thd的RO属性 if (flags & MYSQL_START_TRANS_OPT_READ_ONLY) { thd->tx_read_only= true; if (tst) tst->set_read_flags(thd, TX_READ_ONLY); } //如果以显式的READ_WRITE开启,则设置线程thd的RW属性 elseif (flags & MYSQL_START_TRANS_OPT_READ_WRITE) { /*Explicitly starting a RW transaction when the server is in read-only mode, is not allowed unless the user has SUPER priv.Implicitly starting a RW transaction is allowed for backward compatibility. */
if (check_readonly(thd, true)) DBUG_RETURN(true); thd->tx_read_only= false; /*This flags that tx_read_only was set explicitly, rather than just from the session's default.*/
if (tst) tst->set_read_flags(thd, TX_READ_WRITE); } //设置thd的标志位,标识事务开始 thd->variables.option_bits|= OPTION_BEGIN; thd->server_status|= SERVER_STATUS_IN_TRANS; if (thd->tx_read_only) thd->server_status|= SERVER_STATUS_IN_TRANS_READONLY; DBUG_PRINT("info", ("setting SERVER_STATUS_IN_TRANS")); if (tst) tst->add_trx_state(thd, TX_EXPLICIT); /* ha_start_consistent_snapshot() relies on OPTION_BEGIN flag set. */ /* 如果以START TRANSACTION WITH consistent snapshot 开启事务*/ if (flags & MYSQL_START_TRANS_OPT_WITH_CONS_SNAPSHOT) { if (tst) tst->add_trx_state(thd, TX_WITH_SNAPSHOT); //下面这句代码分配ReadView? res= ha_start_consistent_snapshot(thd); } DBUG_RETURN(MY_TEST(res)); }
/**提交单语句事务.*/ //@note Note that if the autocommit is on, then the following call //inside InnoDB will commit or rollback the whole transaction //(= the statement). The autocommit mechanism built into InnoDB //is based on counting locks, but if the user has used LOCK //TABLES then that mechanism does not know to do the commit. //@param thd Current thread //@retval FALSE Success //@retval TRUE Failure
booltrans_commit_stmt(THD *thd) { DBUG_ENTER("trans_commit_stmt"); int res= FALSE; /*We currently don't invoke commit/rollback at end of a sub-statement. In future, we perhaps should take a savepoint for each nested statement, and release the savepoint when statement has succeeded.*/
DBUG_ASSERT(! thd->in_sub_stmt); /*Some code in MYSQL_BIN_LOG::commit and ha_commit_low() is not safefor attachable transactions.*/
DBUG_ASSERT(!thd->is_attachable_ro_transaction_active()); thd->get_transaction()->merge_unsafe_rollback_flags(); /* 如果是单语句事务则调用下面的if提交,因为单语句事务没有显式的commit命令,所有要在最后统一提交{*/ if (thd->get_transaction()->is_active(Transaction_ctx::STMT)) { res= ha_commit_trans(thd, FALSE); if (! thd->in_active_multi_stmt_transaction()) trans_reset_one_shot_chistics(thd); } elseif (tc_log) tc_log->commit(thd, false); if (res == FALSE && !thd->in_active_multi_stmt_transaction()) if (thd->rpl_thd_ctx.session_gtids_ctx(). notify_after_transaction_commit(thd)) sql_print_warning("Failed to collect GTID to send in the response packet!"); /* In autocommit=1 mode the transaction should be marked as complete in P_S */ DBUG_ASSERT(thd->in_active_multi_stmt_transaction() || thd->m_transaction_psi == NULL); thd->get_transaction()->reset(Transaction_ctx::STMT); DBUG_RETURN(MY_TEST(res)); }
/*****************************************************************/ /**Commits a transaction in an InnoDB database or marks an SQL statement ended.*/ //@return 0 or deadlock error if the transaction was aborted by another //higher priority transaction. static int innobase_commit( /*============*/ handlerton* hton, /*!< in: InnoDB handlerton */ THD* thd, /*!< in: MySQL thread handle of the user for whom the transaction should be committed */ bool commit_trx) /*!< in: true - commit transaction false - the current SQL statement ended */ { DBUG_ENTER("innobase_commit"); DBUG_ASSERT(hton == innodb_hton_ptr); DBUG_PRINT("trans", ("ending transaction")); trx_t* trx = check_trx_exists(thd); TrxInInnoDB trx_in_innodb(trx); if (trx_in_innodb.is_aborted()) { innobase_rollback(hton, thd, commit_trx); DBUG_RETURN(convert_error_code_to_mysql( DB_FORCED_ABORT, 0, thd)); } ut_ad(trx->dict_operation_lock_mode == 0); ut_ad(trx->dict_operation == TRX_DICT_OP_NONE); /* Transaction is deregistered only in a commit or a rollback. If it is deregistered we know there cannot be resources to be freed and we could return immediately. For the time being, we play safe and do the cleanup though there should be nothing to clean up. */
if (!trx_is_registered_for_2pc(trx) && trx_is_started(trx)) { sql_print_error("Transaction not registered for MySQL 2PC," " but transaction is active"); } bool read_only = trx->read_only || trx->id == 0; /**下面这个段代码实现了单语句事务和多语句事务(BEGIN...COMMIT)之间提交的区别。大致流程如下:1)if判断是否提交(commit_trx针对多语句事务)或是否autocommit = 1如果有一个满足,则进入提交流程。2)否则,则用trx_mark_sql_stat_end(trx)标记本条语句结束。{*/
if (commit_trx || (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) { /* We were instructed to commit the whole transaction, or this is an SQL statement end and autocommit is on */
/* We need current binlog position for mysqlbackup to work. */ if (!read_only) { while (innobase_commit_concurrency > 0) { mysql_mutex_lock(&commit_cond_m); ++commit_threads; if (commit_threads <= innobase_commit_concurrency) { mysql_mutex_unlock(&commit_cond_m); break; } --commit_threads; mysql_cond_wait(&commit_cond, &commit_cond_m); TrxInInnoDB trx_in_innodb(trx); if (trx_in_innodb.is_aborted()) { innobase_rollback(hton, thd, commit_trx); DBUG_RETURN(convert_error_code_to_mysql( DB_FORCED_ABORT, 0, thd)); } ut_ad(trx->dict_operation_lock_mode == 0); ut_ad(trx->dict_operation == TRX_DICT_OP_NONE); /* Transaction is deregistered only in a commit or a rollback. If it is deregistered we know there cannot be resources to be freed and we could return immediately. For the time being, we play safe and do the cleanup though there should be nothing to clean up. */
if (!trx_is_registered_for_2pc(trx) && trx_is_started(trx)) { sql_print_error("Transaction not registered for MySQL 2PC," " but transaction is active"); } bool read_only = trx->read_only || trx->id == 0; /**下面这个段代码实现了单语句事务和多语句事务(BEGIN...COMMIT)之间提交的区别。大致流程如下:1)if判断是否提交(commit_trx针对多语句事务)或是否autocommit = 1如果有一个满足,则进入提交流程。2)否则,则用trx_mark_sql_stat_end(trx)标记本条语句结束。{*/
if (commit_trx || (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) { /* We were instructed to commit the whole transaction, or this is an SQL statement end and autocommit is on */ /* We need current binlog position for mysqlbackup to work. */ if (!read_only) { while (innobase_commit_concurrency > 0) { mysql_mutex_lock(&commit_cond_m); ++commit_threads; if (commit_threads <= innobase_commit_concurrency) { mysql_mutex_unlock(&commit_cond_m); break; } --commit_threads; mysql_cond_wait(&commit_cond, &commit_cond_m); if (!read_only) { lock_unlock_table_autoinc(trx); } /* Store the current undo_no of the transaction so that we know where to roll back if we have to roll back the next SQL statement */
trx_mark_sql_stat_end(trx); } /* }*/ /* Reset the number AUTO-INC rows required */ trx->n_autoinc_rows = 0; /* This is a statement level variable. */ trx->fts_next_doc_id = 0; innobase_srv_conc_force_exit_innodb(trx); DBUG_RETURN(0); }
void trx_set_rw_mode( /*============*/ trx_t* trx) /*!< in/out: transaction that is RW */ { ut_ad(trx->rsegs.m_redo.rseg == 0); ut_ad(!trx->in_rw_trx_list); ut_ad(!trx_is_autocommit_non_locking(trx)); if (srv_force_recovery >= SRV_FORCE_NO_TRX_UNDO) { return; } /* Function is promoting existing trx from ro mode to rw mode.In this process it has acquired trx_sys->mutex as it plan to move trx from ro list to rw list. If in future, some other threadlooks at this trx object while it is being promoted then ensure that both threads are synced by acquring trx->mutex to avoid decision based on in-consistent view formed during promotion. */
/**********************************************************************/ /**Assigns an undo log for a transaction. A new undo log is created or a cached @return DB_SUCCESS if undo log assign successful, possible error codes are: DB_TOO_MANY_CONCURRENT_TRXS DB_OUT_OF_FILE_SPACE DB_READ_ONLY DB_OUT_OF_MEMORY */
//@param thd Current thread //@retval FALSE Success //@retval TRUE Failure
booltrans_commit(THD *thd) { int res; if (trans_check_state(thd)) DBUG_RETURN(TRUE); //首先清除thd的服务器状态变量 thd->server_status&= ~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY); /* 调用ha_commit_trans()来提交该事务 */ res= ha_commit_trans(thd, TRUE); if (res == FALSE) if (thd->rpl_thd_ctx.session_gtids_ctx(). notify_after_transaction_commit(thd)) sql_print_warning("Failed to collect GTID to send in the response packet!"); /*When gtid mode is enabled, a transaction may cause binlog rotation, which inserts a record into the gtid system table (which is probably a transactional table). Thence, the flag SERVER_STATUS_IN_TRANS may be set again while calling ha_commit_trans(...) Consequently, we need to reset it back,much like we are doing before calling ha_commit_trans(...).We would really only need to do this when gtid_mode=on. However,checking gtid_mode requires holding a lock, which is costly. So we clear the bit unconditionally. This has no side effects since if gtid_mode=off the bit is already cleared.*/
/* 清除SERVER_STATUS_IN_TRANS和OPTION_BEGIN标识 */
thd->server_status&= ~SERVER_STATUS_IN_TRANS; thd->variables.option_bits&= ~OPTION_BEGIN; thd->get_transaction()->reset_unsafe_rollback_flags(Transaction_ctx::SESSION); thd->lex->start_transaction_opt= 0; /* The transaction should be marked as complete in P_S. */ DBUG_ASSERT(thd->m_transaction_psi == NULL); thd->tx_priority= 0; trans_track_end_trx(thd); DBUG_RETURN(MY_TEST(res)); }
/****************************************************************/ /**Assign the transaction its history serialisation number and write the update UNDO log record to the assigned rollback segment.@return true if a serialisation log was written */
/* 因为修改后的undo log headers需要按UNDO trx number来有序放入history list 以便purge,所以下面需要获取rseg mutex。 */
bool own_redo_rseg_mutex = false; bool own_noredo_rseg_mutex = false; /* Get rollback segment mutex. */ if (trx->rsegs.m_redo.rseg != NULL && trx_is_redo_rseg_updated(trx)) { mutex_enter(&trx->rsegs.m_redo.rseg->mutex); own_redo_rseg_mutex = true; } mtr_t temp_mtr; if (trx->rsegs.m_noredo.rseg != NULL && trx_is_noredo_rseg_updated(trx)) { mutex_enter(&trx->rsegs.m_noredo.rseg->mutex); own_noredo_rseg_mutex = true; mtr_start(&temp_mtr); temp_mtr.set_log_mode(MTR_LOG_NO_REDO); } /* If transaction involves insert then truncate undo logs. */ if (trx->rsegs.m_redo.insert_undo != NULL) { trx_undo_set_state_at_finish( trx->rsegs.m_redo.insert_undo, mtr); } if (trx->rsegs.m_noredo.insert_undo != NULL) { trx_undo_set_state_at_finish( trx->rsegs.m_noredo.insert_undo, &temp_mtr); } bool serialised = false; /* If transaction involves update then add rollback segments to purge queue. */
if (trx->rsegs.m_redo.update_undo != NULL || trx->rsegs.m_noredo.update_undo != NULL) { /* Assign the transaction serialisation number and add these rollback segments to purge trx-no sorted priority queue if this is the first UNDO log being written to assigned rollback segments. */
trx_undo_ptr_t* redo_rseg_undo_ptr = trx->rsegs.m_redo.update_undo != NULL ? &trx->rsegs.m_redo : NULL; trx_undo_ptr_t* noredo_rseg_undo_ptr = trx->rsegs.m_noredo.update_undo != NULL ? &trx->rsegs.m_noredo : NULL; /* Will set trx->no and will add rseg to purge queue. */
/* It is not necessary to obtain trx->undo_mutex here because only a single OS thread is allowed to do the transaction commit for this transaction. */ if (trx->rsegs.m_redo.update_undo != NULL) { page_t* undo_hdr_page; undo_hdr_page = trx_undo_set_state_at_finish( trx->rsegs.m_redo.update_undo, mtr); /* Delay update of rseg_history_len if we plan to add non-redo update_undo too. This is to avoid immediate invocation of purge as we need to club these 2 segments with same trx-no as single unit. */
bool update_rseg_len = !(trx->rsegs.m_noredo.update_undo != NULL); trx_undo_update_cleanup( trx, &trx->rsegs.m_redo, undo_hdr_page, update_rseg_len, (update_rseg_len ? 1 : 0), mtr); } DBUG_EXECUTE_IF("ib_trx_crash_during_commit", DBUG_SUICIDE();); if (trx->rsegs.m_noredo.update_undo != NULL) { page_t* undo_hdr_page; undo_hdr_page = trx_undo_set_state_at_finish( trx->rsegs.m_noredo.update_undo, &temp_mtr); ulint n_added_logs = (redo_rseg_undo_ptr != NULL) ? 2 : 1; trx_undo_update_cleanup( trx, &trx->rsegs.m_noredo, undo_hdr_page, true, n_added_logs, &temp_mtr); } } if (own_redo_rseg_mutex) { mutex_exit(&trx->rsegs.m_redo.rseg->mutex); own_redo_rseg_mutex = false; } if (own_noredo_rseg_mutex) { mutex_exit(&trx->rsegs.m_noredo.rseg->mutex); own_noredo_rseg_mutex = false; mtr_commit(&temp_mtr); } MONITOR_INC(MONITOR_TRX_COMMIT_UNDO); /* Update the latest MySQL binlog name and offset info in trx sys header if MySQL binlogging is on or the database server is a MySQL replication slave */
classReadView { /** 下面的ids_t类似于std中的vector,用以存放事务号. */ classids_t { typedeftrx_ids_t::value_type value_type; //value_type为trx_id_t private: // Prevent copying ids_t(constids_t&); ids_t& operator=(constids_t&); private: /** Memory for the array */ value_type* m_ptr; /** Number of active elements in the array */ ulint m_size; /** Size of m_ptr in elements */ ulint m_reserved; friendclassReadView; }; public: ReadView(); ~ReadView(); /** 检查传入trx id是否大于系统当前最大事务号 @param[in] id transaction id to check @param[in] name table name */
staticvoidcheck_trx_id_sanity( trx_id_t id, consttable_name_t& name); /************注意重点看下此函数*****************/ boolchange_visible(trx_id_t id, consttable_name_t& name) //检查传入id的事务所做的修改对该ReadView可见! { ut_ad(id > 0); //先检查是否是小于m_up_limit_id或是创建此ReadView的id,如果是则返回true。 if (id < m_up_limit_id || id == m_creator_trx_id) { return(true); } check_trx_id_sanity(id, name); //检查是否大于m_low_limit_id,大于则直接返回false,不可见,如果小于则查看 //m_ids活动列表是否为空,如果为空说明此事务虽然在系统可见最小事务号之后发生 //但在此事务开始前已经不是活动的了,此时也是可以看见的。 if (id >= m_low_limit_id) { return(false); } elseif (m_ids.empty()) { return(true); } constids_t::value_type* p = m_ids.data(); //最后如果m_ids活动事务列表不为空,则二分查找m_ids,如果找到,说明拍摄此ReadView //时此id是活动的,直接对找到结果的取反。 return(!std::binary_search(p, p + m_ids.size(), id)); } /**@param id transaction to check*/ //@return true if view sees transaction id
voidclose() { ut_ad(m_creator_trx_id != TRX_ID_MAX); m_creator_trx_id = TRX_ID_MAX; } /**@return true if the view is closed */
boolis_closed()const { return(m_closed); } /**Write the limits to the file.*/ //@param file file to write to
voidprint_limits(FILE* file)const { fprintf(file, "Trx read view will not see trx with" " id >= " TRX_ID_FMT ", sees < " TRX_ID_FMT "\n", m_low_limit_id, m_up_limit_id); } #ifdef UNIV_DEBUG /**/ //@param rhs view to compare with //@return true if this view is less than or equal rhs */ boolle(const ReadView* rhs)const { return(m_low_limit_no <= rhs->m_low_limit_no); } #endif/* UNIV_DEBUG */ private: /**Copy the transaction ids from the source vector */
inlinevoidcopy_trx_ids(consttrx_ids_t& trx_ids);
/**打开一个ReadView在此之前被序列化的事务都对本ReadView可见*/ //@param id Creator transaction id */
inlinevoidprepare(trx_id_t id); /** Complete the read view creation */ inlinevoidcomplete(); /**Copy state from another view. Must call copy_complete() to finish.*/
//@param other view to copy from */
inlinevoidcopy_prepare(const ReadView& other);
/**Complete the copy, insert the creator transaction id into the m_trx_ids too and adjust the m_up_limit_id *, if required */
inlinevoidcopy_complete(); /**Set the creator transaction id, existing id must be 0 */
voidcreator_trx_id(trx_id_t id) { ut_ad(m_creator_trx_id == 0); m_creator_trx_id = id; } friendclassMVCC; private: /** The read should not see any transaction with trx id >= this value. In other words, this is the "high water mark". */
trx_id_t m_low_limit_id; /** The read should see all trx ids which are strictly smaller (<) than this value. In other words, this is the low water mark". */ trx_id_t m_up_limit_id; /** trx id of creating transaction, set to TRX_ID_MAX for free views. */
trx_id_t m_creator_trx_id; /** Set of RW transactions that was active when this snapshot was taken */
ids_t m_ids; /** The view does not need to see the undo logs for transactionswhose transaction number is strictly smaller (<) than this value:they can be removed in purge if not needed by other views */
trx_id_t m_low_limit_no; /** AC-NL-RO transaction view that has been "closed". */ bool m_closed; typedefUT_LIST_NODE_T(ReadView) node_t; /** List of read views in trx_sys */ byte pad1[64 - sizeof(node_t)]; node_t m_view_list; };
Consistent Read
1 2 3 4
ToDo list: row/page/space format row_search_mvc(); struct row_prebuilt_t;
/*------------------------------*/ UT_LIST_BASE_NODE_T(trx_named_savept_t) trx_savepoints; /*!< savepoints set with SAVEPOINT ...,oldest first */
/*------------------------------*/ UndoMutex undo_mutex; // mutex保护下部分变量,last_sql_stat_start除外 undo_no_t undo_no; /*!< next undo log record number to assign; since the undo log is private for a transaction, this is a simple ascending sequence with no gaps; thus it represents the number of modified/inserted rows in a transaction */
ulint undo_rseg_space; /*!< 最后一个undo log所在的space_id */ trx_savept_t last_sql_stat_start; /*!< 上一条语句开始时的undo_no值,如果发生错误,则回滚到此处 */ trx_rsegs_t rsegs; /* 事务的回滚段 */ undo_no_t roll_limit; /*!< least undo number to undo during a partial rollback; 0 otherwise */
/*Either statement transaction or normal transaction - related thread-specific storage engine data.If a storage engine participates in a statement/transaction,an instance of this class is present in thd->m_transaction.m_scope_info[STMT|SESSION].ha_list. The addition this list is made by trans_register_ha().When it's time to commit or rollback, each element of ha_list is used to access storage engine's prepare()/commit()/rollback() methods, and also to evaluate if a full two phase commit is necessary.@sa General description of transaction handling in handler.cc.*/
classHa_trx_info { public: /**Register this storage engine in the given transaction context.*/ voidregister_ha(Ha_trx_info *ha_info, handlerton *ht_arg) { DBUG_ENTER("Ha_trx_info::register_ha"); DBUG_PRINT("enter", ("ht: 0x%llx (%s)", (ulonglong) ht_arg, ha_legacy_type_name(ht_arg->db_type))); DBUG_ASSERT(m_flags == 0); DBUG_ASSERT(m_ht == NULL); DBUG_ASSERT(m_next == NULL); m_ht= ht_arg; m_flags= (int) TRX_READ_ONLY; /* Assume read-only at start. */ m_next= ha_info; DBUG_VOID_RETURN; } /**Clear, prepare for reuse.*/
voidcoalesce_trx_with(const Ha_trx_info *stmt_trx) { /*Must be called only after the transaction has been started. Can be called many times, e.g. when we have many read-write statements in a transaction.*/
/**Although a given Ha_trx_info instance is currently always used for the same storage engine, 'ht' is not-NULL only when the corresponding storage is a part of a transaction.*/
handlerton *m_ht; /**Transaction flags related to this engine.Not-null only if this instance is a part of transaction.May assume a combination of enum values above.*/ uchar m_flags; };
部分常量释义
OPTION_NOT_AUTOCOMMIT
1 2
//When autocommit is off, a multi-statement transaction is implicitly started on the first //statement after a previous transaction has been ended.
OPTION_BEGIN
1 2
Regardless of the autocommit status, a multi-statement transaction can be explicitly startedwith the statements "START TRANSACTION", "BEGIN [WORK]", "[COMMIT | ROLLBACK] AND CHAIN", etc.
bool check_flush_or_checkpoint; /*!< this is set when there may be need to flush the log buffer, or preflush buffer pool pages, or make a checkpoint; this MUST be TRUE when lsn - last_checkpoint_lsn > max_checkpoint_age; this flag is peeked at by log_free_check(), which does not reserve the log mutex */
lsn_t flushed_to_disk_lsn; /*!< 已经刷盘的lsn */ ulint n_pending_flushes;/*!< 正在flush的数量, 受log mutex保护*/ os_event_t flush_event; /*!< this event is in the reset state when a flush is running; a thread should wait for this without owning the log mutex, but NOTE that to set this event, the thread MUST own the log mutex! */
ulint n_log_ios; /*!< number of log i/os initiated thus far */
ulint n_log_ios_old; /*!< number of log i/o's at the previous printout */
time_t last_printout_time;/*!< when log_print was last time called */
/* @} */ /** Fields involved in checkpoints @{ */ lsn_t log_group_capacity; /*!< capacity of the log group; if the checkpoint age exceeds this, it is a serious error because it is possible we will then overwrite log and spoil crash recovery */
lsn_t max_modified_age_async; /*!< when this recommended value for lsn - buf_pool_get_oldest_modification() is exceeded, we start an asynchronous preflush of pool pages */
lsn_t max_modified_age_sync; /*!< when this recommended value for lsn - buf_pool_get_oldest_modification() is exceeded, we start a synchronous preflush of pool pages */
lsn_t max_checkpoint_age_async; /*!< when this checkpoint age is exceeded we start an asynchronous writing of a new checkpoint */
lsn_t max_checkpoint_age; /*!< this is the maximum allowed value for lsn - last_checkpoint_lsn when a new query step is started */
ib_uint64_t next_checkpoint_no; /*!< next checkpoint number */ lsn_t last_checkpoint_lsn; /*!< latest checkpoint lsn */ lsn_t next_checkpoint_lsn; /*!< next checkpoint lsn */ mtr_buf_t* append_on_checkpoint; /*!< extra redo log records to write during a checkpoint, or NULL if none. The pointer is protected by log_sys->mutex, and the data must remain constant as long as this pointer is not NULL. */ ulint n_pending_checkpoint_writes; /*!< number of currently pending checkpoint writes */ rw_lock_t checkpoint_lock;/*!< this latch is x-locked when a checkpoint write is running; a thread should wait for this without owning the log mutex */ #endif/* !UNIV_HOTBACKUP */ byte* checkpoint_buf_ptr;/* unaligned checkpoint header */ byte* checkpoint_buf; /*!< checkpoint header is read to this buffer */ /* @} */ };
/** Prepare to write the mini-transaction log to the redo log buffer.@return number of bytes to write in finish_write() */
ulint mtr_t::Command::prepare_write() { switch (m_impl->m_log_mode) { case MTR_LOG_NONE选项开启 直接返回0,即不写redo; case MTR_LOG_ALL: break; } ulint len = mtr中redo日志大小->m_log.size(); ulint n_recs = m_impl->m_n_log_recs; if (len长度超过redo buf缓冲区大小的1/2) { log_buffer_extend((len + 1) * 2); //进行缓冲区扩展,因为在当前redo buf使用空间 //没到1/2时不会被flush,所以当本次mtr的redo //log长度大于buf的1/2时,可能会写不下,防止 //缓冲区溢出而进行扩展。 } fil_space_t* space = m_impl->m_user_space; if (space != NULL && is_system_or_undo_tablespace(space->id)) { /* Omit MLOG_FILE_NAME for predefined tablespaces. */ space = NULL; } log_mutex_enter(); //下面的逻辑注意看下,MLOG_FILE_NAME是什么意思? if (fil_names_write_if_was_clean(space, m_impl->m_mtr)) { /* This mini-transaction was the first one to modify this tablespace since the latest checkpoint, so some MLOG_FILE_NAME records were appended to m_log. */
ut_ad(m_impl->m_n_log_recs > n_recs); mlog_catenate_ulint( &m_impl->m_log, MLOG_MULTI_REC_END, MLOG_1BYTE); len = m_impl->m_log.size(); } else { /* This was not the first time of dirtying a tablespace since the latest checkpoint. */ if (n_recs <= 1) { ut_ad(n_recs == 1); /* Flag the single log record as the only record in this mini-transaction. */ *m_impl->m_log.front()->begin() |= MLOG_SINGLE_REC_FLAG; } else { /* Because this mini-transaction comprises multiple log records, append MLOG_MULTI_REC_END at the end. */
/** Log group consists of a number of log files, each of the same size; a log group is implemented as a space in the sense of the module fil0fil.Currently, this is only protected by log_sys->mutex. However, in the case of log_write_up_to(), we will access some members only with the protection of log_sys->write_mutex, which should affect nothing for now. */
structlog_group_t{ /** 组id,因为5.7只有一个日志文件组,所以永远是0 */ ulint id; /** 组文件个数*/ ulint n_files; /** format of the redo log: e.g., LOG_HEADER_FORMAT_CURRENT */ ulint format; /** 单个文件大小(单位字节) */ lsn_t file_size /** file space which implements the log group */; ulint space_id; /** corruption status */ log_group_state_t state; /** lsn used to fix coordinates within the log group */ lsn_t lsn; /** the byte offset of the above lsn */ lsn_t lsn_offset; /** unaligned buffers */ byte** file_header_bufs_ptr; /** buffers for each file header in the group */ byte** file_header_bufs; /** used only in recovery: recovery scan succeeded up to this lsn in this log group */ lsn_t scanned_lsn; /** unaligned checkpoint header */ byte* checkpoint_buf_ptr; /** buffer for writing a checkpoint header */ byte* checkpoint_buf; /** 日志组链表,为空 */ UT_LIST_NODE_T(log_group_t) log_groups; };
/*********************************************************************/ /**Perform the tasks that the master thread is supposed to do when the server is active. There are two types of tasks. The first category is of such tasks which are performed at each inovcation of this function.We assume that this function is called roughly every second when the server is active. The second category is of such tasks which are performed at some interval e.g.: purge, dict_LRU cleanup etc. */