大家好,如果您还对accumulator是什么意思?用法、例句不太了解,没有关系,今天就由本站为大家分享accumulator是什么意思?用法、例句的知识,包括源码探究Spark中Accumulator实现的问题都会给大家分析到,还望可以解决大家的问题,下面我们就开始吧!
classAccumulator[T]private[spark](
//该参数不参与序列化,只在driver中维护,各个worker会通过heartbeat传该worker的累加统计值上来,
@transientprivate[spark]valinitialValue:T,
extendsAccumulable[T,T](initialValue,param,name,internal){
defthis(initialValue:T,param:AccumulatorParam[T],name:Option[String])={
this(initialValue,param,name,false)
defthis(initialValue:T,param:AccumulatorParam[T])={
this(initialValue,param,None,false)
classAccumulable[R,T]private[spark](
*比如说两个数字累加1+2=3,两个对象C1,C2累加得到C3
*累加的具体逻辑可以自己随意实现
*累加的两个对象必须是同种类型,累加的结果也必须是该类型
@transientinitialValue:R,param:AccumulableParam[R,T],internal:Boolean)={
this(initialValue,param,None,internal)
defthis(@transientinitialValue:R,param:AccumulableParam[R,T],name:Option[String])=
this(initialValue,param,name,false)
defthis(@transientinitialValue:R,param:AccumulableParam[R,T])=
*有一个Accumulators对象,里面有一个Map对象,已k/v的形式,保存所有的累加器
*key为累加器的id,这个id为系统自动生成,从0开始,
//driver中的初始值,该值无需序列化
@volatile@transientprivatevarvalue_:R=initialValue
valzero=param.zero(initialValue)
//注册一个累加器,保存在Accumulators的Map对象中
*累加器分为两种:内置累加器和用户自定义累加器
private[spark]defisInternal:Boolean=internal
*其实addAccumulator中调用就是addInPlace方法
*addInPlace方法逻辑可以自定义实现
def+=(term:T){value_=param.addAccumulator(value_,term)}
defadd(term:T){value_=param.addAccumulator(value_,term)}
def++=(term:R){value_=param.addInPlace(value_,term)}
defmerge(term:R){value_=param.addInPlace(value_,term)}
*获取计数器全局的值,放回的是一个AccumulableParam对象
thrownewUnsupportedOperationException("Can'treadaccumulatorvalueintask")
thrownewUnsupportedOperationException("Can'tassignaccumulatorvalueintask")
*在反序列化过程中实例化Accumulator相关对象
*各节点上累加器初始化值为AccumulableParam对象逻辑上的zero
privatedefreadObject(in:ObjectInputStream):Unit=Utils.tryOrIOException{
valtaskContext=TaskContext.get()
taskContext.registerAccumulator(this)
overridedeftoString:String=if(value_==null)"null"elsevalue_.toString
*其实addAccumulator中调用就是addInPlace方法
traitAccumulatorParam[T]extendsAccumulableParam[T,T]{
defaddAccumulator(t1:T,t2:T):T={
*常用四种类型的默认"累加逻辑"实现
*无需声明AccumulatorParam,通过implicit隐式传入参数
*valaccum=sc.accumulator(0L,"ErrorAccumulator")
*就是隐式传入LongAccumulatorParam对象
implicitobjectDoubleAccumulatorParamextendsAccumulatorParam[Double]{
defaddInPlace(t1:Double,t2:Double):Double=t1+t2
defzero(initialValue:Double):Double=0.0
implicitobjectIntAccumulatorParamextendsAccumulatorParam[Int]{
defaddInPlace(t1:Int,t2:Int):Int=t1+t2
defzero(initialValue:Int):Int=0
implicitobjectLongAccumulatorParamextendsAccumulatorParam[Long]{
defaddInPlace(t1:Long,t2:Long):Long=t1+t2
defzero(initialValue:Long):Long=0L
implicitobjectFloatAccumulatorParamextendsAccumulatorParam[Float]{
defaddInPlace(t1:Float,t2:Float):Float=t1+t2
defzero(initialValue:Float):Float=0f
private[spark]objectAccumulatorsextendsLogging{
//构建一个Map来保存每个累加器对象
valoriginals=mutable.Map[Long,WeakReference[Accumulable[_,_]]]()
privatevarlastId:Long=0//每个累加器用一个数字标识,从0开始
lastId+=1//每注册增加一个累加器,累加器id自增1
defregister(a:Accumulable[_,_]):Unit=synchronized{
originals(a.id)=newWeakReference[Accumulable[_,_]](a)
//通过累加器数字标识,移除一个累加器
//AddvaluestotheoriginalaccumulatorswithsomegivenIDs
defadd(values:Map[Long,Any]):Unit=synchronized{
//Sincewearenowstoringweakreferences,wemustcheckwhethertheunderlyingdata
caseSome(accum)=>accum.asInstanceOf[Accumulable[Any,Any]]++=value
thrownewIllegalAccessError("AttemptedtoaccessgarbagecollectedAccumulator.")
logWarning(s"Ignoringaccumulatorupdateforunknownaccumulatorid$id")
OK,本文到此结束,希望对大家有所帮助。