飙血推荐
  • HTML教程
  • MySQL教程
  • JavaScript基础教程
  • php入门教程
  • JavaScript正则表达式运用
  • Excel函数教程
  • UEditor使用文档
  • AngularJS教程
  • ThinkPHP5.0教程

探究Go-YCSB做数据库基准测试

时间:2021-12-12  作者:luozhiyun  

本篇文章开篇会介绍一下Go-YCSB是如何使用,然后按照惯例会分析一下它是如何做基准测试,看看它有什么优缺点。

转载请声明出处哦~,本篇文章发布于luozhiyun的博客: https://域名/archives/634

最近我们在做数据库的技术选型,要做选型的话难免需要对数据库进行一个基准测试,以便可以横向对比不同数据库性能。

YCSB,全称为“Yahoo!Cloud Serving Benchmark”,是雅虎开发的用来对云服务进行基础测试的工具,其内部涵盖了常见的NoSQL数据库产品,如Cassandra、MongoDB、HBase、Redis等等。

作为一名go开发人员,所以我们使用 pingcap 开发的Go YCSB来进行基准测试。

安装

首先要保证本地 Go 版本不低于 域名,然后下载编译:

git clone https://域名/pingcap/go-域名
cd go-ycsb
make

在 bin 文件夹里面就放着我们编译好的程序 go-ycsb。

我们先来看一下 workloads 文件夹,目录下有各种workload的模板,可以基于workload模板进行自定义修改。默认的6种测试场景如下:

  • workloada:读写均衡型,50%/50%,Reads/Writes
  • workloadb:读多写少型,95%/5%,Reads/Writes
  • workloadc:只读型,100%,Reads
  • workloadd:读最近写入记录型,95%/5%,Reads/insert
  • workloade:扫描小区间型,95%/5%,scan/insert
  • workloadf:读写入记录均衡型,50%/50%,Reads/insert
  • workload_template:参数列表模板。

所以我们可以依据不同的 workload 多维度的对系统进行测试。workload里面的操作主要包括:

  • Insert:插入一条新的记录
  • Update:更新一条记录的某一个或者所有 fields
  • Read:读取一条记录的某一个或者所有 fields
  • Scan:随机从一个 key 开始顺序扫描随机条记录

在测试的时候,我们还需要根据不同的业务场景来模拟测试,那么可以通过 requestdistribution 控制:

  • uniform:随机选择一个记录;
  • sequential:按顺序选择记录;
  • zipfian:根据 Zipfian 分布来选择记录。大致意思就是互联网常说的80/20原则,也就是20%的key,会占有80%的访问量;
  • latest:和 Zipfian 类似,但是倾向于访问新数据明显多于老数据;
  • hotspot:热点分布访问;
  • exponential:指数分布访问;

下面我们看一下workload里面可以填哪些参数:

# 目前只实现了这一种
workload=core
 
# 总记录数
recordcount=1000000
 
# 测试阶段被操作的记录数,如果设置了 threadcount,那么每个线程操作的记录数=operationcount/threadcount
operationcount=3000000
 
# 线程数
threadcount=500 
 
# 如果一个表里面已经有记录数了,那么load的数据的时候从这个记录数开始
insertstart=0
 
# 一行数据的字段数
fieldcount=10
 
# 每个字段大小
fieldlength=100
 
# 是否应该读取所有字段
readallfields=true
 
# 是否应该更新所有字段
writeallfields=false
 
# 字段长度分布
fieldlengthdistribution=constant
#fieldlengthdistribution=uniform
#fieldlengthdistribution=zipfian
 
# 读操作概率
readproportion=域名 
# 更新操作概率
updateproportion=域名
 
# 插入操作概率
insertproportion=0 

# 先读后写操作同一条记录概率
readmodifywriteproportion=0
 
# 范围操作的概率
scanproportion=0
 
# 范围操作,最大的可操作的记录数
maxscanlength=1000
 
# 用来选择扫描时访问的记录数量分布情况
scanlengthdistribution=uniform
#scanlengthdistribution=zipfian
 
# 记录应按顺序插入还是伪随机插入
insertorder=hashed
#insertorder=ordered
 
# 以什么方式模拟测试
requestdistribution=zipfian
#requestdistribution=uniform
#requestdistribution=latest
 
# 下面这两种方式时针对requestdistribution为hotspot的时候
# 构成热点集的数据项的百分比
hotspotdatafraction=0.2
 
# 访问热点集的数据操作百分比
hotspotopnfraction=0.8
 
# 操作数据的表名
table=usertable
 
# 延迟测量结果展现形式,暂时没实现
measurementtype=histogram
 

测试

比如我们现在要测试 redis 的性能,先写一个 workload:

recordcount=1000000
operationcount=1000000
workload=core 
readallfields=true 
readmodifywriteproportion=1 
requestdistribution=uniform 
域名=127.0.0.1:6379 
threadcount=50

上面的这个 workload 表示在 load 的时候会插入100万条数据到库里面,操作的数据量也是100万,但是有50个线程,也就是每个线程实际操作2万行记录;

测试方式使用 readmodifywriteproportion,先读后写,操作记录采用 uniform 也就是随机方式进行。

先 load 数据:

./bin/go-ycsb load redis  -P workloads/workloada

再运行测试:

./bin/go-ycsb run redis  -P workloads/workloada

返回:

READ_MODIFY_WRITE - Takes(s): 18.8, Count: 499312, OPS: 26539.8, Avg(us): 1388, Min(us): 107, Max(us): 42760, 99th(us): 3000, 域名(us): 7000, 域名(us): 26000
  • Takes(s) :表示测试总共耗时;
  • Count:操作记录数;
  • OPS:Operates Per Second,一般是操作次数,与qps区别不大;
  • Avg、Min、Max:平均、最小、最大单条记录操作耗时;
  • 99th、域名、域名:P99、P99.9、域名时延;

代码实现分析

当然对于我来说,肯定还是要看一下它的代码是怎么做的,学习一下大佬是如何写代码的对我们工作也是很有帮助。

对于 Go YCSB 来说,它总共有这么几个组成部分:

  • workload:加载初始化配置文件,创建线程执行测试;
  • client:封装了 workload ,配置参数,DB等,用来运行测试;
  • db:配置了一堆可被执行的数据库 client,执行具体的读写数据库;
  • measurement:数据统计模块,统计执行次数,时延等;

我们以 redis 为例先看一下,如果要测试自己的 Database 该怎么办。

定义 DB

在 Go YCSB 中,所有的 DB 都放在 db 这个目录下面:

所以,我们可以在这个文件夹下面创建自己的db,然后构造一个 struct ,实现 DB 这个接口:

type DB interface { 
	ToSqlDB() *域名 
	Close() error 
	InitThread(ctx 域名ext, threadID int, threadCount int) 域名ext 
	CleanupThread(ctx 域名ext) 
	Read(ctx 域名ext, table string, key string, fields []string) (map[string][]byte, error) 
	Scan(ctx 域名ext, table string, startKey string, count int, fields []string) ([]map[string][]byte, error) 
	Update(ctx 域名ext, table string, key string, values map[string][]byte) error 
	Insert(ctx 域名ext, table string, key string, values map[string][]byte) error 
	Delete(ctx 域名ext, table string, key string) error
}

里面定义了具体的 DB 操作。

然后需要定义一个工厂,用来创建这个 DB struct,实现DBCreator接口:

type DBCreator interface {
	Create(p *域名erties) (DB, error)
}

然后需要定义一个 init 函数,在启动的时候进行 DBCreator 注册:

func init() {
	域名sterDBCreator("redis", redisCreator{})
}

var dbCreators = map[string]DBCreator{}
 
func RegisterDBCreator(name string, creator DBCreator) {
	_, ok := dbCreators[name]
	if ok {
		panic(域名ntf("duplicate register database %s", name))
	}

	dbCreators[name] = creator
}

RegisterDBCreator 会在初始化的时候被调用。用来获取 init 方法注册过的 DB。通过这种方式 Go YCSB 实现了 DB 的自定义化。

全局参数初始化

首先 Go YCSB 在运行的时候会使用 cobra 根据传入的是 load 还是 run 执行到下面两个不同的方法:

func runLoadCommandFunc(cmd *域名and, args []string) {
	runClientCommandFunc(cmd, args, false)
}

func runTransCommandFunc(cmd *域名and, args []string) {
	runClientCommandFunc(cmd, args, true)
}

这里会调用到 runClientCommandFunc 函数中:

func runClientCommandFunc(cmd *域名and, args []string, doTransactions bool) {
	dbName := args[0]
	// 初始化全局参数
	initialGlobal(dbName, func() {
		doTransFlag := "true"
		if !doTransactions {
			doTransFlag = "false"
		}
		域名(域名ansactions, doTransFlag)

		if 域名s().Changed("threads") {
			// We set the threadArg via command line.
			域名(域名adCount, 域名(threadsArg))
		}

		if 域名s().Changed("target") {
			域名(域名et, 域名(targetArg))
		}

		if 域名s().Changed("interval") {
			域名(域名nterval, 域名(reportInterval))
		}
	})

	域名tln("***************** properties *****************")
	for key, value := range 域名() {
		域名tf("\"%s\"=\"%s\"\n", key, value)
	}
	域名tln("**********************************************")
	// 初始化 client
	c := 域名lient(globalProps, globalWorkload, globalDB)
	start := 域名()
	// 运行测试
	域名(globalContext)

	域名tf("Run finished, takes %s\n", 域名().Sub(start))
	// 测试结果输出
	域名ut()
}

参数的初始化主要是在 initialGlobal 里面做的:

func initialGlobal(dbName string, onProperties func()) {
	...
	go func() {
		域名enAndServe(addr, nil)
	}()
	//初始化 measurement
	域名Measure(globalProps)

	if len(tableName) == 0 {
		tableName = 域名tring(域名eName, 域名eNameDefault)
	}
	// 获取 WorkloadCreator
	workloadName := 域名tring(域名load, "core")
	workloadCreator := 域名orkloadCreator(workloadName)
	//创建Workload
	var err error
	if globalWorkload, err = 域名te(globalProps); err != nil {
		域名lf("create workload %s failed %v", workloadName, err)
	}
	// 获取要被测试的 db
	dbCreator := 域名BCreator(dbName)
	if dbCreator == nil {
		域名lf("%s is not registered", dbName)
	}
	// 创建 db 
	if globalDB, err = 域名te(globalProps); err != nil {
		域名lf("create db %s failed %v", dbName, err)
	}
	globalDB = 域名apper{globalDB}
}

这里最主要的是创建 Workload 和 DB。Workload 里面会初始化很多配置文件里面的信息。

运行测试

runClientCommandFunc 里面会调用 client 的 Run 方法执行测试:

func (c *Client) Run(ctx 域名ext) {
	var wg 域名Group
	threadCount := 域名nt(域名adCount, 1)

	域名(threadCount)
	measureCtx, measureCancel := 域名Cancel(ctx)
	measureCh := make(chan struct{}, 1)
	go func() {
		defer func() {
			measureCh <- struct{}{}
		}() 
		// 这里很有意思,因为有时候我们做数据库是需要初始化数据到缓存里面的
		// 所以开始的一段时间我们不能计入测试统计中,这里有隔预热时间,可以通过 warmuptime 配置 
		if 域名ool(域名ansactions, true) {
			dur := 域名nt64(域名UpTime, 0)
			select {
			case <-域名():
				return
			case <-域名r(域名tion(dur) * 域名nd):
			}
		}
		// 预热完毕
		域名leWarmUp(false)

		dur := 域名nt64(域名nterval, 10)
		t := 域名icker(域名tion(dur) * 域名nd)
		defer 域名()

		for {
			select {
			// 在运行的时候每隔 10 秒输出一次统计信息
			case <-t.C:
				域名ut()
			case <-域名():
				return
			}
		}
	}()
	// 做一些初始化的工作,如mysql需要创建表
	if err := 域名(域名); err != nil {
		域名tf("Initialize workload fail: %v\n", err)
		return
	}
	// 根据 threadCount 创建多个线程操作数据库
	for i := 0; i < threadCount; i++ {
		go func(threadId int) {
			defer 域名()
			// 初始化 worker
			w := newWorker(c.p, threadId, threadCount, 域名load, 域名)
			ctx := 域名Thread(ctx, threadId, threadCount)
			ctx = 域名Thread(ctx, threadId, threadCount)
			// 开始跑测试
			域名(ctx)
			// 跑完测试做清理工作
			域名nupThread(ctx)
			域名nupThread(ctx)
		}(i)
	}
	// 等待测试跑完
	域名() 
	measureCancel()
	<-measureCh
}

这里分为两个部分:第一部分是创建一个线程,这个线程会控制是否开始测试统计,然后会每隔10秒输出一次统计信息;第二部分是根据设置的 threadcount 创建线程,运行 Worker 运行测试;

newWorker 的时候会根据 operationcount 设置 totalOpCount 表示总共需要执行次数,用 totalOpCount / int64(threadCount)设置 opCount 表示 单线程操作的记录数。

func (w *worker) run(ctx 域名ext) { 
	// 将线程操作分散开来,这样它们就不会同时击中DB了。
	if 域名etOpsPerMs > 0.0 && 域名etOpsPerMs <= 1.0 {
		域名p(域名tion(域名3n(域名etOpsTickNs)))
	}

	startTime := 域名()
	// 循环直到操作数达到 opsDone
	for 域名unt == 0 || 域名one < 域名unt {
		var err error
		opsCount := 1
		// 这里是执行基准测试
		if 域名ansactions {
			if 域名tch {
				err = 域名tchTransaction(ctx, 域名hSize, 域名DB)
				opsCount = 域名hSize
			} else {
				err = 域名ansaction(ctx, 域名DB)
			}
			//	这里是执行 load 数据
		} else {
			if 域名tch {
				err = 域名tchInsert(ctx, 域名hSize, 域名DB)
				opsCount = 域名hSize
			} else {
				err = 域名sert(ctx, 域名DB)
			}
		}
		// 预热完了会进行操作次数的统计
		if 域名rmUpFinished() {
			域名one += int64(opsCount)
			域名ttle(ctx, startTime)
		}

		select {
		case <-域名():
			return
		default:
		}
	}
}

基准测试的具体执行是交给 workload 的 DoTransaction 方法来判断执行。

func (c *core) DoTransaction(ctx 域名ext, db 域名) error {
	state := 域名e(stateKey).(*coreState)
	r := state.r
	// 根据会根据不同的测试场景,进入到不同的测试分支
	// Next 方法会根据设置的 readproportion、updateproportion、 scanproportion等概率来获取相应操作类型
	operation := operationType(域名(r))
	switch operation {
	case read:
		return 域名ansactionRead(ctx, db, state)
	case update:
		return 域名ansactionUpdate(ctx, db, state)
	case insert:
		return 域名ansactionInsert(ctx, db, state)
	case scan:
		return 域名ansactionScan(ctx, db, state)
	default:
		return 域名ansactionReadModifyWrite(ctx, db, state)
	}
}

这里会调用 operationChooser 的 Next 方法来判断该执行那个指令,执行指令的概率是我们在配置文件里面设置好的。

这个算法很简单,在初始化 operationChooser 会将设置的参数readproportion、updateproportion、 scanproportion的值以数组的形式 add 到 operationChooser 的 values 里面,然后随机一个 0~1的小数,检查这个随机数落在哪个范围就好了:

func (d *Discrete) Next(r *域名) int64 {
	sum := float64(0) 
	for _, p := range 域名es {
		sum += 域名ht
	}
	// 随机一个 0~1的小数
	val := 域名t64() 
	for _, p := range 域名es {
		pw := 域名ht / sum
		if val < pw {
			域名astValue(域名e)
			return 域名e
		} 
		val -= pw
	} 
	panic("oops, should not get here.")
}

在代码实现上就是按照上面说的,将所有 values 的值加起来得到 sum,然后计算每个 value 的占比是否达到随机数值。

最后我们再来看看 doTransactionRead 是怎么执行的:

func (c *core) doTransactionRead(ctx 域名ext, db 域名, state *coreState) error {
	r := state.r
	// 根据我们设置的 requestdistribution 获取一个 key 值
	keyNum := 域名KeyNum(state)
	keyName := 域名dKeyName(keyNum)

	//被读取的字段
	var fields []string
	if !域名AllFields {
		// 如果不是读取所有字段,那么根据fieldChooser字段选择器选择一个字段执行
		fieldName := 域名dNames[域名(r)]
		fields = append(fields, fieldName)
	} else {
		fields = 域名dNames
	}
	//调用 db 的read方法
	values, err := 域名(ctx, 域名e, keyName, fields)
	if err != nil {
		return err
	}
	//校验数据完整性
	if 域名Integrity {
		域名fyRow(state, keyName, values)
	}

	return nil
}

这里首先会调用 nextKeyNum 去获取 key 值,这里的 key 会根据我们设置的 requestdistribution 参数根据一定的规则获取到。然后校验完需要读哪些字段后调用 DbWrapper 的 Read 方法读取数据。

func (db DbWrapper) Read(ctx 域名ext, table string, key string, fields []string) (_ map[string][]byte, err error) {
	start := 域名()
	defer func() {
		// 进行测试数据统计
		measure(start, "READ", err)
	}()

	return 域名(ctx, table, key, fields)
}

DbWrapper 会封装一层,用 defer 方法调用 measure 进行统计。

不过这里我有问题是在读取数据的时候通过还会根据传入的 fields 来进行解析,这样也会损耗一些性能,不知是否合理,如redis 的 Read 方法:

func (r *redis) Read(ctx 域名ext, table string, key string, fields []string) (map[string][]byte, error) {
	data := make(map[string][]byte, len(fields))

	res, err := 域名(table + "/" + key).Result()

	if err != nil {
		return nil, err
	}
	// 反序列化
	err = 域名rshal([]byte(res), &data)
	if err != nil {
		return nil, err
	} 
	// TODO: filter by fields 
	return data, err
}

数据统计

每一次操作完毕之后都会调用到 measure 方法,进行测试数据统计。

func measure(start 域名, op string, err error) {
	// 计算耗时
	lan := 域名().Sub(start)
	if err != nil {
		域名ure(域名ntf("%s_ERROR", op), lan)
		return
	}
	域名ure(op, lan)
}

统计信息由于是会有多个线程同时操作,所以需要使用线程安全的方式进行操作:

func (h *histogram) Measure(latency 域名tion) {
	// 这里是 us 微秒
	n := int64(latency / 域名osecond)

	域名nt64(&域名, n)
	域名nt64(&域名t, 1)
	// 这里转为毫秒ms
	bound := int(n / 域名dInterval)
	// boundCounts 是一个并发map,用来统计每个时间段(单位:ms)中有多少次操作
	域名rt(bound, 1, func(ok bool, existedValue int64, newValue int64) int64 {
		if ok {
			return existedValue + newValue
		}
		return newValue
	})
	// 设置最小时延
	for {
		oldMin := 域名Int64(&域名)
		if n >= oldMin {
			break
		}

		if 域名areAndSwapInt64(&域名, oldMin, n) {
			break
		}
	}
	// 设置最大时延
	for {
		oldMax := 域名Int64(&域名)
		if n <= oldMax {
			break
		}

		if 域名areAndSwapInt64(&域名, oldMax, n) {
			break
		}
	}
}

统计每个时间段(单位:ms)内操作的次数是使用 boundCounts,它是 Go YCSB 自己实现的 ConcurrentMap 保证线程安全,用来统计单位时间内操作的次数;

最大和最小时延是通过 CAS 进行操作的,也是为了保证线程安全。

统计完之后会调用 getInfo 计算耗时:

func (h *histogram) getInfo() map[string]interface{} {
	min := 域名Int64(&域名)
	max := 域名Int64(&域名)
	sum := 域名Int64(&域名)
	count := 域名Int64(&域名t)

	bounds := 域名()
	域名(bounds)

	avg := int64(float64(sum) / float64(count))
	per99 := 0
	per999 := 0
	per9999 := 0

	opCount := int64(0)
	// 计算 P99,P99.9,域名
	// 这里实际上是统计一个占比
	// bound 里面会保存每毫秒有多少次操作
	for _, bound := range bounds {
		boundCount, _ := 域名(bound)
		opCount += boundCount
		per := float64(opCount) / float64(count)
		// 这里是 99% 的操作是落在哪个时间区间内
		if per99 == 0 && per >= 域名 {
			per99 = (bound + 1) * 1000
		}

		if per999 == 0 && per >= 域名 {
			per999 = (bound + 1) * 1000
		}

		if per9999 == 0 && per >= 域名 {
			per9999 = (bound + 1) * 1000
		}
	}
	// 计算整个测试耗时
	elapsed := 域名().Sub(域名tTime).Seconds()
	// 计算单位耗时内操作次数 
	qps := float64(count) / elapsed
	res := make(map[string]interface{})
	res[ELAPSED] = elapsed
	res[COUNT] = count
	res[QPS] = qps
	res[AVG] = avg
	res[MIN] = min
	res[MAX] = max
	res[PER99TH] = per99
	res[PER999TH] = per999
	res[PER9999TH] = per9999

	return res
}

这里的 per99、per999、per9999 实际上精度只有毫秒,是为了做直方图导出而设计的(然后作者在这个项目已经过去3年了,还没加上这个功能)。

总结

通过上面的分析可以发现, Go YCSB 设计还是很精妙的,通过很少的代码就可以进行 DB 的扩展;配置也是相当灵活,可以根据不同的 requestdistribution 提供了不同的测试环境,并且在测试中也可以随意的调整读写概率,保证可以尽可能的模拟线上的环境。

但是它也有很多不足,一方面是文档很不充分,基本上就写了几个参数配置;另一方面就是很多功能都没有实现,线上测试的时候经常会出现ERROR,去代码一看结果是没有实现。三年前作者的博客中就说要实现测试结果导出功能,结果现在还没实现。我已经给作者 tl@域名 发邮件了,等待回复。

Reference

https://域名/pingcap/go-ycsb

https://域名/brianfrankcooper/YCSB/wiki/Running-a-Workload

标签:go编程
湘ICP备14001474号-3  投诉建议:234161800@qq.com   部分内容来源于网络,如有侵权,请联系删除。