事务进阶

这一部分通过sql语句的实践进一步了解事务的执行过程

跟踪SQL语句

尝试跟踪一条select语句

实验条件:

  • 隔离级别:RR;
  • autocommit=true,也就是说每一个sql语句都会被当作一个事务来对待;
  • SQL query:select from trx;
    `[trx0trx.cc]trx_commit_for_mysql(trx_t
    trx)-->[trx0trx.cc]trx_commit(trx_t* trx))//函数中判断了事务是否修改了redo日志,本次select没有修改。-->[trx0trx.cc]trx_commit_low(trx, mtr)`>//由于是select,mtr参数为NULL。–>

    如果是一个执行了UPDATE的事务,mtr不再是NULL,此时会执行 mtr_commit(mtr) ,这个函数的注释解释如下:
    mtr_commit函数提交了mini­transaction,使得事务用当前LSN提交到文件系统中。事务的“持久化”是通过实际写入磁盘来保证的,但逻辑上的持久化操作就是从此函数开始的。

1
trx_commmit_in_memory()

跟踪事务流程

从下面的函数开始跟踪

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/*********************************************************************/
/**初始化同步变量,内存系统和线程*/
void
srv_general_init(void)
/*==================*/
{
sync_check_init();
/* Reset the system variables in the recovery module. */
recv_sys_var_init();
os_thread_init();
trx_pool_init();
que_init();
row_mysql_init();
}

trx_pool_init函数阅读

下面接着看一下其中的trx_pool_init(),其创建了trx_t的pool,trx_t就是我们上面说的描述一个事务的句柄。

1
2
3
4
5
6
7
8
9
10
11
/** Use explicit mutexes for the trx_t pool and its manager. */
typedef Pool<trx_t, TrxFactory, TrxPoolLock> trx_pool_t;
typedef PoolManager<trx_pool_t, TrxPoolManagerLock > trx_pools_t;
/** The trx_t pool manager */
static trx_pools_t* trx_pools;
/** 创建一个 trx_t的pool */
void trx_pool_init()
{
trx_pools = UT_NEW_NOKEY(trx_pools_t(MAX_TRX_BLOCK_SIZE));
ut_a(trx_pools != 0);
}

trx_pools类型阅读

trx_pools是一个trx_pools_t类型的指针,而trx_pools_t是一个模板类,如下:其功能是用来管理pool,如果pool不够,就每次增加一个pool。来请求时(比如申请一个trx_t),他会从Pool列表中选出一个,并执行Pool‑>get()来获取一个trx_ttrx_pools_t的构造函数接收的值是创建一个新Pool的大小(4MB)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
template <typename Pool, typename LockStrategy>
struct PoolManager {
typedef Pool PoolType;
typedef typename PoolType::value_type value_type;
PoolManager(size_t size)
:
m_size(size)
{
create();
}
~PoolManager()
{
destroy();
ut_a(m_pools.empty());
}
/** Get an element from one of the pools.@return instance or NULL if pool is empty. */

value_type* get()
{
size_t index = 0;
size_t delay = 1;
value_type* ptr = NULL;
do {
m_lock_strategy.enter();
ut_ad(!m_pools.empty());
size_t n_pools = m_pools.size();
PoolType* pool = m_pools[index % n_pools];
m_lock_strategy.exit();
ptr = pool->get();
if (ptr == 0 && (index / n_pools) > 2) {
if (!add_pool(n_pools)) {
ib::error() << "Failed to allocate"
" memory for a pool of size "
<< m_size << " bytes. Will"
" wait for " << delay
<< " seconds for a thread to"
" free a resource";
/* There is nothing much we can do except crash and burn, however lets be a little optimistic and wait for a resource to be freed. */
os_thread_sleep(delay * 1000000);
if (delay < 32) {
delay <<= 1;
}
} else {
delay = 1;
}
}
++index;
} while (ptr == NULL);
return(ptr);
}
static void mem_free(value_type* ptr)
{
PoolType::mem_free(ptr);
}
private:
/** Add a new pool @param n_pools Number of pools that existed when the add pool was called. @return true on success */

bool add_pool(size_t n_pools)
{
bool added = false;
m_lock_strategy.enter();
if (n_pools < m_pools.size()) {
/* Some other thread already added a pool. */
added = true;
} else {
PoolType* pool;
ut_ad(n_pools == m_pools.size());
pool = UT_NEW_NOKEY(PoolType(m_size));
if (pool != NULL) {
ut_ad(n_pools <= m_pools.size());
m_pools.push_back(pool);
ib::info() << "Number of pools: "
<< m_pools.size();
added = true;
}
}
ut_ad(n_pools < m_pools.size() || !added);
m_lock_strategy.exit();
return(added);
}
/** Create the pool manager. */
void create()
{
ut_a(m_size > sizeof(value_type));
m_lock_strategy.create();
add_pool(0);
}
/** Release the resources. */
void destroy()
{
typename Pools::iterator it;
typename Pools::iterator end = m_pools.end();
for (it = m_pools.begin(); it != end; ++it) {
PoolType* pool = *it;
UT_DELETE(pool);
}
m_pools.clear();
m_lock_strategy.destroy();
}
private:
// Disable copying
PoolManager(const PoolManager&);
PoolManager& operator=(const PoolManager&);
typedef std::vector<PoolType*, ut_allocator<PoolType*> > Pools;
/** Size of each block */
size_t m_size;
/** Pools managed this manager */
Pools m_pools;
/** Lock strategy to use */
LockStrategy m_lock_strategy;
};

代码中经常应用下面的宏定义来分配新对象,new(std::nothrow)是什么?和普通的new有何不同?

1
#define UT_NEW_NOKEY(expr) ::new(std::nothrow) expr

普通new一个异常的类型std::bad_alloc。这个是标准适应性态。 在早期C++的舞台上,这个性态和现在的非常不同;new将返回0来指出一个失败,和malloc()非常相似。 在内存不足时,new (std::nothrow)并不抛出异常,而是将指针置NULL。 在一定的环境下,返回一个NULL指针来表示一个失败依然是一个不错的选择。C++标准委员 会意识到这个问题,所以他们决定定义一个特别的new操作符版本,这个版本返回0表示失败。

trx_pool_t是一个Pool模板类实例,Pool模板类定义如下:
总的来说,Pool模板类接收一个参数,分配一块该参数大小的内存,然后转换成trx_t的一个数组。
我们接下来首先看Pool模板类的功能,接着看第二个模板参数TrxFactory完成什么功能。

1
**typedef Pool trx_pool_t;**
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
template <typename Type, typename Factory, typename LockStrategy>
struct Pool {
typedef Type value_type;
// FIXME: Add an assertion to check alignment and offset is
// as we expect it. Also, sizeof(void*) can be 8, can we impove on this.
struct Element {
Pool* m_pool;
value_type m_type;
};
/** Constructor
@(Database)param size size of the memory block */
Pool(size_t size)
:
m_end(),
m_start(),
m_size(size),
m_last()
{
ut_a(size >= sizeof(Element));
m_lock_strategy.create();
ut_a(m_start == 0);
m_start = reinterpret_cast<Element*>(ut_zalloc_nokey(m_size));
m_last = m_start;
m_end = &m_start[m_size / sizeof(*m_start)];
/* Note: Initialise only a small subset, even though we have allocated all the memory. This is required only because PFS (MTR) results change if we instantiate too many mutexes up front. */



init(ut_min(size_t(16), size_t(m_end - m_start)));
ut_ad(m_pqueue.size() <= size_t(m_last - m_start));
}
/** Destructor */
~Pool()
{
m_lock_strategy.destroy();
for (Element* elem = m_start; elem != m_last; ++elem) {
ut_ad(elem->m_pool == this);
Factory::destroy(&elem->m_type);
}
ut_free(m_start);
m_end = m_last = m_start = 0;
m_size = 0;
}
/** Get an object from the pool.@retrun a free instance or NULL if exhausted. */

Type* get()
{
Element* elem;
m_lock_strategy.enter();
if (!m_pqueue.empty()) {
elem = m_pqueue.top();
m_pqueue.pop();
} else if (m_last < m_end) {
/* Initialise the remaining elements. */
init(m_end - m_last);
ut_ad(!m_pqueue.empty());
elem = m_pqueue.top();
m_pqueue.pop();
} else {
elem = NULL;
}
m_lock_strategy.exit();
return(elem != NULL ? &elem->m_type : 0);
}
/** Add the object to the pool.@param ptr object to free */

static void mem_free(value_type* ptr)
{
Element* elem;
byte* p = reinterpret_cast<byte*>(ptr + 1);
elem = reinterpret_cast<Element*>(p - sizeof(*elem));
elem->m_pool->put(elem);
}
protected:
// Disable copying
Pool(const Pool&);
Pool& operator=(const Pool&);
private:
/* We only need to compare on pointer address. */
typedef std::priority_queue<
Element*,
std::vector<Element*, ut_allocator<Element*> >,
std::greater<Element*> > pqueue_t;
/** Release the object to the free pool @param elem element to free */

void put(Element* elem)
{
m_lock_strategy.enter();
ut_ad(elem >= m_start && elem < m_last);
ut_ad(Factory::debug(&elem->m_type));
m_pqueue.push(elem);
m_lock_strategy.exit();
}
/** Initialise the elements.@param n_elems Number of elements to initialise */

void init(size_t n_elems)
{
ut_ad(size_t(m_end - m_last) >= n_elems);
for (size_t i = 0; i < n_elems; ++i, ++m_last) {
m_last->m_pool = this;
Factory::init(&m_last->m_type);
m_pqueue.push(m_last);
}
ut_ad(m_last <= m_end);
}
private:
/** Pointer to the last element */
Element* m_end;
/** Pointer to the first element */
Element* m_start;
/** Size of the block in bytes */
size_t m_size;
/** Upper limit of used space */
Element* m_last;
/** Priority queue ordered on the pointer addresse. */
pqueue_t m_pqueue;
/** Lock strategy to use */
LockStrategy m_lock_strategy;
};

TrxFactory是一个类,定义如下:
TrxFactory的功能是回收和创建trx_t对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/** For managing the life-cycle of the trx_t instance that we get
from the pool. */
struct TrxFactory {
/** Initializes a transaction object. It must be explicitly started with trx_start_if_not_started() before using it. The default isolation level is TRX_ISO_REPEATABLE_READ. @param trx Transaction instance to initialise */



static void init(trx_t* trx)
{
/* Explicitly call the constructor of the already allocated object. trx_t objects are allocated by ut_zalloc() in Pool::Pool() which would not call the constructors of the trx_t members. */

new(&trx->mod_tables) trx_mod_tables_t();
new(&trx->lock.rec_pool) lock_pool_t();
new(&trx->lock.table_pool) lock_pool_t();
new(&trx->lock.table_locks) lock_pool_t();
new(&trx->hit_list) hit_list_t();
trx_init(trx);
trx->state = TRX_STATE_NOT_STARTED;
trx->dict_operation_lock_mode = 0;
trx->xid = UT_NEW_NOKEY(xid_t());
trx->detailed_error = reinterpret_cast<char*>(
ut_zalloc_nokey(MAX_DETAILED_ERROR_LEN));
trx->lock.lock_heap = mem_heap_create_typed(
1024, MEM_HEAP_FOR_LOCK_HEAP);
lock_trx_lock_list_init(&trx->lock.trx_locks);
UT_LIST_INIT(
trx->trx_savepoints,
&trx_named_savept_t::trx_savepoints);
mutex_create(LATCH_ID_TRX, &trx->mutex);
mutex_create(LATCH_ID_TRX_UNDO, &trx->undo_mutex);
lock_trx_alloc_locks(trx);
}
/** Release resources held by the transaction object. @param trx the transaction for which to release resources */

static void destroy(trx_t* trx)
{
ut_ad(!trx->in_rw_trx_list);
ut_ad(!trx->in_mysql_trx_list);
ut_a(trx->lock.wait_lock == NULL);
ut_a(trx->lock.wait_thr == NULL);
ut_a(!trx->has_search_latch);
ut_a(trx->dict_operation_lock_mode == 0);
if (trx->lock.lock_heap != NULL) {
mem_heap_free(trx->lock.lock_heap);
trx->lock.lock_heap = NULL;
}
ut_a(UT_LIST_GET_LEN(trx->lock.trx_locks) == 0);
UT_DELETE(trx->xid);
ut_free(trx->detailed_error);
mutex_free(&trx->mutex);
mutex_free(&trx->undo_mutex);
trx->mod_tables.~trx_mod_tables_t();
ut_ad(trx->read_view == NULL);
if (!trx->lock.rec_pool.empty()) {
/* See lock_trx_alloc_locks() why we only free the first element. */

ut_free(trx->lock.rec_pool[0]);
}
if (!trx->lock.table_pool.empty()) {
/* See lock_trx_alloc_locks() why we only free the first element. */

ut_free(trx->lock.table_pool[0]);
}
trx->lock.rec_pool.~lock_pool_t();
trx->lock.table_pool.~lock_pool_t();
trx->lock.table_locks.~lock_pool_t();
trx->hit_list.~hit_list_t();
}
/** Enforce any invariants here, this is called before the transaction is added to the pool. @return true if all OK */


static bool debug(const trx_t* trx)
{
ut_a(trx->error_state == DB_SUCCESS);
ut_a(trx->magic_n == TRX_MAGIC_N);
ut_ad(!trx->read_only);
ut_ad(trx->state == TRX_STATE_NOT_STARTED
|| trx->state == TRX_STATE_FORCED_ROLLBACK);
ut_ad(trx->dict_operation == TRX_DICT_OP_NONE);
ut_ad(trx->mysql_thd == 0);
ut_ad(!trx->in_rw_trx_list);
ut_ad(!trx->in_mysql_trx_list);
ut_a(trx->lock.wait_thr == NULL);
ut_a(trx->lock.wait_lock == NULL);
ut_a(!trx->has_search_latch);
ut_a(trx->dict_operation_lock_mode == 0);
ut_a(UT_LIST_GET_LEN(trx->lock.trx_locks) == 0);
ut_ad(trx->autoinc_locks == NULL);
ut_ad(trx->lock.table_locks.empty());
ut_ad(!trx->abort);
ut_ad(trx->hit_list.empty());
ut_ad(trx->killed_by == 0);
return(true);
}
};

