本文主要介绍调查数据库(调查数据库建立项目),下面一起看看调查数据库(调查数据库建立项目)相关资讯。
在本文的开始,我们将介绍go-ycsb是如何使用的,然后我们将分析它是如何作为基准测试使用的,以查看其优缺点。
转载请声明出处~,本文发表于罗志云 的博客:,这样才能横向比较不同数据库的性能。
ycsb,全称是 雅虎!云服务基准和测试是雅虎开发的一个工具,用于构建云服务的测试,涵盖常见的nosql数据库产品,如cassandra、mongodb、hbase、redis等。
作为一名围棋开发者,我们使用pingcap开发的go ycsb对测试进行了基准测试。
要安装#,首先确保本地go版本不低于1.16,然后下载编译:
复制git clon-ycsb.gitcd go-ycsbmake在b in文件夹中有我们编译的程序go-ycsb。
让 让我们先看看工作负载文件夹。目录中有各种工作负载模板,可以基于工作负载模板进行定制和修改。默认的六种测试场景如下:
workloada:读写平衡型,50%/50%,reads/writesworkloadb:多读少写,95%/5%,reads/writesworkloadc:只读,100%,readsworkloadd:读取最近写入的记录,95%/5%,reads/insertworkloade。95%/5%,scan/ins测试。工作量中的操作主要包括:
插入:插入新记录更新:更新记录的一个或所有字段读取:读取记录的一个或所有字段。有字段扫描:从一个键中按顺序随机扫描随机记录。当我们在测试时,我们需要根据不同的业务场景模拟测试,因此我们可以通过r测试阶段操作的记录数。如果设置了threadcount,那么每个线程操作的记录数=操作数/threadcountoptioncount = 3000000 # thread count = 500 #如果一个表中已经有记录,那么在加载数据时,从这个记录号开始。insertstart=0 #一行数据中的字段数。fieldcount=10 #每个字段的大小。fieldlength=100 #是否应该加载所有字段?真的吗?是否应该更新所有字段?writeallfields=false #字段长度分布fieldlength distribution = constant # fieldlength distribution = uniform # fieldlength distribution = zipfian #读取操作概率readproportion=0.95 #更新操作概率update proportion = 0 #插入操作概率insertproportion=0 #先读取后写入相同的记录概率readmodifywriteproportion=0 #范围操作概率scanproportion=0 #范围操作,可操作记录的最大数量maxscanlength=1000 #用于选择扫描期间访问的记录数量的分布。iform # scanlength distribution = zipfian #记录应该顺序插入还是伪随机插入order = hashed # insert order = ordered #如何模拟测试requestdistribution = zipfian # request distribution = uniform # request distribution = latest #以下两种,当request distribution为热点时#, 构成热点集的数据项百分比hotspotdatafraction=0.2 #访问热点集的数据操作百分比hotspotopnfraction=0.8 #操作数据表的表名=usertable #延迟了测量结果的展现形式,测量类型= histogram测试 #暂时没有实现。 例如,如果我们现在想要测试 redis的性能,我们应该首先编写一个工作负载:
copyrecordcount = 1000000 operation count = 1000000 workload = core read all fields = true readmodifywriteproportion = 1 request distribution = uniform redis . addr = 127.0。0.1 : 6379 thread count = 50上面的工作负载表示加载时,将有100万条数据插入到库中,要操作的数据量也是100万,但有50个线程,即每个线程实际操作2万行记录;
测试模式采用readmodifywriteproportion,先读后写,操作记录采用uniform,即随机模式。
首先加载数据:
收到。/bin/go-ycsb load r测试:
收到。/bin/go-ycsb runr: 18.8,count: 499312、ops: 26539.8、avg(us): 1388、min(us): 107、max(us): 42760、99th(us): 3000、99.9th(us): 7000、99.99th(us): 26000 tak测试总耗时;计数:操作记录的数量;ops:每秒运算次数,一般是运算次数,和qps差别不大;avg、min、max:平均、最小、最大单次记录操作耗时;第99、99.9、99.99:p99、p99.9、p99.99延时;代码实现分析#当然,对我来说,肯定是要看它的代码是怎么做的,了解一下老板是怎么写代码的也很有帮助。
对于go ycsb,它总共有几个组件:
工作负载:加载初始化配置文件,创建一个线程执行测试;客户端:封装工作负载、配置参数、数据库等。,并用于运行测试;db:配置了一堆可执行的数据库客户端来读写特定的数据库;测量:数据统计模块,统计执行次数,时间延迟等。让 让我们以redis为例,看看如果我们想要测试 ;自己的数据库。
在go ycsb中定义db#,所有的db都放在db:
所以,我们可以在这个文件夹下创建自己的db,然后构造一个struct来实现db的接口:
copytype db接口{ tosqldb *sql。db close错误initthread(ctx上下文。context,threadid int,threadcount int)上下文。上下文cleanupthread(ctx上下文。上下文)读取(ctx上下文。context,table string,key string,fields[]string)(map[string][]字节,error)扫描(ctx上下文。context,table string,startkey string,count int,fields[]string)([]map[string][]byte,error) update(ctx context。上下文,表字符串,键字符串,值映射[字符串][]字节)错误插入(ctx上下文。上下文、表字符串、键字符串、值映射[字符串] []字节)错误删除(ctxcontext。context,tablestring,keystring) error}定义特定的db操作。
然后,您需要定义一个工厂来创建这个db结构并实现dbcreator接口:
复制类型dbcreator接口{create (p * properties。properties) (db,error)}然后您需要定义一个init函数在启动时注册dbcreator:
copyfunc init {ycsb。registerdbcreator( redis ,rediscreator { })} var db creators = map[string]dbcreator { } func registerdbcreator(name string,creator dbcreator) {_,ok : = db creators[name]if ok { panic(fmt . sprintf( 重复的注册数据库% s ,name))} dbcreators[name]= creator } register dbcreator在初始化时会被调用。用于获取由init方法注册的数据库。这样go ycsb实现了db的定制化。
全局参数初始化#首先,go ycsb会根据传入的是load还是run,使用cobra执行以下两种不同的方法:
copy func runloadcommandfunc(cmd * cobra.command,args[]string){ runclientcommandfunc(cmd,args,false)} func runtranscommandfunc(cmd * cobra . command,args[]string){ runclientcommandfunc(cmd,args,true)}这将被调用到runclientcommandfunc函数中。
copy func runclientcommandfunc(cmd * cobra . command,args [] string,do transactions bool){ dbname : = args[0]//初始化全局参数initialglobal (dbname,func{ dotranslag : = 。do transactions { do transflag = 虚假 }globalprops。设置(道具。dotransactions,dotransflag)如果cmd。标志。已更改( 线程和){//我们通过命令行设置threadarg . global props . set(prop。threadcount,strconv。itoa(threadsarg))}if cmd。标志。已更改( 目标 ){globalprops。设置(道具。目标,strconv。itoa(target arg))}如果cmd。标志。已更改( 音程 ){globalprops。设置(道具。loginterval,strconv。itoa(reportinterval))}})fmt。println( ****************属性* * * * * * * * * * * * )对于key,值: = range global props。map {fmt。printf( \ % s \ = \ % s \ \ n ,key,value)}fmt。println( ********************** * * * * * * * * * * * * * * * * * * )//初始化client : = client . new client(全局道具,全局工作量,globaldb)启动: = time . now//运行测试.run(global cont测试结果输出测量。output}参数主要在initialglobal中完成:
copy func initial global(dbname string,onproperties func) {...go func {http。listenandserve(addr,nil)}//初始化测量measurement。init measure(global props)iflen(tablename)= = 0 { tablename = global props . getstring(prop . tablename,prop。tablename default)}//get workloadcreatorworkloadname : = global props . getstring(prop . workload, 核心 )工作负荷创建者: = ycsb . get workload creator(工作负荷名称)//创建工作负荷var errorif全局工作负荷,err = workload creator.create(全局道具);呃!= nil {util。fatalf( 创建工作负荷% s失败,工作负荷名称,err)}/get dbdbcreator : = ycsb . get dbcreator(dbname)if dbcreator = = nil { util . fatalf( %s没有注册,dbname)}/create dbif globaldb,err = dbcreator . create(global props);呃!= nil { util . fatalf( createdb% s失败,dbname,err)} global db = client . db wrapper { global db } }这里最重要的是创建工作负载和db。工作负载将初始化配置文件中的大量信息。
运行测试 # runclientcommandfunc将调用客户端 执行测试的运行方法:
copyfunc (c *client)运行(ctx上下文。上下文){var wg sync。waitgroupthreadcount : = c . p . getint(prop。线程计数,1)wg。add(threadcount)measurectx,measure cancel : = context。with cancel(ctx)measure ch : = make(chan struct { },1)go func{ defer func{ measure ch-struct { } { }/这个很有意思,因为有时候我们在做数据库的时候需要初始化缓存中的数据//这样我们就可以 测试统计中不算初期,这里有热身时间。可以配置c . p . get bool(prop . do transactions,true){ dure cho 45-@ .com = c . p . getint 64(prop . warm uptime,0)选择{case-ctx。done: return case-time . after(time . dur)* time . second): } }//测量。:= time。newticker(时间。持续时间*时间。第二)defer t.stopfor {select {//输出统计信息case-t . c: measurement . outputcase-measure ctx . done: return } }//做一些初始化工作,比如mysql需要创建一个表if err: = c . err!= nil { fmt . printf( 初始化工作负载失败,err)return }//根据threadcount为i := 0创建多个线程操作数据库;我threadcounti { go func(threadid int){ defer wg . done//initialize worker : = new worker(c . p,threadid,threadcount,c.workload,c.db)ctx : = c . workload . init thread(ctx,threadid,thread count)ctx = c . db . init thread(ctx,threadid,threadcount)//开始运行测试w.run(ctx)//运行测试后,做清理工作c . dbwaitmeasure cancel-measure ch }这个分为两部分:第一部分是创建一个线程,这个线程会控制是否启动测试统计,然后每10秒输出一次统计信息;第二部分是根据设置的threadcount创建一个线程,并运行work测试。
新工人根据operationcount设置totalopcount,表示要执行的总次数,设置totalopcount/int64(threadcount)表示单线程操作的记录数。
复制func (w * worker)运行(ctxcontext。context){//分发线程操作,这样它们就不会 不要同时打db。如果w . targetopsperms 0.0 w . targetopsperms = 1.0 { time。睡眠(时间。持续时间(兰特。int 63n(w . targetopstickns))} start time : = time。now//循环直到操作数到达opsdonefor w . op count = = 0 | | w . opsdonew . op count { var err err count : = 1//这里是执行基准测试 if w . do transactions { if w . do batch { err = w . workload . dobatch transaction(ctx,w.batchsize,w . work db)ops count = w . batch size } else { err = w . workload . do transaction(ctx,w.workdb)}//这里是iswarupfinished{ w . ops done = int 64(ops count)w . throttle(ctx,开始时间)} select {case-ctx。done: return default : } }基准测试的具体实现就交给工作了。负载与负载 dotransaction方法来确定执行。
copy func(c * core)do transaction(ctx上下文。上下文,dbycsb。db)错误{ state : = ctx。值(statekey)。(* corestate)re: = stat测试场景,进入不同的测试分支// next方根据设置的readproportion、updateproportion、scanproportion等概率得到相应的操作类型-@ .com = operation type(c . operation chooser . next(r))。开关操作{ case read: return c . dotransactionread(ctx,db,state)case update : return c . dotransactionupdate(ctx,db,state)case insert : return c . dotransactioninsert(ctx,db,state)case scan: return c . dotransactionscan(ctx,db,state)default : return c . dotransactionread modify(ctx
这个算法很简单。在初始化operationchooser时,将readproportion、updateproportion、scanproportion等设置参数的值以数组的形式加到operationchooser的值上,然后随机选取一个0~1的小数,检查这个随机数落在哪个范围内。
copy func(d * discrete)next(r * rand。跑d)int 64 { sum : = float 64(0)for _,p : = range d . values { sum = p . weight }//random a decimal val : = r . float 64(。p : = range d . values { pw : = p . weight/sumif val pw { d . setlastvalue(p . value)return p . value } val-= pw } panic( 哎呀,不应该到这里。 )}在代码实现方面,通过将所有值相加得到sum,然后计算每个值的比例是否达到随机值。
最后,让 让我们看看dotransactionread是如何实现的:
copy func(c * core)dotransactionread(ctx上下文。上下文,db ycsb。db,state * corestate)error { r : = state . r//获取一个键值keynum : = c . next keynum(state)根据我们设置的requestdistribution,keyname: = c . build keyname(keynum)//读取字段varfields [] stringiff!c.readallfields {//如果没有读取所有字段,那么根据fieldchooser选择一个字段执行字段名: = state . field names[c . field chooser . next(r)]fields = append(fields,fieldname)} else {fields = state。field names }//调用db的read方法值,err : = db . read(ctx,c.table,keyname,fields) if err!= nil {return err}//检查数据完整性if c . data integrity { c . verify row(state,keyname,values)} return nil}这里会调用nextkeynum先获取键值,这里的key会根据我们设置的requestdistribution参数按照一定的规则获取。然后调用dbwrapper的read方法,在检查需要读取哪些字段后读取数据。
copyfunc (db dbwrapper) read(ctx上下文。context,table string,key string,fields[]string)(_ map[string][]byte,error){ start: = time . nowdefer func{//进行测试统计量(start, 阅读 ,err)} return db.db.read (ctx,表,键,字段)} db wrapper会封装一个层,使用defer wrapper。
但我在这里的问题是,在读取数据时,会根据传入的字段进行解析,这也会损失一些性能。不知道是否合理,比如redis的read方法:
copyfunc (r *redis) read(ctx上下文。context,table string,key string,fields[]string)(map[string][]byte,error){ data : = make(map[string][]byte,len(fields))res,err : = r . cli:按字段过滤返回数据,err} statistics #它将在每次操作后进行调整。采用测量法对测试数据进行统计。
复制funcmeasure (starttime。time,opstring,err error){//需要时间计算lan: = time . now。如果出错,则执行sub (start )!=零{测量值。测量(fmt。sprintf( %s错误 ,op),lan) return}测量。measure (op,lan)} statistics由于将有多个线程同时运行,因此有必要以线程安全的运行:
复制函数(h *直方图)测量(等待时间。持续时间){//这是us微秒n : = int 64(latency/time .微秒)原子。add 64(h . sum,n)原子。addint64(h.count,1)//这里转换成ms bound : = int(n/h . bound interval)//bound counts是并发映射。用于统计每个时间段内的操作次数(单位:ms)。h .受约束的县。upsert (bound,1,func (ok bool,existed value int 64,new value int 64)int 64 { if ok { return existing value new value } return new value })//设置{ old min : = atomic . load int 64(h . min)的最小延迟。if n = oldmin {break}if atomic。compareandswapint64(h.min,oldmin,n) {break}}//设置{ oldmax: = atomic的最大延迟。load int 64(h . max)if n = old max { break } if atomic。compareandswapint64 (h.max,oldmax,n) {break}}每个时间段的统计数据(h.max,n)}}undcounts是go ycsb自己实现的保证线程安全的concurrentmap,用于统计单位时间内的运算次数;
最大和最小延迟是通过cas操作的,也是为了确保线程安全。
统计完成后将调用getinfo计算耗时:
copy func(h *直方图)getinfomap[string]interface { } { min : = atomic。load int 64(h . min)max : = atomic。load int 64(h . max)sum : = atomic。load int 64(h . sum)count : = atomic。load int 64(h . count)bounds : = h . boundcounts . keyssort。ints(bounds)avg : = int 64(float 64(sum)/float 64(count))per 99 : = 0 per 999 : = 0 per 999 : = 0 pcount: = int 64(0)//计算p99,p99.9,p99.99//这其实是一个比例的统计。// bound将为_,bound cho 45-@ .com = range bounds { boundcount,_ : = h . boundcounts . get(bound)op count = boundcountper : = float 64(op count)/float 64(count)//下面是per99 = = 0 per = 0.99 { per99 =(bound 1)* 1000时99%的操作落在哪个时间间隔内}//计算经过的: = time . now。sub (h .开始时间)。seconds//计算单位时间内的运算次数qps : = float 64(count)/elapsedres : = make(map[string]interface { })res[elapsed]= elapsedres[count]= count res[qps]= qpsres[avg]= avgres[min]= minres[max]= maxres[per 99 th]= per 99 res[per 999 th]= per 999 res[per 999 th]= per 999 returnres }
总结#通过以上分析可以发现,go ycsb设计还是很精妙的,用很少的代码就可以扩展db;配置也相当灵活,可以根据不同的r测试环境,在测试可以随意调整读写概率,保证尽可能模拟在线环境。
然而,它也有许多缺点。一方面文档很不充分,基本只写了几个参数配置;另一方面,许多功能还没有实现,在线测试经常出现错误。你看代码,结果没有实现。三年前,作者在他的博客中说,应该实现测试结果导出功能,但结果尚未实现。我已经给作者tl@pingcap.com发了一封电子邮件,等待回复。
referenc-ycsb
github . com/brianfrankcooper/ycsb/wiki/running-a-workload
标签:
测试行动
了解更多调查数据库(调查数据库建立项目)相关内容请关注本站点。
电脑系统字体怎么调大小(如何将电脑系统字体变大)京东 装机模拟(京东装机模拟器在哪)windows打开img文件(电脑img文件怎么用)联系电脑快捷启动u盘(联想电脑u盘启动快捷键启动没反应)老式dvd怎么用(老式dvd怎么连接电视播放)锐龙r5 2600配什么主板最好(amd r5 2600配什么显卡比较好)如何装win10win11双系统(双系统win7win10安装教程)多多云手机iphone版(多多云ios最新版本)