您的位置 首页 > 德语词汇

reporterror是什么意思?用法、例句 聊聊storm的reportError

很多朋友对于reporterror是什么意思?用法、例句和聊聊storm的reportError不太懂,今天就由小编来为大家分享,希望可以帮助到大家,下面一起来看看吧!

本文主要研究一下storm的reportError

IErrorReporter

reporterror是什么意思?用法、例句 聊聊storm的reportError

storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/IErrorReporter.java

publicinterfaceIErrorReporter{\nvoidreportError(Throwableerror);\n}\nISpoutOutputCollector、IOutputCollector、IBasicOutputCollector接口均继承了IErrorReporter接口

ISpoutOutputCollector

storm-core/1.2.2/storm-core-1.2.2-sources.jar!/org/apache/storm/spout/ISpoutOutputCollector.java

publicinterfaceISpoutOutputCollectorextendsIErrorReporter{\n/**\nReturnsthetaskidsthatreceivedthetuples.\n*/\nList<Integer>emit(StringstreamId,List<Object>tuple,ObjectmessageId);\nvoidemitDirect(inttaskId,StringstreamId,List<Object>tuple,ObjectmessageId);\nlonggetPendingCount();\n}\nISpoutOutputCollector的实现类有SpoutOutputCollector、SpoutOutputCollectorImpl等

IOutputCollector

storm-2.0.0/storm-client/src/jvm/org/apache/storm/task/IOutputCollector.java

publicinterfaceIOutputCollectorextendsIErrorReporter{\n/**\n*Returnsthetaskidsthatreceivedthetuples.\n*/\nList<Integer>emit(StringstreamId,Collection<Tuple>anchors,List<Object>tuple);\nvoidemitDirect(inttaskId,StringstreamId,Collection<Tuple>anchors,List<Object>tuple);\nvoidack(Tupleinput);\nvoidfail(Tupleinput);\nvoidresetTimeout(Tupleinput);\nvoidflush();\n}\nIOutputCollector的实现类有OutputCollector、BoltOutputCollectorImpl等

IBasicOutputCollector

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/IBasicOutputCollector.java

publicinterfaceIBasicOutputCollectorextendsIErrorReporter{\nList<Integer>emit(StringstreamId,List<Object>tuple);\nvoidemitDirect(inttaskId,StringstreamId,List<Object>tuple);\nvoidresetTimeout(Tupletuple);\n}\nIBasicOutputCollector的实现类有BasicOutputCollector

reportError

SpoutOutputCollectorImpl.reportError

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java

@Override\npublicvoidreportError(Throwableerror){\nexecutor.getErrorReportingMetrics().incrReportedErrorCount();\nexecutor.getReportError().report(error);\n}\n

BoltOutputCollectorImpl.reportError

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java

@Override\npublicvoidreportError(Throwableerror){\nexecutor.getErrorReportingMetrics().incrReportedErrorCount();\nexecutor.getReportError().report(error);\n}\n

可以看到SpoutOutputCollectorImpl及BoltOutputCollectorImpl的reportError方法,均调用了executor.getReportError().report(error);

ReportError.report

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/error/ReportError.java

publicclassReportErrorimplementsIReportError{\nprivatestaticfinalLoggerLOG=LoggerFactory.getLogger(ReportError.class);\nprivatefinalMap<String,Object>topoConf;\nprivatefinalIStormClusterStatestormClusterState;\nprivatefinalStringstormId;\nprivatefinalStringcomponentId;\nprivatefinalWorkerTopologyContextworkerTopologyContext;\nprivateintmaxPerInterval;\nprivateinterrorIntervalSecs;\nprivateAtomicIntegerintervalStartTime;\nprivateAtomicIntegerintervalErrors;\npublicReportError(Map<String,Object>topoConf,IStormClusterStatestormClusterState,StringstormId,StringcomponentId,\nWorkerTopologyContextworkerTopologyContext){\nthis.topoConf=topoConf;\nthis.stormClusterState=stormClusterState;\nthis.stormId=stormId;\nthis.componentId=componentId;\nthis.workerTopologyContext=workerTopologyContext;\nthis.errorIntervalSecs=ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS));\nthis.maxPerInterval=ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL));\nthis.intervalStartTime=newAtomicInteger(Time.currentTimeSecs());\nthis.intervalErrors=newAtomicInteger(0);\n}\n@Override\npublicvoidreport(Throwableerror){\nLOG.error("Error",error);\nif(Time.deltaSecs(intervalStartTime.get())>errorIntervalSecs){\nintervalErrors.set(0);\nintervalStartTime.set(Time.currentTimeSecs());\n}\nif(intervalErrors.incrementAndGet()<=maxPerInterval){\ntry{\nstormClusterState.reportError(stormId,componentId,Utils.hostname(),\nworkerTopologyContext.getThisWorkerPort().longValue(),error);\n}catch(UnknownHostExceptione){\nthrowUtils.wrapInRuntime(e);\n}\n}\n}\n}\n可以看到这里先判断interval是否需要重置,然后再判断error是否超过interval的最大次数,没有超过的话,则调用stormClusterState.reportError写入到存储,比如zk

