




下載本文檔
版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
概要依賴管理基本套路輸入源轉(zhuǎn)換操作輸出操作持久化操作依賴管理依賴<dependency><groupId>
.apache.spark</groupId><artifactId>spark-streaming_2.10</artifactId><version>1.6.2</version></dependency>Source相關(guān)依賴部分source相關(guān)依賴現(xiàn)在已經(jīng)單獨(dú)打包,需要單獨(dú)引入基本套路//1、參數(shù)處理if
(args.length
<
2)
{System.err.println("Usage:
NetworkWordCount
<hostname>
<port>")System.exit(1)}//2、初始化StreamingContextval
sparkConf
=
new
SparkConf().setAppName("NetworkWordCount")val
ssc
=
new
StreamingContext(sparkConf,
Seconds(1))//3、從source獲取數(shù)據(jù)創(chuàng)建DStreamval
lines
=
ssc.socketTextStream(args(0),args(1).toInt,
StorageLevel.MEMORY_AND_DISK_SER)//4、對(duì)DStream進(jìn)行val
words
=
lines.flatMap(_.split("
"))val
wordCounts=
words.map(x
=>
(x,
1)).reduceByKey(_+
_)//5、處理計(jì)算結(jié)果
wordCounts.print()//6、啟動(dòng)Spark
Streamingssc.start()ssc.awaitTermination()輸入源Dstream輸入源---input
DStreamSpark內(nèi)置了兩類Source:Source分類舉例說(shuō)明Basic
sourcesfile
systems,
socketconnections,
and
AkkaactorsStreamingContext
直接就可以創(chuàng)建,無(wú)需引入額外的依賴Advanced
sourcesKafka,
Flume,Kinesis,,
etc需要引入相關(guān)依賴,并且需要通過(guò)相關(guān)的工具類來(lái)創(chuàng)建val
lines
=
ssc.socketTextStream(args(0),args(1).toInt,
StorageLevel.MEMORY_AND_DISK_SER)import
.apache.spark.streaming.kafka._val
kafkaStream
=
KafkaUtils.createStream(streamingContext,
[ZK
quorum],
[consumer
group
id],
[per-topic
number
of
Kafka
partitions
to
consume])Dstream輸入源---Receiverinput
Dstream都會(huì)關(guān)聯(lián)一個(gè)Receiver(除了FileInputDStream)Receiver以任務(wù)的形式運(yùn)行在應(yīng)用的執(zhí)行器進(jìn)程中,從輸入源收集數(shù)據(jù)并保存為RDD。Receiver收集到輸入數(shù)據(jù)后會(huì)把數(shù)據(jù)
到另一個(gè)執(zhí)行器進(jìn)程來(lái)保障容錯(cuò)性(默認(rèn)行為)Receiver會(huì)消耗額外的cpu資源,所以要注意分配
的cpu
cores(receiver是一個(gè)單獨(dú)的task,會(huì)消耗cpu)local模式下不要“l(fā)ocal”or
“l(fā)ocal[1]”(需要指定多個(gè),只有一個(gè)核會(huì)卡?。┓植际竭\(yùn)行時(shí),分配的cores >
receivers的數(shù)量StreamingContext
會(huì)周期性地運(yùn)行Spark
作業(yè)來(lái)處理這些數(shù)據(jù)(每接受一批次數(shù)據(jù),就會(huì)提交作業(yè)運(yùn)行處理),把數(shù)據(jù)與之前時(shí)間區(qū)間中的RDD進(jìn)行整合(如果是時(shí)間窗口,需要與其它RDD做運(yùn)算整合)內(nèi)置的input
Dstream:Basic
Sources內(nèi)置input
Dstream–
/apache/spark/tree/v1.6.2/external(高級(jí))文件流val
logData
=
ssc.textFileStream(logDirectory)Spark
支持從任意Hadoop
兼容的文件系統(tǒng)中
數(shù)據(jù),
Spark
Streaming
也就支持從任意Hadoop
兼容的文件系統(tǒng)
中的文件創(chuàng)建數(shù)據(jù)流(InputFormat參數(shù)化)ssc.fileStream[LongWritable,
IntWritable,SequenceFileInputFormat[LongWritable,
IntWritable]](inputDirectory).map
{case
(x,
y)
=>
(x.get(),
y.get())}文件必須原子化創(chuàng)建(比如把文件移入Spark
的
,而不是一條條往已有文件寫數(shù)據(jù))Akka
actor流(spark
底層使用akka通信)內(nèi)置的input
Dstream:Advanced
SourcesApache
Kafkadef
main(args:
Array[String])
{if
(args.length
<
4)
{System.err.println("Usage:KafkaWordCount
<zkQuorum><group>
<topics>
<numThreads>")System.exit(1)}#使用Array接受args參數(shù)val
Array(zkQuorum,
group,
topics,
numThreads)
=
argsval
sparkConf
=
new
SparkConf().setAppName("KafkaWordCount")val
ssc
=
new
StreamingContext(sparkConf,
Seconds(2))#指定hdfs
,作用:容錯(cuò)ssc.checkpoint("checkpoint")val
topicMap
=
topics.split(",").map((_,
numThreads.toInt)).toMapval
lines
=
KafkaUtils.createStream(ssc,
zkQuorum,
group,
topicMap).map(_._2)val
words
=
lines.flatMap(_.split("
"))val
wordCounts
=
words.map(x
=>
(x,
1L)).reduceByKeyAndWindow(_
+
_,_
-
_,Minutes(10),
Seconds(2),
2)wordCounts.print()ssc.start()ssc.awaitTermination()}/apache/spark/examples/st
/apache/spark/tree/v1.6.2/examples/src/main/scala/
reamingDstream輸入源:multiple
input
DStreammultiple
input
streams(same
type
and
same
slideduration)//相同的類型,相同的滑動(dòng)窗口ssc.union(Seq(stream1,stream2,…))
//合并多個(gè)streamstream.union(otherStream)//兩個(gè)stream進(jìn)行合并Dstream輸入源:Custom
ReceiverCustom
input
Dstream–
.apache.spark.streaming.receiver.Receiver(只需要擴(kuò)展Receiver)–無(wú)狀態(tài)轉(zhuǎn)換操作和Sparkcore的語(yǔ)義?一致無(wú)狀態(tài)轉(zhuǎn)化操作就是把簡(jiǎn)單的RDD
轉(zhuǎn)化操作應(yīng)用到每個(gè)批次上,也就是轉(zhuǎn)化DStream中的每一個(gè)RDD(對(duì)Dstream的操作會(huì)
到每個(gè)批次的RDD上)無(wú)狀態(tài)轉(zhuǎn)換操作不會(huì)跨多個(gè)batch的RDD去執(zhí)行(每個(gè)批次的RDD結(jié)果不能累加)有狀態(tài)轉(zhuǎn)換操作1-updateStateByKey有時(shí) 需要在DStream
中跨所有批次狀態(tài)(例如用戶的會(huì)話)。針對(duì)這種情況,updateStateByKey()
為 提供了對(duì)一個(gè)狀態(tài)變量的 ,用于鍵值對(duì)形式的Dstream使用updateStateByKey需要完成兩步工作:定義狀態(tài):可以是任意數(shù)據(jù)類型定義狀態(tài)更新函數(shù)-updateFuncupdate(events,
oldState)events:是在當(dāng)前批次中收到的事件的列表(可能為空)。–oldState:是一個(gè)可選的狀態(tài)對(duì)象,存放在Option
內(nèi);如果一個(gè)鍵沒(méi)有之前的狀態(tài),這個(gè)值可以空缺。newState:由函數(shù)返回,也以O(shè)ption
形式存在;
可以返回一個(gè)空的Option
來(lái)表示想要?jiǎng)h除該狀態(tài)。注意:有狀態(tài)轉(zhuǎn)化操作需要在你的StreamingContext
中打開(kāi)檢查點(diǎn)機(jī)制來(lái)確保容錯(cuò)性–
ssc.checkpoint("hdfs://...")有狀態(tài)轉(zhuǎn)換操作2-window基于窗口的操作會(huì)在一個(gè)比StreamingContext
的批次間隔更長(zhǎng)的時(shí)間范圍內(nèi),通過(guò)整合多個(gè)批次的結(jié)果,計(jì)算出整個(gè)窗口的結(jié)果所有基于窗口的操作都需要兩個(gè)參數(shù),分別為windowDuration以及slideDuration,兩者都必須是StreamContext
的批次間隔的整數(shù)倍valaccessLogsWindow
=
accessLogsDStream.window(Seconds(30),
Seconds(10))val
windowCounts
=
accessLogsWindow.count()batchDuration(每個(gè)批次的長(zhǎng)度)val
ssc
=
new
StreamingContext(sparkConf,
Seconds(10))windowDuration(每次移動(dòng),窗口框住的長(zhǎng)度(幾個(gè)批次))長(zhǎng)控制每次計(jì)算最近的多少個(gè)批次的數(shù)據(jù)(windowDuration/batchDuration)slideDuration(每次移動(dòng)的距離(
幾個(gè)批次))默認(rèn)值與batchDuration相等(默認(rèn)滑動(dòng)一個(gè)batch)控制多長(zhǎng)時(shí)間計(jì)算一次有狀態(tài)轉(zhuǎn)換操作2-window操作代碼片段val
ssc
=
new
StreamingContext(sparkConf,
Seconds(10))…val
accessLogsWindow=
accessLogsDStream.window(Seconds(30),
Seconds(20))val
windowCounts
=
accessLogsWindow.count()..窗口時(shí)長(zhǎng)為3個(gè)批次,滑動(dòng)步長(zhǎng)為2個(gè)批次;每隔2個(gè)批次就對(duì)前3
個(gè)批次的數(shù)據(jù)進(jìn)行一次計(jì)算有狀態(tài)轉(zhuǎn)換操作2-window操作—普通規(guī)約與增量規(guī)約增量規(guī)約只考慮新進(jìn)入窗口的數(shù)據(jù)和離開(kāi)窗口的數(shù)據(jù),讓Spark增量計(jì)算歸約結(jié)果。這種特殊形式需要提供歸約函數(shù)的一個(gè)逆函數(shù),比如+對(duì)應(yīng)的逆函數(shù)為-有狀態(tài)轉(zhuǎn)換操作2-window操作—理解增量規(guī)約DStream輸出常見(jiàn)輸出操作print每個(gè)批次中抓取DStream
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025年保健按摩師(按摩保健操練習(xí))職業(yè)技能鑒定試卷
- 2025年評(píng)茶員(二級(jí))茶葉投資分析與風(fēng)險(xiǎn)評(píng)估考試試卷
- 2025年電子商務(wù)師(中級(jí))考試試卷:電商數(shù)據(jù)分析方法與應(yīng)用試題解析
- 2025年德語(yǔ)TestDaF閱讀真題試卷(德語(yǔ)考試)攻略
- 2025年小學(xué)英語(yǔ)畢業(yè)考試模擬卷(英語(yǔ)綜合實(shí)踐口語(yǔ)與寫作)
- 2025年電子商務(wù)師(高級(jí))考試試卷:電商數(shù)據(jù)分析與用戶畫像
- 軟件業(yè)軟件開(kāi)發(fā)流程優(yōu)化與管理方法研究
- 農(nóng)村合作社與農(nóng)戶土地使用權(quán)流轉(zhuǎn)協(xié)議
- 線上直播帶貨平臺(tái)合作協(xié)議
- 2025年大學(xué)英語(yǔ)四級(jí)考試模擬試卷:翻譯能力提升與真題分析
- 房屋建筑與市政工程重大事故安全隱患判定標(biāo)準(zhǔn)解讀課件
- DB43-T 1267-2023 機(jī)動(dòng)車檢驗(yàn)機(jī)構(gòu)建設(shè)和運(yùn)行管理規(guī)范
- 公司稅務(wù)注銷協(xié)議書
- 2025年人力資源管理專業(yè)期末考試卷及答案
- 防溺水安全家長(zhǎng)會(huì)課件
- 第四單元:促銷問(wèn)題(方案選擇問(wèn)題)專項(xiàng)練習(xí)(學(xué)生版+解析)-2024-2025學(xué)年六年級(jí)數(shù)學(xué)上冊(cè)培優(yōu)精練(北師大版)
- 放射科實(shí)習(xí)生入科教育
- 國(guó)家開(kāi)放大學(xué)國(guó)開(kāi)電大《幼兒園課程基礎(chǔ)》形考任務(wù)1~4答案
- 2025至2030中國(guó)翡翠市場(chǎng)經(jīng)營(yíng)績(jī)效與投資狀況研究報(bào)告
- 神經(jīng)可塑性在教育中的應(yīng)用探索-全面剖析
- 2025年安全生產(chǎn)月主題培訓(xùn)課件
評(píng)論
0/150
提交評(píng)論