博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MapReduce 编程模板编写【分析网站基本指标UV】程序
阅读量:4317 次
发布时间:2019-06-06

本文共 7200 字,大约阅读时间需要 24 分钟。

1.网站基本指标的几个概念

PV: page view 浏览量

页面的浏览次数,用户每打开一次页面就记录一次。

UV:unique visitor 独立访客数

一天内访问某站点的人数(以cookie为例) 但是如果用户把浏览器cookie给删了之后再次访问会影响记录。

VV: visit view 访客的访问次数

记录所有访客一天内访问了多少次网站,访客完成访问直到浏览器关闭算一次。

IP:独立ip数

指一天内使用不同ip地址的用户访问网站的数量。

2.编写MapReduce编程模板

Driver

package mapreduce;​import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;​public class MRDriver extends Configured implements Tool {​    public int run(String[] args) throws Exception {        //创建job        Job job = Job.getInstance(this.getConf(),"mr-demo");        job.setJarByClass(MRDriver.class);​        //input 默认从hdfs读取数据 将每一行转换成key-value        Path inPath = new Path(args[0]);        FileInputFormat.setInputPaths(job,inPath);​        //map 一行调用一次Map方法  对每一行数据进行分割        job.setMapperClass(null);        job.setMapOutputKeyClass(null);        job.setMapOutputValueClass(null);​        //shuffle        job.setPartitionerClass(null);//分组        job.setGroupingComparatorClass(null);//分区        job.setSortComparatorClass(null);//排序​        //reduce 每有一条key value调用一次reduce方法        job.setReducerClass(null);        job.setOutputKeyClass(null);        job.setOutputValueClass(null);​        //output        Path outPath = new Path(args[1]);        //this.getConf()来自父类 内容为空可以自己set配置信息        FileSystem fileSystem = FileSystem.get(this.getConf());        //如果目录已经存在则删除        if(fileSystem.exists(outPath)){            //if path is a directory and set to true            fileSystem.delete(outPath,true);        }        FileOutputFormat.setOutputPath(job, outPath);        //submit        boolean isSuccess = job.waitForCompletion(true);        return isSuccess ? 0:1;    }​    public static void main(String[] args) {        Configuration configuration = new Configuration();        try {            int status = ToolRunner.run(configuration, new MRDriver(), args);            System.exit(status);        } catch (Exception e) {            e.printStackTrace();        }    }}​

Mapper

public class MRModelMapper extends Mapper
{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { /** * 实现自己的业务逻辑 */ }}

Reduce

public class MRModelReducer extends Reducer
{​ @Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { /** * 根据业务需求自己实现 */ }}

3. 统计每个城市的UV数

分析需求:

UV:unique view 唯一访问数,一个用户记一次

map:

key: CityId (城市id) 数据类型: Text

value: guid (用户id) 数据类型:Text

 

shuffle:

key: CityId

value: {guid guid guid..}

 

reduce:

key: CityId

value: 访问数 即shuffle输出value的集合大小

 

output:

key : CityId

value : 访问数

 

MRDriver.java mapreduce执行过程

 
package mapreduce;​import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;​public class MRDriver extends Configured implements Tool {​    public int run(String[] args) throws Exception {        //创建job        Job job = Job.getInstance(this.getConf(),"mr-demo");        job.setJarByClass(MRDriver.class);​        //input 默认从hdfs读取数据 将每一行转换成key-value        Path inPath = new Path(args[0]);        FileInputFormat.setInputPaths(job,inPath);​        //map 一行调用一次Map方法  对每一行数据进行分割        job.setMapperClass(MRMapper.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(Text.class);​       /* //shuffle        job.setPartitionerClass(null);//分组        job.setGroupingComparatorClass(null);//分区        job.setSortComparatorClass();//排序*/        //reduce        job.setReducerClass(MRReducer.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class);​        //output        Path outPath = new Path(args[1]);        FileSystem fileSystem = FileSystem.get(this.getConf());        if(fileSystem.exists(outPath)){            //if path is a directory and set to true            fileSystem.delete(outPath,true);        }        FileOutputFormat.setOutputPath(job, outPath);                //submit        boolean isSuccess = job.waitForCompletion(true);        return isSuccess ? 0:1;    }​    public static void main(String[] args) {        Configuration configuration = new Configuration();        try {            int status = ToolRunner.run(configuration, new MRDriver(), args);            System.exit(status);        } catch (Exception e) {            e.printStackTrace();        }    }}

MRMapper.java

package mapreduce;​import java.io.IOException;​import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;​public class MRMapper extends Mapper
{ private Text mapOutKey = new Text(); private Text mapOutKey1 = new Text(); //一行调用一次Map方法 对每一行数据进行分割 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //获得每行的值 String str = value.toString(); //按空格得到每个item String[] items = str.split("\t"); if (items[24]!=null) { this.mapOutKey.set(items[24]); if (items[5]!=null) { this.mapOutKey1.set(items[5]); } } context.write(mapOutKey, mapOutKey1); } }

MPReducer.java

package mapreduce;​import java.io.IOException;import java.util.HashSet;​import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;​public class MRReducer extends Reducer
{​ //每有一个key value数据 就执行一次reduce方法 @Override protected void reduce(Text key, Iterable
texts, Reducer
.Context context) throws IOException, InterruptedException { HashSet
set = new HashSet
(); for (Text text : texts) { set.add(text.toString()); } context.write(key,new IntWritable(set.size())); } }

4.MapReduce执行wordcount过程理解

input:默认从HDFS读取数据

Path inPath = new Path(args[0]); FileInputFormat.setInputPaths(job,inPath);

将每一行数据转换为key-value(分割),这一步由MapReduce框架自动完成。

输出行的偏移量和行的内容

 

 

mapper: 分词输出

数据过滤,数据补全,字段格式化

输入:input的输出

将分割好的<key,value>对交给用户定义的map方法进行处理,生成新的<key,value>对。

一行调用一次map方法。

统计word中的map:

shuffle: 分区,分组,排序

输出:

<Bye,1>

<Hello,1>

<World,1,1>

得到map输出的<key,value>对,Mapper会将他们按照key进行排序,得到mapper的最终输出结果。

Reduce :每一条Keyvalue调用一次reduce方法

将相同Key的List<value>,进行相加求和

output:将reduce输出写入hdfs

转载于:https://www.cnblogs.com/whcwkw1314/p/8971760.html

你可能感兴趣的文章
HNOI2016
查看>>
BZOJ2648: SJY摆棋子&&2716: [Violet 3]天使玩偶
查看>>
JVM介绍
查看>>
结构体,联合体,内存分配
查看>>
JVM垃圾收集器介绍
查看>>
[No0000136]6个重要的.NET概念:栈,堆,值类型,引用类型,装箱,拆箱
查看>>
【转】MapReduce源码分析总结
查看>>
原生JDK网络编程- NIO之Reactor模式
查看>>
table01
查看>>
简洁侧边wordpress博客模板
查看>>
linux及安全第四周总结——20135227黄晓妍
查看>>
搞出来,PHP mysql JQuery 二级联动
查看>>
AviSynth入门与应用指南
查看>>
ubuntu14.04安装GoldenDict
查看>>
重装系统时启动失败,引导信息有错误,修复磁盘的主引导记录MBR方法
查看>>
字符数组 字符指针
查看>>
Jedis的使用
查看>>
文献笔记(一)
查看>>
Linux(CentOS6.5)下修改Nginx初始化配置
查看>>
windows 重写调试输出
查看>>