mysql有很多锁,今天看下读写锁是怎么实现的,5.7版本新增了sx锁,先从简单的没有sx锁的版本。
先来看下结构体,下面是精简版
struct rw_lock_t
{
volatile lint lock_word; //计数器
volatile ulint waiters; // = 1表示有线程在等锁,可能等写锁,可能等读锁
volatile bool recursive; // 是否递归锁
volatile os_thread_id_t writer_thread; // 持有写锁的线程id
os_event_t event; // 等待信号量
os_event_t wait_ex_event; // 扩展等待信号量,写锁排队的时候使用
/** The mutex protecting rw_lock_t */
mutable ib_mutex_t mutex; //锁保护,可以用原子操作代替
};
lock_word初始化是X_LOCK_DECR,每次读锁的时候,lock_word就减1,每次写锁的时候,lock_word就减X_LOCK_DECR。
下面是lock_word取值范围的说明:
lock_word == X_LOCK_DECR: Unlocked.
0 < lock_word < X_LOCK_DECR:
Read locked, no waiting writers. (X_LOCK_DECR - lock_word) is the number of readers that hold the lock.
lock_word == 0: Write locked
-X_LOCK_DECR < lock_word < 0:
Read locked, with a waiting writer. (-lock_word) is the number of readers that hold the lock.
lock_word <= -X_LOCK_DECR:
Recursively write locked. lock_word has been decremented by X_LOCK_DECR once for each lock, so the number of locks is: ((-lock_word) / X_LOCK_DECR) + 1
When lock_word <= -X_LOCK_DECR, we also know that lock_word % X_LOCK_DECR == 0: other values of lock_word are invalid.
mysql里面的s锁就是读锁, 读锁调用的都是rw_lock_s_lock_func函数,下面是函数精简实现。
先调用rw_lock_s_lock_low函数,如果lock_word > 0, 就减1, 加锁成功。
否则就调用rw_lock_s_lock_spin函数等待(这样可以保证写锁不会饿死)。先判断lock_word是否 <= 0, 如果是的话,就先让出cpu
再次尝试调用rw_lock_s_lock_low,加锁成功返回,否则就申请cell, wait等待在lock->event。
如果被唤醒再次循环刚才的流程
bool
rw_lock_lock_word_decr(
/*===================*/
rw_lock_t* lock, /*!< in/out: rw-lock */
ulint amount, /*!< in: amount to decrement */
lint threshold) /*!< in: threshold of judgement */
{
bool success = false;
mutex_enter(&(lock->mutex));
if (lock->lock_word > threshold) {
lock->lock_word -= amount;
success = true;
}
mutex_exit(&(lock->mutex));
return(success);
}
@return TRUE if success */
bool rw_lock_s_lock_low(rw_lock_t* lock)
{
if (!rw_lock_lock_word_decr(lock, 1, 0)) {
/* Locking did not succeed */
return(false);
}
return(true); /* locking succeeded */
}
void rw_lock_s_lock_func(
rw_lock_t* lock, /*!< in: pointer to rw-lock */
ulint pass, /*!< in: pass value; != 0, if the lock will
be passed to another thread to unlock */
const char* file_name,/*!< in: file name where lock requested */
ulint line) /*!< in: line where requested */
{
if (!rw_lock_s_lock_low(lock, pass, file_name, line)) {
/* Did not succeed, try spin wait */
rw_lock_s_lock_spin(lock, pass, file_name, line);
}
}
void rw_lock_s_lock_spin(
rw_lock_t* lock, /*!< in: pointer to rw-lock */
ulint pass, /*!< in: pass value; != 0, if the lock
will be passed to another thread to unlock */
const char* file_name, /*!< in: file name where lock requested */
ulint line) /*!< in: line where requested */
{
ulint i = 0; /* spin round count */
sync_array_t* sync_arr;
ulint spin_count = 0;
uint64_t count_os_wait = 0;
lock_loop:
/* Spin waiting for the writer field to become free */
os_rmb;
while (i < srv_n_spin_wait_rounds && lock->lock_word <= 0) {
if (srv_spin_wait_delay) {
ut_delay(ut_rnd_interval(0, srv_spin_wait_delay));
}
i++;
}
if (i >= srv_n_spin_wait_rounds) {
os_thread_yield();
}
++spin_count;
/* We try once again to obtain the lock */
if (rw_lock_s_lock_low(lock, pass, file_name, line)) {
return; /* Success */
} else {
if (i < srv_n_spin_wait_rounds) {
goto lock_loop;
}
++count_os_wait;
sync_cell_t* cell;
sync_arr = sync_array_get_and_reserve_cell(
lock, RW_LOCK_S, file_name, line, &cell);
/* Set waiters before checking lock_word to ensure wake-up
signal is sent. This may lead to some unnecessary signals. */
rw_lock_set_waiter_flag(lock);
if (rw_lock_s_lock_low(lock, pass, file_name, line)) {
sync_array_free_cell(sync_arr, cell);
return; /* Success */
}
sync_array_wait_event(sync_arr, cell);
i = 0;
goto lock_loop;
}
}
在来看下解锁读锁,比较简单,就是对lock_word+1,如果lock_word== 0说明已经没有其他线程持有读锁,
并且有写锁等待就唤醒写锁
void rw_lock_s_unlock_func(
rw_lock_t* lock) /*!< in/out: rw-lock */
{
/* Increment lock_word to indicate 1 less reader */
lint lock_word = rw_lock_lock_word_incr(lock, 1);
if (lock_word == 0) {
/* wait_ex waiter exists. It may not be asleep, but we signal
anyway. We do not wake other waiters, because they can't
exist without wait_ex waiter and wait_ex waiter goes first.*/
os_event_set(lock->wait_ex_event);
}
}
lint rw_lock_lock_word_incr(
rw_lock_t* lock, /*!< in/out: rw-lock */
ulint amount) /*!< in: amount of increment */
{
lint local_lock_word;
mutex_enter(&(lock->mutex));
lock->lock_word += amount;
local_lock_word = lock->lock_word;
mutex_exit(&(lock->mutex));
return(local_lock_word);
}
mysql里面的x锁就是写锁,底层调用的是rw_lock_x_lock_func函数。
先调用rw_lock_x_lock_low函数,加锁成功就返回true,否则返回false。
加锁失败的话,就申请一个cell,然后等待lock->event通知。
void rw_lock_x_lock_func(
rw_lock_t* lock, /*!< in: pointer to rw-lock */
ulint pass, /*!< in: pass value; != 0, if the lock will
be passed to another thread to unlock */
const char* file_name,/*!< in: file name where lock requested */
ulint line) /*!< in: line where requested */
{
ulint i = 0;
sync_array_t* sync_arr;
ulint spin_count = 0;
uint64_t count_os_wait = 0;
lock_loop:
if (rw_lock_x_lock_low(lock, pass, file_name, line)) {
/* Locking succeeded */
return;
} else {
/* Spin waiting for the lock_word to become free */
os_rmb;
while (i < srv_n_spin_wait_rounds
&& lock->lock_word <= 0) {
if (srv_spin_wait_delay) {
ut_delay(ut_rnd_interval(
0, srv_spin_wait_delay));
}
i++;
}
spin_count += i;
if (i >= srv_n_spin_wait_rounds) {
os_thread_yield();
} else {
goto lock_loop;
}
}
sync_cell_t* cell;
sync_arr = sync_array_get_and_reserve_cell(
lock, RW_LOCK_X, file_name, line, &cell);
/* Waiters must be set before checking lock_word, to ensure signal
is sent. This could lead to a few unnecessary wake-up signals. */
rw_lock_set_waiter_flag(lock);
if (rw_lock_x_lock_low(lock, pass, file_name, line)) {
sync_array_free_cell(sync_arr, cell);
/* Locking succeeded */
return;
}
sync_array_wait_event(sync_arr, cell);
i = 0;
goto lock_loop;
}
先看下rw_lock_x_lock_wait_func函数的实现。
如果lock_word >= threshold,直接return。
否则就申请cell,等待lock->wait_ex_event。
void
rw_lock_x_lock_wait_func(
/*=====================*/
rw_lock_t* lock, /*!< in: pointer to rw-lock */
lint threshold,/*!< in: threshold to wait for */
const char* file_name,/*!< in: file name where lock requested */
ulint line) /*!< in: line where requested */
{
ulint i = 0;
ulint n_spins = 0;
sync_array_t* sync_arr;
uint64_t count_os_wait = 0;
os_rmb;
while (lock->lock_word < threshold) {
if (srv_spin_wait_delay) {
ut_delay(ut_rnd_interval(0, srv_spin_wait_delay));
}
if (i < srv_n_spin_wait_rounds) {
i++;
os_rmb;
continue;
}
/* If there is still a reader, then go to sleep.*/
++n_spins;
sync_cell_t* cell;
sync_arr = sync_array_get_and_reserve_cell(
lock, RW_LOCK_X_WAIT, file_name, line, &cell);
i = 0;
/* Check lock_word to ensure wake-up isn't missed.*/
if (lock->lock_word < threshold) {
sync_array_wait_event(sync_arr, cell);
/* It is possible to wake when lock_word < 0.
We must pass the while-loop check to proceed.*/
} else {
sync_array_free_cell(sync_arr, cell);
break;
}
}
}
下面看具体的rw_lock_x_lock_low的实现。
首先比较lock_word,如果lock_word > 0,就减去X_LOCK_DECR,然后调用rw_lock_x_lock_wait_func函数,等待lock_word >= 0。条件满足,等待结束就表示加锁成功了。这种情况属于加写锁之前都是读锁,没有写锁
如果lock_word <= 0,说明之前已经有其他线程抢到写锁了。
如果之前加锁的线程不是本线程,那么加写锁失败,return false。
否则就是同一个线程再次加写锁,并且是递归锁的话,就减去X_LOCK_DECR。
bool rw_lock_x_lock_low(
/*===============*/
rw_lock_t* lock, /*!< in: pointer to rw-lock */
ulint pass, /*!< in: pass value; != 0, if the lock will
be passed to another thread to unlock */
const char* file_name,/*!< in: file name where lock requested */
ulint line) /*!< in: line where requested */
{
if (rw_lock_lock_word_decr(lock, X_LOCK_DECR, 0)) {
/* Decrement occurred: we are writer or next-writer. */
rw_lock_set_writer_id_and_recursion_flag(lock, !pass);
rw_lock_x_lock_wait(lock, pass, 0, file_name, line);
} else {
os_thread_id_t thread_id = os_thread_get_curr_id();
/* Decrement failed: relock or failed lock */
if (!pass && lock->recursive
&& os_thread_eq(lock->writer_thread, thread_id)) {
/* Relock */
lock->lock_word -= X_LOCK_DECR;
} else {
/* Another thread locked before us */
return(false);
}
}
return(true);
}
下面再来看下写锁的解锁实现。
首先判断如果lock_word == 0说明是第一次加的写锁,那么lock_word + X_LOCK_DECR,然后判断是否有waiters(可能是读锁或者写锁等待),如果有的话就通知一下其他waiters。
void rw_lock_x_unlock_func(
rw_lock_t* lock) /*!< in/out: rw-lock */
{
if (lock->lock_word == 0) {
/* Last caller in a possible recursive chain. */
lock->recursive = FALSE;
}
if (rw_lock_lock_word_incr(lock, X_LOCK_DECR) == X_LOCK_DECR) {
/* There is 1 x-lock */
/* atomic increment is needed, because it is last */
if (lock->waiters) {
rw_lock_reset_waiter_flag(lock);
os_event_set(lock->event);
}
}
}
mysql新加的共享排它锁,先来看下相容性矩阵。
| S|SX| X|
--+--+--+--+
S | o| o| x|
--+--+--+--+
SX| o| x| x|
--+--+--+--+
X | x| x| x|
--+--+--+--+
S锁和X锁与之前的逻辑相同,没有做变动,SX与SX和X互斥,与S共享,在加上SX锁之后,不会影响读操作,但阻塞写操作。
背景参考 内核文章
lock_word新取值范围的说明:
lock_word == X_LOCK_DECR: Unlocked.
X_LOCK_HALF_DECR < lock_word < X_LOCK_DECR:
S locked, no waiting writers.(X_LOCK_DECR - lock_word) is the number of S locks.
lock_word == X_LOCK_HALF_DECR: SX locked, no waiting writers.
0 < lock_word < X_LOCK_HALF_DECR:
SX locked AND S locked, no waiting writers.(X_LOCK_HALF_DECR - lock_word) is the number of S locks.
lock_word == 0: X locked, no waiting writers.
-X_LOCK_HALF_DECR < lock_word < 0:
S locked, with a waiting writer.(-lock_word) is the number of S locks.
lock_word == -X_LOCK_HALF_DECR: X locked and SX locked, no waiting writers.
-X_LOCK_DECR < lock_word < -X_LOCK_HALF_DECR:
S locked, with a waiting writer which has SX lock. -(lock_word + X_LOCK_HALF_DECR) is the number of S locks.
lock_word == -X_LOCK_DECR: X locked with recursive X lock (2 X locks).
-(X_LOCK_DECR + X_LOCK_HALF_DECR) < lock_word < -X_LOCK_DECR:
X locked. The number of the X locks is: 2 - (lock_word + X_LOCK_DECR)
lock_word == -(X_LOCK_DECR + X_LOCK_HALF_DECR):
X locked with recursive X lock (2 X locks) and SX locked.
lock_word < -(X_LOCK_DECR + X_LOCK_HALF_DECR):
X locked and SX locked.The number of the X locks is:2 - (lock_word + X_LOCK_DECR + X_LOCK_HALF_DECR)
####读锁
读锁的逻辑和原来没有变化,当lock_word > 0的时候可以加读写,成功以后lock_work - 1.
解锁的时候lock_work + 1, 多了一个变化是如果lock_work == -X_LOCK_HALF_DECR 唤醒wait_ex_event.
####写锁
加锁失败的时候,需要等待lock_word > X_LOCK_HALF_DECR. SX锁和X锁互斥。
rw_lock_x_lock_low判断lock_word > X_LOCK_HALF_DECR,才会等待lock_word > 0,表示加锁成功。
####SX加锁
加锁逻辑和X锁差不多,如果rw_lock_sx_lock_low加锁成功直接返回,否则就等待lock->event直到lock_word > X_LOCK_HALF_DECR.
void rw_lock_sx_lock_func(
rw_lock_t *lock, /*!< in: pointer to rw-lock */
ulint pass, /*!< in: pass value; != 0, if the lock will
be passed to another thread to unlock */
const char *file_name, /*!< in: file name where lock requested */
ulint line) /*!< in: line where requested */
{
ulint i = 0;
sync_array_t *sync_arr;
lock_loop:
if (rw_lock_sx_lock_low(lock, pass, file_name, line)) {
/* Locking succeeded */
return;
} else {
/* Spin waiting for the lock_word to become free */
os_rmb;
while (i < srv_n_spin_wait_rounds && lock->lock_word <= X_LOCK_HALF_DECR) {
if (srv_spin_wait_delay) {
ut_delay(ut_rnd_interval(0, srv_spin_wait_delay));
}
i++;
}
if (i >= srv_n_spin_wait_rounds) {
std::this_thread::yield();
} else {
goto lock_loop;
}
}
sync_cell_t *cell;
sync_arr =
sync_array_get_and_reserve_cell(lock, RW_LOCK_SX, file_name, line, &cell);
/* Waiters must be set before checking lock_word, to ensure signal
is sent. This could lead to a few unnecessary wake-up signals. */
rw_lock_set_waiter_flag(lock);
if (rw_lock_sx_lock_low(lock, pass, file_name, line)) {
sync_array_free_cell(sync_arr, cell);
/* Locking succeeded */
return;
}
++count_os_wait;
sync_array_wait_event(sync_arr, cell);
i = 0;
goto lock_loop;
}
rw_lock_sx_lock_low 先判断lock_word > X_LOCK_HALF_DECR, 如果成立就减去X_LOCK_HALF_DECR。
否则就判断之前加锁的线程是不是本线程,如果不是说明有其他线程加锁sx锁或者x锁,返回false,如果是就减去X_LOCK_HALF_DECR, 加锁成功。
bool rw_lock_sx_lock_low(
rw_lock_t *lock, /*!< in: pointer to rw-lock */
ulint pass, /*!< in: pass value; != 0, if the lock will
be passed to another thread to unlock */
const char *file_name, /*!< in: file name where lock requested */
ulint line) /*!< in: line where requested */
{
if (rw_lock_lock_word_decr(lock, X_LOCK_HALF_DECR, X_LOCK_HALF_DECR)) {
/* Decrement occurred: we are the SX lock owner. */
rw_lock_set_writer_id_and_recursion_flag(lock, !pass);
lock->sx_recursive = 1;
} else {
/* Decrement failed: It already has an X or SX lock by this
thread or another thread. If it is this thread, relock,
else fail. */
if (!pass && lock->recursive.load(std::memory_order_acquire) &&
lock->writer_thread.load(std::memory_order_relaxed) ==
std::this_thread::get_id()) {
/* This thread owns an X or SX lock */
if (lock->sx_recursive++ == 0) {
lock->lock_word -= X_LOCK_HALF_DECR;
}
} else {
/* Another thread locked before us */
return false;
}
}
return true;
}
####SX解锁 如果sx_recursive = 0表示sx锁都释放了,lock_word + X_LOCK_HALF_DECR。如果lock_word > X_LOCK_HALF_DECR 并且waiters = 1说明有写锁等待,通知一下。
static inline void rw_lock_sx_unlock_func(
rw_lock_t *lock) /*!< in/out: rw-lock */
{
--lock->sx_recursive;
if (lock->sx_recursive == 0) {
/* Last caller in a possible recursive chain. */
if (lock->lock_word > 0) {
if (rw_lock_lock_word_incr(lock, X_LOCK_HALF_DECR) <= X_LOCK_HALF_DECR) {
ut_error;
}
/* Lock is now free. May have to signal read/write
waiters. We do not need to signal wait_ex waiters,
since they cannot exist when there is an sx-lock
holder. */
if (lock->waiters) {
rw_lock_reset_waiter_flag(lock);
os_event_set(lock->event);
sync_array_object_signalled();
}
} else {
/* still has x-lock */
ut_ad(lock->lock_word == -X_LOCK_HALF_DECR ||
lock->lock_word <= -(X_LOCK_DECR + X_LOCK_HALF_DECR));
lock->lock_word += X_LOCK_HALF_DECR;
}
}
}
http://mysql.taobao.org/monthly/2020/04/02/
mysql有很多参数,innodb存储引擎也有自己独立的参数,这篇文章分析一下参数解析的流程。代码版本:8.0.13
my_long_options里面定义了一些不修改,全局,系统启动的时候初始化一次的参数。my_long_options都可以放在sys_var.cc里面
sys_vars.cc 里面定义了的参数可以动态修改,各种类型都有,可以是全局的,也可以是session级别的,这些参数都是sys_var的子类,所有的参数在构造函数里面都会加到all_sys_vars链表中。
在看实际参数之前,我们先学习一下 sys_var类的主要成员变量
class sys_var {
public:
sys_var *next; //next指针,all_sys_vars链表遍历的时候使用
LEX_CSTRING name; //参数名字
protected:
int flags; //参数标记,比如说global变量,session变量
int m_parse_flag; //PARSE_EARLY 优先解析,PARSE_NORMAL 正常解析.
my_option option; //参数min, max, default值
ptrdiff_t offset; //距离global_system_variables的offset值,实际的参数存储地址空间
on_check_function on_check; //check函数
on_update_function on_update; //update函数
};
下面看一个例子:basedir。 我们先看下这个参数的定义
static Sys_var_charptr Sys_basedir(
"basedir", //参数名字,和配置文件里面对应
"Path to installation directory. All paths are "
"usually resolved relative to this", //注释
READ_ONLY NON_PERSIST GLOBAL_VAR(mysql_home_ptr), //flag标记,offset偏移量,size
CMD_LINE(REQUIRED_ARG, 'b'), IN_FS_CHARSET, DEFAULT(0)); //参数校验,编码, 默认值
参数的flag是read_only + 非持久化 + 全局变量
innodb里面的mutex常见的实现是PolicyMutex<TTASEventMutex
struct os_event {
void set();
int64_t reset();
void wait_low();
void broadcast();
private:
bool m_set;
int64_t signal_count;
os_cond_t cond_var;
EventMutex mutex;
os_cond_t cond_var;
}
wait操作: 先调用reset函数,然后用返回的reset_sig_count作为参数,调用wait_low函数
signal操作: 调用set函数
先看PolicyMutex的主要结构
template <typename MutexImpl>
struct PolicyMutex {
private:
MutexImpl m_impl;
public:
void enter();
void exit();
void init();
}
init函数负责初始化,加锁是enter函数,解锁是exit函数,具体的实现都是通过m_impl来实现
在看TTASEventMutex的主要结构
struct TTASEventMutex {
public:
void init();
void exit();
void enter();
bool try_lock();
private:
std::atomic_bool m_lock_word;
std::atomic_bool m_waiters;
os_event_t m_event;
MutexPolicy m_policy;
}
exit函数
enter函数功能就是加锁,成功返回,否则就一直等待,具体内部实现:
调用spin_and_try_lock函数,内部实现死循环执行下面步骤:
2.1. 先尝试max_spins次对m_lock_word变量执行cas操作,如果成功就返回
2.2. 没有成功就先尝试执行yield函数,放弃cpu占用
2.3. 调用wait函数,内部实现:
2.3.1. 先调用sync_array_get_and_reserve_cell从wait_array获取一个cell,m_waiters设置为true
2.3.2. 尝试4次m_lock_word原子变量cas操作,如果成功就返回
2.3.3. 调用sync_array_wait_event等待信号量唤醒
struct sync_array_t {
ulint n_reserved; //正在使用的cell个数
ulint n_cells; //数组分配大小
sync_cell_t *cells; //数组
ulint next_free_slot; //除了free list以外,下一个可以用的cell
ulint first_free_slot; //free list链表头, 复用cell里面的line字段作为next指针
}
sync_array_init 初始化sync_wait_array 二维数组,第一维大小1,第二维大小100k。
sync_array_reserve_cell 从sync_wait_array 里面获取一个free cell,极限情况全部cell被占用就返回nullptr
sync_array_free_cell 放回cell到free list
sync_array_wait_event 等待信号量唤醒
struct GenericPolicy {
latch_id_t m_id;
/** Number of spins trying to acquire the latch. */
uint32_t m_spins;
/** Number of waits trying to acquire the latch */
uint32_t m_waits;
/** Number of times it was called */
uint32_t m_calls;
}
每次加锁都会更新里面相关字段,因此通过GenericPolicy可以看到锁竞争激烈程度
最近线上遇到2次报警,登录机器查看,发现所有线程cpu百分比,不响应任何命令,和之前的排查过的死锁 很像。
直接使用pstack命令查看堆栈信息,发现确实出现死锁。
Thread 16 (Thread 0x7fd90920e700 (LWP 20244)):
#0 0x00007fda6e33d222 in pthread_spin_lock () from /lib64/libpthread.so.0
#1 0x00000000008aa7b5 in lock (this=0x2dc4024) at /home/xiaoju/bigdata-storage/fusion.r2/src/spin_lock.h:16
#2 lock_guard (__m=..., this=<synthetic pointer>) at /opt/gcc-5.4/include/c++/5.4.0/mutex:386
#3 ReplicationController::role (this=0x2dc4000) at /home/xiaoju/bigdata-storage/fusion.r2/src/replication.cpp:939
#4 0x000000000081179f in get_self_status () at /home/xiaoju/bigdata-storage/fusion.r2/src/replication.h:666
#5 cmd_proc (req=req@entry=0x6beab6b40) at /home/xiaoju/bigdata-storage/fusion.r2/src/cmds.cpp:191
#6 0x00000000008cbe4f in work_process (work=0x3246000) at /home/xiaoju/bigdata-storage/fusion.r2/src/resp_server.cpp:1588
#7 0x0000000000ca2cc4 in event_persist_closure (ev=<optimized out>, base=0x31618c0) at event.c:1629
#8 event_process_active_single_queue (base=base@entry=0x31618c0, activeq=0x5666cd20, max_to_process=max_to_process@entry=2147483647, endtime=endtime@entry=0x0) at event.c:1688
#9 0x0000000000ca366f in event_process_active (base=0x31618c0) at event.c:1789
#10 event_base_loop (base=0x31618c0, flags=flags@entry=0) at event.c:2012
#11 0x0000000000ca38b7 in event_base_dispatch (event_base=<optimized out>) at event.c:1823
#12 0x00000000008d109a in worker_loop (data=0x3246000) at /home/xiaoju/bigdata-storage/fusion.r2/src/resp_server.cpp:1671
#13 0x0000000000c85245 in g_thread_proxy (data=0xbb68de0) at gthread.c:778
#14 0x00007fda6e338dc5 in start_thread () from /lib64/libpthread.so.0
#15 0x00007fda6d73e21d in clone () from /lib64/libc.so.6
Thread 15 (Thread 0x7fd902e0d700 (LWP 20245)):
#0 0x00007fda6d73e7f3 in epoll_wait () from /lib64/libc.so.6
#1 0x0000000000cac6f4 in epoll_dispatch (base=0x32ac000, tv=<optimized out>) at epoll.c:465
#2 0x0000000000ca3495 in event_base_loop (base=0x32ac000, flags=flags@entry=0) at event.c:1998
#3 0x0000000000ca38b7 in event_base_dispatch (event_base=<optimized out>) at event.c:1823
#4 0x00000000008d109a in worker_loop (data=0x329a000) at /home/xiaoju/bigdata-storage/fusion.r2/src/resp_server.cpp:1671
#5 0x0000000000c85245 in g_thread_proxy (data=0xbb68e30) at gthread.c:778
#6 0x00007fda6e338dc5 in start_thread () from /lib64/libpthread.so.0
#7 0x00007fda6d73e21d in clone () from /lib64/libc.so.6
Thread 14 (Thread 0x7fd8fca0c700 (LWP 20246)):
#0 0x00007fda6e33c6d5 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1 0x0000000000cad515 in evthread_posix_cond_wait (cond_=0x2a7e570, lock_=0x2a7e210, tv=<optimized out>) at evthread_pthread.c:158
#2 0x0000000000c9fde6 in event_del_nolock_ (ev=ev@entry=0x38b3680, blocking=blocking@entry=2) at event.c:2896
#3 0x0000000000ca00b2 in event_del_ (ev=0x38b3680, blocking=2) at event.c:2783
#4 0x0000000000ca012e in event_del (ev=0x38b3680) at event.c:2792
#5 event_free (ev=0x38b3680) at event.c:2233
#6 0x00000000008b0724 in ReplicationController::Promote (this=0x2dc4000) at /home/xiaoju/bigdata-storage/fusion.r2/src/replication.cpp:1039
#7 0x00000000008b4841 in slaveof_impl (ip=<optimized out>, port=<optimized out>) at /home/xiaoju/bigdata-storage/fusion.r2/src/replication.cpp:3002
#8 0x0000000000815d14 in slaveof_cmd (req=0x4a993140) at /home/xiaoju/bigdata-storage/fusion.r2/src/cmds.cpp:5871
#9 0x00000000008904a6 in FusionCommand::process (this=this@entry=0x2dc9758, req=0x4a993140) at /home/xiaoju/bigdata-storage/fusion.r2/src/fusion_command.cpp:19
#10 0x000000000081182f in cmd_proc (req=req@entry=0x4a993140) at /home/xiaoju/bigdata-storage/fusion.r2/src/cmds.cpp:251
#11 0x00000000008cbe4f in work_process (work=0x3288000) at /home/xiaoju/bigdata-storage/fusion.r2/src/resp_server.cpp:1588
#12 0x0000000000ca2cc4 in event_persist_closure (ev=<optimized out>, base=0x32ac2c0) at event.c:1629
#13 event_process_active_single_queue (base=base@entry=0x32ac2c0, activeq=0x5666ce10, max_to_process=max_to_process@entry=2147483647, endtime=endtime@entry=0x0) at event.c:1688
#14 0x0000000000ca366f in event_process_active (base=0x32ac2c0) at event.c:1789
#15 event_base_loop (base=0x32ac2c0, flags=flags@entry=0) at event.c:2012
#16 0x0000000000ca38b7 in event_base_dispatch (event_base=<optimized out>) at event.c:1823
#17 0x00000000008d109a in worker_loop (data=0x3288000) at /home/xiaoju/bigdata-storage/fusion.r2/src/resp_server.cpp:1671
#18 0x0000000000c85245 in g_thread_proxy (data=0xbb68e80) at gthread.c:778
#19 0x00007fda6e338dc5 in start_thread () from /lib64/libpthread.so.0
#20 0x00007fda6d73e21d in clone () from /lib64/libc.so.6
event_process_active_single_queue回调用户函数卡在业务代码里面的锁, 业务代码另外一个线程先加锁,然后调用了event_free,这个里面又卡在信号量通知,形成死锁。 针对有资源竞争的场景,尽量放到同一个线程执行。
pstack现场没有及时保存,这里代码文字分析一下。
rocksdb在启动一个Manual-Compact的时候需要先加全局锁,然后调用CompactRange判断对应的区间里面是否和其他compact存在冲突。如果存在冲突就等待
Status DBImpl::RunManualCompaction(
ColumnFamilyData* cfd, int input_level, int output_level,
const CompactRangeOptions& compact_range_options, const Slice* begin,
const Slice* end, bool exclusive, bool disallow_trivial_move,
uint64_t max_file_num_to_ignore) {
//只展示关键代码
InstrumentedMutexLock l(&mutex_);
while (!manual.done) {
assert(HasPendingManualCompaction());
manual_conflict = false;
Compaction* compaction = nullptr;
if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) ||
scheduled ||
(((manual.manual_end = &manual.tmp_storage1) != nullptr) &&
((compaction = manual.cfd->CompactRange(
*manual.cfd->GetLatestMutableCFOptions(), manual.input_level,
manual.output_level, compact_range_options, manual.begin,
manual.end, &manual.manual_end, &manual_conflict,
max_file_num_to_ignore)) == nullptr &&
manual_conflict))) {
// exclusive manual compactions should not see a conflict during
// CompactRange
assert(!exclusive || !manual_conflict);
// Running either this or some other manual compaction
bg_cv_.Wait();
}
}
}
我们代码里面自定义了Filter类,在过滤的时候会调用write接口,向db写入数据,写操作是需要申请加锁的。
有一个正在跑着的compact线程,在filter过滤数据的时候,会写数据,卡在全局锁上面。
新的Manual-Compact持有全局锁,但是在等待其他compact完成,又卡在信号通知上面。
修改方法就是把自定义Filter类中写操作,放到异步队列当作,不在本线程。
最近线上遇到一个业务,平均value很大,达到50K,写放大很严重。
blob_db参考了WiscKey的思想,设计的kv存储分离,可以有效的减小写放大。
LSM树里面只存储key和value的地址,这样后台线程compact的时候可以少读写很多数据。
rocksdb里面增加一种类型:kTypeBlobIndex 表示value是否blob_db的地址。
首先判断value大小是否超过配置,超过了就写blob_db,然后在把offset和文件id做为value写入lsm树。
否则就是正常的rocksdb写入。如果设置了db最大size,并且磁盘空间超过限制了,就会淘汰删除最老的blob文件。
blob文件格式:
head结构
|—-|—-|—-|-|-|——–|———|
magic version cf_id flag expiration_start expiration_end
foot结构
|—-|——–|——–|———|—-|
magic count expiration_start expiration_end crc
和sst文件一样,blob文件写完以后,不会被更改,只能被删除。
每个blob文件都有size限制,超过这个限制就会和wal一样,重新打开一个blob文件写入。
每个blob文件没有类似rocksdb那样的level层级。
主要的流程还是和普通的db一样,增加GetImplOptions里面is_blob_index选项。
BlobDBOptions选项里面min_blob_size控制多大的value存储在blob_db中,小于min_blob_size,还是和原来一样,存储在lsm树。
根据返回的value类型判断,如果是kTypeBlobIndex,那么就需要再从blob_db获取真正的value,可以看到比原来多了一次读。
先需要解码lsm树里面获取的value,找到对应的blob_db里面的文件和offset,然后再获取真正的数据。
每个blob文件或者会有几个对应的sst文件,或者对应几个memtable。
只有blob文件没有关联的sst文件并且blob文件的seq比flush_seq大,才满足被gc删除条件。
后台线程会周期性的删除无用的blob文件。
Flush memtable的时候会跟进value类型判断,如果valuekTypeBlobIndex,则会更新文件对应的最早的blob文件。
Flush完成以后会调用blob的回调函数,建立新的sst文件和blob文件的对应关系。
Compact完成以后也会调用blob的回调函数,老的sst文件和blob文件映射关系解除,增加新的sst文件和blob文件的映射关系。
最近遇到2个线上问题,记录一下,总结经验。
有业务反馈线上服务有时候查询不出来结果,自己用业务反馈的请求,试了几次果真会出现查询不到的问题。
先说一下我们的整体架构,最上层是vip,负责负载均衡和机器谈话,中间是proxy,底层才是实际的存储服务节点。
排查过程:
图1:
我们可以看到3次握手成功以后,客户端36972端口发送了一个请求,服务端3028端口回报了,但是回报的seq号和前面不连续。
3次握手里面3028端口发送的seq是4072672142,但是看3028端口回的数据包的seq是4022679883,明显中间有丢包了。当然有可能是tcpdump抓包的时候丢的。
图2:
看图中的红框,出现了tcp经典的200ms重传。验证了图1所说的丢包问题。图1和图2,端口对不上的原因是中间还有一层vip服务。
最后解释一下原因:
mss值会取客户端和服务器之间的最小值,mtu为1600机器之间协商出来的mss大小1540,mtu1500和mtu1600机器协商是1440。
所以2台mtu1600之间传输大包的时候会在vip机器进行分片,分片之后的包没有4层头信息,所以没办法顺利转发,出现丢包现象。
不知道这算不算vip的问题,在tcp3次握手的时候,没有把本机的mtu值传给通信双方。
某天组内同事服务的时候,发现有一台机器突然出现出core,其他机器正常,而且诡异的是出core是另外的服务,和本次升级服务,没有任务逻辑关系。
排查过程: