分享|The Rsync Algorithm
分享|The Rsync Algorithm

分享|The Rsync Algorithm

Created time
Oct 26, 2021 08:55 AM
Tags
算法
blog
rsync
Priority
Status
Date
Feb 25, 2022

写在之前

想必大家一定都进行过在服务器之间传输文件的操作,以我个人的使用经验为例,我们可以使用 scp 来在两个不同物理机传输文件,如果需要长期保持文件的一致,还可以配合 crontab 脚本来使用。
但经过学习和搜索之后,我发现了一种更优的方式,就是 rsync + crontab 来实现定时 增量 传输保持文件一致。rsync 功能很强大,网上的使用教程也很多,在这里不作赘述。
相比工具的使用,其背后的原理可能更值得我们花时间去深入了解和学习。我也是想借助这个分享,和大家一起学习 rsync 算法的起源、思想以及改进。
 

全量同步 & 增量同步

rsync 是一个类 Unix 系统下用于同步文件的高效工具(算法),它能同步两处计算机的文件和目录,并且利用恰当的比较算法来减少数据的传输。
在 rsync 诞生的年代,网络带宽不像如今这么大,而要跨越因特网同步两个大文件,是非常奢侈的行为。
我们举一个实际的例子 🌰...
假设有这样一种场景:
💡
在客户端,我们有一个记录着几十万条电话数据的文本文件,同时,服务端也存储这一个相同的文件作为备份。
需求:
💡
同步客户端的数据到服务端
这时候,我们很容易想到下面的做法:
客户端程序监测到数据改变,就把客户端文件全部传输到服务端,并覆盖掉服务端的数据
这种方式简单直接且粗暴,缺点也很明显,对于大文件来说,传输的效率很低。
那么,有没有更优的解决方案呢?
当我们在客户端仅仅只增加一条电话记录时,我们肯定只希望传输增加的数据到服务端,服务端自动更新这条记录,这样不仅节约了带宽和时间,同样也能达到客户端和服务端的同步的目的。
于是,第二种方案诞生:
客户端程序监测到数据改变,只把新增的数据传输到服务端,服务端接收到客户端新增的数据后,再把新增数据整合进已有的数据中

小结

通过一起探讨上面的例子,我们对 rsync 算法的诞生背景有了一个初步的了解。上述方案中,第一种方案属于全量同步,第二种方案属于增量同步。
rsync 算法就是解决上述增量同步问题的算法,即对于 fileSrcfileDst 两个文件,我们想通过某种算法,只传输他们不同的部分,而不是使用暴力地将文件全部传输并覆盖老版本的文件。

Rsync

问题

我们现在知道,rsync 想要解决的问题是,如果我们要同步的文件只传输不同的部分,那么就需要对两边的文件做 diff 对比。
但这两个文件在两个不同的机器上,无法做 diff,如果要对这两个文件直接做 diff,那么就需要把一个文件传输到另一台机器上做 diff。但这样一来,我们就传输了整个文件,这与我们只想传输不同部分的初衷相背。
我们现在知道有一种叫做「摘要」Digest 的东西,「摘要」其实就是对一个内容使用某种哈希算法,然后得到等长的一段数据,这段数据比原本的内容就要小得多了。
「摘要」一般用来验证数据的完整性,也可以验证原本的数据内容是否被篡改,因此,我们可以利用这一点来对文件块进行比较。
Okay,有了这一点共识之后,我们就可以继续探讨 Rsync 的基本算法了。

算法

我们把同步源文件名叫作 file_src ,同步目的文件叫作 file_dst
  1. 分块 checksum 算法
    1. 我们把 file_dst 的文件平均分成若干小块,比如每块固定 512 字节,然后对每一块计算两个 checksum。checksum 本质上也是一个 hash 代码。
      • 一个叫 rolling checksum,是 32 位的弱 checksum,最开始使用的是 Mark Adler 发明的 adler-32 算法。
      • 另一个是 128 的强 checksum 算法,最开始用 md4,后来用 md5 算法。
      为什么要设置弱 hash 和强 hash 呢?
      两个目的:
    2. 弱 hash 用于快速鉴别文件块的不同
    3. 强 hash 用于保证两文件块是相同的
  1. 传输算法
    1. 同步的目标端会把 file_dst 的 checksum 列表传给同步源,这个列表包含了三个东西,rolling checksum(32 bits)md5 checksum(128 bits)文件块编号
      在同步源端的机器在拿到这个列表后,会对 file_src 做同样的 checksum,然后和 file_src 的 checksum 做对比,找到改变了的文件块。
      但是,同步源端并不是简单的计算 checksum 并一一对照,为了保证效率和准确性,同步源端还有一个 checksum 查找算法。
      同步源的算法主要解决了如下两个问题:
      • 如果我在 file_src 这边的文件中间加了一个字符,这样后面的文件块都会移一个字符,就完全和 file_dst 不一样了,但理论上来说,我们只需要传一个字符就可以了。
      • 如果这个checksum列表特别长,而我的两边的相同的文件块可能并不是一样的顺序,那就需要查找,线性的查找的速度又特别慢,因此我们还需要解决查找速度的问题。
  1. checksum 查找算法
    1. 同步源端拿到 file_dst 的 checksum 后,会把这个数据存储到一个哈希表中,并用 rolling checksum 做 hash,这样就可以获得 O(1) 时间复杂度的查找性能。
      但是我们知道,hash 难免会发生碰撞,对于碰撞的情况,我们可以把碰撞的做成一个链表,发现碰撞的时候,再便利碰撞的链表即可。
  1. 对比算法
    1. 对比算法是最关键的算法,但是整体流程还是比较清晰:
    2. file_src 的第一个文件块(假设是 512 字节),取出来以后做 rolling checksum 计算,并把计算结果放到 hash 表中查询
    3. 如果查到了,说明 file_dst 中有潜在相同的文件块,但因为 rolling checksum 太弱了,很可能发生碰撞,于是还要计算 md5 的 128 bits 的checksum。这样一来,碰撞概率为 2^-(32+128)=2^-160 的概率发生碰撞,这样的概率非常小可以忽略。 如果 rolling checksum 和 md5 checksum 都相同,说明 file_dst 中有相同的块,我们需要记下这一块在 file_dst 下的文件编号。
    4. 如果 file_src 的 rolling checksum 没有在哈希表中找到,就证明这块中有不同的信息,也就不用再计算 md5 了。 总之,只要 rolling checksum 或者 md5 其中有一个在 file_dst 的 checksum 哈希表中找不到匹配项,就会触发算法对 file_src 的 rolling 动作。 于是,算法会往后移动一个字节,取 file_src 中字节 2-253 的文件块做 checksum,然后再回到第 1 步。
    5. 经过这样多轮的 rolling 循环检查,我们就能找到 file_src 相邻两次匹配中的那些文本字符,这些就是我们要往同步目标端传的文件内容了。
 

Rolling Checksum

在学习 rsync 时,我一直不太理解什么是 rolling checksum,后面了解了之后,发现它的原理和算法相对简单。
我们应该要拆开来看:
  • checksum 好理解,中文译作 校验和,顾名思义,用于校验目的地一组数据的和。本质上是一套哈希代码,但并不是任何的哈希计算函数都能够很好地用于 checksum。
  • rolling 其实也好理解,滚动的意思。rolling 是最大的精髓,比如,我们往后前进 1 个字符的时候,不需要全部重新计算 checksum。也就是说,我们计算 checksum([0, 512])时,不需要重新计算从 1 到 513 的 checksum,而是在 [0, 512] 的 checksum 基础上直接算出来。
我举一个例子,有一串数字 123456789 ,假设 5 个长度作为一个块,那么第一个块就是 12345 ,可以表示为:
1 * 10^4 + 2 * 10^3 + 3 * 10^2 + 4 * 10^1 + 5 * 10^0 = 12345
如果我们前进 1 步,也就是要得到 23456 ,我们不必计算:
2 * 10^4 + 3 * 10^3 + 4 * 10^2 + 5 * 10^1 + 6 * 10^0
而是直接计算:
(12345 - 1 * 10^4) * 10 + 6 * 10 ^0
也就是说,我们把 12345 的第一位去掉,再加上最右边一位,就能得到前进一步后的结果,这就是 rolling checksum 的算法。
 

实现

我们根据前面所学的内容,来进行一个简单的 Python 版本实现。
设定块大小为 512 字节 → 4096 bit
block_size = 4096
计算 checksum:
def signature(f):
    while True:
        block_data = f.read(block_size)
        if not block_data:
            break
        yield (zlib.adler32(block_data), hashlib.md5(block_data).digest())		
实现一个 file_src 端的用于对比的哈希表
class RsyncLookupTable(object):

    def __init__(self, checksums):
        self.dict = {}
        for block_number, c in enumerate(checksums):
            weak, strong = c
            if weak not in self.dict:
                self.dict[weak] = dict()
            self.dict[weak][strong] = block_number

    def __getitem__(self, block_data):
        weak = zlib.adler32(block_data)
        subdict = self.dict.get(weak)
        if subdict:
            strong = hashlib.md5(block_data).digest()
            return subdict.get(strong)
        return None
用于查找不同数据块的函数
def delta(sigs, f):
    table = RsyncLookupTable(sigs)
    block_data = f.read(block_size)
    while block_data:
        block_number = table[block_data]
        if block_number:
            yield (block_number * block_size, len(block_data))
            block_data = f.read(block_size)
        else:
            yield block_data[0]
            block_data = block_data[1:] + f.read(1)
用于增量更新 file_dst 的函数
def patch(outputf, deltas, old_file):
    for x in deltas:
        if type(x) == str:
            outputf.write(x)
        else:
            offset, length = x
            old_file.seek(offset)
            outputf.write(old_file.read(length))
 

完整代码

from typing import Union
import zlib
import hashlib


block_size = 4096


def signature(f):
    while True:
        block_data = f.read(block_size)
        if isinstance(block_data, str):
            block_data = block_data.encode()
        if not block_data:
            break
        yield (zlib.adler32(block_data), hashlib.md5(block_data).digest())


class RsyncLookupTable(object):
    def __init__(self, checksums):
        self.dict = {}
        for block_number, c in enumerate(checksums):
            weak, strong = c
            if weak not in self.dict:
                self.dict[weak] = dict()
            self.dict[weak][strong] = block_number

    def __getitem__(self, block_data: Union[str, bytes]):
        if isinstance(block_data, str):
            weak = zlib.adler32(block_data.encode())
        else:
            weak = zlib.adler32(block_data)
        subdict = self.dict.get(weak)
        if subdict:
            strong = hashlib.md5(block_data).digest()
            return subdict.get(strong)
        return None


def delta(sigs, f):
    table = RsyncLookupTable(sigs)
    block_data = f.read(block_size)
    while block_data:
        block_number = table[block_data]
        if block_number:
            yield (block_number * block_size, len(block_data))
            block_data = f.read(block_size)
        else:
            yield block_data[0]
            block_data = block_data[1:] + f.read(1)


def patch(outputf, deltas, old_file):
    for x in deltas:
        if type(x) == str:
            outputf.write(x)
        else:
            offset, length = x
            old_file.seek(offset)
            outputf.write(old_file.read(length))


if __name__ == "__main__":
    with open("file_dst.txt", "r") as f_dst:
        sigs = signature(f_dst)
        with open("file_src.txt", "r") as f_src:
            deltas = delta(sigs, f_src)
            with open("file_dst.txt", "w") as outputf:
                patch(outputf, deltas, f_src)
 

总结 & 延伸思考

我们先从实际工作和生活中的例子切入,在工具层面先知道了 scp,然后知道了增量更新的方案 rsync。
然后更进一步,从工具背后的算法和原理入手,层层拆解,知道了 rsync 背后的逻辑,然后再用一门语言 Python 来实现 rsync 算法,完成了从理论到上手实践的整个流程。
rsync 是很久之前被提出来的算法了,本次分享的内容是基于最初的几个版本来展开的,相信过去这么长的时间,应该有一些我们能够改进的地方,这里留下一道会后思考,请大家思考一下可以从哪些方面着手进行优化。
在学习过程中,还有一点我认为值得和大家再分享一下:
在对比 file_srcfile_dst 两者的不同时,我们运用了 hash 表,通过分别判断弱 checksum 和强 checksum 是否在已有哈希表中存在来判断是否属于相同数据块。
这个算法和思想与布隆过滤器有异曲同工之妙,布隆过滤器有两个特点:判断存在的,不一定存在;判断不存在的,一定不存在。所以,布隆过滤器是有误差的。
在 rsync 的对比算法中,同样也可能存在误差,因此,用于对比的哈希表不仅使用了弱 checksum,还增加了强 checksum 作为第二道保险改进了对比的算法。
感兴趣的同学也可以在后面自行了解一下布隆过滤器。

参考

原作者的报告:
报告 pdf 版本: