跳转至

Bitcask代码阅读

背景

看过 go夜读 的分享,也读了 bitcask 的paper

56aU0S.png

代码实现

db对象

有读写锁,配置文件,文件锁(保证只有一个进程在操作db)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
type Bitcask struct {
    mu         sync.RWMutex
    flock      *flock.Flock
    config     *config.Config
    options    []Option
    path       string                // 指代db路径
    curr       data.Datafile         // 当前数据页
    datafiles  map[int]data.Datafile // 数据页的索引 id -> dataFile
    trie       art.Tree              // 存的所有的key -> <fileId, offset, size>, 以radix tree的方式来存
    ttlIndex   art.Tree              // key -> expireTime
    indexer    index.Indexer         // 读写index文件,trie的持久化操作
    ttlIndexer index.Indexer         // 读写ttlindex文件,ttlIndex的持久化操作

    metadata  *metadata.MetaData // 元数据信息
    isMerging bool               // 是否在merge
}

Put操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// Put stores the key and value in the database.
func (b *Bitcask) Put(key, value []byte) error {
    // 1. 校验
    if len(key) == 0 {
        return ErrEmptyKey
    }
    if b.config.MaxKeySize > 0 && uint32(len(key)) > b.config.MaxKeySize {
        return ErrKeyTooLarge
    }
    if b.config.MaxValueSize > 0 && uint64(len(value)) > b.config.MaxValueSize {
        return ErrValueTooLarge
    }

    // 2. 上锁
    b.mu.Lock()
    defer b.mu.Unlock()

    // 3. 写入datafile
    offset, n, err := b.put(key, value)
    if err != nil {
        return err
    }

    // 4. 落盘
    if b.config.Sync {
        if err := b.curr.Sync(); err != nil {
            return err
        }
    }

    // in case of successful `put`, IndexUpToDate will be always be false
    b.metadata.IndexUpToDate = false

    // 5. 更新meta信息
    if oldItem, found := b.trie.Search(key); found {
        b.metadata.ReclaimableSpace += oldItem.(internal.Item).Size
    }

    // 6. 更新 key -> {fileId, offset, size} 索引信息
    item := internal.Item{FileID: b.curr.FileID(), Offset: offset, Size: n}
    b.trie.Insert(key, item)

    return nil
}

Get操作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// Get fetches value for a key
func (b *Bitcask) Get(key []byte) ([]byte, error) {
    b.mu.RLock()
    defer b.mu.RUnlock()
    e, err := b.get(key)
    if err != nil {
        return nil, err
    }
    return e.Value, nil
}


// get retrieves the value of the given key
func (b *Bitcask) get(key []byte) (internal.Entry, error) {
    var df data.Datafile

    // 1. 寻找key对应的index信息
    value, found := b.trie.Search(key)
    if !found {
        return internal.Entry{}, ErrKeyNotFound
    }
    // 2. 判断过期
    if b.isExpired(key) {
        return internal.Entry{}, ErrKeyExpired
    }

    // 3. 找到对应的文件页
    item := value.(internal.Item)
    if item.FileID == b.curr.FileID() {
        df = b.curr
    } else {
        df = b.datafiles[item.FileID]
    }

    // 4. 根据offset和size读取对象
    e, err := df.ReadAt(item.Offset, item.Size)
    if err != nil {
        return internal.Entry{}, err
    }

    // 5. 校验编码是否发生问题
    checksum := crc32.ChecksumIEEE(e.Value)
    if checksum != e.Checksum {
        return internal.Entry{}, ErrChecksumFailed
    }

    return e, nil
}

Merge操作

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// 根据内存中的indexr的所有key,重新开一个新的db,然后写进去
// Merge merges all datafiles in the database. Old keys are squashed
// and deleted keys removes. Duplicate key/value pairs are also removed.
// Call this function periodically to reclaim disk space.
func (b *Bitcask) Merge() error {
    // 1. 判断是否在 IsMerging阶段(感觉用atomic更好些)
    b.mu.Lock()
    if b.isMerging {
        b.mu.Unlock()
        return ErrMergeInProgress
    }
    b.isMerging = true
    b.mu.Unlock()
    defer func() {
        b.isMerging = false
    }()
    b.mu.RLock()
    err := b.closeCurrentFile()
    if err != nil {
        b.mu.RUnlock()
        return err
    }
    // 2. 读取所有的dataFileIds,从小到大排序
    filesToMerge := make([]int, 0, len(b.datafiles))
    for k := range b.datafiles {
        filesToMerge = append(filesToMerge, k)
    }
    err = b.openNewWritableFile()
    if err != nil {
        b.mu.RUnlock()
        return err
    }
    b.mu.RUnlock()
    sort.Ints(filesToMerge)

    // 3. 开临时目录 merge,执行完会删除该目录
    // Temporary merged database path
    temp, err := ioutil.TempDir(b.path, "merge")
    if err != nil {
        return err
    }
    defer os.RemoveAll(temp)

    // 4. 新开一个db
    // Create a merged database
    mdb, err := Open(temp, withConfig(b.config))
    if err != nil {
        return err
    }

    // 5. 根据内存中的 trie(也就是索引),重新将数据从老db读取,并写入到新db中
    // Rewrite all key/value pairs into merged database
    // Doing this automatically strips deleted keys and
    // old key/value pairs
    err = b.Fold(func(key []byte) error {
        item, _ := b.trie.Search(key)
        // if key was updated after start of merge operation, nothing to do
        if item.(internal.Item).FileID > filesToMerge[len(filesToMerge)-1] {
            return nil
        }
        e, err := b.get(key)
        if err != nil {
            return err
        }

        if e.Expiry != nil {
            if err := mdb.PutWithTTL(key, e.Value, time.Until(*e.Expiry)); err != nil {
                return err
            }
        } else {
            if err := mdb.Put(key, e.Value); err != nil {
                return err
            }
        }

        return nil
    })
    if err != nil {
        return err
    }
    if err = mdb.Close(); err != nil {
        return err
    }
    // no reads and writes till we reopen
    b.mu.Lock()
    defer b.mu.Unlock()
    if err = b.close(); err != nil {
        return err
    }

    // 6. 删除现有的文件
    // Remove data files
    files, err := ioutil.ReadDir(b.path)
    if err != nil {
        return err
    }
    for _, file := range files {
        if file.IsDir() || file.Name() == lockfile {
            continue
        }
        ids, err := internal.ParseIds([]string{file.Name()})
        if err != nil {
            return err
        }
        // if datafile was created after start of merge, skip
        if len(ids) > 0 && ids[0] > filesToMerge[len(filesToMerge)-1] {
            continue
        }
        err = os.RemoveAll(path.Join(b.path, file.Name()))
        if err != nil {
            return err
        }
    }

    // 7. 将新文件更名
    // Rename all merged data files
    files, err = ioutil.ReadDir(mdb.path)
    if err != nil {
        return err
    }
    for _, file := range files {
        // see #225
        if file.Name() == lockfile {
            continue
        }
        err := os.Rename(
            path.Join([]string{mdb.path, file.Name()}...),
            path.Join([]string{b.path, file.Name()}...),
        )
        if err != nil {
            return err
        }
    }
    b.metadata.ReclaimableSpace = 0

    // 8. 根据文件内容,重新load到内存数据
    // And finally reopen the database
    return b.reopen()
}

参考

评论