您的位置 首页 > 德语词汇

accumulator是什么意思?用法、例句,源码探究Spark中Accumulator实现

大家好,如果您还对accumulator是什么意思?用法、例句不太了解,没有关系,今天就由本站为大家分享accumulator是什么意思?用法、例句的知识,包括源码探究Spark中Accumulator实现的问题都会给大家分析到,还望可以解决大家的问题,下面我们就开始吧!

classAccumulator[T]private[spark](

accumulator是什么意思?用法、例句,源码探究Spark中Accumulator实现

//该参数不参与序列化,只在driver中维护,各个worker会通过heartbeat传该worker的累加统计值上来,

@transientprivate[spark]valinitialValue:T,

extendsAccumulable[T,T](initialValue,param,name,internal){

defthis(initialValue:T,param:AccumulatorParam[T],name:Option[String])={

this(initialValue,param,name,false)

defthis(initialValue:T,param:AccumulatorParam[T])={

this(initialValue,param,None,false)

classAccumulable[R,T]private[spark](

*比如说两个数字累加1+2=3,两个对象C1,C2累加得到C3

*累加的具体逻辑可以自己随意实现

*累加的两个对象必须是同种类型,累加的结果也必须是该类型

@transientinitialValue:R,param:AccumulableParam[R,T],internal:Boolean)={

this(initialValue,param,None,internal)

defthis(@transientinitialValue:R,param:AccumulableParam[R,T],name:Option[String])=

this(initialValue,param,name,false)

defthis(@transientinitialValue:R,param:AccumulableParam[R,T])=

*有一个Accumulators对象,里面有一个Map对象,已k/v的形式,保存所有的累加器

*key为累加器的id,这个id为系统自动生成,从0开始,

//driver中的初始值,该值无需序列化

@volatile@transientprivatevarvalue_:R=initialValue

valzero=param.zero(initialValue)

//注册一个累加器,保存在Accumulators的Map对象中

*累加器分为两种:内置累加器和用户自定义累加器

private[spark]defisInternal:Boolean=internal

*其实addAccumulator中调用就是addInPlace方法

*addInPlace方法逻辑可以自定义实现

def+=(term:T){value_=param.addAccumulator(value_,term)}

defadd(term:T){value_=param.addAccumulator(value_,term)}

def++=(term:R){value_=param.addInPlace(value_,term)}

defmerge(term:R){value_=param.addInPlace(value_,term)}

*获取计数器全局的值,放回的是一个AccumulableParam对象

thrownewUnsupportedOperationException("Can'treadaccumulatorvalueintask")

thrownewUnsupportedOperationException("Can'tassignaccumulatorvalueintask")

*在反序列化过程中实例化Accumulator相关对象

*各节点上累加器初始化值为AccumulableParam对象逻辑上的zero

privatedefreadObject(in:ObjectInputStream):Unit=Utils.tryOrIOException{

valtaskContext=TaskContext.get()

taskContext.registerAccumulator(this)

overridedeftoString:String=if(value_==null)"null"elsevalue_.toString

*其实addAccumulator中调用就是addInPlace方法

traitAccumulatorParam[T]extendsAccumulableParam[T,T]{

defaddAccumulator(t1:T,t2:T):T={

*常用四种类型的默认"累加逻辑"实现

*无需声明AccumulatorParam,通过implicit隐式传入参数

*valaccum=sc.accumulator(0L,"ErrorAccumulator")

*就是隐式传入LongAccumulatorParam对象

implicitobjectDoubleAccumulatorParamextendsAccumulatorParam[Double]{

defaddInPlace(t1:Double,t2:Double):Double=t1+t2

defzero(initialValue:Double):Double=0.0

implicitobjectIntAccumulatorParamextendsAccumulatorParam[Int]{

defaddInPlace(t1:Int,t2:Int):Int=t1+t2

defzero(initialValue:Int):Int=0

implicitobjectLongAccumulatorParamextendsAccumulatorParam[Long]{

defaddInPlace(t1:Long,t2:Long):Long=t1+t2

defzero(initialValue:Long):Long=0L

implicitobjectFloatAccumulatorParamextendsAccumulatorParam[Float]{

defaddInPlace(t1:Float,t2:Float):Float=t1+t2

defzero(initialValue:Float):Float=0f

private[spark]objectAccumulatorsextendsLogging{

//构建一个Map来保存每个累加器对象

valoriginals=mutable.Map[Long,WeakReference[Accumulable[_,_]]]()

privatevarlastId:Long=0//每个累加器用一个数字标识,从0开始

lastId+=1//每注册增加一个累加器,累加器id自增1

defregister(a:Accumulable[_,_]):Unit=synchronized{

originals(a.id)=newWeakReference[Accumulable[_,_]](a)

//通过累加器数字标识,移除一个累加器

//AddvaluestotheoriginalaccumulatorswithsomegivenIDs

defadd(values:Map[Long,Any]):Unit=synchronized{

//Sincewearenowstoringweakreferences,wemustcheckwhethertheunderlyingdata

caseSome(accum)=>accum.asInstanceOf[Accumulable[Any,Any]]++=value

thrownewIllegalAccessError("AttemptedtoaccessgarbagecollectedAccumulator.")

logWarning(s"Ignoringaccumulatorupdateforunknownaccumulatorid$id")

OK,本文到此结束,希望对大家有所帮助。

本站涵盖的内容、图片、视频等数据,部分未能与原作者取得联系。若涉及版权问题,请及时通知我们并提供相关证明材料,我们将及时予以删除!谢谢大家的理解与支持!

Copyright © 2023