当多线程并发写入rocksdb,所有的写入操作会构造一个write_batch,但是内部写入wal文件还是串行操作,保证写入操作的顺序性,写memtable可以并发写入,对于同时写入的多个write_batch,rocksdb内部会合并成一个大的batch,提供写入性能。 同时外部写入的时候,可能会发生memtable写满切换以及wal文件写满切换操作,这个时候外部写入操作需要暂停等待切换操作完成(这个也是rocksdb毛刺的原因之一),这些工作都是由write_thread完成,同时还负责保证写入操作的顺序性。
初始是init状态,只有leader才有权限发起写操作,STATE_PARALLEL_MEMTABLE_WRITER专门给并发写memtable用的。
内部会维护write group,把多个batch合并一起写入。如果设置了enable_pipelined_write,那么写wal和写memtable分开,可以提高写吞吐。batch如果是follower,那么只能等待leader唤醒。
struct Writer {
WriteBatch* batch;
bool sync;
bool no_slowdown;
bool disable_wal;
bool disable_memtable;
uint64_t log_used; // log number that this batch was inserted into
uint64_t log_ref; // log number that memtable insert should reference
WriteCallback* callback;
bool made_waitable; // records lazy construction of mutex and cv
std::atomic<uint8_t> state; // write under StateMutex() or pre-link
WriteGroup* write_group;
SequenceNumber sequence; // the sequence number to use for the first key
Status status; // status of memtable inserter
Status callback_status; // status returned by callback->Callback()
std::aligned_storage<sizeof(std::mutex)>::type state_mutex_bytes;
std::aligned_storage<sizeof(std::condition_variable)>::type state_cv_bytes;
Writer* link_older; // prev指针
Writer* link_newer; // next指针
};
struct WriteGroup {
Writer* leader = nullptr;
Writer* last_writer = nullptr;
SequenceNumber last_sequence;
// before running goes to zero, status needs leader->StateMutex()
Status status;
std::atomic<size_t> running;
size_t size = 0;
};
class WriteThread {
private:
// 并发写memtable
const bool allow_concurrent_memtable_write_;
// 分开写memtable和wal
const bool enable_pipelined_write_;
// Points to the newest pending writer. Only
// leader can remove elements, adding can
// be done lock-free by anybody.
std::atomic<Writer*> newest_writer_;
// Points to the newest pending memtable writer.
// Used only when pipelined write is enabled.
std::atomic<Writer*> newest_memtable_writer_;
};
write_group单次批量写入队列,leader指向第一个写入的batch,last_write指向本批次最后一个写入的batch
rocksdb里面的sst文件是存储在磁盘中的,每次从磁盘读数据有不小的代价,需要使用cache缓存热点数据,减少磁盘io。rocksdb里面使用是lru cache,关于lru策略,这里就不讲。参考wiki 和一般的lru策略不同的时候,它还支持不同的优先级,优先淘汰低优先级数据。
节点
struct LRUHandle {
void* value;
void (*deleter)(const Slice&, void* value); //释放内存相关函数
LRUHandle* next_hash; //hash表中同一个桶内指针
LRUHandle* next; //双向链表指针
LRUHandle* prev; //双向链表指针
size_t charge; //节点大小
size_t key_length;
uint32_t refs; //引用计数,初始化是1
char flags;
//三种情况,in_cache,is_high_pri,in_high_pro_pool
//in_cache表示是否在cache中
//is_high_pri是否高优先级
//in_high_pro_pool是否在高优先级区域
uint32_t hash; //key的hash值,减少字符串比较次数
char key_data[1];
入口函数:
std::shared_ptr<Cache> NewLRUCache(size_t capacity,
int num_shard_bits = -1,
bool strict_capacity_limit = false,
double high_pri_pool_ratio = 0.0);
函数有4个参数:
1: 第一个参数是cache表示容量
2: 第二个参数是cache分成2^num_shard_bits shard
3: 第三个参数表示是否严格限制容量,比如说是否可以超过一点cache容量
4: 第四个参数是高优先级区域的cache占比, 根据这个比例把cache分成2部分,一部分高优先级区,一部分低优先级区。
lru cache由2部分组成,一个双向链表,一个hash表。hash表提供近似o(1)时间复杂度的查找,双向链表用于维护lru策略。 dummy节点是表头,橘黄色部分是高优先级区域,蓝色部分是低优先级区域。还有一个指针指向低优先级区域头部。
从hash里面找到对应的key, 然后从双向链表里面删除删除对应的节点,引用计数+1
get使用结束以后需要调用release接口,引用计数-1,然后判断引用计数是否=0
1: =0就会删除这个节点, 释放相关内存,size减去这个节点占用大小。
2: =1并且flag中in_cache=true, 如果size小于容量,就把节点插入双向链表的头部,否则就释放节点。
3: >1表示这个key还有其他人在用,那么就什么都不操作。
根据优先级不同插入链表对应的优先级区域
1:先判断当前cache的size是否超过capacity,如果超过了,就从链表的尾部开始淘汰数据直至size < capacity。
2:如果链表淘汰完了还是不满足条件(节点如果正在被他人使用不在链表中,但是在hash桶中,算size的时候还是要算上的),那就会根据是否设置strict_capacity_limit来决定是返回失败还是返回正常,但是都不会插入cache中。
3:插入的key如果存在,老节点引用计数-1,老节点引用计数=0就直接删除释放内存, >0表示老节点正在被其他人使用,就不用做什么操作,等其他人调用release接口的时候在释放老节点
根据优先级不同插入链表对应的优先级区域
从hash桶中删除节点,引用计数-1,然后判断引用计数是否=0 1: =0, 更新size, 从链表中删除节点, 释放节点内存 2: >0, 什么都不操作
高优先级区域和低优先级区域在hash桶中没有区别,在双写链表有区别,中间有一个指针区分。淘汰的时候优先淘汰低优先级区域的节点。高优先级区域满了,就把高优先级区域最后一个节点变成低优先级,并不淘汰。