简介

在RocksDB中,每次写入它都会先写WAL,然后再写入MemTable,这次我们就来分析这两个逻辑具体是如何实现的.
首先需要明确的是在RocksDB中,WAL的写入是单线程顺序串行写入的,而MemTable则是可以并发多线程写入的。

而在RocksDB 5.5中引进了一个选项enable_pipelined_write,这个选项的目的就是将WAL和MemTable的写入pipeline化,
也就是说当一个线程写完毕WAL之后,此时在WAL的write队列中等待的其他的write则会开始继续写入WAL, 而当前线程将会继续
写入MemTable.此时就将不同的Writer的写入WAL和写入MemTable并发执行了.

实现

我们这里只来分析pipeline的实现,核心函数就是DBImpl::PipelinedWriteImpl.

  • 每一个DB(DBImpl)都有一个write_thread_(class WriteThread).
  • 每次调用Write的时候会先写入WAL, 此时新建一个WriteThread::Writer对象,并将这个对象加入到一个Group中(调用JoinBatchGroup)

    1
    2
    3
    WriteThread::Writer w(write_options, my_batch, callback, log_ref,
    disable_memtable);
    write_thread_.JoinBatchGroup(&w);
  • 然后我们来看JoinBatchGroup,这个函数主要是用来讲所有的写入WAL加入到一个Group中.这里可以看到当当前的Writer
    对象是leader(比如第一个进入的对象)的时候将会直接返回,否则将会等待知道更新为对应的状态.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    void WriteThread::JoinBatchGroup(Writer* w) {
    ...................................
    bool linked_as_leader = LinkOne(w, &newest_writer_);
    if (linked_as_leader) {
    SetState(w, STATE_GROUP_LEADER);
    }

    TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w);

    if (!linked_as_leader) {
    /**
    * Wait util:
    * 1) An existing leader pick us as the new leader when it finishes
    * 2) An existing leader pick us as its follewer and
    * 2.1) finishes the memtable writes on our behalf
    * 2.2) Or tell us to finish the memtable writes in pralallel
    * 3) (pipelined write) An existing leader pick us as its follower and
    * finish book-keeping and WAL write for us, enqueue us as pending
    * memtable writer, and
    * 3.1) we become memtable writer group leader, or
    * 3.2) an existing memtable writer group leader tell us to finish memtable
    * writes in parallel.
    */
    AwaitState(w, STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER |
    STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
    &jbg_ctx);
    TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w);
    }
    }
  • 然后我们来看LinkOne函数,这个函数主要用来讲当前的Writer对象加入到group中,这里可以看到由于
    写入是并发的因此对应的newest_writer_(保存最新的写入对象)需要原子操作来更新.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) {
    assert(newest_writer != nullptr);
    assert(w->state == STATE_INIT);
    Writer* writers = newest_writer->load(std::memory_order_relaxed);
    while (true) {
    w->link_older = writers;
    if (newest_writer->compare_exchange_weak(writers, w)) {
    return (writers == nullptr);
    }
    }
    }
  • 当从JoinBatchGroup返回之后,当当前的Writer对象为leader的话,则将会把此leader下的所有的write都
    链接到一个WriteGroup中(调用EnterAsBatchGroupLeader函数), 并开始写入WAL,这里要注意非leader的write将会直接
    进入memtable的写入,这是因为非leader的write都将会被当前它所从属的leader来打包(group)写入,后面我们会看到实现.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader,
    WriteGroup* write_group) {
    assert(leader->link_older == nullptr);
    assert(leader->batch != nullptr);
    assert(write_group != nullptr);
    ................................................
    Writer* newest_writer = newest_writer_.load(std::memory_order_acquire);

    // This is safe regardless of any db mutex status of the caller. Previous
    // calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks
    // (they emptied the list and then we added ourself as leader) or had to
    // explicitly wake us up (the list was non-empty when we added ourself,
    // so we have already received our MarkJoined).
    CreateMissingNewerLinks(newest_writer);

    // Tricky. Iteration start (leader) is exclusive and finish
    // (newest_writer) is inclusive. Iteration goes from old to new.
    Writer* w = leader;
    while (w != newest_writer) {
    w = w->link_newer;
    .........................................
    w->write_group = write_group;
    size += batch_size;
    write_group->last_writer = w;
    write_group->size++;
    }
    ..............................
    }
  • 这里注意到遍历是通过link_newer进行的,之所以这样做是相当于在写入WAL之前,对于当前leader的Write
    做一次snapshot(通过CreateMissingNewerLinks函数).

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    void WriteThread::CreateMissingNewerLinks(Writer* head) {
    while (true) {
    Writer* next = head->link_older;
    if (next == nullptr || next->link_newer != nullptr) {
    assert(next == nullptr || next->link_newer == head);
    break;
    }
    next->link_newer = head;
    head = next;
    }
    }
  • 上述操作进行完毕之后,进入写WAL操作,最终会把这个write_group打包成一个writeBatch(通过MergeBatch函数)进行写入.

