Spark Streaming 实战 日志分析数据清洗+统计访问量

2022年6月12日14:47:43

1 项目需求

1)需求

  • 统计今天到目前为止的访问量
  • 统计今天到目前为止从搜索引擎过来的课程的访问量

2)开发环境与技术选型

  • IDEA+maven
  • flume+kafka+HBase

3)安装配置 HBase

  1. 下载、解压、配置环境变量
  2. 配置文件

conf/hbase-env.sh

修改JAVA_HOMEexport HBASE_MANAGES_ZK=false

conf/hbase-site.xml

<configuration><property><name>hbase.rootdir</name><value>hdfs://localhost:8020/hbase</value></property><property><name>hbase.cluster.distributed</name><value>true</value></property><property><name>hbase.zookeeper.quorum</name><value>localhost:2181</value></property></configuration>

conf/regionservers

localhost

4)HBase 建表

//1 启动hbase
start-hbase.sh
// 2 启动shell
hbaseshell
// 3 建表create'course_clickcount','info'create'course_search_clickcount','info'
// 4 查看数据表
list
// 5 查看数据表信息
describe'course_clickcount'
// 6 查看表数据
scan'course_clickcount'

5)代码地址

2 模拟日志生成

1)使用python开发日志生成器模拟产生日志,每分钟产生一次日志信息

generate_log.py

#coding=UTF-8import randomimport time

url_paths=["class/112.html","class/128.html","class/145.html","class/130.html","class/146.html","class/131.html","learn/821","course/list"
]

ip_slices=[132,156,124,10,29,167,143,187,30,46,55,63,72,87,98,168]

http_referers=["https://www.baidu.com/s?wd={query}","https://www.sogou.com/web?query={query}","https://cn.bing.com/search?q={query}","https://www.so.com/s?q={query}"
]

search_keyword=["spark sql实战","hadoop 基础","storm实战","spark streaming实战"
]

status_code=["200","404","500"]defsample_status_code():return random.sample(status_code,1)[0]defsample_referer():if random.uniform(0,1)>0.2:return"-"
    refer_str=random.sample(http_referers,1)
    query_str=random.sample(search_keyword,1)return refer_str[0].format(query=query_str[0])defsample_url():return random.sample(url_paths,1)[0]defsample_ip():
    slice=random.sample(ip_slices,4)return".".join([str(item)for itemin slice])defgenerate_log(count=10):
    time_str=time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())

    f=open("/Users/Mac/testdata/streaming_access.log","w+")while count >=1:
        query_log="{ip}\t{local_time}\t\"GET /{url} HTTP/1.1\"\t{status_code}\t{refer}".format(url=sample_url(),ip=sample_ip(),refer=sample_referer(),status_code=sample_status_code(),local_time=time_str)
        print(query_log)
        f.write(query_log+"\n")
        count=count-1if __name__ =='__main__':# 每一分钟生成一次日志信息whileTrue:
        generate_log()
        time.sleep(60)

3 flume收集日志并对接kafka

1)编写flume配置文件,streaming_project2.conf

exec-memory-kafka.sources= exec-source
exec-memory-kafka.sinks=kafka-sink
exec-memory-kafka.channels= memory-channel

exec-memory-kafka.sources.exec-source.type=exec
exec-memory-kafka.sources.exec-source.command=tail-F /Users/Mac/testdata/streaming_access.log
exec-memory-kafka.sources.exec-source.shell= /bin/sh-c

exec-memory-kafka.memory-channel.type=memory

exec-memory-kafka.sinks.kafka-sink.type= org.apache.flume.sink.kafka.KafkaSink
exec-memory-kafka.sinks.kafka-sink.brokerList=localhost:9092
exec-memory-kafka.sinks.kafka-sink.topic=test_topic
exec-memory-kafka.sinks.kafka-sink.batchSize=5
exec-memory-kafka.sinks.kafka-sink.requireedAcks=1

