Spark(第四节—1)Spark实现ngixn日志的pv、uv统计,Spark实现ip地址转为地点的统计

1300-黄同学

发表文章数:85

热门标签

首页 » 大数据 » 正文

Spark实现ngixn日志的pv、uv统计

首先创建maven项目,pom如下

<properties>
   <scala.version>2.11.8</scala.version>
   <spark.version>2.2.0</spark.version>
</properties>
<dependencies>
   <dependency>
       <groupId>org.scala-lang</groupId>
       <artifactId>scala-library</artifactId>
       <version>${scala.version}</version>
   </dependency>
   <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_2.11</artifactId>
       <version>${spark.version}</version>
   </dependency>
   <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-client</artifactId>
       <version>2.7.5</version>
   </dependency>

   <dependency>
       <groupId>mysql</groupId>
       <artifactId>mysql-connector-java</artifactId>
       <version>5.1.38</version>
   </dependency>

</dependencies>
<build>
   <sourceDirectory>src/main/scala</sourceDirectory>
   <testSourceDirectory>src/test/scala</testSourceDirectory>
   <plugins>
       <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-compiler-plugin</artifactId>
           <version>3.0</version>
           <configuration>
               <source>1.8</source>
               <target>1.8</target>
               <encoding>UTF-8</encoding>
               <!--    <verbal>true</verbal>-->
           </configuration>
       </plugin>
       <plugin>
           <groupId>net.alchim31.maven</groupId>
           <artifactId>scala-maven-plugin</artifactId>
           <version>3.2.0</version>
           <executions>
               <execution>
                   <goals>
                       <goal>compile</goal>
                       <goal>testCompile</goal>
                   </goals>
                   <configuration>
                       <args>
                           <arg>-dependencyfile</arg>
                           <arg>${project.build.directory}/.scala_dependencies</arg>
                       </args>
                   </configuration>
               </execution>
           </executions>
       </plugin>
       <plugin>
           <groupId>org.apache.maven.plugins</groupId>
           <artifactId>maven-shade-plugin</artifactId>
           <version>3.1.1</version>
           <executions>
               <execution>
                   <phase>package</phase>
                   <goals>
                       <goal>shade</goal>
                   </goals>
                   <configuration>
                       <filters>
                           <filter>
                               <artifact>*:*</artifact>
                               <excludes>
                                   <exclude>META-INF/*.SF</exclude>
                                   <exclude>META-INF/*.DSA</exclude>
                                   <exclude>META-INF/*.RSA</exclude>
                               </excludes>
                           </filter>
                       </filters>
                       <transformers>
                           <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                               <mainClass></mainClass>
                           </transformer>
                       </transformers>
                   </configuration>
               </execution>
           </executions>
       </plugin>
   </plugins>
</build>

pv统计代码

import org.apache.spark.{SparkConf, SparkContext}

object PVCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("PVCount-conf").setMaster("local")
    val sc = new SparkContext(conf)
    val fileRDD = sc.textFile("access.log")
    val pv = fileRDD.count()
    println(pv)
    sc.stop()
  }
}

uv统计代码

// 由于数据中没有cookie,用ip代替cookie来统计uv
import org.apache.spark.{SparkConf, SparkContext}

object UVCount {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("UVCount-conf").setMaster("local")
    val sc = new SparkContext(conf)
    val fileRDD = sc.textFile("access.log")
    // 获取数据中的全部IP
    val ipArr = fileRDD.map(_.split(" ")(0))
    // 对IP进行去重并计数
    val uv = ipArr.distinct(1).count()
    println(uv)
    sc.stop()
  }
}

访问来源Refer的Top10统计代码

要获取refer:

  1. 筛掉没有refer的数据,在本数据集中,没有refer的数据就是按空格分割后数组长度小于等于10的。
  2. refer数据在将原数据按空格分割后的第11个,索引值为10
import org.apache.spark.{SparkConf, SparkContext}

object ReferTopN {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("ReferTopN-conf").setMaster("local")
    val sc = new SparkContext(conf)
    val fileRDD = sc.textFile("access.log")
    // 获取数据中的全部IP
    val referArr = fileRDD.map(_.split(" ")).filter(_.size > 10).map(_ (10))
    // 将每个refer计数1,使用reduceByKey进行按refer累加,并按累加的结果倒序排序
    val countRDD = referArr.map((_, 1)).reduceByKey(_ + _).sortBy(_._2, false)
    // 取出前10名
    val top10 = countRDD.take(10)
    println(top10.toBuffer)
    sc.stop()
  }
}

Spark实现ip地址转为地点的统计

pom文件沿用“Spark实现ngixn日志的pv、uv统计”项目的。

数据准备

ip日志信息
在ip日志信息中,我们只需要关心ip这一个维度就可以了,其他的不做介绍
Spark(第四节—1)Spark实现ngixn日志的pv、uv统计,Spark实现ip地址转为地点的统计
城市ip段字典
Spark(第四节—1)Spark实现ngixn日志的pv、uv统计,Spark实现ip地址转为地点的统计

思路

  1. 加载城市ip段字典,获取ip在Long类型下的起始数字、结束数字、经度、维度
  2. 加载日志数据,获取ip信息,然后转换为数字,和ip段在Long类型下的起始数字、结束数字比较(比较算法为二分法)
  3. 比较的时候采用二分法查找,找到对应的经度和维度
  4. 然后对经度和维度做单词计数
  5. 将统计结果存到MySQL中

创建mysql数据库表

CREATE DATABASE `spark`;

USE `spark`;

DROP TABLE IF EXISTS `iplocation`;

CREATE TABLE `iplocation` (
  `longitude` varchar(32) DEFAULT NULL,
  `latitude` varchar(32) DEFAULT NULL,
  `total_count` varchar(32) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

代码开发实现

import java.sql.DriverManager

import org.apache.spark.{SparkConf, SparkContext}

object IPAddressCount {

  // 将ip地址转为long类型数值
  def ip2Long(ip:String):Long={
    val fragments = ip.split("[.]")
    var Ip_Num = 0L
    for (i <- 0 until fragments.length){
      Ip_Num = fragments(i).toLong | Ip_Num << 8L
    }
    Ip_Num
  }

  //通过二分查找法,将ip转换为long的数字,通过二分查找法,查找ip落在哪一个经纬度范围内
  def binarySearch(longIp: Long, value: Array[(String, String, String, String)]): Int = {
    //定义二分查找的初始下标
    var start = 0;
    //定义二分查找的结束下标
    var end = value.length - 1
    //如果初始下标等于结束下标,查找结束,不等于则一直查找
    while (start <= end) {
      //定义中间下标,通过比较,确定初始与结束值是否与中间下标相等
      var middle = (start + end) / 2
      //如果ip大于初始值,小于结束值,直接返回,注意这里需要用到return
      if (longIp >= value(middle)._1.toLong && longIp <= value(middle)._2.toLong) {
        return middle
      }
      //如果ip小于中间值,ip位于左边
      if (longIp < value(middle)._1.toLong) {
        end = middle - 1
      }
      //如果ip大于中间值,ip位于右边
      if (longIp > value(middle)._2.toLong) {
        start = middle + 1
      }
    }
    //为了防止报错,直接返回一个数字
    -1
  }


  // 将结果保存到mysql的函数
  val data2MySql=(datas:Iterator[((String,String),Int)])=>{
    // 获取mysql连接
    val connect=DriverManager.getConnection("jdbc:mysql://192.168.2.6/spark",
      "root","12345")
    // 设置sql语句
    val preparedStatement=
      connect.prepareStatement("insert into iplocation(longitude,latitude,total_count) values (?,?,?)")
    datas.foreach(e => {
      // 插入经度数据
      preparedStatement.setString(1,e._1._1)
      // 插入维度数据
      preparedStatement.setString(2,e._1._2)
      // 插入经纬度次数数据
      preparedStatement.setInt(3,e._2)
      // 提交更改到mysql
      preparedStatement.execute()
    })
    // 关闭sql对象
    preparedStatement.close()
    // 关闭mysql连接
    connect.close()
  }


  def main(args: Array[String]): Unit = {
    // 获取sparkContext
    val sc=new SparkContext(new SparkConf().setAppName("ip-count").setMaster("local"))
    // 设置sparkContext日志级别
    sc.setLogLevel("WARN")
    // 城市ip段字典
    val ipText=sc.textFile("ip.txt")
    // 获取ip范围与经纬度数据,scala中的字段分隔符最好加上[]
    val ipDict=ipText.map(_.split("[|]")).map(x => (x(2),x(3),x(x.length-2),x(x.length-1)))
    // 将ip范围与经纬度字典数据作为spark集群的广播变量
    val bc=sc.broadcast(ipDict.collect())
    // ip日志信息
    val userText=sc.textFile("20090121000132.394251.http.format")
    // 获取用户上网记录的ip字段
    val userIPArr=userText.map(_.split("[|]")(1))
    // 将用户ip的RDD转为((经度,维度),1)的RDD
    val locationRDD=userIPArr.mapPartitions(iter => {
      iter.map(ip =>{
        val ipArray= bc.value
        val ipLong=ip2Long(ip)
        val resultIndex=binarySearch(ipLong,ipArray)
        // 返回((经度,维度),1)
        ((ipArray(resultIndex)._3,ipArray(resultIndex)._4),1)
      }
      )
    })
    //println(locationRDD.collect().toBuffer)
    // 统计每个经纬度的次数
    val locationCount=locationRDD.reduceByKey(_+_)
    // 将结果插入到mysql
    locationCount.foreachPartition(data2MySql)

    sc.stop()
  }
}
标签:

未经允许不得转载:作者:1300-黄同学, 转载或复制请以 超链接形式 并注明出处 拜师资源博客
原文地址:《Spark(第四节—1)Spark实现ngixn日志的pv、uv统计,Spark实现ip地址转为地点的统计》 发布于2021-02-18

分享到:
赞(0) 打赏

评论 抢沙发

评论前必须登录!

  注册



长按图片转发给朋友

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏

Vieu3.3主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。

登录

忘记密码 ?

您也可以使用第三方帐号快捷登录

Q Q 登 录
微 博 登 录