离线日志的分析综合案例:
第一步:数据的采集 flume,爬虫,javaAPI等等
第二步:数据的统一的存储 hdfs
第三步:数据的清洗 主要目的:将半结构化的数据,转换成结构化的数据 MR来实现 MR比较灵活,可以灵活的处理我们的数据
第四步:数据的映射入库 hive可以将结构化的数据映射成为一张表
第五步:数据的分析 数据仓库DW层,数据的分析
第六步:将分析的结果,存入到hive的临时表当中
第七步:通过sqoop工具将我们的数据导出去到mysql当中
第八步:通过web报表展示工具,展示我们的统计结果
12 网站点击流日志数据分析系统
点击流数据:关注的是用户访问网站的轨迹,按照时间来进行先后区分
基本上所有的大型网站都有日志埋点
通过js的方式,可以获取到你再网站上面点击的所有的链接,按钮,商品,等等,包括你访问的url的链接等等
js埋点,谁来做???专业的前端来做的
埋点收集的数据,都发送到日志服务器 一条日志大概1Kb来算
数据全部在日志服务器
分析用户的点击数据,得到我们的点击流模型
pageView模型:重视的是每一个页面受到的访问情况,每访问一个页面,就算一条记录
visit模型:重视的是每一个session会话内的访问情况,这次会话内,哪个页面进来,哪个页面出去,进入时间,出去时间
一、网站流量模型分析:
分析的是我们网站流量的来源:
广告推广
自然搜索 百度搜索 google搜索
付费搜索 百度竞价排名
直接流量: 直接敲网站的网址
网站流量多维度的细分:
访问来源:从什么地方来访问的
访问媒介:访问的新老用户,目标页面等等
网站内容分析:
进入网站首页 ==》 商品分类页 ==》 商品详情页 ==》 订单确认页 ==》 付款页面
不怕你不买,就怕你不来
网站流量转化漏斗分析:
二、流量常见分析分类:
骨灰级指标
IP:一天之内访问我这个网站不重复IP的个数
一般来说一个IP可能对应多个人
pageView:每打开一个页面,就算一次 pv值
一共访问了多少次页面
unique page view:以用户的cookie来为依据,不同的用户对应不同的cookie。一个用户多次访问网站只算一次
去重之后的访问人数
基础级指标
访问次数:访客从进入网站到离开网站的一系列活动记为一次访问,也称会话(session),1次访问(会话)可能包含多个PV。
网站停留时间:访问者在网站上花费的时间。
页面停留时间:访问者在某个特定页面或某组网页上所花费的时间。
复合级指标
人均浏览页数:平均每个独立访客产生的PV。人均浏览页数=浏览次数/独立访客。体现网站对访客的吸引程度。
跳出率:指某一范围内单页访问次数或访问者与总访问次数的百分比。其中跳出指单页访问或访问者的次数,即在一次访问中访问者进入网站后只访问了一个页面就离开的数量。
退出率:指某一范围内退出的访问者与综合访问量的百分比。其中退出指访问者离开网站的次数,通常是基于某个范围的。
基础分析(PV,IP,UV)
趋势分析:根据选定的时段,提供网站流量数据,通过流量趋势变化形态,为您分析网站访客的访问规律、网站发展状况提供参考。
对比分析:根据选定的两个对比时段,提供网站流量在时间上的纵向对比报表,帮您发现网站发展状况、发展规律、流量变化率等。
当前在线:提供当前时刻站点上的访客量,以及最近15分钟流量、来源、受访、访客变化情况等,方便用户及时了解当前网站流量状况。
访问明细:提供最近7日的访客访问记录,可按每个PV或每次访问行为(访客的每次会话)显示,并可按照来源、搜索词等条件进行筛选。 通过访问明细,用户可以详细了解网站流量的累计过程,从而为用户快速找出流量变动原因提供最原始、最准确的依据。
流量来源分析
主要分析我们的流量从哪些渠道过来的
来源分类
搜索引擎:
搜索词:
最近7日的访客搜索记录
来路域名:
来路页面:
来源升降榜:
受访分析
访问域名 子域名
受访页面: A.html访问5000次
受访升降榜
热点图
用户视图
访问轨迹:从哪个页面跳转到哪个页面等等
访客分析
地区运营商
终端详情
新老访客
忠诚度
活跃度
转化路径分析
分析漏斗模型:
每一步相对于上一步的转化率
每一步相对于第一步的转化率
三、整体技术流程及架构
流量日志分析网站整体架构模块
1、数据采集模块
使用flume来进行采集
使用flume的tailDirSource可以按照正则匹配,收集我们某一个文件夹下面的多个不同类型的数据。
tailDirSource特点:
如果数据这一行数据正在写入,那么过一会儿重试采集,直到数据写入成功
a1.sources = r1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
channel memory channel
sink: hdfs sink 要控制文件的采集的策略,避免hdfs产生大量的小文件
时间长短 文件大小
数据采集过来的字段
1、访客ip地址: 58.215.204.118
2、访客用户信息: - -
3、请求时间:[18/Sep/2013:06:51:35 +0000]
4、请求方式:GET
5、请求的url:/wp-includes/js/jquery/jquery.js?ver=1.10.2
6、请求所用协议:HTTP/1.1
7、响应码:304
8、返回的数据流量:0
9、访客的来源url:http://blog.fens.me/nodejs-socketio-chat/
10、访客所用浏览器:Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0
数据的每个字段之间都是用空格隔开的
2、数据的清洗|(预处理)
使用mapreduce来实现
3、数据入库
将清洗之后的结构化数据全部load到hive表中
4、数据分析
开发数据统计分析的hql的语句
5、数据的展示
展示我们的结果数据
四、模块开发—-数据预处理
数据预处理
package cn.itcast.bigdata.weblog.pre;
import java.io.IOException;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.HashSet;
import java.util.Set;
import cn.itcast.bigdata.weblog.utils.DateUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import cn.itcast.bigdata.weblog.mrbean.WebLogBean;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 处理原始日志,过滤出真实pv请求 转换时间格式 对缺失字段填充默认值 对记录标记valid和invalid
*
*/
public class WeblogPreProcess extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//Configuration conf = new Configuration();
Configuration conf = super.getConf();
Job job = Job.getInstance(conf);
/*String inputPath= "hdfs://node01:9000/weblog/"+DateUtil.getYestDate()+"/input";
String outputPath="hdfs://node01:9000/weblog/"+DateUtil.getYestDate()+"/weblogPreOut";
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:9000"), conf);
if (fileSystem.exists(new Path(outputPath))){
fileSystem.delete(new Path(outputPath),true);
}
fileSystem.close();
FileInputFormat.setInputPaths(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
*/
FileInputFormat.addInputPath(job,new Path("file:///xxx//input"));
job.setInputFormatClass(TextInputFormat.class);
FileOutputFormat.setOutputPath(job,new Path("file:///xxx//weblogPreOut2"));
job.setOutputFormatClass(TextOutputFormat.class);
job.setJarByClass(WeblogPreProcess.class);
job.setMapperClass(WeblogPreProcessMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
boolean res = job.waitForCompletion(true);
return res?0:1;
}
static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
// 用来存储网站url分类数据
Set<String> pages = new HashSet<String>();
Text k = new Text();
NullWritable v = NullWritable.get();
/**
* 从外部配置文件中加载网站的有用url分类数据 存储到maptask的内存中,用来对日志数据进行过滤
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
pages.add("/about");
pages.add("/black-ip-list/");
pages.add("/cassandra-clustor/");
pages.add("/finance-rhive-repurchase/");
pages.add("/hadoop-family-roadmap/");
pages.add("/hadoop-hive-intro/");
pages.add("/hadoop-zookeeper-intro/");
pages.add("/hadoop-mahout-roadmap/");
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
WebLogBean webLogBean = WebLogParser.parser(line);
if (webLogBean != null) {
// 过滤js/图片/css等静态资源
WebLogParser.filtStaticResource(webLogBean, pages);
/* if (!webLogBean.isValid()) return; */
k.set(webLogBean.toString());
context.write(k, v);
}
}
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
int run = ToolRunner.run(configuration, new WeblogPreProcess(), args);
System.exit(run);
}
}
点击流模型pageviews表
package cn.itcast.bigdata.weblog.clickstream;
import cn.itcast.bigdata.weblog.mrbean.WebLogBean;
import cn.itcast.bigdata.weblog.utils.DateUtil;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.net.URI;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
/**
*
* 将清洗之后的日志梳理出点击流pageviews模型数据
*
* 输入数据是清洗过后的结果数据
*
* 区分出每一次会话,给每一次visit(session)增加了session-id(随机uuid)
* 梳理出每一次会话中所访问的每个页面(请求时间,url,停留时长,以及该页面在这次session中的序号)
* 保留referral_url,body_bytes_send,useragent
*
*
* @author
*
*/
public class ClickStreamPageView extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration conf = super.getConf();
Job job = Job.getInstance(conf);
/*String inputPath="hdfs://node01:9000/weblog/"+DateUtil.getYestDate()+"/weblogPreOut";
String outputPath="hdfs://node01:9000/weblog/"+DateUtil.getYestDate()+"/pageViewOut";
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:9000"), conf);
if (fileSystem.exists(new Path(outputPath))){
fileSystem.delete(new Path(outputPath),true);
}
fileSystem.close();
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));*/
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///xxx//weblogPreOut2"));
TextOutputFormat.setOutputPath(job,new Path("file://xxx//pageViewOut2"));
job.setJarByClass(ClickStreamPageView.class);
job.setMapperClass(ClickStreamMapper.class);
job.setReducerClass(ClickStreamReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(WebLogBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
boolean b = job.waitForCompletion(true);
return b?0:1;
}
static class ClickStreamMapper extends Mapper<LongWritable, Text, Text, WebLogBean> {
Text k = new Text();
WebLogBean v = new WebLogBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("/001");
if (fields.length < 9) return;
//将切分出来的各字段set到weblogbean中
v.set("true".equals(fields[0]) ? true : false, fields[1], fields[2], fields[3], fields[4], fields[5], fields[6], fields[7], fields[8]);
//只有有效记录才进入后续处理
if (v.isValid()) {
//此处用ip地址来标识用户
k.set(v.getRemote_addr());
context.write(k, v);
}
}
}
static class ClickStreamReducer extends Reducer<Text, WebLogBean, NullWritable, Text> {
Text v = new Text();
@Override
protected void reduce(Text key, Iterable<WebLogBean> values, Context context) throws IOException, InterruptedException {
ArrayList<WebLogBean> beans = new ArrayList<WebLogBean>();
// 先将一个用户的所有访问记录中的时间拿出来排序
try {
for (WebLogBean bean : values) {
//为什么list集合当中不能直接添加循环出来的这个bean?
//这里通过属性拷贝,每次new 一个对象,避免了bean的属性值每次覆盖
WebLogBean webLogBean = new WebLogBean();
try {
BeanUtils.copyProperties(webLogBean, bean);
} catch(Exception e) {
e.printStackTrace();
}
beans.add(webLogBean);
}
//将bean按时间先后顺序排序
Collections.sort(beans, new Comparator<WebLogBean>() {
@Override
public int compare(WebLogBean o1, WebLogBean o2) {
try {
Date d1 = toDate(o1.getTime_local());
Date d2 = toDate(o2.getTime_local());
if (d1 == null || d2 == null)
return 0;
return d1.compareTo(d2);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
});
/**
* 以下逻辑为:从有序bean中分辨出各次visit,并对一次visit中所访问的page按顺序标号step
* 核心思想:
* 就是比较相邻两条记录中的时间差,如果时间差<30分钟,则该两条记录属于同一个session
* 否则,就属于不同的session
*
*/
int step = 1;
String session = UUID.randomUUID().toString();
for (int i = 0; i < beans.size(); i++) {
WebLogBean bean = beans.get(i);
// 如果仅有1条数据,则直接输出
if (1 == beans.size()) {
// 设置默认停留时长为60s
v.set(session+"/001"+key.toString()+"/001"+bean.getRemote_user() + "/001" + bean.getTime_local() + "/001" + bean.getRequest() + "/001" + step + "/001" + (60) + "/001" + bean.getHttp_referer() + "/001" + bean.getHttp_user_agent() + "/001" + bean.getBody_bytes_sent() + "/001"
+ bean.getStatus());
context.write(NullWritable.get(), v);
session = UUID.randomUUID().toString();
break;
}
// 如果不止1条数据,则将第一条跳过不输出,遍历第二条时再输出
if (i == 0) {
continue;
}
// 求近两次时间差
long timeDiff = timeDiff(toDate(bean.getTime_local()), toDate(beans.get(i - 1).getTime_local()));
// 如果本次-上次时间差<30分钟,则输出前一次的页面访问信息
if (timeDiff < 30 * 60 * 1000) {
v.set(session+"/001"+key.toString()+"/001"+beans.get(i - 1).getRemote_user() + "/001" + beans.get(i - 1).getTime_local() + "/001" + beans.get(i - 1).getRequest() + "/001" + step + "/001" + (timeDiff / 1000) + "/001" + beans.get(i - 1).getHttp_referer() + "/001"
+ beans.get(i - 1).getHttp_user_agent() + "/001" + beans.get(i - 1).getBody_bytes_sent() + "/001" + beans.get(i - 1).getStatus());
context.write(NullWritable.get(), v);
step++;
} else {
// 如果本次-上次时间差>30分钟,则输出前一次的页面访问信息且将step重置,以分隔为新的visit
v.set(session+"/001"+key.toString()+"/001"+beans.get(i - 1).getRemote_user() + "/001" + beans.get(i - 1).getTime_local() + "/001" + beans.get(i - 1).getRequest() + "/001" + (step) + "/001" + (60) + "/001" + beans.get(i - 1).getHttp_referer() + "/001"
+ beans.get(i - 1).getHttp_user_agent() + "/001" + beans.get(i - 1).getBody_bytes_sent() + "/001" + beans.get(i - 1).getStatus());
context.write(NullWritable.get(), v);
// 输出完上一条之后,重置step编号
step = 1;
session = UUID.randomUUID().toString();
}
// 如果此次遍历的是最后一条,则将本条直接输出
if (i == beans.size() - 1) {
// 设置默认停留市场为60s
v.set(session+"/001"+key.toString()+"/001"+bean.getRemote_user() + "/001" + bean.getTime_local() + "/001" + bean.getRequest() + "/001" + step + "/001" + (60) + "/001" + bean.getHttp_referer() + "/001" + bean.getHttp_user_agent() + "/001" + bean.getBody_bytes_sent() + "/001" + bean.getStatus());
context.write(NullWritable.get(), v);
}
}
} catch (ParseException e) {
e.printStackTrace();
}
}
private String toStr(Date date) {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
return df.format(date);
}
private Date toDate(String timeStr) throws ParseException {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
return df.parse(timeStr);
}
private long timeDiff(String time1, String time2) throws ParseException {
Date d1 = toDate(time1);
Date d2 = toDate(time2);
return d1.getTime() - d2.getTime();
}
private long timeDiff(Date time1, Date time2) throws ParseException {
return time1.getTime() - time2.getTime();
}
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new ClickStreamPageView(), args);
System.exit(run);
}
}
WebLogBean
package cn.itcast.bigdata.weblog.mrbean;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
/**
* 对接外部数据的层,表结构定义最好跟外部数据源保持一致
* 术语: 贴源表
* @author
*
*/
public class WebLogBean implements Writable {
private boolean valid = true;// 判断数据是否合法
private String remote_addr;// 记录客户端的ip地址
private String remote_user;// 记录客户端用户名称,忽略属性"-"
private String time_local;// 记录访问时间与时区
private String request;// 记录请求的url与http协议
private String status;// 记录请求状态;成功是200
private String body_bytes_sent;// 记录发送给客户端文件主体内容大小
private String http_referer;// 用来记录从那个页面链接访问过来的
private String http_user_agent;// 记录客户浏览器的相关信息
public void set(boolean valid,String remote_addr, String remote_user, String time_local, String request, String status, String body_bytes_sent, String http_referer, String http_user_agent) {
this.valid = valid;
this.remote_addr = remote_addr;
this.remote_user = remote_user;
this.time_local = time_local;
this.request = request;
this.status = status;
this.body_bytes_sent = body_bytes_sent;
this.http_referer = http_referer;
this.http_user_agent = http_user_agent;
}
public String getRemote_addr() {
return remote_addr;
}
public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
}
public String getRemote_user() {
return remote_user;
}
public void setRemote_user(String remote_user) {
this.remote_user = remote_user;
}
public String getTime_local() {
return this.time_local;
}
public void setTime_local(String time_local) {
this.time_local = time_local;
}
public String getRequest() {
return request;
}
public void setRequest(String request) {
this.request = request;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getBody_bytes_sent() {
return body_bytes_sent;
}
public void setBody_bytes_sent(String body_bytes_sent) {
this.body_bytes_sent = body_bytes_sent;
}
public String getHttp_referer() {
return http_referer;
}
public void setHttp_referer(String http_referer) {
this.http_referer = http_referer;
}
public String getHttp_user_agent() {
return http_user_agent;
}
public void setHttp_user_agent(String http_user_agent) {
this.http_user_agent = http_user_agent;
}
public boolean isValid() {
return valid;
}
public void setValid(boolean valid) {
this.valid = valid;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(this.valid);
sb.append("/001").append(this.getRemote_addr());
sb.append("/001").append(this.getRemote_user());
sb.append("/001").append(this.getTime_local());
sb.append("/001").append(this.getRequest());
sb.append("/001").append(this.getStatus());
sb.append("/001").append(this.getBody_bytes_sent());
sb.append("/001").append(this.getHttp_referer());
sb.append("/001").append(this.getHttp_user_agent());
return sb.toString();
}
@Override
public void readFields(DataInput in) throws IOException {
this.valid = in.readBoolean();
this.remote_addr = in.readUTF();
this.remote_user = in.readUTF();
this.time_local = in.readUTF();
this.request = in.readUTF();
this.status = in.readUTF();
this.body_bytes_sent = in.readUTF();
this.http_referer = in.readUTF();
this.http_user_agent = in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeBoolean(this.valid);
out.writeUTF(null==remote_addr?"":remote_addr);
out.writeUTF(null==remote_user?"":remote_user);
out.writeUTF(null==time_local?"":time_local);
out.writeUTF(null==request?"":request);
out.writeUTF(null==status?"":status);
out.writeUTF(null==body_bytes_sent?"":body_bytes_sent);
out.writeUTF(null==http_referer?"":http_referer);
out.writeUTF(null==http_user_agent?"":http_user_agent);
}
}
点击流模型visit信息表
package cn.itcast.bigdata.weblog.clickstream;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import cn.itcast.bigdata.weblog.utils.DateUtil;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import cn.itcast.bigdata.weblog.mrbean.PageViewsBean;
import cn.itcast.bigdata.weblog.mrbean.VisitBean;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 输入数据:pageviews模型结果数据
* 从pageviews模型结果数据中进一步梳理出visit模型
* sessionid start-time out-time start-page out-page pagecounts ......
*
* @author
*
*/
public class ClickStreamVisit extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration conf = super.getConf();
Job job = Job.getInstance(conf);
/*String inputPath = "hdfs://node01:9000/weblog/"+ DateUtil.getYestDate() + "/pageViewOut";
String outPutPath="hdfs://node01:9000/weblog/"+ DateUtil.getYestDate() + "/clickStreamVisit";
FileSystem fileSystem = FileSystem.get(new URI("hdfs://node01:9000"),conf);
if (fileSystem.exists(new Path(outPutPath))){
fileSystem.delete(new Path(outPutPath),true);
}
fileSystem.close();
FileInputFormat.setInputPaths(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outPutPath));
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);*/
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///xxx//pageViewOut2"));
TextOutputFormat.setOutputPath(job,new Path("file:///xxx//clickStreamVisit"));
job.setJarByClass(ClickStreamVisit.class);
job.setMapperClass(ClickStreamVisitMapper.class);
job.setReducerClass(ClickStreamVisitReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(PageViewsBean.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(VisitBean.class);
boolean res = job.waitForCompletion(true);
return res?0:1;
}
// 以session作为key,发送数据到reducer
static class ClickStreamVisitMapper extends Mapper<LongWritable, Text, Text, PageViewsBean> {
PageViewsBean pvBean = new PageViewsBean();
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("/001");
int step = Integer.parseInt(fields[5]);
//(String session, String remote_addr, String timestr, String request, int step, String staylong, String referal, String useragent, String bytes_send, String status)
//299d6b78-9571-4fa9-bcc2-f2567c46df3472.46.128.140-2013-09-18 07:58:50/hadoop-zookeeper-intro/160"https://www.google.com/""Mozilla/5.0"14722200
pvBean.set(fields[0], fields[1], fields[2], fields[3],fields[4], step, fields[6], fields[7], fields[8], fields[9]);
k.set(pvBean.getSession());
context.write(k, pvBean);
}
}
static class ClickStreamVisitReducer extends Reducer<Text, PageViewsBean, NullWritable, VisitBean> {
@Override
protected void reduce(Text session, Iterable<PageViewsBean> pvBeans, Context context) throws IOException, InterruptedException {
// 将pvBeans按照step排序
ArrayList<PageViewsBean> pvBeansList = new ArrayList<PageViewsBean>();
for (PageViewsBean pvBean : pvBeans) {
PageViewsBean bean = new PageViewsBean();
try {
BeanUtils.copyProperties(bean, pvBean);
pvBeansList.add(bean);
} catch (Exception e) {
e.printStackTrace();
}
}
Collections.sort(pvBeansList, new Comparator<PageViewsBean>() {
@Override
public int compare(PageViewsBean o1, PageViewsBean o2) {
return o1.getStep() > o2.getStep() ? 1 : -1;
}
});
// 取这次visit的首尾pageview记录,将数据放入VisitBean中
VisitBean visitBean = new VisitBean();
// 取visit的首记录
visitBean.setInPage(pvBeansList.get(0).getRequest());
visitBean.setInTime(pvBeansList.get(0).getTimestr());
// 取visit的尾记录
visitBean.setOutPage(pvBeansList.get(pvBeansList.size() - 1).getRequest());
visitBean.setOutTime(pvBeansList.get(pvBeansList.size() - 1).getTimestr());
// visit访问的页面数
visitBean.setPageVisits(pvBeansList.size());
// 来访者的ip
visitBean.setRemote_addr(pvBeansList.get(0).getRemote_addr());
// 本次visit的referal
visitBean.setReferal(pvBeansList.get(0).getReferal());
visitBean.setSession(session.toString());
context.write(NullWritable.get(), visitBean);
}
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(),new ClickStreamVisit(),args);
}
}
PageViewsBean
package cn.itcast.bigdata.weblog.mrbean;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class PageViewsBean implements Writable {
private String session;
private String remote_addr;
private String timestr;
private String request;
private int step;
private String staylong;
private String referal;
private String useragent;
private String bytes_send;
private String status;
public void set(String session, String remote_addr, String useragent, String timestr, String request, int step, String staylong, String referal, String bytes_send, String status) {
this.session = session;
this.remote_addr = remote_addr;
this.useragent = useragent;
this.timestr = timestr;
this.request = request;
this.step = step;
this.staylong = staylong;
this.referal = referal;
this.bytes_send = bytes_send;
this.status = status;
}
public String getSession() {
return session;
}
public void setSession(String session) {
this.session = session;
}
public String getRemote_addr() {
return remote_addr;
}
public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
}
public String getTimestr() {
return timestr;
}
public void setTimestr(String timestr) {
this.timestr = timestr;
}
public String getRequest() {
return request;
}
public void setRequest(String request) {
this.request = request;
}
public int getStep() {
return step;
}
public void setStep(int step) {
this.step = step;
}
public String getStaylong() {
return staylong;
}
public void setStaylong(String staylong) {
this.staylong = staylong;
}
public String getReferal() {
return referal;
}
public void setReferal(String referal) {
this.referal = referal;
}
public String getUseragent() {
return useragent;
}
public void setUseragent(String useragent) {
this.useragent = useragent;
}
public String getBytes_send() {
return bytes_send;
}
public void setBytes_send(String bytes_send) {
this.bytes_send = bytes_send;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
@Override
public void readFields(DataInput in) throws IOException {
this.session = in.readUTF();
this.remote_addr = in.readUTF();
this.timestr = in.readUTF();
this.request = in.readUTF();
this.step = in.readInt();
this.staylong = in.readUTF();
this.referal = in.readUTF();
this.useragent = in.readUTF();
this.bytes_send = in.readUTF();
this.status = in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(session);
out.writeUTF(remote_addr);
out.writeUTF(timestr);
out.writeUTF(request);
out.writeInt(step);
out.writeUTF(staylong);
out.writeUTF(referal);
out.writeUTF(useragent);
out.writeUTF(bytes_send);
out.writeUTF(status);
}
}
VisitBean
package cn.itcast.bigdata.weblog.mrbean;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class VisitBean implements Writable {
private String session;
private String remote_addr;
private String inTime;
private String outTime;
private String inPage;
private String outPage;
private String referal;
private int pageVisits;
public void set(String session, String remote_addr, String inTime, String outTime, String inPage, String outPage, String referal, int pageVisits) {
this.session = session;
this.remote_addr = remote_addr;
this.inTime = inTime;
this.outTime = outTime;
this.inPage = inPage;
this.outPage = outPage;
this.referal = referal;
this.pageVisits = pageVisits;
}
public String getSession() {
return session;
}
public void setSession(String session) {
this.session = session;
}
public String getRemote_addr() {
return remote_addr;
}
public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
}
public String getInTime() {
return inTime;
}
public void setInTime(String inTime) {
this.inTime = inTime;
}
public String getOutTime() {
return outTime;
}
public void setOutTime(String outTime) {
this.outTime = outTime;
}
public String getInPage() {
return inPage;
}
public void setInPage(String inPage) {
this.inPage = inPage;
}
public String getOutPage() {
return outPage;
}
public void setOutPage(String outPage) {
this.outPage = outPage;
}
public String getReferal() {
return referal;
}
public void setReferal(String referal) {
this.referal = referal;
}
public int getPageVisits() {
return pageVisits;
}
public void setPageVisits(int pageVisits) {
this.pageVisits = pageVisits;
}
@Override
public void readFields(DataInput in) throws IOException {
this.session = in.readUTF();
this.remote_addr = in.readUTF();
this.inTime = in.readUTF();
this.outTime = in.readUTF();
this.inPage = in.readUTF();
this.outPage = in.readUTF();
this.referal = in.readUTF();
this.pageVisits = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(session);
out.writeUTF(remote_addr);
out.writeUTF(inTime);
out.writeUTF(outTime);
out.writeUTF(inPage);
out.writeUTF(outPage);
out.writeUTF(referal);
out.writeInt(pageVisits);
}
@Override
public String toString() {
return session + "/001" + remote_addr + "/001" + inTime + "/001" + outTime + "/001" + inPage + "/001" + outPage + "/001" + referal + "/001" + pageVisits;
}
}
拜师教育学员文章:作者:976-沈同学,
转载或复制请以 超链接形式 并注明出处 拜师资源博客。
原文地址:《12 网站点击流日志数据分析系统》 发布于2020-04-29
评论 抢沙发