exec-memory-kafka.sources.exec-source.channels=memory-channel
exec-memory-kafka.sinks.kafka-sink.channel= memory-channel

4 业务开发

4.1 消费kafka数据、数据清洗与统计

1)实体类

ClickLog.scala

package com.lihaogn.sparkProject.domain/**
  * 清洗后的日志格式
  *
  *@param ip
  *@param time
  *@param courseId
  *@param statusCode 日志访问状态码
  *@param referer
  */caseclassClickLog(ip: String, time: String, courseId: Int, statusCode: Int, referer: String)

CourseClickCount.scala

package com.lihaogn.sparkProject.domain/**
  * 课程点击次数实体类
  *
  *@param day_course  对应HBase中的rowkey
  *@param click_count 访问次数
  */caseclassCourseClickCount(day_course: String, click_count: Long)

CourseSearchClickCount.scala

package com.lihaogn.sparkProject.domain/**
  * 从搜索引擎过来的课程点击数实体类
  *@param day_search_course
  *@param click_count
  */caseclassCourseSearchClickCount(day_search_course: String, click_count: Long)

2)工具类

DateUtils.scala

package com.lihaogn.sparkProject.utilsimport java.util.Dateimport org.apache.commons.lang3.time.FastDateFormat/**
  * 日期时间工具类
  */objectDateUtils {val OLD_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")val TARGET_FORMAT = FastDateFormat.getInstance("yyyyMMddHHmmss")def getTime(time: String) = {
    OLD_FORMAT.parse(time).getTime
  }def parseToMinute(time: String) = {
    TARGET_FORMAT.format(new Date(getTime(time)))
  }def main(args: Array[String]): Unit = {
    println(parseToMinute("2018-9-6 13:58:01"))
  }
}

添加依赖

<!-- cloudera repo--><repositories><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos</url></repository></repositories><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>${hbase.version}</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>${hbase.version}</version></dependency><!-- hadoop --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency>

HBaseUtils.java

package com.lihaogn.spark.project.utils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.client.HBaseAdmin;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;/**
 * HBase操作工具类,Java工具类建议采用单例模式封装
 */publicclassHBaseUtils {

    HBaseAdmin admin =null;
    Configuration configuration =null;/**
     * 私有构造方法
     */privateHBaseUtils() {

        configuration =new Configuration();
        configuration.set("hbase.zookeeper.quorum","localhost:2181");
        configuration.set("hbase.rootdir","hdfs://localhost:8020/hbase");try {
            admin =new HBaseAdmin(configuration);
        }catch (IOException e) {
            e.printStackTrace();
        }
    }privatestatic HBaseUtils instance =null;publicstaticsynchronized HBaseUtilsgetInstance() {if (null == instance) {
            instance =new HBaseUtils();
        }return instance;
    }/**
     * 根据表名获取到HTable实例
     *
     * @param tableName
     * @return
     */public HTablegetTable(String tableName) {
        HTable table =null;try {
            table =new HTable(configuration, tableName);
        }catch (IOException e) {
            e.printStackTrace();
        }return table;
    }/**
     * 添加一条记录到表中
     *
     * @param tableName
     * @param rowkey
     * @param cf
     * @param column
     * @param value
     */publicvoidput(String tableName, String rowkey, String cf, String column, String value) {
        HTable table = getTable(tableName);

        Put put =new Put(Bytes.toBytes(rowkey));
        put.add(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));try {
            table.put(put);
        }catch (IOException e) {
            e.printStackTrace();
        }
    }publicstaticvoidmain(String[] args) {//        HTable table = HBaseUtils.getInstance().getTable("course_clickcount");//        System.out.println(table.getName().getNameAsString());

        String tableName ="course_clickcount";
        String rowkey ="20180906_1";
        String cf ="info";
        String column ="click_count";
        String value ="2";

        HBaseUtils.getInstance().put(tableName, rowkey, cf, column, value);
    }
}

3)数据库操作

