sparkstreaming测试之四设置窗口大小接收数据-创新互联

测试思路:

创新互联坚持“要么做到,要么别承诺”的工作理念,服务领域包括:做网站、成都做网站、企业官网、英文网站、手机端网站、网站推广等服务,满足客户于互联网时代的井冈山网站设计、移动媒体设计的需求,帮助企业找到有效的互联网解决方案。努力成为您成熟可靠的网络建设合作伙伴!

    首先,使用网络数据发送程序发送数据;

    然后,运行spark程序;

    观察效果。

说明:

    1. 这里也需要设置检查点目录

    2. 这里有四个参数:

      前两个分别是监听的端口和每隔多少毫秒接收一次数据;

      第三个参数是接收前多少毫秒的数据;(详细请参见window具体含义)

      第四个参数是每隔多少毫秒接收一次数据。

sparkStreaming

import org.apache.log4j.{LoggerLevel}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{SecondsStreamingContext}
import org.apache.spark.{SparkContextSparkConf}
import org.apache.spark.streaming.StreamingContext._

WindowWordCount {
  def main(args: Array[]){

    Logger.().setLevel(Level.)
    Logger.().setLevel(Level.)

    conf = SparkConf().setAppName().setMaster()
    sc = SparkContext(conf)

    ssc = StreamingContext(sc())
    ssc.checkpoint()

    val lines = ssc.socketTextStream(args(0),args(1).toInt,
      StorageLevel.MEMORY_ONLY_SER)
    words = lines.flatMap(_.split())

    //windows operator
    val wordCounts = words.map(x=>(x,1)).reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),
      Seconds(args(2).toInt),Seconds(args(3).toInt))

    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。


本文标题:sparkstreaming测试之四设置窗口大小接收数据-创新互联
本文链接:http://pwwzsj.com/article/ddpcpi.html