再说一下关键字new,上面的init()函数里:new(&trx‑>mod_tables) trx_mod_tables_t();是什么用法?
这种new允许在一块已经分配成功的内存上重新构造对象或对象数组。placement new不用担心内存分配失败,因为它根本不分配内存,它做的唯一一件事情就是调用对象的构造函数。定义如下:

1
2
void* operator
new(size_t,void*); void operator delete(void,void);

提示1:palcement new的主要用途就是反复使用一块较大的动态分配的内存来构造不同类型的对象或者他们的数组。
提示2:placement new构造起来的对象或其数组,要显示的调用他们的析构函数来销毁,千万不要使用delete。

1
char* p = new(nothrow) char[100]; long *q1 = new(p) long(100); int *q2 = new(p) int[100/sizeof(int)];

具体查看此文:C++中new关键字的三种用法

函数trx_pool_init()总结

1
2
3
4
5
6
7
/** Create the trx_t pool */
void
trx_pool_init()
{
trx_pools = UT_NEW_NOKEY(trx_pools_t(MAX_TRX_BLOCK_SIZE));
ut_a(trx_pools != 0);
}

函数创建了一个trx_pools的静态变量,该变量是trx_t类型的Pool的管理器,括号中传入的
MAX_TRX_BLOCK_SIZE是用来初始化创建新Pool的大小的,单位为byte,该值为4MB.

执行一个UPDATE的函数调用

语句:

1
UPDATE trx SET data=data+1 WHERE pk=1;

  • 获得表对象: open_table(thd, tables, ot_ctx);
  • 该函数首先会使用 get_table() 来获取TABLE对象
    在执行UPDATE时,首先会执行上面的函数来获取一个TABLE对象。同时该函数会调用 check_trx_exist(THD*thd)来检测用于该UPDATE的用户线程是否拥有一个 trx_t ,如果没有(用户首次登陆时),则会调用innobase_trx_allocate() 来分配一个;如果此时线程已经拥有一个 trx_t ,则会调用innobase_trx_init() 来初始化该 trx_t (重用)。
  • 接着执行 table‑>init(thd, table_list); 来初始化表对象
    该函数在执行时仍然会重复上面的 trx_t 检查,再次执行 check_trx_exist(THD* thd)

  • 接着执行 mysql_update() 函数,该函数会依次执行下面几个函数
    • 执行 mysql_lock_tables()获得表锁
      • mysql_lock_tables() 会依次调用下面的函数完成加锁:
      • get_lock_data() 获得锁的一个实例 ( MYSQL_LOCK *)

该函数在执行时仍然会重复上面的 trx_t 检查,再次执行 check_trx_exist(THD* thd)


  • 接着执行 ha_innobase::info() 来获得统计信息返回给MySQL接口层。
    该函数在执行时仍然会重复上面的 trx_t 检查,再次执行 check_trx_exist(THD* thd)

  • 接下来开始真正的行更新
    btr_cur_upd_lock_and_undo() 函数开始看,该函数的功能时检查加锁情况,并直接调用trx_undo_report_row_operation()

trx_undo_report_row_operation()
该函数记录UPDATE、INSERT和DELETE操作的undo log用以回滚和一致性读。

trx‑>id=9988
trx‑>commit_lsn=0
mtr‑>commit_lsn=0
mtr‑>m_log‑>m_first_block‑>m_used=37(日志大小)

commit()
该函数执行完成后,mtr‑>commit_lsn=253065
应该还有更新redo日志


  • 最后执行 trans_commit_stmt(THD *thd)
    该函数功能提交一个单语句事务。该函数判断传入用户线程thd关联的事务是否处于活动状态,如果是,则调用trans_commit_stmt(THD *thd) 来提交该事务,下面看一下该函数。

trans_commit_stmt(THD *thd)
该函数判断了事务的锁信息和是否有子事务后,直接调用 TC_LOG_DUMMY::commit() 该函数没有做任何事,直接调用 ha_commit_low()

ha_commit_low() 会接着调用 innobase_commit_low() , innobase_commit_low() 判断事务是否开始,开始的话调用 trx_commit_for_mysql(trx);

trx_commit_for_mysql(trx) 会接着调用 trx_commit()trx_commit() 在判断事务是否生成redo日志后执行 trx_commit_low()trx_commit_low() 做了类似的判断(先是判断是否修改了全文索引,再是判断是否产生了redo日志)之后执行 trx_commit_in_memory() 下面看下此函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
/****************************************************************/
/**Commits a transaction in memory. */
static
void
trx_commit_in_memory(
/*=================*/
trx_t* trx, /*!< in/out: transaction */
const mtr_t* mtr, /*!< in: mini-transaction of
trx_write_serialisation_history(), or NULL if
the transaction did not modify anything */
bool serialised)/*!< in: true if serialisation log was written */
{
trx->must_flush_log_later = false;
if (trx_is_autocommit_non_locking(trx)) {
ut_ad(trx->id == 0);
ut_ad(trx->read_only);
ut_a(!trx->is_recovered);
ut_ad(trx->rsegs.m_redo.rseg == NULL);
ut_ad(!trx->in_rw_trx_list);
/* Note: We are asserting without holding the lock mutex. But that is OK because this transaction is not waiting and cannot be rolled back and no new locks can (or should not) be added becuase it is flagged as a non-locking read-only transaction. */



ut_a(UT_LIST_GET_LEN(trx->lock.trx_locks) == 0);
/* This state change is not protected by any mutex, therefore there is an inherent race here around state transition during printouts. We ignore this race for the sake of efficiency. However, the trx_sys_t::mutex will protect the trx_t instance and it cannot be removed from the mysql_trx_list and freed without first acquiring the trx_sys_t::mutex. */





ut_ad(trx_state_eq(trx, TRX_STATE_ACTIVE));
if (trx->read_view != NULL) {
trx_sys->mvcc->view_close(trx->read_view, false);
}
MONITOR_INC(MONITOR_TRX_NL_RO_COMMIT);
/* AC-NL-RO transactions can't be rolled back asynchronously. */
ut_ad(!trx->abort);
ut_ad(!(trx->in_innodb
& (TRX_FORCE_ROLLBACK | TRX_FORCE_ROLLBACK_ASYNC)));
trx->state = TRX_STATE_NOT_STARTED;
} else {
if (trx->id > 0) {
/* For consistent snapshot, we need to remove current transaction from running transaction id list for mvcc before doing commit and releasing locks. */


trx_erase_lists(trx, serialised);
}
lock_trx_release_locks(trx);
/* Remove the transaction from the list of active transactions now that it no longer holds any user locks. */

ut_ad(trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY));
DEBUG_SYNC_C("after_trx_committed_in_memory");
if (trx->read_only || trx->rsegs.m_redo.rseg == NULL) {
MONITOR_INC(MONITOR_TRX_RO_COMMIT);
if (trx->read_view != NULL) {
trx_sys->mvcc->view_close(
trx->read_view, false);
}
} else {
ut_ad(trx->id > 0);
MONITOR_INC(MONITOR_TRX_RW_COMMIT);
}
}
if (trx->rsegs.m_redo.rseg != NULL) {
trx_rseg_t* rseg = trx->rsegs.m_redo.rseg;
mutex_enter(&rseg->mutex);
ut_ad(rseg->trx_ref_count > 0);
--rseg->trx_ref_count;
mutex_exit(&rseg->mutex);
}
if (mtr != NULL) {
if (trx->rsegs.m_redo.insert_undo != NULL) {
trx_undo_insert_cleanup(&trx->rsegs.m_redo, false);
}
if (trx->rsegs.m_noredo.insert_undo != NULL) {
trx_undo_insert_cleanup(&trx->rsegs.m_noredo, true);
}
/* NOTE that we could possibly make a group commit more efficient here: call os_thread_yield here to allow also other trxs to come to commit! */
/*-------------------------------------*/
/* Depending on the my.cnf options, we may now write the log buffer to the log files, making the transaction durable if the OS does not crash. We may also flush the log files to disk, making the transaction durable also at an OS crash or a power outage. The idea in InnoDB's group commit is that a group of transactions gather behind a trx doing a physical disk write transactions gather behind a trx doing a physical disk write one of those transactions does a write which commits the whole group. Note that this group commit will only bring benefit if there are > 2 users in the database. Then at least 2 users can gather behind one doing the physical log write to disk. If we are calling trx_commit() under prepare_commit_mutex, we will delay possible log write and flush to a separate function trx_commit_complete_for_mysql(), which is only called when the thread has released the mutex. This is to make the group commit algorithm to work. Otherwise, the prepare_commit mutex would serialize all commits and prevent a group of transactions from gathering. */

lsn_t lsn = mtr->commit_lsn();
if (lsn == 0) {
/* Nothing to be done. */
} else if (trx->flush_log_later) {
/* Do nothing yet */
trx->must_flush_log_later = true;
} else if (srv_flush_log_at_trx_commit == 0
|| thd_requested_durability(trx->mysql_thd)
== HA_IGNORE_DURABILITY) {
/* Do nothing */
} else {
}
trx->commit_lsn = lsn; //把mtr提交的lsn赋值给事务trx
/* Tell server some activity has happened, since the trx does changes something. Background utility threads like master thread, purge thread or page_cleaner thread might have some work to do. */

srv_active_wake_master_thread();
}
/* Free all savepoints, starting from the first. */
trx_named_savept_t* savep = UT_LIST_GET_FIRST(trx->trx_savepoints);
trx_roll_savepoints_free(trx, savep);
if (trx->fts_trx != NULL) {
trx_finalize_for_fts(trx, trx->undo_no != 0);
}
trx_mutex_enter(trx);
trx->dict_operation = TRX_DICT_OP_NONE;

/* Because we can rollback transactions asynchronously, we change the state at the last step. trx_t::abort cannot change once commit or rollback has started because we will have released the locks by the time we get here. */

if (trx->abort) {
trx->abort = false;
trx->state = TRX_STATE_FORCED_ROLLBACK;
} else {
trx->state = TRX_STATE_NOT_STARTED;
}
/* trx->in_mysql_trx_list would hold between trx_allocate_for_mysql() and trx_free_for_mysql(). It does not hold for recovered transactions or system transactions. */
assert_trx_is_free(trx);
trx_init(trx);
trx_mutex_exit(trx);
ut_a(trx->error_state == DB_SUCCESS);
}

mtr和trx联系

  • 在执行最后的 trx_commit_in_memory() 前,两者没有什么交互,各司其职:当trx需要写日志(比如undo log)时,会用一个mtr,将构造好undo log的内容传入mtr‑>m_log。最后,trx会调用 mtr_commit(&mtr) 将日志写盘.mtr_commit()是将日志写缓冲区,而不是写盘.写盘是 innobase_commit() 调用trx_commit_complete_for_mysql() 来实现.

上述动作的代码一般是:

1
2
3
4
mtr_t mtr; //构造一个mtr
mtr_start(&mtr); //开始一个mtr(仅初始化上面构造的mtr)
...构造各种日志和需要落盘的数据,例如构造undo log的trx_undo_create...
mtr_commit(&mtr); //提交该mtr
  • 当执行 trx_commit_in_memory() 时:
    函数会将最后一个mtr的mtr‑>commit_lsn赋值给trx,完成事务的提交。最后一个mtr在 trx_commit() 中构造,并在 trx_commit_low() 中提交。