CourseClickCountDAO.scala

package com.lihaogn.sparkProject.daoimport com.lihaogn.spark.project.utils.HBaseUtilsimport com.lihaogn.sparkProject.domain.CourseClickCountimport org.apache.hadoop.hbase.client.Getimport org.apache.hadoop.hbase.util.Bytesimport scala.collection.mutable.ListBuffer/**
  * 数据访问层,课程点击数
  */objectCourseClickCountDAO {val tableName ="course_clickcount"val cf ="info"val qualifer ="click_count"/**
    * 保存数据到HBase
    *
    *@param list
    */def save(list: ListBuffer[CourseClickCount]): Unit = {val table = HBaseUtils.getInstance().getTable(tableName)for (ele <- list) {
      table.incrementColumnValue(Bytes.toBytes(ele.day_course),
        Bytes.toBytes(cf),
        Bytes.toBytes(qualifer),
        ele.click_count)
    }
  }/**
    * 根据rowkey查询值
    *@param day_course
    *@return
    */def count(day_course:String):Long= {val table = HBaseUtils.getInstance().getTable(tableName)val get =new Get(Bytes.toBytes(day_course))val value = table.get(get).getValue(cf.getBytes, qualifer.getBytes)if (value ==null) {0L
    }else
      Bytes.toLong(value)
  }def main(args: Array[String]): Unit = {val list=new ListBuffer[CourseClickCount]
    list.append(CourseClickCount("20180906_8",8))
    list.append(CourseClickCount("20180906_4",3))
    list.append(CourseClickCount("20180906_2",2))

    save(list)

    println(count("20180906_8")+":"+count("20180906_4")+":"+count("20180906_2"))
  }

}

CourseSearchClickCountDAO.scala

package com.lihaogn.sparkProject.daoimport com.lihaogn.spark.project.utils.HBaseUtilsimport com.lihaogn.sparkProject.domain.{CourseClickCount, CourseSearchClickCount}import org.apache.hadoop.hbase.client.Getimport org.apache.hadoop.hbase.util.Bytesimport scala.collection.mutable.ListBuffer/**
  * 数据访问层,从搜索引擎过来的课程点击数
  */objectCourseSearchClickCountDAO {val tableName ="course_search_clickcount"val cf ="info"val qualifer ="click_count"/**
    * 保存数据到HBase
    *
    *@param list
    */def save(list: ListBuffer[CourseSearchClickCount]): Unit = {val table = HBaseUtils.getInstance().getTable(tableName)for (ele <- list) {
      table.incrementColumnValue(Bytes.toBytes(ele.day_search_course),
        Bytes.toBytes(cf),
        Bytes.toBytes(qualifer),
        ele.click_count)
    }
  }/**
    * 根据rowkey查询值
    *
    *@param day_search_course
    *@return
    */def count(day_search_course: String): Long = {val table = HBaseUtils.getInstance().getTable(tableName)val get =new Get(Bytes.toBytes(day_search_course))val value = table.get(get).getValue(cf.getBytes, qualifer.getBytes)if (value ==null) {0L
    }else
      Bytes.toLong(value)
  }def main(args: Array[String]): Unit = {val list =new ListBuffer[CourseSearchClickCount]
    list.append(CourseSearchClickCount("20180906_www.baidu.com_8",8))
    list.append(CourseSearchClickCount("20180906_www.baidu.com_4",3))

    save(list)

    println(count("20180906_www.baidu.com_8") +":" + count("20180906_www.baidu.com_4"))
  }

}

4)主类

SparkStreamingApp.scala

packagecom.lihaogn.sparkProject.main

importcom.lihaogn.sparkProject.dao.{CourseClickCountDAO, CourseSearchClickCountDAO}
importcom.lihaogn.sparkProject.domain.{ClickLog, CourseClickCount, CourseSearchClickCount}
importcom.lihaogn.sparkProject.utils.DateUtils
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils

import scala.collection.mutable.ListBuffer/**
  * 使用spark streaming分析日志
  */
object SparkStreamingApp {

  def main(args: Array[String]): Unit = {

    if (args.length !=4) {
      System.err.println("usage: KafKaReceiverWC <zkQuorum> <group> <topics> <numThreads>")
    }

    val Array(zkQuorum, group, topics, numThreads) = args

    val sparkConf = new SparkConf().setAppName("SparkStreamingApp").setMaster("local[5]")

    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap

    // spark streaming 对接 kafka
    val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)

    // 步骤一:测试数据接收
    messages.map(_._2).count().print()

    // 步骤二:数据清洗
    val logs = messages.map(_._2)
    val cleanData = logs.map(line => {
      val infos = line.split("\t")

      val url = infos(2).split(" ")(1)
      var courseId =0

      // 获取课程标号
      if (url.startsWith("/class")) {
        val courseHtml = url.split("/")(2)
        courseId = courseHtml.substring(0, courseHtml.lastIndexOf(".")).toInt
      }

      ClickLog(infos(0), DateUtils.parseToMinute(infos(1)), courseId, infos(3).toInt, infos(4))
    }).filter(clicklog => clicklog.courseId !=0)

    cleanData.print()

    // 步骤三:统计今天到现在为止的课程访问量
    cleanData.map(x=>{
      (x.time.substring(0,8)+"_"+x.courseId,1)
    }).reduceByKey(_+_).foreachRDD(rdd=>{
      rdd.foreachPartition(partitionRecords=>{
        val list=new ListBuffer[CourseClickCount]

        partitionRecords.foreach(pair=>{
          list.append(CourseClickCount(pair._1,pair._2))
        })
        // 写入数据库
        CourseClickCountDAO.save(list)

      })
    })

    // 步骤四:统计从搜索引擎过来的从今天开始到现在的课程的访问量
    cleanData.map(x=>{
      val referer=x.referer.replaceAll("//","/")
      val splits=referer.split("/")
      var host=""
      if(splits.length>2) {
        host=splits(1)
      }

      (host,x.courseId,x.time)
    }).filter(_._1!="").map(x=>{
      (x._3.substring(0,8)+"_"+x._1+"_"+x._2,1)
    }).reduceByKey(_+_).foreachRDD(rdd=>{
      rdd.foreachPartition(partitionRecords=>{
        val list =new ListBuffer[CourseSearchClickCount]

        partitionRecords.foreach(pair=>{
          list.append(CourseSearchClickCount(pair._1,pair._2))
        })
        // 写入数据库
        CourseSearchClickCountDAO.save(list)

      })
    })

    ssc.start()

    ssc.awaitTermination()
  }
}

5 运行测试

1)启动 zookeeper

zkServer.shstart

2)启动 HDFS

start-dfs.shstart-yarn.sh

3)启动 kafka

kafka-server-start.sh-daemon$KAFKA_HOME/config/server.properties $

4)启动 flume

flume-ng agent \
--conf$FLUME_HOME/conf \--conf-file$FLUME_HOME/conf/streaming_project2.conf \--name exec-memory-kafka \-Dflume.root.logger=INFO,console

5)运行日志生成器

python3 generate_log.py

6)运行spark程序

spark-submit \
--classcom.lihaogn.sparkProject.main.SparkStreamingApp \--master local[5] \--nameSparkStreamingApp \--jars /Users/Mac/software/spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar,$(echo /Users/Mac/app/hbase-1.2.0-cdh5.7.0/lib/*.jar | tr' '',') \/Users/Mac/my-lib/Kafka-train-1.0.jar \
localhost:2181 test test_topic 1

7)结果
Spark Streaming 实战 日志分析数据清洗+统计访问量

  • 作者:豪华手抓饼
  • 原文链接:https://blog.csdn.net/lihaogn/article/details/82461601
    更新时间:2022年6月12日14:47:43 ,共 11868 字。