StormClusterStateImpl.reportError

storm-2.0.0/storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java

@Override\npublicvoidreportError(StringstormId,StringcomponentId,Stringnode,Longport,Throwableerror){\nStringpath=ClusterUtils.errorPath(stormId,componentId);\nStringlastErrorPath=ClusterUtils.lastErrorPath(stormId,componentId);\nErrorInfoerrorInfo=newErrorInfo(ClusterUtils.stringifyError(error),Time.currentTimeSecs());\nerrorInfo.set_host(node);\nerrorInfo.set_port(port.intValue());\nbyte[]serData=Utils.serialize(errorInfo);\nstateStorage.mkdirs(path,defaultAcls);\nstateStorage.create_sequential(path+ClusterUtils.ZK_SEPERATOR+"e",serData,defaultAcls);\nstateStorage.set_data(lastErrorPath,serData,defaultAcls);\nList<String>childrens=stateStorage.get_children(path,false);\nCollections.sort(childrens,newComparator<String>(){\npublicintcompare(Stringarg0,Stringarg1){\nreturnLong.compare(Long.parseLong(arg0.substring(1)),Long.parseLong(arg1.substring(1)));\n}\n});\nwhile(childrens.size()>10){\nStringznodePath=path+ClusterUtils.ZK_SEPERATOR+childrens.remove(0);\ntry{\nstateStorage.delete_node(znodePath);\n}catch(Exceptione){\nif(Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class,e)){\n//ifthenodeisalreadydeleted,donothing\nLOG.warn("Couldnotfindtheznode:{}",znodePath);\n}else{\nthrowe;\n}\n}\n}\n}\n这里使用ClusterUtils.errorPath(stormId,componentId)获取写入的目录,再通过ClusterUtils.lastErrorPath(stormId,componentId)获取写入的路径由于zk不适合存储大量数据,因而这里会判断如果childrens超过10的时候,会删除多余的节点,这里先按照节点名substring(1)升序排序,然后挨个删除

ClusterUtils.errorPath

storm-2.0.0/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java

publicstaticfinalStringZK_SEPERATOR="/";\npublicstaticfinalStringERRORS_ROOT="errors";\npublicstaticfinalStringERRORS_SUBTREE=ZK_SEPERATOR+ERRORS_ROOT;\npublicstaticStringerrorPath(StringstormId,StringcomponentId){\ntry{\nreturnerrorStormRoot(stormId)+ZK_SEPERATOR+URLEncoder.encode(componentId,"UTF-8");\n}catch(UnsupportedEncodingExceptione){\nthrowUtils.wrapInRuntime(e);\n}\n}\npublicstaticStringlastErrorPath(StringstormId,StringcomponentId){\nreturnerrorPath(stormId,componentId)+"-last-error";\n}\npublicstaticStringerrorStormRoot(StringstormId){\nreturnERRORS_SUBTREE+ZK_SEPERATOR+stormId;\n}\nerrorPath的路径为/errors/{stormId}/{componentId},该目录下创建了以e开头的EPHEMERAL_SEQUENTIAL节点,error信息首先追加到该目录下,然后再判断如果超过10个则删除旧的节点lastErrorPath的路径为/errors/{stormId}/{componentId}-last-error,用于存储该componentId的最后一个error

zkCli查看

[zk:localhost:2181(CONNECTED)21]ls/storm/errors\n[DRPCStateQuery-1-1540185943,reportErrorDemo-1-1540260375]\n[zk:localhost:2181(CONNECTED)22]ls/storm/errors/reportErrorDemo-1-1540260375\n[print,print-last-error]\n[zk:localhost:2181(CONNECTED)23]ls/storm/errors/reportErrorDemo-1-1540260375/print\n[e0000000291,e0000000290,e0000000295,e0000000294,e0000000293,e0000000292,e0000000299,e0000000298,e0000000297,e0000000296]\n[zk:localhost:2181(CONNECTED)24]ls/storm/errors/reportErrorDemo-1-1540260375/print/e0000000299\n[]\n[zk:localhost:2181(CONNECTED)25]ls/storm/errors/reportErrorDemo-1-1540260375/print-last-error\n[]\n

storm-ui

curl-ihttp://192.168.99.100:8080/api/v1/topology/reportErrorDemo-1-1540260375?sys=false\nstorm-ui请求了如上的接口,获取了topology相关的数据,其中spout或bolt中包括了lastError,展示了最近一个的error信息

StormApiResource

storm-2.0.0/storm-webapp/src/main/java/org/apache/storm/daemon/ui/resources/StormApiResource.java

/**\n*/api/v1/topology->topo.\n*/\n@GET\n@Path("/topology/{id}")\n@AuthNimbusOp(value="getTopology",needsTopoId=true)\n@Produces("application/json")\npublicResponsegetTopology(@PathParam("id")Stringid,\n@DefaultValue(":all-time")@QueryParam("window")Stringwindow,\n@QueryParam("sys")booleansys,\n@QueryParam(callbackParameterName)Stringcallback)throwsTException{\ntopologyPageRequestMeter.mark();\ntry(NimbusClientnimbusClient=NimbusClient.getConfiguredClient(config)){\nreturnUIHelpers.makeStandardResponse(\nUIHelpers.getTopologySummary(\nnimbusClient.getClient().getTopologyPageInfo(id,window,sys),\nwindow,config,\nservletRequest.getRemoteUser()\n),\ncallback\n);\n}\n}\n这里调用了nimbusClient.getClient().getTopologyPageInfo(id,window,sys)方法

Nimbus.getTopologyPageInfo

storm-2.0.0/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java

@Override\npublicTopologyPageInfogetTopologyPageInfo(StringtopoId,Stringwindow,booleanincludeSys)\nthrowsNotAliveException,AuthorizationException,TException{\ntry{\ngetTopologyPageInfoCalls.mark();\nCommonTopoInfocommon=getCommonTopoInfo(topoId,"getTopologyPageInfo");\nStringtopoName=common.topoName;\nIStormClusterStatestate=stormClusterState;\nintlaunchTimeSecs=common.launchTimeSecs;\nAssignmentassignment=common.assignment;\nMap<List<Integer>,Map<String,Object>>beats=common.beats;\nMap<Integer,String>taskToComp=common.taskToComponent;\nStormTopologytopology=common.topology;\nMap<String,Object>topoConf=Utils.merge(conf,common.topoConf);\nStormBasebase=common.base;\nif(base==null){\nthrownewWrappedNotAliveException(topoId);\n}\nMap<WorkerSlot,WorkerResources>workerToResources=getWorkerResourcesForTopology(topoId);\nList<WorkerSummary>workerSummaries=null;\nMap<List<Long>,List<Object>>exec2NodePort=newHashMap<>();\nif(assignment!=null){\nMap<List<Long>,NodeInfo>execToNodeInfo=assignment.get_executor_node_port();\nMap<String,String>nodeToHost=assignment.get_node_host();\nfor(Entry<List<Long>,NodeInfo>entry:execToNodeInfo.entrySet()){\nNodeInfoni=entry.getValue();\nList<Object>nodePort=Arrays.asList(ni.get_node(),ni.get_port_iterator().next());\nexec2NodePort.put(entry.getKey(),nodePort);\n}\nworkerSummaries=StatsUtil.aggWorkerStats(topoId,\ntopoName,\ntaskToComp,\nbeats,\nexec2NodePort,\nnodeToHost,\nworkerToResources,\nincludeSys,\ntrue);//thisisthetopologypage,soweknowtheuserisauthorized\n}\nTopologyPageInfotopoPageInfo=StatsUtil.aggTopoExecsStats(topoId,\nexec2NodePort,\ntaskToComp,\nbeats,\ntopology,\nwindow,\nincludeSys,\nstate);\n//......\nreturntopoPageInfo;\n}catch(Exceptione){\nLOG.warn("Gettopopageinfoexception.(topologyid='{}')",topoId,e);\nif(einstanceofTException){\nthrow(TException)e;\n}\nthrownewRuntimeException(e);\n}\n}\n这里调用了StatsUtil.aggTopoExecsStats来获取TopologyPageInfo

StatsUtil.aggTopoExecsStats

storm-2.0.0/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java

/**\n*aggregatetopoexecutorsstats.\n*\n*@paramtopologyIdtopologyid\n*@paramexec2nodePortexecutor->host+port\n*@paramtask2componenttask->component\n*@parambeatsexecutor[start,end]->executorheartbeat\n*@paramtopologystormtopology\n*@paramwindowthewindowtobeaggregated\n*@paramincludeSyswhethertoincludesystemstreams\n*@paramclusterStateclusterstate\n*@returnTopologyPageInfothriftstructure\n*/\npublicstaticTopologyPageInfoaggTopoExecsStats(\nStringtopologyId,Mapexec2nodePort,Maptask2component,Map<List<Integer>,Map<String,Object>>beats,\nStormTopologytopology,Stringwindow,booleanincludeSys,IStormClusterStateclusterState){\nList<Map<String,Object>>beatList=extractDataFromHb(exec2nodePort,task2component,beats,includeSys,topology);\nMap<String,Object>topoStats=aggregateTopoStats(window,includeSys,beatList);\nreturnpostAggregateTopoStats(task2component,exec2nodePort,topoStats,topologyId,clusterState);\n}\nStatsUtil.aggTopoExecsStats方法最后调用了postAggregateTopoStats方法

StatsUtil.postAggregateTopoStats

storm-2.0.0/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java

privatestaticTopologyPageInfopostAggregateTopoStats(Maptask2comp,Mapexec2nodePort,Map<String,Object>accData,\nStringtopologyId,IStormClusterStateclusterState){\nTopologyPageInforet=newTopologyPageInfo(topologyId);\nret.set_num_tasks(task2comp.size());\nret.set_num_workers(((Set)accData.get(WORKERS_SET)).size());\nret.set_num_executors(exec2nodePort!=null?exec2nodePort.size():0);\nMapbolt2stats=ClientStatsUtil.getMapByKey(accData,BOLT_TO_STATS);\nMap<String,ComponentAggregateStats>aggBolt2stats=newHashMap<>();\nfor(Objecto:bolt2stats.entrySet()){\nMap.Entrye=(Map.Entry)o;\nMapm=(Map)e.getValue();\nlongexecuted=getByKeyOr0(m,EXECUTED).longValue();\nif(executed>0){\ndoubleexecLatencyTotal=getByKeyOr0(m,EXEC_LAT_TOTAL).doubleValue();\nm.put(EXEC_LATENCY,execLatencyTotal/executed);\ndoubleprocLatencyTotal=getByKeyOr0(m,PROC_LAT_TOTAL).doubleValue();\nm.put(PROC_LATENCY,procLatencyTotal/executed);\n}\nm.remove(EXEC_LAT_TOTAL);\nm.remove(PROC_LAT_TOTAL);\nStringid=(String)e.getKey();\nm.put("last-error",getLastError(clusterState,topologyId,id));\naggBolt2stats.put(id,thriftifyBoltAggStats(m));\n}\n//......\nreturnret;\n}\nprivatestaticErrorInfogetLastError(IStormClusterStatestormClusterState,StringstormId,StringcompId){\nreturnstormClusterState.lastError(stormId,compId);\n}\n这里有添加last-error,通过getLastError调用,之后再通过thriftifyBoltAggStats转化到thrift对象这里调用了stormClusterState.lastError(stormId,compId)获取last-error

UIHelpers.getTopologySummary

storm-2.0.0/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java

/**\n*getTopologySummary.\n*@paramtopologyPageInfotopologyPageInfo\n*@paramwindowwindow\n*@paramconfigconfig\n*@paramremoteUserremoteUser\n*@returngetTopologySummary\n*/\npublicstaticMap<String,Object>getTopologySummary(TopologyPageInfotopologyPageInfo,\nStringwindow,Map<String,Object>config,StringremoteUser){\nMap<String,Object>result=newHashMap();\nMap<String,Object>topologyConf=(Map<String,Object>)JSONValue.parse(topologyPageInfo.get_topology_conf());\nlongmessageTimeout=(long)topologyConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS);\nMap<String,Object>unpackedTopologyPageInfo=\nunpackTopologyInfo(topologyPageInfo,window,config);\nresult.putAll(unpackedTopologyPageInfo);\nresult.put("user",remoteUser);\nresult.put("window",window);\nresult.put("windowHint",getWindowHint(window));\nresult.put("msgTimeout",messageTimeout);\nresult.put("configuration",topologyConf);\nresult.put("visualizationTable",newArrayList());\nresult.put("schedulerDisplayResource",config.get(DaemonConfig.SCHEDULER_DISPLAY_RESOURCE));\nreturnresult;\n}\n获取到TopologyPageInfo之后,UIHelpers.getTopologySummary对其进行unpackTopologyInfo

UIHelpers.unpackTopologyInfo

storm-2.0.0/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java

/**\n*unpackTopologyInfo.\n*@paramtopologyPageInfotopologyPageInfo\n*@paramwindowwindow\n*@paramconfigconfig\n*@returnunpackTopologyInfo\n*/\nprivatestaticMap<String,Object>unpackTopologyInfo(TopologyPageInfotopologyPageInfo,Stringwindow,Map<String,Object>config){\nMap<String,Object>result=newHashMap();\nresult.put("id",topologyPageInfo.get_id());\n//......\nMap<String,ComponentAggregateStats>spouts=topologyPageInfo.get_id_to_spout_agg_stats();\nList<Map>spoutStats=newArrayList();\nfor(Map.Entry<String,ComponentAggregateStats>spoutEntry:spouts.entrySet()){\nspoutStats.add(getTopologySpoutAggStatsMap(spoutEntry.getValue(),spoutEntry.getKey()));\n}\nresult.put("spouts",spoutStats);\nMap<String,ComponentAggregateStats>bolts=topologyPageInfo.get_id_to_bolt_agg_stats();\nList<Map>boltStats=newArrayList();\nfor(Map.Entry<String,ComponentAggregateStats>boltEntry:bolts.entrySet()){\nboltStats.add(getTopologyBoltAggStatsMap(boltEntry.getValue(),boltEntry.getKey()));\n}\nresult.put("bolts",boltStats);\n//......\nresult.put("samplingPct",samplingPct);\nresult.put("replicationCount",topologyPageInfo.get_replication_count());\nresult.put("topologyVersion",topologyPageInfo.get_topology_version());\nresult.put("stormVersion",topologyPageInfo.get_storm_version());\nreturnresult;\n}\n/**\n*getTopologySpoutAggStatsMap.\n*@paramcomponentAggregateStatscomponentAggregateStats\n*@paramspoutIdspoutId\n*@returngetTopologySpoutAggStatsMap\n*/\nprivatestaticMap<String,Object>getTopologySpoutAggStatsMap(ComponentAggregateStatscomponentAggregateStats,\nStringspoutId){\nMap<String,Object>result=newHashMap();\nCommonAggregateStatscommonStats=componentAggregateStats.get_common_stats();\nresult.putAll(getCommonAggStatsMap(commonStats));\nresult.put("spoutId",spoutId);\nresult.put("encodedSpoutId",URLEncoder.encode(spoutId));\nSpoutAggregateStatsspoutAggregateStats=componentAggregateStats.get_specific_stats().get_spout();\nresult.put("completeLatency",spoutAggregateStats.get_complete_latency_ms());\nErrorInfolastError=componentAggregateStats.get_last_error();\nresult.put("lastError",Objects.isNull(lastError)?"":getTruncatedErrorString(lastError.get_error()));\nreturnresult;\n}\n/**\n*getTopologyBoltAggStatsMap.\n*@paramcomponentAggregateStatscomponentAggregateStats\n*@paramboltIdboltId\n*@returngetTopologyBoltAggStatsMap\n*/\nprivatestaticMap<String,Object>getTopologyBoltAggStatsMap(ComponentAggregateStatscomponentAggregateStats,\nStringboltId){\nMap<String,Object>result=newHashMap();\nCommonAggregateStatscommonStats=componentAggregateStats.get_common_stats();\nresult.putAll(getCommonAggStatsMap(commonStats));\nresult.put("boltId",boltId);\nresult.put("encodedBoltId",URLEncoder.encode(boltId));\nBoltAggregateStatsboltAggregateStats=componentAggregateStats.get_specific_stats().get_bolt();\nresult.put("capacity",StatsUtil.floatStr(boltAggregateStats.get_capacity()));\nresult.put("executeLatency",StatsUtil.floatStr(boltAggregateStats.get_execute_latency_ms()));\nresult.put("executed",boltAggregateStats.get_executed());\nresult.put("processLatency",StatsUtil.floatStr(boltAggregateStats.get_process_latency_ms()));\nErrorInfolastError=componentAggregateStats.get_last_error();\nresult.put("lastError",Objects.isNull(lastError)?"":getTruncatedErrorString(lastError.get_error()));\nreturnresult;\n}\n/**\n*getTruncatedErrorString.\n*@paramerrorStringerrorString\n*@returngetTruncatedErrorString\n*/\nprivatestaticStringgetTruncatedErrorString(StringerrorString){\nreturnerrorString.substring(0,Math.min(errorString.length(),200));\n}\n注意这里对spout调用了getTopologySpoutAggStatsMap,对bolt调用了getTopologyBoltAggStatsMap这两个方法对lastError都进行了getTruncatedErrorString处理,最大只substring(0,200)

crashlog

2018-10-2302:53:28.118o.a.s.utilThread-10-print-executor[77][ERROR]Asyncloopdied!\njava.lang.RuntimeException:java.lang.ClassCastException:java.lang.Stringcannotbecasttojava.lang.Integer\natorg.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)~[storm-core-1.2.2.jar:1.2.2]\natorg.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)~[storm-core-1.2.2.jar:1.2.2]\natorg.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)~[storm-core-1.2.2.jar:1.2.2]\natorg.apache.storm.daemon.executor$fn__10795$fn__10808$fn__10861.invoke(executor.clj:861)~[storm-core-1.2.2.jar:1.2.2]\natorg.apache.storm.util$async_loop$fn__553.invoke(util.clj:484)[storm-core-1.2.2.jar:1.2.2]\natclojure.lang.AFn.run(AFn.java:22)[clojure-1.7.0.jar:?]\natjava.lang.Thread.run(Thread.java:748)[?:1.8.0_171]\nCausedby:java.lang.ClassCastException:java.lang.Stringcannotbecasttojava.lang.Integer\natorg.apache.storm.tuple.TupleImpl.getInteger(TupleImpl.java:116)~[storm-core-1.2.2.jar:1.2.2]\natcom.example.demo.error.ErrorPrintBolt.execute(ErrorPrintBolt.java:26)~[stormjar.jar:?]\natorg.apache.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50)~[storm-core-1.2.2.jar:1.2.2]\natorg.apache.storm.daemon.executor$fn__10795$tuple_action_fn__10797.invoke(executor.clj:739)~[storm-core-1.2.2.jar:1.2.2]\natorg.apache.storm.daemon.executor$mk_task_receiver$fn__10716.invoke(executor.clj:468)~[storm-core-1.2.2.jar:1.2.2]\natorg.apache.storm.disruptor$clojure_handler$reify__10135.onEvent(disruptor.clj:41)~[storm-core-1.2.2.jar:1.2.2]\natorg.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)~[storm-core-1.2.2.jar:1.2.2]\n...6more\n2018-10-2302:53:28.129o.a.s.d.executorThread-10-print-executor[77][ERROR]\njava.lang.RuntimeException:java.lang.ClassCastException:java.lang.Stringcannotbecasttojava.lang.Integer\natorg.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)~[storm-core-1.2.2.jar:1.2.2]\natorg.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)~[storm-core-1.2.2.jar:1.2.2]\natorg.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)~[storm-core-1.2.2.jar:1.2.2]\natorg.apache.storm.daemon.executor$fn__10795$fn__10808$fn__10861.invoke(executor.clj:861)~[storm-core-1.2.2.jar:1.2.2]\natorg.apache.storm.util$async_loop$fn__553.invoke(util.clj:484)[storm-core-1.2.2.jar:1.2.2]\natclojure.lang.AFn.run(AFn.java:22)[clojure-1.7.0.jar:?]\natjava.lang.Thread.run(Thread.java:748)[?:1.8.0_171]\nCausedby:java.lang.ClassCastException:java.lang.Stringcannotbecasttojava.lang.Integer\natorg.apache.storm.tuple.TupleImpl.getInteger(TupleImpl.java:116)~[storm-core-1.2.2.jar:1.2.2]\natcom.example.demo.error.ErrorPrintBolt.execute(ErrorPrintBolt.java:26)~[stormjar.jar:?]\natorg.apache.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50)~[storm-core-1.2.2.jar:1.2.2]\natorg.apache.storm.daemon.executor$fn__10795$tuple_action_fn__10797.invoke(executor.clj:739)~[storm-core-1.2.2.jar:1.2.2]\natorg.apache.storm.daemon.executor$mk_task_receiver$fn__10716.invoke(executor.clj:468)~[storm-core-1.2.2.jar:1.2.2]\natorg.apache.storm.disruptor$clojure_handler$reify__10135.onEvent(disruptor.clj:41)~[storm-core-1.2.2.jar:1.2.2]\natorg.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)~[storm-core-1.2.2.jar:1.2.2]\n...6more\n2018-10-2302:53:28.175o.a.s.utilThread-10-print-executor[77][ERROR]Haltingprocess:("Workerdied")\njava.lang.RuntimeException:("Workerdied")\natorg.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341)[storm-core-1.2.2.jar:1.2.2]\natclojure.lang.RestFn.invoke(RestFn.java:423)[clojure-1.7.0.jar:?]\natorg.apache.storm.daemon.worker$fn__11404$fn__11405.invoke(worker.clj:792)[storm-core-1.2.2.jar:1.2.2]\natorg.apache.storm.daemon.executor$mk_executor_data$fn__10612$fn__10613.invoke(executor.clj:281)[storm-core-1.2.2.jar:1.2.2]\natorg.apache.storm.util$async_loop$fn__553.invoke(util.clj:494)[storm-core-1.2.2.jar:1.2.2]\natclojure.lang.AFn.run(AFn.java:22)[clojure-1.7.0.jar:?]\natjava.lang.Thread.run(Thread.java:748)[?:1.8.0_171]\n2018-10-2302:53:28.176o.a.s.d.workerThread-41[INFO]ShuttingdownworkerreportErrorDemo-2-1540263136f9856902-cfe9-45c7-b675-93a29d3d3d366700\n2018-10-2302:53:28.177o.a.s.d.workerThread-41[INFO]Terminatingmessagingcontext\n2018-10-2302:53:28.177o.a.s.d.workerThread-41[INFO]Shuttingdownexecutors\n2018-10-2302:53:28.177o.a.s.d.executorThread-41[INFO]Shuttingdownexecutorspout:[88]\n2018-10-2302:53:28.182o.a.s.utilThread-3-disruptor-executor[88]-send-queue[INFO]Asyncloopinterrupted!\n2018-10-2302:53:28.186o.a.s.utilThread-4-spout-executor[88][INFO]Asyncloopinterrupted!\n2018-10-2302:53:28.188o.a.s.d.executorThread-41[INFO]Shutdownexecutorspout:[88]\n2018-10-2302:53:28.188o.a.s.d.executorThread-41[INFO]Shuttingdownexecutorspout:[1212]\n2018-10-2302:53:28.189o.a.s.utilThread-5-disruptor-executor[1212]-send-queue[INFO]Asyncloopinterrupted!\n2018-10-2302:53:28.190o.a.s.utilThread-6-spout-executor[1212][INFO]Asyncloopinterrupted!\n2018-10-2302:53:28.190o.a.s.d.executorThread-41[INFO]Shutdownexecutorspout:[1212]\n2018-10-2302:53:28.190o.a.s.d.executorThread-41[INFO]Shuttingdownexecutorcount:[22]\n2018-10-2302:53:28.191o.a.s.utilThread-7-disruptor-executor[22]-send-queue[INFO]Asyncloopinterrupted!\n2018-10-2302:53:28.193o.a.s.utilThread-8-count-executor[22][INFO]Asyncloopinterrupted!\n2018-10-2302:53:28.194o.a.s.d.executorThread-41[INFO]Shutdownexecutorcount:[22]\n2018-10-2302:53:28.194o.a.s.d.executorThread-41[INFO]Shuttingdownexecutorprint:[77]\n2018-10-2302:53:28.196o.a.s.utilThread-9-disruptor-executor[77]-send-queue[INFO]Asyncloopinterrupted!\n

小结

关于本次reporterror是什么意思?用法、例句和聊聊storm的reportError的问题分享到这里就结束了,如果解决了您的问题,我们非常高兴。

本站涵盖的内容、图片、视频等数据,部分未能与原作者取得联系。若涉及版权问题,请及时通知我们并提供相关证明材料,我们将及时予以删除!谢谢大家的理解与支持!

Copyright © 2023