####几个关于MTR的问题

  • 弄清楚mtr产生的规律?
    上面实验中提到,当事务提交时需要一个mtr来完成持久化。通过查阅资料和调试,当需要进行持久化操作时都会进行mtr的提交。通过跟踪一条update语句,发现会执行三次mtr的提交,分别是:
    1)为事务分配undo段,写入undo段时;
    2)更新索引和记录时;
    3)标记事务为提交状态时。
  • 事务和lsn的关系?
    通过跟踪调试,发现事务和lsn关系不大,唯一的联系是当事务在内存中标记提交时,系统会把事务产生的最后一个mtr提交的lsn赋给该事务,用于最后的日志刷盘。即lsn对于事务来说,只是用于标记事务提交时,应该把日志至少刷到lsn那个位置。事务id在innodb中是用来标记读写(RW)事务的只读(RO)事务的id永远是0.读写事务的id用于多版本中数据的可见性判断。
    值得一提的是,redo日志中不直接记录事务id的。redo日志只记录页的修改!!
    通过查阅资料[博客1、2],因为undo log是存在回滚段中,所以redo记录undo log时和记录普通数据页十分相似。通过将undo log持久化,当需要崩溃回滚时,先用redo复原出undo段的信息,再通过undo段记录的事务的状态和事务提交前的数据,进行事务的回滚。事务提交时,也是通过标记undo的状态来实现。
  • trx_t如何分配给线程?
    在打开表时,总会执行ha_innobase::extra()函数,该函数会调用check_trx_exists()check_trx_exists会检查传入的thd是否关联一个事务句柄(trx_t),如果没有就创建一个,如果已经关联,则进行初始化,以重用。
  • ReadView何时分配?
    前面实验提到,MVCC是在事务执行第一条SELECT语句时才分配。通过查看代码,其是在row_search_mvcc()->trx_assign_view()[row0sel.cc,5033 row]下分配的。

相关博客
MySQL·引擎特性InnoDB 事务系统
InnoDB事务回滚中redo和undo的互动

UNDO日志

row0undo.h 文件中有undo结构体 undo_node_t 的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/** Undo node structure */
struct undo_node_t{
que_common_t common; /*!< node type: QUE_NODE_UNDO */
enum undo_exec state; /*!< node execution state */
trx_t* trx; /*!< trx for which undo is done */
roll_ptr_t roll_ptr;/*!< roll pointer to undo log record */
trx_undo_rec_t* undo_rec;/*!< undo log record */
undo_no_t undo_no;/*!< undo number of the record */
ulint rec_type;/*!< undo log record type: TRX_UNDO_INSERT_REC, ... */

trx_id_t new_trx_id; /*!< trx id to restore to clustered index record */

btr_pcur_t pcur; /*!< persistent cursor used in searching the clustered index record */

dict_table_t* table; /*!< table where undo is done */
ulint cmpl_info;/*!< compiler analysis of an update */
upd_t* update; /*!< update vector for a clustered index record */

dtuple_t* ref; /*!< row reference to the next row to handle */
dtuple_t* row; /*!< a copy (also fields copied to heap) of the row to handle */

row_ext_t* ext; /*!< NULL, or prefixes of the externally stored columns of the row */

dtuple_t* undo_row;/*!< NULL, or the row after undo */
row_ext_t* undo_ext;/*!< NULL, or prefixes of the externally stored columns of undo_row */

dict_index_t* index; /*!< the next index whose record should be handled */

mem_heap_t* heap; /*!< memory heap used as auxiliary storage for row; this must be emptied after undo is tried on a row */


};

事务流程分析

实验一:包含select、update和insert的事务执行

执行下面sql语句,逐条跟踪:

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE trx(
pk INT NOT NULL,
DATA INT
) ENGINE = innodb;
SET AUTOCOMMIT = false;
-- 从此处跟踪
BEGIN;
SELECT * FROM trx where pk = 1;
UPDATE trx set data = data +1 where pk = 1;
INSERT INTO trx VALUES(6,1);
COMMIT;

BEGIN;

BEGIN; 语句是不会进入InnoDB层的,在handler层就会被挡住,innobase层不会为其分配trx_t。其作用是:
1)提交上次未显式提交的事务;2)设置线程的状态变量;

trans_begin(thd, lex‑>start_transaction_opt)

在Parse阶段, BEGIN 命令对应执行的函数为 trans_begin(thd, lex‑>start_transaction_opt) ,下面看一下其代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/*开启一个事务。另外,会隐式提交上个未完成的事务(或事务性的命令如 flush tables),并释放thd所拥有的表锁。*/
//@note Beginning a transaction implicitly commits any current
//transaction and releases existing locks.
//@param thd Current thread
//@param flags Transaction flags
//@retval FALSE Success
//@retval TRUE Failure


bool trans_begin(THD *thd, uint flags)
{
int res= FALSE;
Transaction_state_tracker *tst= NULL;
/* 本段代码设置thd的session_tracker变量,此变量功能后面阅读 { */
if (thd->variables.session_track_transaction_info > TX_TRACK_NONE)
tst= (Transaction_state_tracker *)
thd->session_tracker.get_tracker(TRANSACTION_INFO_TRACKER);
/*} */
//释放此thd拥有的表锁
thd->locked_tables_list.unlock_locked_tables(thd);
/* 检测thd是否处于`多语句事务状态`,处于该状态说明上一个事务,或事务性的命令(拥有表锁OPTION_TABLE_LOCK)没有提交,则调用ha_commit_trans()先提交该事务或命令。{ */


if (thd->in_multi_stmt_transaction_mode() ||
(thd->variables.option_bits & OPTION_TABLE_LOCK))
{
thd->variables.option_bits&= ~OPTION_TABLE_LOCK;
thd->server_status&=
~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
DBUG_PRINT("info", ("clearing SERVER_STATUS_IN_TRANS"));
res= MY_TEST(ha_commit_trans(thd, TRUE));
}
/* } */
//清除OPTION_BEGIN标识/reset unsafe_rollback_flags,
//unsafe_rollback_flag表明该事务可能修改了不支持事务的表
thd->variables.option_bits&= ~OPTION_BEGIN;
thd->get_transaction()->reset_unsafe_rollback_flags(Transaction_ctx::SESSION);
//如果res != FALSE,则说明上一个事务已经正常提交,则直接返回。
//这里直接返回是什么意思?也就是这个begin语句只是提交上个事务?后面再看
if (res)
DBUG_RETURN(TRUE);
/*释放已提交事务的transactional metadata locks。*/


thd->mdl_context.release_transactional_locks();
/* 根据传入的flags判断开启的是RO还是RW事务 *{ */
//如果以显式的READ_ONLY开启,则设置线程thd的RO属性
if (flags & MYSQL_START_TRANS_OPT_READ_ONLY)
{
thd->tx_read_only= true;
if (tst)
tst->set_read_flags(thd, TX_READ_ONLY);
}
//如果以显式的READ_WRITE开启,则设置线程thd的RW属性
else if (flags & MYSQL_START_TRANS_OPT_READ_WRITE)
{
/*Explicitly starting a RW transaction when the server is in read-only mode, is not allowed unless the user has SUPER priv.Implicitly starting a RW transaction is allowed for backward compatibility. */






if (check_readonly(thd, true))
DBUG_RETURN(true);
thd->tx_read_only= false;
/*This flags that tx_read_only was set explicitly, rather than just from the session's default.*/

if (tst)
tst->set_read_flags(thd, TX_READ_WRITE);
}
//设置thd的标志位,标识事务开始
thd->variables.option_bits|= OPTION_BEGIN;
thd->server_status|= SERVER_STATUS_IN_TRANS;
if (thd->tx_read_only)
thd->server_status|= SERVER_STATUS_IN_TRANS_READONLY;
DBUG_PRINT("info", ("setting SERVER_STATUS_IN_TRANS"));
if (tst)
tst->add_trx_state(thd, TX_EXPLICIT);
/* ha_start_consistent_snapshot() relies on OPTION_BEGIN flag set. */
/* 如果以START TRANSACTION WITH consistent snapshot 开启事务*/
if (flags & MYSQL_START_TRANS_OPT_WITH_CONS_SNAPSHOT)
{
if (tst)
tst->add_trx_state(thd, TX_WITH_SNAPSHOT);
//下面这句代码分配ReadView?
res= ha_start_consistent_snapshot(thd);
}
DBUG_RETURN(MY_TEST(res));
}

trans_commit_stmt(THD *thd)
MySQL层,每条语句执行结束时,都会执行此函数,上面的 BEGIN; 语句在执行完上述 trans_begin(thd,lex‑>start_transaction_opt)函数后,也会执行此函数。下面阅读此函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**提交单语句事务.*/
//@note Note that if the autocommit is on, then the following call
//inside InnoDB will commit or rollback the whole transaction
//(= the statement). The autocommit mechanism built into InnoDB
//is based on counting locks, but if the user has used LOCK
//TABLES then that mechanism does not know to do the commit.
//@param thd Current thread
//@retval FALSE Success
//@retval TRUE Failure

bool trans_commit_stmt(THD *thd)
{
DBUG_ENTER("trans_commit_stmt");
int res= FALSE;
/*We currently don't invoke commit/rollback at end of a sub-statement. In future, we perhaps should take a savepoint for each nested statement, and release the savepoint when statement has succeeded.*/





DBUG_ASSERT(! thd->in_sub_stmt);
/*Some code in MYSQL_BIN_LOG::commit and ha_commit_low() is not safefor attachable transactions.*/



DBUG_ASSERT(!thd->is_attachable_ro_transaction_active());
thd->get_transaction()->merge_unsafe_rollback_flags();
/* 如果是单语句事务则调用下面的if提交,因为单语句事务没有显式的commit命令,所有要在最后统一提交{*/
if (thd->get_transaction()->is_active(Transaction_ctx::STMT))
{
res= ha_commit_trans(thd, FALSE);
if (! thd->in_active_multi_stmt_transaction())
trans_reset_one_shot_chistics(thd);
}
else if (tc_log)
tc_log->commit(thd, false);
if (res == FALSE && !thd->in_active_multi_stmt_transaction())
if (thd->rpl_thd_ctx.session_gtids_ctx().
notify_after_transaction_commit(thd))
sql_print_warning("Failed to collect GTID to send in the response packet!");
/* In autocommit=1 mode the transaction should be marked as complete in P_S */
DBUG_ASSERT(thd->in_active_multi_stmt_transaction() ||
thd->m_transaction_psi == NULL);
thd->get_transaction()->reset(Transaction_ctx::STMT);
DBUG_RETURN(MY_TEST(res));
}

innobase_commit()
innobase_commit()函数是MySQL层向innodb层提交事务的入口函数。根据传入的参数 bool commit_trx决定是提交该事务,还是进行语句结束标记(记录trx_t‑>last_sql_stat_start,一个undo_no),具体逻辑见以下代码。

调用本函数的情况如下:

  • 当为单语句事务状态时(autocommit = true),在SQL_COM switch的finish默认执行代码trans_commit_stmt(THD* thd)中会调用(间接调用,依次调用:ha_commit_trans()‑>commit()‑>ha_commit_low())下面的函数。
  • 当为多语句事务状态时,由 case SQLCOM_COMMIT; 下的trans_commit(THD *thd)调用
    ha_commit_trans()执行下面函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