1
2
3
4
5
   if (w.ShouldWriteToWAL()) {
...............................
w.status = WriteToWAL(wal_write_group, log_writer, log_used,
need_log_sync, need_log_dir_sync, current_sequence);
}
  • 当当前的leader将它自己与它的follow写入之后,此时它将需要写入memtable,那么此时之前还阻塞的Writer,分为两种情况
    第一种是已经被当前的leader打包写入到WAL,这些writer(包括leader自己)需要将他们链接到memtable writer list.还有一种情况,那就是还没有写入WAL的,此时这类writer则需要选择一个leader然后继续写入WAL.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
    Status status) {
    Writer* leader = write_group.leader;
    Writer* last_writer = write_group.last_writer;
    assert(leader->link_older == nullptr);
    .....................................

    if (enable_pipelined_write_) {
    // Notify writers don't write to memtable to exit.
    ......................................
    // Link the ramaining of the group to memtable writer list.
    if (write_group.size > 0) {
    if (LinkGroup(write_group, &newest_memtable_writer_)) {
    // The leader can now be different from current writer.
    SetState(write_group.leader, STATE_MEMTABLE_WRITER_LEADER);
    }
    }
    // Reset newest_writer_ and wake up the next leader.
    Writer* newest_writer = last_writer;
    if (!newest_writer_.compare_exchange_strong(newest_writer, nullptr)) {
    Writer* next_leader = newest_writer;
    while (next_leader->link_older != last_writer) {
    next_leader = next_leader->link_older;
    assert(next_leader != nullptr);
    }
    next_leader->link_older = nullptr;
    SetState(next_leader, STATE_GROUP_LEADER);
    }
    AwaitState(leader, STATE_MEMTABLE_WRITER_LEADER |
    STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
    &eabgl_ctx);
    } else {
    .....................................
    }
    }
  • 接下来我们来看写入memtable的操作,这里逻辑类似写入WAL,如果是leader的话,则依旧会创建一个group(WriteGroup),然后遍历需要写入memtable的writer,将他们都加入到group中(EnterAsMemTableWriter),然后则设置并发执行的大小,以及设置对应状态(LaunchParallelMemTableWriters).这里注意每次setstate就将会唤醒之前阻塞的Writer.

    1
    2
    3
    4
    5
    6
    7
    void WriteThread::LaunchParallelMemTableWriters(WriteGroup* write_group) {
    assert(write_group != nullptr);
    write_group->running.store(write_group->size);
    for (auto w : *write_group) {
    SetState(w, STATE_PARALLEL_MEMTABLE_WRITER);
    }
    }
  • 这里要注意,在构造memtable的group的时候,我们不需要创建link_newer,因为之前在写入WAL的时候,我们已经构造好link_newer,那么此时我们使用构造好的group也就是表示这个group中包含的都是已经写入到WAL的操作.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    void WriteThread::EnterAsMemTableWriter(Writer* leader,
    WriteGroup* write_group) {
    ....................................

    if (!allow_concurrent_memtable_write_ || !leader->batch->HasMerge()) {
    ....................................................
    }

    write_group->last_writer = last_writer;
    write_group->last_sequence =
    last_writer->sequence + WriteBatchInternal::Count(last_writer->batch) - 1;
    }
  • 最后开始执行写入MemTable的操作,之前在写入WAL的时候被阻塞的所有Writer此时都会进入下面这个逻辑,此时也就意味着
    并发写入MemTable.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
      if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
    .........................
    w.status = WriteBatchInternal::InsertInto(
    &w, w.sequence, &column_family_memtables, &flush_scheduler_,
    write_options.ignore_missing_column_families, 0 /*log_number*/, this,
    true /*concurrent_memtable_writes*/);
    if (write_thread_.CompleteParallelMemTableWriter(&w)) {
    MemTableInsertStatusCheck(w.status);
    versions_->SetLastSequence(w.write_group->last_sequence);
    write_thread_.ExitAsMemTableWriter(&w, *w.write_group);
    }
    }
  • 最后当当前group的所有Writer都写入MemTable之后,则将会调用ExitAsMemTableWriter来进行收尾工作.如果有新的memtable
    writer list需要处理,那么则唤醒对应的Writer,然后设置已经处理完毕的Writer的状态.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    void WriteThread::ExitAsMemTableWriter(Writer* /*self*/,
    WriteGroup& write_group) {
    Writer* leader = write_group.leader;
    Writer* last_writer = write_group.last_writer;

    Writer* newest_writer = last_writer;
    if (!newest_memtable_writer_.compare_exchange_strong(newest_writer,
    nullptr)) {
    CreateMissingNewerLinks(newest_writer);
    Writer* next_leader = last_writer->link_newer;
    assert(next_leader != nullptr);
    next_leader->link_older = nullptr;
    SetState(next_leader, STATE_MEMTABLE_WRITER_LEADER);
    }
    Writer* w = leader;
    while (true) {
    if (!write_group.status.ok()) {
    w->status = write_group.status;
    }
    Writer* next = w->link_newer;
    if (w != leader) {
    SetState(w, STATE_COMPLETED);
    }
    if (w == last_writer) {
    break;
    }
    w = next;
    }
    // Note that leader has to exit last, since it owns the write group.
    SetState(leader, STATE_COMPLETED);
    }

总结

我们可以看到在RocksDB中,WAL的写入始终是串行写入,而MemTable可以多线程并发写入,也就是说在系统压力到一定阶段的时候,
写入WAL肯定会成为瓶颈.