/*****************************************************************/
/**Commits a transaction in an InnoDB database or marks an SQL statement ended.*/
//@return 0 or deadlock error if the transaction was aborted by another
//higher priority transaction.
static
int
innobase_commit(
/*============*/
handlerton* hton, /*!< in: InnoDB handlerton */
THD* thd, /*!< in: MySQL thread handle of the
user for whom the transaction should
be committed */
bool commit_trx) /*!< in: true - commit transaction
false - the current SQL statement
ended */
{
DBUG_ENTER("innobase_commit");
DBUG_ASSERT(hton == innodb_hton_ptr);
DBUG_PRINT("trans", ("ending transaction"));
trx_t* trx = check_trx_exists(thd);
TrxInInnoDB trx_in_innodb(trx);
if (trx_in_innodb.is_aborted()) {
innobase_rollback(hton, thd, commit_trx);
DBUG_RETURN(convert_error_code_to_mysql(
DB_FORCED_ABORT, 0, thd));
}
ut_ad(trx->dict_operation_lock_mode == 0);
ut_ad(trx->dict_operation == TRX_DICT_OP_NONE);
/* Transaction is deregistered only in a commit or a rollback. If it is deregistered we know there cannot be resources to be freed and we could return immediately. For the time being, we play safe and do the cleanup though there should be nothing to clean up. */

if (!trx_is_registered_for_2pc(trx) && trx_is_started(trx)) {
sql_print_error("Transaction not registered for MySQL 2PC,"
" but transaction is active");
}
bool read_only = trx->read_only || trx->id == 0;
/**下面这个段代码实现了单语句事务和多语句事务(BEGIN...COMMIT)之间提交的区别。大致流程如下:1)if判断是否提交(commit_trx针对多语句事务)或是否autocommit = 1如果有一个满足,则进入提交流程。2)否则,则用trx_mark_sql_stat_end(trx)标记本条语句结束。{*/






if (commit_trx
|| (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) {
/* We were instructed to commit the whole transaction, or this is an SQL statement end and autocommit is on */

/* We need current binlog position for mysqlbackup to work. */
if (!read_only) {
while (innobase_commit_concurrency > 0) {
mysql_mutex_lock(&commit_cond_m);
++commit_threads;
if (commit_threads
<= innobase_commit_concurrency) {
mysql_mutex_unlock(&commit_cond_m);
break;
}
--commit_threads;
mysql_cond_wait(&commit_cond, &commit_cond_m);
TrxInInnoDB trx_in_innodb(trx);
if (trx_in_innodb.is_aborted()) {
innobase_rollback(hton, thd, commit_trx);
DBUG_RETURN(convert_error_code_to_mysql(
DB_FORCED_ABORT, 0, thd));
}
ut_ad(trx->dict_operation_lock_mode == 0);
ut_ad(trx->dict_operation == TRX_DICT_OP_NONE);
/* Transaction is deregistered only in a commit or a rollback. If it is deregistered we know there cannot be resources to be freed and we could return immediately. For the time being, we play safe and do the cleanup though there should be nothing to clean up. */

if (!trx_is_registered_for_2pc(trx) && trx_is_started(trx)) {
sql_print_error("Transaction not registered for MySQL 2PC,"
" but transaction is active");
}
bool read_only = trx->read_only || trx->id == 0;
/**下面这个段代码实现了单语句事务和多语句事务(BEGIN...COMMIT)之间提交的区别。大致流程如下:1)if判断是否提交(commit_trx针对多语句事务)或是否autocommit = 1如果有一个满足,则进入提交流程。2)否则,则用trx_mark_sql_stat_end(trx)标记本条语句结束。{*/






if (commit_trx
|| (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) {
/* We were instructed to commit the whole transaction, or this is an SQL statement end and autocommit is on */
/* We need current binlog position for mysqlbackup to work. */
if (!read_only) {
while (innobase_commit_concurrency > 0) {
mysql_mutex_lock(&commit_cond_m);
++commit_threads;
if (commit_threads
<= innobase_commit_concurrency) {
mysql_mutex_unlock(&commit_cond_m);
break;
}
--commit_threads;
mysql_cond_wait(&commit_cond, &commit_cond_m);
if (!read_only) {
lock_unlock_table_autoinc(trx);
}
/* Store the current undo_no of the transaction so that we know where to roll back if we have to roll back the next SQL statement */

trx_mark_sql_stat_end(trx);
}
/* }*/
/* Reset the number AUTO-INC rows required */
trx->n_autoinc_rows = 0;
/* This is a statement level variable. */
trx->fts_next_doc_id = 0;
innobase_srv_conc_force_exit_innodb(trx);
DBUG_RETURN(0);
}

SELECT * FROM trx WHERE pk = 1;

本条SELECT语句的大致执行流程如下:

  1. 打开表,打开表是先在Server层进行的,再调用handle层的接口打开innobase里的表;
  2. 外,由于是第一次开始事务,innobase会为该会话对应的thd分配一个trx_t结构体;
  3. 接着server层会调用handle层为要查询的表上锁;
  4. 接着在调用 row_search_mvcc() 函数时会调用
1
2
 trx_start_if_not_started_low(trx_t trx,bool
read_write)

来启动事务,该函数会做两件事:
‑ a.如果事务处于 TRX_STATE_NOT_STARTED ,则会启动该事务,
‑ b.如果该事务已经处于 TRX_STATE_ACTIVE 状态,则根据 bool read_write 的值设置事务的读写状态;

  1. 接着调用 trx_assign_read_view() 为trx_t分配一个ReadView,用来做MVCC;
  2. 最后会关闭线程打开的表

UPDATE trx set data = data +1 where pk = 1;

本条UPDATE语句的大致执行流程如下:

  1. 首先会尝试执行单表更新函数 try_single_table_update() ,打开表,步骤和上面SELECT的第1步类似,不过由于上个SELECT语句已经打开了trx表,这次打开会尝试从Table_cache里面查找。
  2. 接着上表锁;
  3. 接着会调用 row_search_mvcc() 函数进行读操作,这时会对表加LOCK_IX,在 lock_table() 中,检测到要加LOCK_I后,会将当前read_only事务转换成read_write事务,调用的函数为 trx_set_rw_mode(trx); ,下面会阅读此函数。
  4. 将事务提升为RW事务后开始更新聚集索引记录,此时会调用 trx_undo_report_row_operation() 函数来分配undo页,写入对行的更改。
  5. 接着进行数据行的更新;
  6. 更新完毕后,写redo,释放表锁,关闭线程关联的表。

    trx_set_rw_mode(trx);

此函数将RO事务提升为RW事务,具体完成的工作有:

  1. 分配undo段;
  2. 获取事务号;
  3. 加入全局的读写事务集合
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/*************************************************************/
/**将RO事务提升为RW事务。RO事务一般不分配事务号,除非该RO事务需要写入临时表,这时会分配事务号和回滚段,但是该事务不能加入全局的RW事务链表,因为临时表的更新对其他事务不可见!*/



void
trx_set_rw_mode(
/*============*/
trx_t* trx) /*!< in/out: transaction that is RW */
{
ut_ad(trx->rsegs.m_redo.rseg == 0);
ut_ad(!trx->in_rw_trx_list);
ut_ad(!trx_is_autocommit_non_locking(trx));
if (srv_force_recovery >= SRV_FORCE_NO_TRX_UNDO) {
return;
}
/* Function is promoting existing trx from ro mode to rw mode.In this process it has acquired trx_sys->mutex as it plan to move trx from ro list to rw list. If in future, some other threadlooks at this trx object while it is being promoted then ensure that both threads are synced by acquring trx->mutex to avoid decision based on in-consistent view formed during promotion. */

//分配undo段
trx->rsegs.m_redo.rseg = trx_assign_rseg_low(
srv_rollback_segments,
srv_undo_tablespaces,
TRX_RSEG_TYPE_REDO);
ut_ad(trx->rsegs.m_redo.rseg != 0);
/*获取trx_sys->mutex锁,以取得事务号+加入读写事务集合+修改ReadView {*/
mutex_enter(&trx_sys->mutex);
ut_ad(trx->id == 0);
trx->id = trx_sys_get_new_trx_id(); //获取事务号
trx_sys->rw_trx_ids.push_back(trx->id); //将改事务加入trx_sys的读写事务集合
trx_sys->rw_trx_set.insert(TrxTrack(trx->id, trx));
/* 在已拥有的ReadView中加入本事务的事务号,以能看到自己的改动 */
if (MVCC::is_view_active(trx->read_view)) {
MVCC::set_view_creator_trx_id(trx->read_view, trx->id);
}
#ifdef UNIV_DEBUG
if (trx->id > trx_sys->rw_max_trx_id) {
trx_sys->rw_max_trx_id = trx->id;
}
#endif /* UNIV_DEBUG */
if (!trx->read_only) {
UT_LIST_ADD_FIRST(trx_sys->rw_trx_list, trx);
ut_d(trx->in_rw_trx_list = true);
}
mutex_exit(&trx_sys->mutex);
/* }*/
}

trx_undo_assign_undo()

本函数为事务分配undo log页。执行逻辑如下:

  1. 如果为临时表修改的操作,则只分配undo log,不记录对应的redo,否则则记录redo;
  2. 接着开始分配undo log,要么重用一个,要么创建一个;
  3. 最后判断本次undo log对应的修改是否是对数据字典的操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/**********************************************************************/
/**Assigns an undo log for a transaction. A new undo log is created or a cached @return DB_SUCCESS if undo log assign successful, possible error codes are: DB_TOO_MANY_CONCURRENT_TRXS DB_OUT_OF_FILE_SPACE DB_READ_ONLY DB_OUT_OF_MEMORY */





dberr_t
trx_undo_assign_undo(
/*=================*/
trx_t* trx, /*!< in: transaction */
trx_undo_ptr_t* undo_ptr, /*!< in: assign undo log from
referred rollback segment. */
ulint type) /*!< in: TRX_UNDO_INSERT or
TRX_UNDO_UPDATE */
{
trx_rseg_t* rseg;
trx_undo_t* undo;
mtr_t mtr;
dberr_t err = DB_SUCCESS;
ut_ad(trx);
/* RO事务的trx->rsegs.m_redo.rseg 可能为NULL,但可能因为需要修改临时表而申请undo.但如果该trx没有被分配回滚段,却要申请undo页,则表明出现错误,此断言判断上面的情况。*/


ut_ad(trx_is_rseg_assigned(trx));
rseg = undo_ptr->rseg;
ut_ad(mutex_own(&(trx->undo_mutex)));
/*分配undo页同样需要写入redo日志,所以此处开启mtr事务 {*/
mtr_start(&mtr);
/*如果传入的undo段为m_noredo,表明是对临时表的修改,
则设置mtr为MTR_LOG_NO_REDO,不写入redo log */
if (&trx->rsegs.m_noredo == undo_ptr) {
mtr.set_log_mode(MTR_LOG_NO_REDO);;
} else {
ut_ad(&trx->rsegs.m_redo == undo_ptr);
}
/*如果该事务的回滚段为临时回滚段,则表明是临时表的修改,同样设置mtr为MTR_LOG_NO_REDO,不写入redo log*/


if (trx_sys_is_noredo_rseg_slot(rseg->id)) {
mtr.set_log_mode(MTR_LOG_NO_REDO);;
ut_ad(undo_ptr == &trx->rsegs.m_noredo);
} else {
ut_ad(undo_ptr == &trx->rsegs.m_redo);
}
mutex_enter(&rseg->mutex);
/* 下面开始分配undo log {*/
//先看是否有可重用的undo log页
undo = trx_undo_reuse_cached(trx, rseg, type, trx->id, trx->xid,
&mtr);
//如果没有可重用的,则开始分配undo log页
if (undo == NULL) {
err = trx_undo_create(trx, rseg, type, trx->id, trx->xid,
&undo, &mtr);
if (err != DB_SUCCESS) {
goto func_exit;
}
}
if (type == TRX_UNDO_INSERT) {
UT_LIST_ADD_FIRST(rseg->insert_undo_list, undo);
ut_ad(undo_ptr->insert_undo == NULL);
undo_ptr->insert_undo = undo;
} else {
UT_LIST_ADD_FIRST(rseg->update_undo_list, undo);
ut_ad(undo_ptr->update_undo == NULL);
undo_ptr->update_undo = undo;
}
/*如果本次修改是对数据字典的修改,则要在undo log头部做标记!!! */
if (trx_get_dict_operation(trx) != TRX_DICT_OP_NONE) {
trx_undo_mark_as_dict_operation(trx, undo, &mtr);
}
func_exit:
mutex_exit(&(rseg->mutex));
mtr_commit(&mtr);
return(err);
}

INSERT INTO trx VALUES(6,1);

本条INSERT语句的大致执行流程如下:

  1. 打开需要的表;
  2. 上表锁;
  3. 调用 row_insert_for_mysql((byte*) record, m_prebuilt) 进行实际数据的更新;
  4. 上面的函数会首先调用 dict_sys_get_new_row_id() 来获取要插入行的行号;
  5. 接着执行插入操作,同样调用 trx_undo_report_row_operation() 函数来将修改写入undo log;
  6. 接着进行实际的插入操作;
  7. 接着关闭线程锁拥有的表。

    COMMIT;

    执行COMMIT会直接调用 trans_commit(thd)#1 函数来提交事务,使其持久化。COMMIT执行的大致流程如下:
  8. 调用 trans_commit(thd) 来设置一些状态变量;
  9. 接着调用 ha_commit_trans() ,该函数除了进行单语句/多语句事务判断和2PC事务判断外,根据传入参数(all = true)直接调用 tc_log‑>commit(thd, all) 进行事务的提交。
  10. tc_log‑>commit(thd, all) 调用innnobase_commit()来进行最后innodb层的提交操作。
    另外,innobase层提交时,最重要的是设置该事务所对应undo页的状态,该操作是在trx_write_serialisation_history(trx, mtr); 函数完成的,同时该函数还会对insert undo log进行truncate,update undo log放入purge队列,下面看下其内容(#2)

    trans_commit(THD* thd)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**注意:此函数在handle层.提交线程所关联的事务,使其持久化。*/


//@param thd Current thread
//@retval FALSE Success
//@retval TRUE Failure

bool trans_commit(THD *thd)
{
int res;
if (trans_check_state(thd))
DBUG_RETURN(TRUE);
//首先清除thd的服务器状态变量
thd->server_status&=
~(SERVER_STATUS_IN_TRANS | SERVER_STATUS_IN_TRANS_READONLY);
/* 调用ha_commit_trans()来提交该事务 */
res= ha_commit_trans(thd, TRUE);
if (res == FALSE)
if (thd->rpl_thd_ctx.session_gtids_ctx().
notify_after_transaction_commit(thd))
sql_print_warning("Failed to collect GTID to send in the response packet!");
/*When gtid mode is enabled, a transaction may cause binlog rotation, which inserts a record into the gtid system table (which is probably a transactional table). Thence, the flag SERVER_STATUS_IN_TRANS may be set again while calling ha_commit_trans(...) Consequently, we need to reset it back,much like we are doing before calling ha_commit_trans(...).We would really only need to do this when gtid_mode=on. However,checking gtid_mode requires holding a lock, which is costly. So we clear the bit unconditionally. This has no side effects since if gtid_mode=off the bit is already cleared.*/











/* 清除SERVER_STATUS_IN_TRANS和OPTION_BEGIN标识 */

thd->server_status&= ~SERVER_STATUS_IN_TRANS;
thd->variables.option_bits&= ~OPTION_BEGIN;
thd->get_transaction()->reset_unsafe_rollback_flags(Transaction_ctx::SESSION);
thd->lex->start_transaction_opt= 0;
/* The transaction should be marked as complete in P_S. */
DBUG_ASSERT(thd->m_transaction_psi == NULL);
thd->tx_priority= 0;
trans_track_end_trx(thd);
DBUG_RETURN(MY_TEST(res));
}

#2

trx_write_serialisation_history(trx, mtr);

//需要再看下此函数完成以下功能:

  1. 如果事务有UPDATE操作,则获取事务的序列号(trx_no 而不是trx_id);
  2. 如果仅有INSERT操作,则不会获取序列号;
  3. trx_serialisation_number_get() [row 71]函数实际进行序列号的生成,通过调用trx_sys_get_new_trx_id() 来获得;除了获取序列号,此函数还会将事务的update undo页加入到purge_sys‑>purge_queue队列中供purge线程清理。
  4. 最后设置回滚段的状态(TRX_UNDO_CACHED、TRX_UNDO_TO_FREE、TRX_UNDO_TO_PURGE

注意:是先放入purge队列再设置回滚段的状态!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
/****************************************************************/
/**Assign the transaction its history serialisation number and write the update UNDO log record to the assigned rollback segment.@return true if a serialisation log was written */



bool
trx_write_serialisation_history(
/*============================*/
trx_t* trx, /*!< in/out: transaction */
mtr_t* mtr) /*!< in/out: mini-transaction */
{
/*将trx对应的undo log segment从TRX_UNDO_ACTIVE修改为其他状态。将上述这些修改写入文件后,标识着事务最终在文件层提交和持久化,其提交的逻辑时间就是下面获得的lsn号。*/



/* 因为修改后的undo log headers需要按UNDO trx number来有序放入history list 以便purge,所以下面需要获取rseg mutex。 */

bool own_redo_rseg_mutex = false;
bool own_noredo_rseg_mutex = false;
/* Get rollback segment mutex. */
if (trx->rsegs.m_redo.rseg != NULL && trx_is_redo_rseg_updated(trx)) {
mutex_enter(&trx->rsegs.m_redo.rseg->mutex);
own_redo_rseg_mutex = true;
}
mtr_t temp_mtr;
if (trx->rsegs.m_noredo.rseg != NULL
&& trx_is_noredo_rseg_updated(trx)) {
mutex_enter(&trx->rsegs.m_noredo.rseg->mutex);
own_noredo_rseg_mutex = true;
mtr_start(&temp_mtr);
temp_mtr.set_log_mode(MTR_LOG_NO_REDO);
}
/* If transaction involves insert then truncate undo logs. */
if (trx->rsegs.m_redo.insert_undo != NULL) {
trx_undo_set_state_at_finish(
trx->rsegs.m_redo.insert_undo, mtr);
}
if (trx->rsegs.m_noredo.insert_undo != NULL) {
trx_undo_set_state_at_finish(
trx->rsegs.m_noredo.insert_undo, &temp_mtr);
}
bool serialised = false;
/* If transaction involves update then add rollback segments to purge queue. */

if (trx->rsegs.m_redo.update_undo != NULL
|| trx->rsegs.m_noredo.update_undo != NULL) {
/* Assign the transaction serialisation number and add these rollback segments to purge trx-no sorted priority queue if this is the first UNDO log being written to assigned rollback segments. */



trx_undo_ptr_t* redo_rseg_undo_ptr =
trx->rsegs.m_redo.update_undo != NULL
? &trx->rsegs.m_redo : NULL;
trx_undo_ptr_t* noredo_rseg_undo_ptr =
trx->rsegs.m_noredo.update_undo != NULL
? &trx->rsegs.m_noredo : NULL;
/* Will set trx->no and will add rseg to purge queue. */

serialised = trx_serialisation_number_get(
trx, redo_rseg_undo_ptr, noredo_rseg_undo_ptr);

/* It is not necessary to obtain trx->undo_mutex here because only a single OS thread is allowed to do the transaction commit for this transaction. */
if (trx->rsegs.m_redo.update_undo != NULL) {
page_t* undo_hdr_page;
undo_hdr_page = trx_undo_set_state_at_finish(
trx->rsegs.m_redo.update_undo, mtr);
/* Delay update of rseg_history_len if we plan to add non-redo update_undo too. This is to avoid immediate invocation of purge as we need to club these 2 segments with same trx-no as single unit. */



bool update_rseg_len =
!(trx->rsegs.m_noredo.update_undo != NULL);
trx_undo_update_cleanup(
trx, &trx->rsegs.m_redo, undo_hdr_page,
update_rseg_len, (update_rseg_len ? 1 : 0),
mtr);
}
DBUG_EXECUTE_IF("ib_trx_crash_during_commit", DBUG_SUICIDE(););
if (trx->rsegs.m_noredo.update_undo != NULL) {
page_t* undo_hdr_page;
undo_hdr_page = trx_undo_set_state_at_finish(
trx->rsegs.m_noredo.update_undo, &temp_mtr);
ulint n_added_logs =
(redo_rseg_undo_ptr != NULL) ? 2 : 1;
trx_undo_update_cleanup(
trx, &trx->rsegs.m_noredo, undo_hdr_page,
true, n_added_logs, &temp_mtr);
}
}
if (own_redo_rseg_mutex) {
mutex_exit(&trx->rsegs.m_redo.rseg->mutex);
own_redo_rseg_mutex = false;
}
if (own_noredo_rseg_mutex) {
mutex_exit(&trx->rsegs.m_noredo.rseg->mutex);
own_noredo_rseg_mutex = false;
mtr_commit(&temp_mtr);
}
MONITOR_INC(MONITOR_TRX_COMMIT_UNDO);
/* Update the latest MySQL binlog name and offset info in trx sys header if MySQL binlogging is on or the database server is a MySQL replication slave */


if (trx->mysql_log_file_name != NULL
&& trx->mysql_log_file_name[0] != '\0') {
trx_sys_update_mysql_binlog_offset(
trx->mysql_log_file_name,
trx->mysql_log_offset,
TRX_SYS_MYSQL_LOG_INFO, mtr);
trx->mysql_log_file_name = NULL;
}
return(serialised);
}

ReadView(MVCC) again

ReadView是InnoDB用来实现一致性读和多版本并发控制的方式。一个ReadView相当于一个快照,当事务开启且执行第一条SELECT的语句时(同时insert、update和delete语句不会触发ReadView的创建,或者执行:START TRANSACTION WITH CONSISTENT SNAPSHOT在一开始就分配一个ReadView),系统会将当前活动的事务(事务号)拍摄下来,形成一个ReadView,该ReadView里的事务对数据的更改对拥有此ReadView的事务来说均是不可见的。另外,purge线程也会持有一个全局的ReadView 用来purge无用的undo页。

实现上,innobase里除了有ReadView类之外还有MVCC类,MVCC类里包装了操作ReadView的方法,是一个针对ReadView的工具类。下面分别看一下这两个类,之后看一下为一个事务创建ReadView的具体过程和purge线程根据ReadView清理undo的过程。

ReadView

ReadView简单来说是一个读写事务号的集合,该集合中的事务对数据的修改对于ReadView的持有者来说不可见,进而实现事务的隔离性(Isolation)。

  1. ReadView有一个ids_t类型的成员变量m_ids,该变量是拍摄
  2. 此ReadView时系统活动读写事务号的集合;
  3. 事务号大于m_low_limit_id的事务,其对数据的修改对本ReadView来说一定不可见;
  4. 事务号小于m_up_limit_id的事务,其修改对本ReadView一定可见;
  5. 当事务号小于m_low_limit_id大于m_up_limit_id时,则搜索m_ids,如果找到该事务号,则说明拍摄此ReadView时其为活动状态,其修改对本ReadView不可见,否则说明其已经提交,对本ReadView可见。

创建ReadView的时机
在不同隔离级别下ReadView的创建时机不同,当处于RC隔离级别时,事务的每次读请求都会创建一个ReadView。当处于RR级别时事务会在第一个读请求时创建ReadView,为了RR的一致性读,此ReadView会一直存在且不会改变。另外值得一提的是,RR隔离级别下MDL语句不会触发ReadView的创建。举个例子,当事务A开启(BEGIN)后,事务B开启,对pk=1的记录做了修改后提交,这时如果A进行SELECT 会读到B修改后的值。但是,如果A在开启后立刻执行一个SELECT语句,那么B的修改对A是不可见的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
class ReadView {
/** 下面的ids_t类似于std中的vector,用以存放事务号. */
class ids_t {
typedef trx_ids_t::value_type value_type; //value_type为trx_id_t
private:
// Prevent copying
ids_t(const ids_t&);
ids_t& operator=(const ids_t&);
private:
/** Memory for the array */
value_type* m_ptr;
/** Number of active elements in the array */
ulint m_size;
/** Size of m_ptr in elements */
ulint m_reserved;
friend class ReadView;
};
public:
ReadView();
~ReadView();
/** 检查传入trx id是否大于系统当前最大事务号
@param[in] id transaction id to check
@param[in] name table name */


static void check_trx_id_sanity(
trx_id_t id,
const table_name_t& name);
/************注意重点看下此函数*****************/
bool change_visible (trx_id_t id, const table_name_t& name)
//检查传入id的事务所做的修改对该ReadView可见!
{
ut_ad(id > 0);
//先检查是否是小于m_up_limit_id或是创建此ReadView的id,如果是则返回true。
if (id < m_up_limit_id || id == m_creator_trx_id) {
return(true);
}
check_trx_id_sanity(id, name);
//检查是否大于m_low_limit_id,大于则直接返回false,不可见,如果小于则查看
//m_ids活动列表是否为空,如果为空说明此事务虽然在系统可见最小事务号之后发生
//但在此事务开始前已经不是活动的了,此时也是可以看见的。
if (id >= m_low_limit_id) {
return(false);
} else if (m_ids.empty()) {
return(true);
}
const ids_t::value_type* p = m_ids.data();
//最后如果m_ids活动事务列表不为空,则二分查找m_ids,如果找到,说明拍摄此ReadView
//时此id是活动的,直接对找到结果的取反。
return(!std::binary_search(p, p + m_ids.size(), id));
}
/**@param id transaction to check*/
//@return true if view sees transaction id

bool sees(trx_id_t id) const
{
return(id < m_up_limit_id);
}
/**将此ReadView的创建者id设为系统最大事务号,用来标记ReadView为closed状态 */


void close()
{
ut_ad(m_creator_trx_id != TRX_ID_MAX);
m_creator_trx_id = TRX_ID_MAX;
}
/**@return true if the view is closed */

bool is_closed() const
{
return(m_closed);
}
/**Write the limits to the file.*/
//@param file file to write to

void print_limits(FILE* file) const
{
fprintf(file,
"Trx read view will not see trx with"
" id >= " TRX_ID_FMT ", sees < " TRX_ID_FMT "\n",
m_low_limit_id, m_up_limit_id);
}
#ifdef UNIV_DEBUG
/**/
//@param rhs view to compare with
//@return true if this view is less than or equal rhs */
bool le(const ReadView* rhs) const
{
return(m_low_limit_no <= rhs->m_low_limit_no);
}
#endif /* UNIV_DEBUG */
private:
/**Copy the transaction ids from the source vector */

inline void copy_trx_ids(const trx_ids_t& trx_ids);

/**打开一个ReadView在此之前被序列化的事务都对本ReadView可见*/
//@param id Creator transaction id */


inline void prepare(trx_id_t id);
/**
Complete the read view creation */
inline void complete();
/**Copy state from another view. Must call copy_complete() to finish.*/

//@param other view to copy from */

inline void copy_prepare(const ReadView& other);

/**Complete the copy, insert the creator transaction id into the m_trx_ids too and adjust the m_up_limit_id *, if required */


inline void copy_complete();
/**Set the creator transaction id, existing id must be 0 */

void creator_trx_id(trx_id_t id)
{
ut_ad(m_creator_trx_id == 0);
m_creator_trx_id = id;
}
friend class MVCC;
private:
/** The read should not see any transaction with trx id >= this value. In other words, this is the "high water mark". */

trx_id_t m_low_limit_id;
/** The read should see all trx ids which are strictly
smaller (<) than this value. In other words, this is the
low water mark". */
trx_id_t m_up_limit_id;
/** trx id of creating transaction, set to TRX_ID_MAX for free views. */

trx_id_t m_creator_trx_id;
/** Set of RW transactions that was active when this snapshot was taken */

ids_t m_ids;
/** The view does not need to see the undo logs for transactionswhose transaction number is strictly smaller (<) than this value:they can be removed in purge if not needed by other views */


trx_id_t m_low_limit_no;
/** AC-NL-RO transaction view that has been "closed". */
bool m_closed;
typedef UT_LIST_NODE_T(ReadView) node_t;
/** List of read views in trx_sys */
byte pad1[64 - sizeof(node_t)];
node_t m_view_list;
};

Consistent Read

1
2
3
4
ToDo list:
row/page/space format
row_search_mvc();
struct row_prebuilt_t;

XA (2PC)and binlog in MySQL

MySQL中的XA实现分为:外部XA和内部XA;前者是指我们通常意义上的分布式事务实现;后者是指单台MySQL服务器中,Server层作为TM(事务协调者),而服务器中的多个数据库实例作为RM,而进行的一种分布式事务,也就是MySQL跨库事务;也就是一个事务涉及到同一条MySQL服务器中的两个innodb数据库(因为其它引擎不支持XA)。

MySQL 内部XA事务与binlog

以下内容摘自:MySQL binlog 组提交与 XA(两阶段提交)
XA 将事务的提交分为两个阶段,而这种实现,解决了 binlog 和 redo log的一致性问题,这就是MySQL内部XA的第三种功能。

MySQL为了兼容其它非事物引擎的复制,在server层面引入了 binlog, 它可以记录所有引擎中的修改操作,因而可以对所有的引擎使用复制功能;MySQL在4.x 的时候放弃redo的复制策略而引入binlog的复制(淘宝丁奇)。

但是引入了binlog,会导致一个问题――binlog和redo log的一致性问题:一个事务的提交必须写redo log和binlog,那么二者如何协调一致呢?事务的提交以哪一个log为标准?如何判断事务提交?事务崩溃恢复如何进行?

MySQL通过两阶段提交(内部XA的两阶段提交)很好地解决了这一问题:

第一阶段:InnoDB prepare,持有prepare_commit_mutex,并且write/sync redo log; 将回滚段设置为Prepared状态,binlog不作任何操作;

第二阶段:包含两步,1> write/sync Binlog; 2> InnoDB commit (写入COMMIT标记后释放prepare_commit_mutex);

以 binlog 的写入与否作为事务提交成功与否的标志,innodb commit标志并不是事务成功与否的标志。因为此时的事务崩溃恢复过程如下:

  1. 崩溃恢复时,扫描最后一个Binlog文件,提取其中的xid;
  2. InnoDB维持了状态为Prepare的事务链表,将这些事务的xid和Binlog中记录的xid做比较,如果在Binlog中存在,则提交,否则回滚事务。

通过这种方式,可以让InnoDB和Binlog中的事务状态保持一致。如果在写入innodb commit标志时崩溃,则恢复时,会重新对commit标志进行写入;

在prepare阶段崩溃,则会回滚,在write/sync binlog阶段崩溃,也会回滚。这种事务提交的实现是MySQL5.6之前的实现。

事务中的UNDO部分

Innobase中undo不仅用于MVCC和回滚,它还承担着崩溃恢复和标记事务状态的工作。下面分两个方面开始阅读undo部分的代码:

  1. 关注事务进行中对undo操作的内容,比如说如何基于undo做savepoint,如何进行多版本读,事务提交时如何修改所关联undo段的状态;
  2. purge线程的工作原理,因为在最后的项目中redo日志应用到从机时purge线程的工作方式必须要做出相应改变,所以需要先清楚purge线程现在的工作方式。

如何基于undo记录savepoint

先看一下trx_t::undo_no成员
从注释可以看到,undo_no是一个递增的序列号,标识该事务修改/插入行的数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/*------------------------------*/
UT_LIST_BASE_NODE_T(trx_named_savept_t)
trx_savepoints; /*!< savepoints set with SAVEPOINT ...,oldest first */


/*------------------------------*/
UndoMutex undo_mutex; // mutex保护下部分变量,last_sql_stat_start除外
undo_no_t undo_no; /*!< next undo log record number to assign; since the undo log is private for a transaction, this is a simple ascending sequence with no gaps; thus it represents the number of modified/inserted rows in a transaction */

ulint undo_rseg_space;
/*!< 最后一个undo log所在的space_id */
trx_savept_t last_sql_stat_start;
/*!< 上一条语句开始时的undo_no值,如果发生错误,则回滚到此处 */
trx_rsegs_t rsegs; /* 事务的回滚段 */
undo_no_t roll_limit; /*!< least undo number to undo during a partial rollback; 0 otherwise */

trx in handler layer

Ha_trx_info类
该类属于是MySQL层的,其注释上写的该类是用来记录事务所对应线程相关的存储引擎的信息。上面trans_commit_stmt()函数的第36行,就是根据 thd 的 Transaction_ctx 类型的成员 m_transaction‑>m_scope_info[]‑>m_ha_list 是否为null来判断该事务是否活动。如果为null则表明该事务/语句没有关联任何存储引擎的 Ha_trx_info ,则不是活动状态。

下面看一下 Ha_trx_info 类的声明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
/*Either statement transaction or normal transaction - related thread-specific storage engine data.If a storage engine participates in a statement/transaction,an instance of this class is present in thd->m_transaction.m_scope_info[STMT|SESSION].ha_list. The addition this list is made by trans_register_ha().When it's time to commit or rollback, each element of ha_list is used to access storage engine's prepare()/commit()/rollback() methods, and also to evaluate if a full two phase commit is necessary.@sa General description of transaction handling in handler.cc.*/

class Ha_trx_info
{
public:
/**Register this storage engine in the given transaction context.*/
void register_ha(Ha_trx_info *ha_info, handlerton *ht_arg)
{
DBUG_ENTER("Ha_trx_info::register_ha");
DBUG_PRINT("enter", ("ht: 0x%llx (%s)",
(ulonglong) ht_arg,
ha_legacy_type_name(ht_arg->db_type)));
DBUG_ASSERT(m_flags == 0);
DBUG_ASSERT(m_ht == NULL);
DBUG_ASSERT(m_next == NULL);
m_ht= ht_arg;
m_flags= (int) TRX_READ_ONLY; /* Assume read-only at start. */
m_next= ha_info;
DBUG_VOID_RETURN;
}
/**Clear, prepare for reuse.*/


void reset()
{
DBUG_ENTER("Ha_trx_info::reset");
m_next= NULL;
m_ht= NULL;
m_flags= 0;
DBUG_VOID_RETURN;
}
Ha_trx_info()
{
reset();
}
void set_trx_read_write()
{
DBUG_ASSERT(is_started());
m_flags|= (int) TRX_READ_WRITE;
}
bool is_trx_read_write() const
{
DBUG_ASSERT(is_started());
return m_flags & (int) TRX_READ_WRITE;
}
bool is_started() const
{
return m_ht != NULL;
}
/**Mark this transaction read-write if the argument is read-write.
*/

void coalesce_trx_with(const Ha_trx_info *stmt_trx)
{
/*Must be called only after the transaction has been started. Can be called many times, e.g. when we have many read-write statements in a transaction.*/




DBUG_ASSERT(is_started());
if (stmt_trx->is_trx_read_write())
set_trx_read_write();
}
Ha_trx_info *next() const
{
DBUG_ASSERT(is_started());
return m_next;
}
handlerton *ht() const
{
DBUG_ASSERT(is_started());
return m_ht;
}
private:
enum { TRX_READ_ONLY= 0, TRX_READ_WRITE= 1 };
/**Auxiliary, used for ha_list management*/
Ha_trx_info *m_next;

/**Although a given Ha_trx_info instance is currently always used for the same storage engine, 'ht' is not-NULL only when the corresponding storage is a part of a transaction.*/




handlerton *m_ht;
/**Transaction flags related to this engine.Not-null only if this instance is a part of transaction.May assume a combination of enum values above.*/
uchar m_flags;
};

部分常量释义

OPTION_NOT_AUTOCOMMIT

1
2
//When autocommit is off, a multi-statement transaction is implicitly started on the first
//statement after a previous transaction has been ended.

OPTION_BEGIN

1
2
Regardless of the autocommit status, a multi-statement transaction can be explicitly startedwith the statements "START TRANSACTION", "BEGIN [WORK]", "[COMMIT | ROLLBACK] AND CHAIN",
etc.

Redo log及其缓冲区代码阅读记录

Redo log buffer阅读

Redo log buffer对应的句柄为 log_t 结构体类型的变量, log_t 的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
/** Redo log buffer */
struct log_t{
char pad1[CACHE_LINE_SIZE];
/*!< 防止出现“伪共享”问题,进行Cache line填充*/
lsn_t lsn; /*!< log sequence number */
ulint buf_free; /*!< first free offset within the log
buffer in use */
#ifndef UNIV_HOTBACKUP
char pad2[CACHE_LINE_SIZE];/*!< 进行Cache line填充*/
LogSysMutex mutex; /*!< mutex protecting the log */
LogSysMutex write_mutex; /*!< 写日志文件锁+访问log_group_t成员锁 */
char pad3[CACHE_LINE_SIZE];/*!< 进行Cache line填充 */
FlushOrderMutex log_flush_order_mutex;/*!< 添加脏block时顺序访问flush list,增加此变量可以使得mtr_commit时释放log_sys->mutex,并且保证flush list的插入按lsn顺序! */

#endif /* !UNIV_HOTBACKUP */
byte* buf_ptr; /*!< 总的redo buf的首地址 */
byte* buf; /*!< 当前使用的redo buf首地址,可能是buf_ptr的前一段或后一段 */
bool first_in_use; /*!< true :第一段在用, false:第二段在用 */
ulint buf_size; /*!< 两段buf的大小 */
ulint max_buf_free; /*!< 推荐redo buf free空间的最小值,当小于此值时需要刷redo到磁盘默认值为:1/2 buf_size*/



bool check_flush_or_checkpoint;
/*!< this is set when there may be need to flush the log buffer, or preflush buffer pool pages, or make a checkpoint; this MUST be TRUE when lsn - last_checkpoint_lsn > max_checkpoint_age; this flag is peeked at by log_free_check(), which does not reserve the log mutex */


UT_LIST_BASE_NODE_T(log_group_t)
log_groups; /*!< redo 日志文件组链表 */
#ifndef UNIV_HOTBACKUP
/** The fields involved in the log buffer flush @{ */
ulint buf_next_to_write;/*!< 下一个要写盘的redo buf的位置 */
volatile bool is_extending; /*!< true:当前redo buf正在扩展 */
lsn_t write_lsn; /*!< 已经写入文件系统的lsn(可能没有flush到磁盘) */
lsn_t current_flush_lsn;/*!< 当前正在flush的lsn,可能还没写入盘!该值的目的是:设A是当前正在flush的线程,当另外一个flush请求(B)到的时候,会检查B要写入lsn是否小于此值,小于的话直接等待A返回就可以了,不用再发起flush。*/


lsn_t flushed_to_disk_lsn;
/*!< 已经刷盘的lsn */
ulint n_pending_flushes;/*!< 正在flush的数量, 受log mutex保护*/
os_event_t flush_event; /*!< this event is in the reset state when a flush is running; a thread should wait for this without owning the log mutex, but NOTE that to set this event, the thread MUST own the log mutex! */

ulint n_log_ios; /*!< number of log i/os initiated thus far */

ulint n_log_ios_old; /*!< number of log i/o's at the previous printout */

time_t last_printout_time;/*!< when log_print was last time called */

/* @} */
/** Fields involved in checkpoints @{ */
lsn_t log_group_capacity; /*!< capacity of the log group; if the checkpoint age exceeds this, it is a serious error because it is possible we will then overwrite log and spoil crash recovery */


lsn_t max_modified_age_async;
/*!< when this recommended value for lsn - buf_pool_get_oldest_modification() is exceeded, we start an asynchronous preflush of pool pages */

lsn_t max_modified_age_sync;
/*!< when this recommended value for lsn - buf_pool_get_oldest_modification() is exceeded, we start a synchronous preflush of pool pages */

lsn_t max_checkpoint_age_async;
/*!< when this checkpoint age is exceeded we start an asynchronous writing of a new checkpoint */

lsn_t max_checkpoint_age;
/*!< this is the maximum allowed value for lsn - last_checkpoint_lsn when a new query step is started */

ib_uint64_t next_checkpoint_no;
/*!< next checkpoint number */
lsn_t last_checkpoint_lsn;
/*!< latest checkpoint lsn */
lsn_t next_checkpoint_lsn;
/*!< next checkpoint lsn */
mtr_buf_t* append_on_checkpoint;
/*!< extra redo log records to write during a checkpoint, or NULL if none. The pointer is protected by log_sys->mutex, and the data must remain constant as long as this pointer is not NULL. */
ulint n_pending_checkpoint_writes;
/*!< number of currently pending
checkpoint writes */
rw_lock_t checkpoint_lock;/*!< this latch is x-locked when a
checkpoint write is running; a thread
should wait for this without owning
the log mutex */
#endif /* !UNIV_HOTBACKUP */
byte* checkpoint_buf_ptr;/* unaligned checkpoint header */
byte* checkpoint_buf; /*!< checkpoint header is read to this buffer */
/* @} */
};

写入redo buffer有关的函数

下面列出的函数仅和写入redo缓冲区有关,不涉及redo的写盘!
log_write_low()
函数伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void
log_write_low(
/*==========*/
const byte* str, /*!< in: string */
ulint str_len) /*!< in: string length */
{
log_t* log = log_sys;
ulint len;
ulint data_len;
byte* log_block;
//已经获得log_mutex锁
part_loop:
if (待写入数据长度小于当前redo block的可用空间) {
可以一次性写入,将本次写入长度直接赋值为str_len
} else {
否则,分块写入,将本次写入长度设为当前redo block的可用空间大小
}
将str对应的内存拷贝当前可写入长度的内容到redo block上
设置该redo block的LOG_BLOCK_HDR_DATA_LEN值
if (当前redo block已满) {
设置当前redo block的checkpoin值,
并重新初始化下一个block,待用。
} else {
log->lsn += len;
}
修改log_sys->buf_free,使其增加len。
如果本次循环没写完str的内容,则goto至part_loop继续写str剩余的内容。
srv_stats.log_write_requests.inc();
}

写入redo file有关的函数

注意:下面的函数是关于redo log写盘的
log_write_up_to()
log_write_up_to() [log0log.cc, row 1209] 是将redo log写盘至指定的log条目(lsn标记),该函数代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
void
log_write_up_to(
lsn_t lsn,
bool flush_to_disk)
{
byte* write_buf;
lsn_t write_lsn;
if (recv_no_ibuf_operations) {
/* 正在进行Recovery ,直接返回*/
return;
}
loop:
//加锁
log_write_mutex_enter();
//将limit_lsn设为当前已经写入的lsn值
lsn_t limit_lsn = flush_to_disk
? log_sys->flushed_to_disk_lsn
: log_sys->write_lsn;
//如果当前的limit_lsn大于等于要求写入的lsn,即传入lsn对应的redo log已经写盘,直接放锁返回。
if (limit_lsn >= lsn) {
log_write_mutex_exit();
return;
}
/* 如果limit_lsn < lsn,即确实有数据要落盘,则进行写盘操作 */
/*如果有一个flush正在进行*/
if (flush_to_disk
&& (log_sys->n_pending_flushes > 0
|| !os_event_is_set(log_sys->flush_event))) {
/* 看下当前flush任务能否将包含lsn的redo写盘 */
bool work_done = log_sys->current_flush_lsn >= lsn;
log_write_mutex_exit();
//无论能否包含都要等待当前的flush完成
os_event_wait(log_sys->flush_event);
//如果当前的flush能将包含此lsn的redo写盘,则直接返回。
if (work_done) {
return;
} else {
goto loop;
}
}
//什么时候会出现这种情况???
log_mutex_enter();
if (!flush_to_disk
&& log_sys->buf_free == log_sys->buf_next_to_write) {
/* Nothing to write and no flush to disk requested */
log_mutex_exit_all();
return;
}
//下面是最后的情况,即没有正在运行的flush。则开始对传入的lsn进行写盘
log_group_t* group;
ulint start_offset;
ulint end_offset;
ulint area_start;
ulint area_end;
ulong write_ahead_size = srv_log_write_ahead_size;
ulint pad_size;
DBUG_PRINT("ib_log", ("write " LSN_PF " to " LSN_PF,
log_sys->write_lsn,
log_sys->lsn));
if (flush_to_disk) {
log_sys->n_pending_flushes++;
log_sys->current_flush_lsn = log_sys->lsn;
MONITOR_INC(MONITOR_PENDING_LOG_FLUSH);
os_event_reset(log_sys->flush_event);
if (log_sys->buf_free == log_sys->buf_next_to_write) {
/* Nothing to write, flush only */
log_mutex_exit_all();
log_write_flush_to_disk_low();
return;
}
}
start_offset = log_sys->buf_next_to_write;
end_offset = log_sys->buf_free;
area_start = ut_calc_align_down(start_offset, OS_FILE_LOG_BLOCK_SIZE);
area_end = ut_calc_align(end_offset, OS_FILE_LOG_BLOCK_SIZE);
ut_ad(area_end - area_start > 0);
log_block_set_flush_bit(log_sys->buf + area_start, TRUE);
log_block_set_checkpoint_no(
log_sys->buf + area_end - OS_FILE_LOG_BLOCK_SIZE,
log_sys->next_checkpoint_no);
write_lsn = log_sys->lsn;
write_buf = log_sys->buf;
log_buffer_switch();
group = UT_LIST_GET_FIRST(log_sys->log_groups);
log_group_set_fields(group, log_sys->write_lsn);
log_mutex_exit();
/* Calculate pad_size if needed. */
pad_size = 0;
if (write_ahead_size > OS_FILE_LOG_BLOCK_SIZE) {
lsn_t end_offset;
ulint end_offset_in_unit;
end_offset = log_group_calc_lsn_offset(
ut_uint64_align_up(write_lsn,
OS_FILE_LOG_BLOCK_SIZE),
group);
end_offset_in_unit = (ulint) (end_offset % write_ahead_size);
if (end_offset_in_unit > 0
&& (area_end - area_start) > end_offset_in_unit) {
/* The first block in the unit was initialized after the last writing.Needs to be written padded data once. */
pad_size = write_ahead_size - end_offset_in_unit;
if (area_end + pad_size > log_sys->buf_size) {
pad_size = log_sys->buf_size - area_end;
}
::memset(write_buf + area_end, 0, pad_size);
}
}
/* Do the write to the log files */
log_group_write_buf(
group, write_buf + area_start,
area_end - area_start + pad_size,
#ifdef UNIV_DEBUG
pad_size,
#endif /* UNIV_DEBUG */
ut_uint64_align_down(log_sys->write_lsn,
OS_FILE_LOG_BLOCK_SIZE),
start_offset - area_start);
srv_stats.log_padded.add(pad_size);
log_sys->write_lsn = write_lsn;
#ifndef _WIN32
if (srv_unix_file_flush_method == SRV_UNIX_O_DSYNC) {
/* O_SYNC means the OS did not buffer the log file at all: so we have also flushed to disk what we have written */

log_sys->flushed_to_disk_lsn = log_sys->write_lsn;
}
#endif /* !_WIN32 */
log_write_mutex_exit();
if (flush_to_disk) {
log_write_flush_to_disk_low();
}
}

mini‑transaction(mtr)

一个事务会用多个mtr来提交产生的redo日志到redo log buf。具体来说,mtr_commit(mtr)时,会将mtr_t‑>m_impl‑>m_log日志的内容复制到redo log buf中。其函数调用顺序为:

1
2
3
4
5
mtr_t::commit();
mtr_t::Command();
mtr_t::Command::execute();
mtr_t::Command::prepare_write();
mtr_t::Command::finish_write();

另外,mini‑transaction只负责将redo log从自身复制到redo log buf,持久化的工作(redo刷盘)由其他程序负责(如事务提交)。下面开始阅读和mtr提交redo日志相关的代码,主要是 prepare_write()finish_write()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/** Prepare to write the mini-transaction log to the redo log buffer.@return number of bytes to write in finish_write() */

ulint
mtr_t::Command::prepare_write()
{
switch (m_impl->m_log_mode) {
case MTR_LOG_NONE选项开启
直接返回0,即不写redo;
case MTR_LOG_ALL:
break;
}
ulint len = mtr中redo日志大小->m_log.size();
ulint n_recs = m_impl->m_n_log_recs;
if (len长度超过redo buf缓冲区大小的1/2) {
log_buffer_extend((len + 1) * 2); //进行缓冲区扩展,因为在当前redo buf使用空间
//没到1/2时不会被flush,所以当本次mtr的redo
//log长度大于buf的1/2时,可能会写不下,防止
//缓冲区溢出而进行扩展。
}
fil_space_t* space = m_impl->m_user_space;
if (space != NULL && is_system_or_undo_tablespace(space->id)) {
/* Omit MLOG_FILE_NAME for predefined tablespaces. */
space = NULL;
}
log_mutex_enter();
//下面的逻辑注意看下,MLOG_FILE_NAME是什么意思?
if (fil_names_write_if_was_clean(space, m_impl->m_mtr)) {
/* This mini-transaction was the first one to modify this tablespace since the latest checkpoint, so some MLOG_FILE_NAME records were appended to m_log. */


ut_ad(m_impl->m_n_log_recs > n_recs);
mlog_catenate_ulint(
&m_impl->m_log, MLOG_MULTI_REC_END, MLOG_1BYTE);
len = m_impl->m_log.size();
} else {
/* This was not the first time of dirtying a
tablespace since the latest checkpoint. */
if (n_recs <= 1) {
ut_ad(n_recs == 1);
/* Flag the single log record as the
only record in this mini-transaction. */
*m_impl->m_log.front()->begin()
|= MLOG_SINGLE_REC_FLAG;
} else {
/* Because this mini-transaction comprises multiple log records, append MLOG_MULTI_REC_END at the end. */


mlog_catenate_ulint(
&m_impl->m_log, MLOG_MULTI_REC_END,
MLOG_1BYTE);
len++;
}
}
/* 尝试推进检查点 */
log_margin_checkpoint_age(len);
return(len);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/** Append the redo log records to the redo log buffer @param[in] len number of bytes to write */

void
mtr_t::Command::finish_write(
ulint len)
{
if (m_impl->m_log.is_small()) {
const mtr_buf_t::block_t* front = m_impl->m_log.front();
//如果当前日志小于512K则尝试用memcpy写入redo log buf
m_end_lsn = log_reserve_and_write_fast(
front->begin(), len, &m_start_lsn);
//m_end_lsn>0表示写成功。
if (m_end_lsn > 0) {
return;
}
}
/* 如果当前redo log大于一个block的大小 */
m_start_lsn = log_reserve_and_open(len);
mtr_write_log_t write_log;
//调用log_write_low()逐块写入redo log buf
m_impl->m_log.for_each_block(write_log);
m_end_lsn = log_close();
}

日志文件组 (struct log_group_t)

日志文件组是由几个大小相同的文件组成的一组文件,这几个文件用于循环写入redo log buf的内容以持久化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/** Log group consists of a number of log files, each of the same size; a log group is implemented as a space in the sense of the module fil0fil.Currently, this is only protected by log_sys->mutex. However, in the case of log_write_up_to(), we will access some members only with the protection of log_sys->write_mutex, which should affect nothing for now. */

struct log_group_t{
/** 组id,因为5.7只有一个日志文件组,所以永远是0 */
ulint id;
/** 组文件个数*/
ulint n_files;
/** format of the redo log: e.g., LOG_HEADER_FORMAT_CURRENT */
ulint format;
/** 单个文件大小(单位字节) */
lsn_t file_size
/** file space which implements the log group */;
ulint space_id;
/** corruption status */
log_group_state_t state;
/** lsn used to fix coordinates within the log group */
lsn_t lsn;
/** the byte offset of the above lsn */
lsn_t lsn_offset;
/** unaligned buffers */
byte** file_header_bufs_ptr;
/** buffers for each file header in the group */
byte** file_header_bufs;
/** used only in recovery: recovery scan succeeded up to this
lsn in this log group */
lsn_t scanned_lsn;
/** unaligned checkpoint header */
byte* checkpoint_buf_ptr;
/** buffer for writing a checkpoint header */
byte* checkpoint_buf;
/** 日志组链表,为空 */
UT_LIST_NODE_T(log_group_t) log_groups;
};

srv_master_thread()线程

master线程[srv0srv.cc,row 2339]是innodb主线程,下面阅读一下代码梳理其大致流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/*********************************************************************/
/**The master thread controlling the server. @return a dummy parameter */

extern "C"
os_thread_ret_t
DECLARE_THREAD(srv_master_thread)(
/*==============================*/
void* arg MY_ATTRIBUTE((unused)))
/*!< in: a dummy parameter required by
os_thread_create */
{
//初始化;
srv_slot_t* slot;
ulint old_activity_count = srv_get_activity_count();
ib_time_t last_print_time;
//注册当前线程到performance schema
pfs_register_thread(srv_master_thread_key);
//下面两个都是获取线程id,第一个直接用线程号,第二个将一个结构变量转成ulint
srv_main_thread_process_no = os_proc_get_number();
srv_main_thread_id = os_thread_pf(os_thread_get_curr_id());
//分配一个slot,slot相当于thread table的一个表项,thread table后面阅读
slot = srv_reserve_slot(SRV_MASTER);
//下面的循环包括master thread开始与结束,和正常工作的循环。
loop:
if (srv_force_recovery >= SRV_FORCE_NO_BACKGROUND) {
goto suspend_thread;
}
/* 下面的循环为正常工作时的循环,@{*/
while (srv_shutdown_state == SRV_SHUTDOWN_NONE) {
//线程先sleep 1s
srv_master_sleep();
MONITOR_INC(MONITOR_MASTER_THREAD_SLEEP);
//activity_count是由谁来更新??
if (当前为活动状态) {
//更新计数器,用srv_sys->activity_count来赋值
old_activity_count = srv_get_activity_count();
//进行active_task
srv_master_do_active_tasks();
} else {
//进行idle_task
srv_master_do_idle_tasks();
}
}
/* @}*/
...
suspend_thread:
...
}

srv_master_do_active_tasks()
下面分别看一下 srv_master_do_active_tasks()srv_master_do_idle_tasks() 两个函数,首先看第一个:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/*********************************************************************/
/**Perform the tasks that the master thread is supposed to do when the server is active. There are two types of tasks. The first category is of such tasks which are performed at each inovcation of this function.We assume that this function is called roughly every second when the server is active. The second category is of such tasks which are performed at some interval e.g.: purge, dict_LRU cleanup etc. */

static
void
srv_master_do_active_tasks(void)
/*============================*/
{
ib_time_t cur_time = ut_time();
uintmax_t counter_time = ut_time_us(NULL);
//首先增加此变量,记录该函数调用了多少次,srv_main_active_loops为全局变量
++srv_main_active_loops;
MONITOR_INC(MONITOR_MASTER_ACTIVE_LOOPS);
/* 如果有ALTER TABLE查询,现在进行真正的alter table操作 */
srv_main_thread_op_info = "doing background drop tables";
row_drop_tables_for_mysql_in_background();
MONITOR_INC_TIME_IN_MICRO_SECS(
MONITOR_SRV_BACKGROUND_DROP_TABLE_MICROSECOND, counter_time);
if (srv_shutdown_state > 0) {
return;
}
/* 检查redo log file可重用空间是否足够 */
srv_main_thread_op_info = "checking free log space";
log_free_check();
/* 进行insert buffer的merge,后面详细阅读 */
srv_main_thread_op_info = "doing insert buffer merge";
counter_time = ut_time_us(NULL);
ibuf_merge_in_background(false);
MONITOR_INC_TIME_IN_MICRO_SECS(
MONITOR_SRV_IBUF_MERGE_MICROSECOND, counter_time);
/* 进行redo log的刷盘 */
srv_main_thread_op_info = "flushing log";
srv_sync_log_buffer_in_background();
MONITOR_INC_TIME_IN_MICRO_SECS(
MONITOR_SRV_LOG_FLUSH_MICROSECOND, counter_time);
/* 上面的函数每次执行时都会调用,下面的根据cur_time的时间决定是否执行*/
//下面给两个if 不知道什么意思
if (srv_shutdown_state > 0) {
return;
}
if (srv_shutdown_state > 0) {
return;
}
//每47s执行一次,执行一次table cache的清理
if (cur_time % SRV_MASTER_DICT_LRU_INTERVAL == 0) {
srv_main_thread_op_info = "enforcing dict cache limit";
ulint n_evicted = srv_master_evict_from_table_cache(50);
if (n_evicted != 0) {
MONITOR_INC_VALUE(
MONITOR_SRV_DICT_LRU_EVICT_COUNT, n_evicted);
}
MONITOR_INC_TIME_IN_MICRO_SECS(
MONITOR_SRV_DICT_LRU_MICROSECOND, counter_time);
}
if (srv_shutdown_state > 0) {
return;
}
// 每7s进行一次checkpoint
if (cur_time % SRV_MASTER_CHECKPOINT_INTERVAL == 0) {
srv_main_thread_op_info = "making checkpoint";
log_checkpoint(TRUE, FALSE);
MONITOR_INC_TIME_IN_MICRO_SECS(
MONITOR_SRV_CHECKPOINT_MICROSECOND, counter_time);
}
}

srv_master_do_idle_tasks()
接着看一下 srv_master_do_idle_tasks() 函数,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/*********************************************************************/
/*当master thread进入shutdown state时退出此函数;执行此函数时不会在意此时master thread是否由idle转为active状态 */

static
void
srv_master_do_idle_tasks(void)
/*==========================*/
{
uintmax_t counter_time;
++srv_main_idle_loops;
MONITOR_INC(MONITOR_MASTER_IDLE_LOOPS);
/* 进行实际的ALTER TABLE操作,和上面的active_task函数一样 */
counter_time = ut_time_us(NULL);
srv_main_thread_op_info = "doing background drop tables";
row_drop_tables_for_mysql_in_background();
MONITOR_INC_TIME_IN_MICRO_SECS(
MONITOR_SRV_BACKGROUND_DROP_TABLE_MICROSECOND,
counter_time);
if (srv_shutdown_state > 0) {
return;
}
/* 检查redo log file空间是否足够 */
srv_main_thread_op_info = "checking free log space";
log_free_check();
/* 进行insert buf的merge */
counter_time = ut_time_us(NULL);
srv_main_thread_op_info = "doing insert buffer merge";
ibuf_merge_in_background(true);
MONITOR_INC_TIME_IN_MICRO_SECS(
MONITOR_SRV_IBUF_MERGE_MICROSECOND, counter_time);
if (srv_shutdown_state > 0) {
return;
}
//执行一次table cache的清理
srv_main_thread_op_info = "enforcing dict cache limit";
ulint n_evicted = srv_master_evict_from_table_cache(100);
if (n_evicted != 0) {
MONITOR_INC_VALUE(
MONITOR_SRV_DICT_LRU_EVICT_COUNT, n_evicted);
}
MONITOR_INC_TIME_IN_MICRO_SECS(
MONITOR_SRV_DICT_LRU_MICROSECOND, counter_time);
/* redo log刷盘 */
srv_sync_log_buffer_in_background();
MONITOR_INC_TIME_IN_MICRO_SECS(
MONITOR_SRV_LOG_FLUSH_MICROSECOND, counter_time);
if (srv_shutdown_state > 0) {
return;
}
/* 做checkpoint */
srv_main_thread_op_info = "making checkpoint";
log_checkpoint(TRUE, FALSE);
MONITOR_INC_TIME_IN_MICRO_SECS(MONITOR_SRV_CHECKPOINT_MICROSECOND,
counter_time);
}

master thread在active和idle状态下工作方式差异

  • 相同点
    • master thread在两种状态下的所做的工作内容相同
  • 不同点
    • 当master thread处于active状态时,刷table cache和做checkpoint不是每次都进行的,而是有单独的时间间隔(比如checkpoint是7s一次)。而处于idle状态时则每次都要进行上述工作。

master thread中有关redo log相关的工作

master thread中的三部分内容和redo log直接相关分别是:

  • 进行redo log file空间检查: log_free_check() ;
    • 在当前redo log buf空闲空间小于1/2时,将buf中的内容刷盘至log_sys‑>lsn
    • 检查是否到checkpoint,redo log file空间是否足够,哪一个不满足做哪一个。
  • 进行redo log buf刷盘: srv_sync_log_buffer_in_background()
  • 每1s将redo log刷盘一次,注释上写的是为了避免当选项innodb_flush_logs_at_trx_commit != 1时,系统崩溃丢失不多于1s的事务信息。
  • 进行checkpoint: log_checkpoint(TRUE,FALSE)
    • 推进checkpoint。另外,在函数注释中此函数 不进行redo buf刷盘的,但在实现时可能会刷盘,而且记录checkpoint。

log_sys­>next_checkpoint_no 的一个用途是在做checkpoint时,根据其最后一位决定写入redo file header中第一个checkpoint block或第二个checkpoint block。

checkpoint写盘的细节

在进行checkpoint时,需要写两部分内容,一是redo log,存在log block中,二是更新ib_logfile0头部的Checkpoint block,下面分别看下这两部分。

  • 写redo log file头部的checkpoint block的函数有:
    • log_write_checkpoint_info(),调用下面的函数,其作用是控制是同步写还是异步写;
    • log_group_checkpoint(),进行实际的checkpoint block写盘。
      log_group_checkpoint(),此函数将checkpoint的信息写入ib_logfile0头部的Checkpoint block中,注意只是写头部的checkpoint block。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/******************************************************//**Writes the checkpoint info to a log group header. */

static
void
log_group_checkpoint(
/*=================*/
log_group_t* group) /*!< in: log group */
{
lsn_t lsn_offset;
byte* buf;
buf = group->checkpoint_buf;
memset(buf, 0, OS_FILE_LOG_BLOCK_SIZE);
//写 Checkpoint block内容:LOG_CHECKPOINT_NO、LOG_CHECKPOINT_LSN
mach_write_to_8(buf + LOG_CHECKPOINT_NO, log_sys->next_checkpoint_no);
mach_write_to_8(buf + LOG_CHECKPOINT_LSN, log_sys->next_checkpoint_lsn);
//写 Checkpoint block内容:LOG_CHECKPOINT_OFFSET、LOG_CHECKPOINT_LOG_BUF_SIZE
//LOG_CHECKPOINT_OFFSET的细节见ppt
lsn_offset = log_group_calc_lsn_offset(log_sys->next_checkpoint_lsn,
group);
mach_write_to_8(buf + LOG_CHECKPOINT_OFFSET, lsn_offset);
mach_write_to_8(buf + LOG_CHECKPOINT_LOG_BUF_SIZE, log_sys->buf_size);
//计算block的checksum
log_block_set_checksum(buf, log_block_calc_checksum_crc32(buf));
MONITOR_INC(MONITOR_PENDING_CHECKPOINT_WRITE);
log_sys->n_log_ios++;
MONITOR_INC(MONITOR_LOG_IO);
if (log_sys->n_pending_checkpoint_writes++ == 0) {
rw_lock_x_lock_gen(&log_sys->checkpoint_lock,
LOG_CHECKPOINT);
}
/* Note: 根据log_sys->next_checkpoint_no的最低位决定写哪一个checkpoint block */
/* We send as the last parameter the group machine address file write and a checkpoint field write */

fil_io(IORequestLogWrite, false,
page_id_t(group->space_id, 0),
univ_page_size,
(log_sys->next_checkpoint_no & 1)
? LOG_CHECKPOINT_2 : LOG_CHECKPOINT_1,
OS_FILE_LOG_BLOCK_SIZE,
buf, (byte*) group + 1);
ut_ad(((ulint) group & 0x1UL) == 0);
}

srv_purge_coordinator_thread()线程

purge线程[srv0srv.cc ,row 2753]用于完成最后的update和delete,包括最终数据的删除和修改,以及undo page的回收。有几个问题:

  • purge线程和main_thread如何交互?
  • purge线程和page_cleaner线程如何交互?
  • purge完成的细节.

首先看一下 srv_purge_coordinator_thread() 的代码:

buf_flush_page_cleaner_coordinator()线程

page clean线程[buf0flu.cc row 3094]用于脏页的刷盘,下面阅读其代码:

  • “伪共享”问题
    “伪共享”是由于CPU预取内存到L1缓存时,会按Cache line的size加载数据而出现的问题。例如,当同一个Cache line中存在两个线程分别需要写入的两个变量,而这两个线程分别运行于不同核心上,就会出现“伪共享”问题,降低程序效率。⸺解决办法为Cache line填充。
    剖析Disruptor:为什么会这么快?(二)神奇的缓存行填充