Hive由Facebook开源基于Hadoop的一个数据仓库工具,用于解决海量结构化日志的数据统计,Hive可以将结构化的数据文件映射为一张表,并提供类SQL 查询功能【本质是将HQL转化成MapReduce程序】
Hive的执行原理
1)Hive处理的数据存储在HDFS
2)Hive分析数据底层的默认实现是MapReduce
3)执行程序运行在Yarn上
Hive的优点:
1)操作接口采用类SQL语法,提供快速开发的能力(简单、容易上手)
2)避免了去写MapReduce,减少开发人员的学习成本
3)Hive 优势在于处理大数据,对于处理小数据没有优势,因为Hive 的执行延迟比较高
4)Hive 支持用户自定义函数,用户可以根据自己的需求来实现自己的函数
Hive的缺点:
1)Hive 的执行延迟比较高,因此Hive 常用于数据分析,对实时性要求不高的场合
2)Hive 自动生成的MapReduce 作业,通常情况下不够智能化,调优比较困难,粒度较粗
3)迭代式算法无法表达,数据挖掘方面不擅长
Hive的架构原理
Hive的数据类型
tinyint
/samlint
/int
/bigint
/float
/double
boolean
string
timestamp
binary
struct
map
array
Hive的交互方式
①hive登陆;②HiveJDBC;③hive -e "cmd"
直接得到结果;④执行SQL文件bin/hive -f hive_file.sql
下载hive解压,配置
第一步:拷贝conf/hive-env.sh.template
为conf/hive-env.sh
并配置
HADOOP_HOME=/opt/module/hadoop
export HIVE_CONF_DIR=/opt/module/hive/conf
第二步:拷贝MySQL驱动到hive的lib
文件夹下
第三步:创建conf/hive-site.xml
文件配置MySQL
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://192.168.22.1:3306/hive?createDatabaseIfNotExist=true&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.cj.jdbc.Driver</value>
</property>
<!-- hive连接账户密码 -->
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
</property>
<!-- 数据存储在HDFS的目录 -->
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<!-- 返回表头 -->
<property>
<name>hive.cli.print.header</name>
<value>true</value>
</property>
<!-- 返回当前的库 -->
<property>
<name>hive.cli.print.current.db</name>
<value>true</value>
</property>
</configuration>
第四步:配置日志
拷贝 conf/hive-log4j2.properties.template
到conf/hive-log4j2.properties
# log目录
property.hive.log.dir = /opt/module/hive/log
第五步:初始化元数据
schematool -dbType mysql -initSchema
开启HiveJDBC服务
nohup bin/hiveserver2 > hiveserver2.log 2>&1 &
beeline连接
bin/beeline
# 进入之后连接
# !connect jdbc:hive2://centos162:10000
-- ########################################## 建库语句 ##########################################
create database db_emp;
create database if not exists db_emp;
create database db_emp location '/hive/resposistion/db_emp';
-- ########################################## 查询库语句 ##########################################
show databases;
desc database extended db_emp;
-- ####################################### 修改属性【添加自定义属性】########################################
alter database db_emp set dbproperties('createOwn'='kun');
-- ########################################## 删库语句 ##########################################
-- 删除语句【删除空数据库】
drop database db_emp;
-- 强制删除
drop database db_emp cascade;
-- ########################################## 切库语句 ##########################################
use db_emp;
管理表【内部表】:删除时元数据和HDFS存储数据一起删除
外部表:删除时只删除元数据
-- ########################################## 查询表结构 ##########################################
show create table student;
desc student;
desc extended student;
desc formatted student;
-- ########################################## 创建表 ##########################################
-- 直接创建表
create external table if not exists student(id int, name string);
-- 使用别的表的查询数据创建表,不允许使用外部表方式
create table if not exists student_new as select id, name from student;
-- 使用别的表结构创建表
create external table if not exists student_like like student;
-- 创建表使用各种分割
-- 数据格式
-- kangkang,lili_xiangxiang,lili:18_xiangxiang:19,fengtai_beijing
-- YueYue,lili_sisi,lili:18_sisi:19,daxiao_beijing
create table persion_info(
name string,
friends array<string>,
children map<string,int>,
address struct<stree:string, city:string>
)
row format delimited
fields terminated by ','
collection items terminated by '_'
map keys terminated by ':'
lines terminated by '\n';
-- ########################################## 修改表 ##########################################
-- 表类型转换
alter table student set tblproperties('EXTERNAL'='TRUE');
alter table student set tblproperties('EXTERNAL'='FALSE');
-- 重命名
alter table student_partition_old rename to student_partition_new;
-- 修改列
alter table student_partition change column id stu_id string;-- 增加列
-- 增加列
alter table student_partition add columns(sex int, age int);
-- 替换列【全量】
alter table student_partition replace columns(id string, name string, sex int);
-- 删除表
drop table student_partition;
-- ########################################## 创建分区表 ##########################################
-- 创建分区表
create table if not exists student_partition(id int, name string)
partitioned by (year string)
row format delimited
fields terminated by '\t';
-- 创建二级分区表
create table if not exists student_partition(id int, name string)
partitioned by (month string, year string)
row format delimited
fields terminated by '\t';
-- ########################################## 插入数据表 ##########################################
-- 插入分区表数据
load data local inpath '/opt/module/hive/input/student.txt' into table student_partition partition(year="2019");
-- 插入二级分区表数据
load data local inpath '/opt/module/hive/input/student.txt' into table student_partition_day partition(month="2019-01", day="01");
-- ########################################## 创建&删除&查询分区 ##########################################
-- 创建分区
alter table student_partition add partition(year="2020") partition(year="2018");
-- 删除分区
alter table student_partition drop partition(year="2020") , partition(year="2018");
-- 查看表分区
show partitions student_partition;
-- ########################################## 修复分区数据 ##########################################
-- 上传数据,完成之后因为没有元数据信息所有无法查询
hadoop fs -mkdir /user/hive/warehouse/student_partition/year=2018
hadoop fs -put input/student.txt /user/hive/warehouse/student_partition/year=2018
-- 方式一:自动修复
msck repair table student_partition;
-- 方式二:手动添加分区
alter table student_partition add partition(year="2017");
-- ########################################## 创建动态分区 ##########################################
-- 开启动态分区功能(默认true)
set hive.exec.dynamic.partition=true
-- 设置为非严格模式(动态分区的模式,默认strict,表示必须指定至少一个分区为静态分区)
set hive.exec.dynamic.partition.mode=nonstrict;
-- 在所有执行MR的节点上,最大一共可以创建多少个动态分区,默认1000。
set hive.exec.max.dynamic.partitions=1000;
-- 在每个执行MR的节点上,最大可以创建多少个动态分区,默认100。
set hive.exec.max.dynamic.partitions.pernode=100;
-- 整个MR Job中,最大可以创建多少个HDFS文件,默认100000。
set hive.exec.max.created.files=100000
-- 当有空分区生成时,是否抛出异常。一般不需要设置。
hive.error.on.empty.partition=false
-- 创建表
create table employee_partition(
empno int,
ename string,
job string,
mgr string,
hiredate string,
sal int,
comm int)
partitioned by (deptno int)
row format delimited fields terminated by '\t' ;
-- 插入数据,此时不用指定partition的字段值,动态获取
insert into employee_partition partition(deptno)
select empno, ename, job, mgr, hiredate, sal, comm, deptno from employee;
-- ########################################## Load装载数据 ##########################################
-- 数据在本地文件是上传到HDFS,数据在HDFS上是剪切
-- 追加
load data [local] inpath '/opt/module/hive/input/student.txt' into table student;
-- 清空,覆盖
load data [local] inpath '/opt/module/hive/input/student.txt' overwrite into table student;
-- ########################################## Insert插入数据 ##########################################
-- 插入数据
insert into table student_partition partition(year="2000") values("2", "tom", NULL);
-- 使用插入数据覆盖, 注意字段顺序
insert overwrite table student_partition partition(year="2000") select id, name, sex from student_bak;
-- ########################################## 查询并创建数据 ##########################################
create table student_new as select id, name from student;
-- ######################################## 创建表并指定数据文件夹#########################################
-- 创建表并指定存放数据的HDFS文件夹位置,然后上传文件到文件夹即可
create table studeng_new(id int, name string)
row format delimited fields terminated by '\t'
location '/user/hive/warehouse/student_new'
-- ########################################## Insert导出 ###########################################
insert overwrite [local] directory '/opt/module/hive/input/student.data'
row format delimited fields terminated by '\t'
select * from student;
-- ######################################## Export导出Import导入 ########################################
-- export导出到HDFS上
export table student to '/hive/export/student';
import table student from '/hive/export/student';
-- 必须是管理表才能截断数据
truncate table student;
SQL语句应该避免产生笛卡尔积,产生笛卡尔积的条件
Oracle、Hive支持满外连接(full join),Mysql不支持满外连接可使用左连接union右连接
-- order by[全局排序,只有一个Reduce]
select * from employee order by sal;
-- sort by[局部排序,多个Reduce,分区字段随机]
set mapreduce.job.reduces=2;
-- 分为两个区,每个区内有序
select deptno,sal from employee sort by sal;
-- distribute by[分区排序],先按照给定字段分区之后再排序
select * from employee distribute by deptno sort by sal;
set mapreduce.job.reduces=3;
select deptno,sal from employee distribute by deptno sort by sal;
--cluster by[当distribute by field sort by field asc相同时可替代]
select deptno from employee cluster by deptno;
-- 创建分桶表
create table employee_buck(empno int, ename string, deptno int)
clustered by(deptno) into 3 buckets
row format delimited fields terminated by '\t';
-- 导入数据
set hive.enforce.bucketmapjoin = true;
set mapreduce.job.reduces = -1;
insert into table employee_buck select empno, ename, deptno from employee;
-- 查询分桶数据
select * from employee_buck where deptno=10;
-- 分桶抽样查询
select * from employee_buck tablesample(bucket 1 out of 6 on deptno);
-- x out of y
-- x: 开始的桶号,基数为1
-- y: 桶数的因子或倍数,如果为因子则为抽取buckets/y个,如果是倍数则为(buckets/y) * buckets个
如果第一个参数是NULL则使用第二个参数值,参数个数为固定两个
select nvl(NULL, 100);
date_format时间格式只认-
-- 字符串转时间
select date_format('2020-09-22', 'yyyy-MM-dd');
-- 时间增加,数值可为负数为减
select date_add('2020-09-22', -1);
-- 时间减少,数值可为负数为增
select date_sub('2020-09-22', -1);
-- 两时间相减
select datediff('2020-09-22','2020-09-21');
-- case when
select
classno,
sum(case sex when "男" then 1 else 0 end) as male,
sum(case sex when "女" then 1 else 0 end) as female
from emp_sex group by classno;
-- if
select if(mgr = "", "领导", "员工") from employee;
-- 拼接
select concat("hello", "_", "world");
-- 拼接指定分隔符
select concat_ws("_", "hello", "world");
-- collect_set返回一个array
select deptno, collect_set(ename) from employee group by deptno;
explode
:将hive一列中复杂的array或者map结构拆分成多行
lateral view
:用于和split
,explode
形成侧写,它将一列数据拆成多行数据
select deptno, ename from emp_dept lateral view explode(enames) table_tmp as ename;
OVER():指定分析函数工作的据窗口大小,这个可能会随着行的变化而变化
OVER内可加参数
参数 | 说明 |
---|---|
CURRENT ROW | 当前行 |
n PRECEDING | 往前n行数据 |
n FOLLOWING | 往后n行数据 |
UNBOUNDED | 起点 |
UNBOUNDED PRECEDING | 表示从前面的起点 |
UNBOUNDED FOLLOWING | 表示到后面的终点 |
OVER左侧函数
参数 | 说明 |
---|---|
lag(col,n) | 往前第n行数据 |
lead(col,n) | 往后第n行数据 |
ntile(n) | 把有序分区中的行发到指定数据组,各个编号从1开始 |
rank() | 存在并列排序,1、1、3、4 |
dense_rank() | 存在并列排序,1、1、2、3 |
row_number() | 顺序计算,1、2、3、4 |
/*
* 数据
* name sale_date price
* Jack 2020-01-01 10
* Jack 2020-01-02 20
* Jack 2020-01-03 30
* Jack 2020-01-05 5
* Jane 2020-01-01 8
* Jane 2020-01-02 12
* Jane 2020-01-08 1
* Roes 2020-01-08 100
*/
-- 按照名称统计,消费时间排序,计算累加消费
select name, sale_date, sum(price) over(distribute by name sort by sale_date) from business;
select name, sale_date, sum(price) over(partition by name order by sale_date) from business;
/**
* 结果如下
* name sale_date sum_window_0
* Jack 2020-01-01 10
* Jack 2020-01-02 30
* Jack 2020-01-03 60
* Jack 2020-01-05 65
* Jane 2020-01-01 8
* Jane 2020-01-02 20
* Jane 2020-01-08 21
* Roes 2020-01-08 100
*/
-- 查询上次消费和本次消费的总计
select
name,
sale_date,
sum(price) over(distribute by name sort by sale_date rows between 1 PRECEDING and CURRENT ROW)
from business;
/**
* 结果如下
* Jack 2020-01-01 10
* Jack 2020-01-02 30
* Jack 2020-01-03 50
* Jack 2020-01-05 35
* Jane 2020-01-01 8
* Jane 2020-01-02 20
* Jane 2020-01-08 13
* Roes 2020-01-08 100
*/
-- 查询顾客上次购买时间
select name, sale_date, lag(sale_data, 1) over(distribute by name sort by sale_date) from business;
/**
* 结果如下
* Jack 2020-01-01 NULL
* Jack 2020-01-02 2020-01-01
* Jack 2020-01-03 2020-01-02
* Jack 2020-01-05 2020-01-03
* Jane 2020-01-01 NULL
* Jane 2020-01-02 2020-01-01
* Jane 2020-01-08 2020-01-02
* Roes 2020-01-08 NULL
*/
-- 查询消费日期25%为一组的数据
select name, sale_date, ntile(4) over(sort by sale_data) as percent from business order by percent;
/**
* 结果如下
* Jack 2020-01-01 1
* Jane 2020-01-01 1
* Jack 2020-01-02 2
* Jane 2020-01-02 2
* Jack 2020-01-05 3
* Jack 2020-01-03 3
* Jane 2020-01-08 4
* Roes 2020-01-08 4
*/
-- 消费者自身消费金额排序
select name, price, sale_date, rank() over(distribute by name sort by price desc) from business;
/**
* 结果如下
* Jack 30 2020-01-03 1
* Jack 20 2020-01-02 2
* Jack 10 2020-01-01 3
* Jack 5 2020-01-05 4
* Jane 12 2020-01-02 1
* Jane 8 2020-01-01 2
* Jane 1 2020-01-08 3
* Roes 100 2020-01-08 1
*/
-- 查看系统自带的函数
show functions;
-- 显示自带的函数的用法
desc function [funcName];
desc function extended [funcName];
当Hive提供的内置函数无法满足你的业务处理需要时,此时可以考虑使用用户自定义函数(UDF:user-defined function)
根据用户自定义函数分为以下三类:
第一步:上传jar到hive服务器
第二步:add jar /opt/module/hive/lib/xxx.jar;
第三步:添加类CREATE TEMPORARY FUNCTION [funcName]AS [className];
@Description(name = "lower_concat",
value = "_FUNC_(param_1,param_2) - Returns concat lower string ",
extended = "param can be one of:\n"
+ "1. primitive\n"
+ "2. null\n"
+ "Example:\n "
+ " > SELECT _FUNC_('AAA', 'BBB')")
public class LowerConcat extends GenericUDF {
private transient ObjectInspectorConverters.Converter[] converters;
/**
* 这个方法只调用一次,并且在evaluate()方法之前调用。该方法接受的参数是一个ObjectInspectors数组。
* 该方法检查接受正确的参数类型和参数个数。
*/
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
// 长度校验
if (arguments.length != 2) {
throw new UDFArgumentException("The function LOWER_CONCAT accepts 2 arguments.");
}
converters = new ObjectInspectorConverters.Converter[arguments.length];
// 参数类型校验
if (!(arguments[0].getCategory().equals(ObjectInspector.Category.PRIMITIVE))) {
throw new UDFArgumentTypeException(0, "\"primitive\" expected at function LOWER_CONCAT, but \""
+ arguments[0].getTypeName() + "\" " + "is found");
}
converters[0] = ObjectInspectorConverters.getConverter(arguments[0], PrimitiveObjectInspectorFactory.writableStringObjectInspector);
if (!(arguments[1].getCategory().equals(ObjectInspector.Category.PRIMITIVE))) {
throw new UDFArgumentTypeException(0, "\"primitive\" expected at function LOWER_CONCAT, but \""
+ arguments[1].getTypeName() + "\" " + "is found");
}
converters[1] = ObjectInspectorConverters.getConverter(arguments[1], PrimitiveObjectInspectorFactory.writableStringObjectInspector);
return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
}
/**
* 处理
*/
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
Text firstText = (Text) converters[0].convert(arguments[0].get());
Text secondText = (Text) converters[1].convert(arguments[1].get());
String firstStr = (firstText == null ? "" : firstText.toString());
String secondStr = (secondText == null ? "" : secondText.toString());
return new Text(firstStr.toLowerCase() + secondStr.toLowerCase());
}
@Override
public String getDisplayString(String[] children) {
return getStandardDisplayString(getFuncName(), children);
}
}
@Description(name = "multi",
value = "_FUNC_(params...) - Returns concat lower string ",
extended = "param can be number\n"
+ "Example:\n "
+ " > SELECT _FUNC_(1,2,3)")
public class Multi extends GenericUDTF {
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
// 1.定义输出数据的列名和类型
List<String> fieldNames = new ArrayList<>();
List<ObjectInspector> fieldOIS = new ArrayList<>();
//2.添加输出数据和列名和类型
fieldNames.add("multi");
fieldOIS.add(PrimitiveObjectInspectorFactory.javaLongObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIS);
}
@Override
public void process(Object[] args) throws HiveException {
List<Integer> inputArg = new ArrayList<>();
for (Object arg : args) {
inputArg.add(((IntWritable)arg).get());
}
Long result = 1L;
for (Integer aInt : inputArg) {
result = result * aInt;
}
forward(result);
}
@Override
public void close() throws HiveException {
}
}
前提条件:hadoop先编译安装好支持Snappy
-- 开启hive传输数据压缩功能
set hive.exec.compress.intermediate=true;
-- 开启map-reduce中map输出压缩功能
set mapreduce.map.output.compress=true;
-- 设置map-reduce中map输出数据的压缩方式
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
-- 开启hive最终输出数据压缩功能
set hive.exec.compress.output=true;
-- 开启map-reduce最终输出数据压缩
set mapreduce.output.fileoutputformat.compress=true;
-- 设置map-reduce中map输出数据的压缩方式
set mapreduce.output.fileoutputformat.compress.codec = org.apache.hadoop.io.compress.SnappyCodec;
-- 设置map-reduce最终数据输出压缩为块压缩
set mapreduce.output.fileoutputformat.compress.type=BLOCK;
-- 测试一下输出结果是否压缩文件
insert overwrite local directory '/opt/module/hive/input/employee' select deptno, sal from employee distribute by deptno sort by sal;
Hive可使用的文件格式
创建不同文件格式的表
-- ######################################## TEXTFILE ########################################
create table employee_textfile(
empno int,
ename string
)
row format delimited fields terminated by '\t'
stored as textfile;
-- ######################################## ORC ########################################
create table employee_orc(
empno int,
ename string
)
row format delimited fields terminated by '\t'
stored as orc tblproperties ("orc.compress"="SNAPPY");
-- 默认压缩为ZLIB
insert into employee_orc select empno, ename from employee_textfile;
-- ######################################## PARQUET ########################################
create table employee_parquet(
empno int,
ename string
)
row format delimited fields terminated by '\t'
stored as parquet;
insert into employee_parquet select empno, ename from employee_textfile;
Fetch抓取是指,Hive中对某些情况的查询可以不必使用MapReduce计算。例如:SELECT * FROM employee;
在这种情况下,Hive可以简单地读取employee对应的存储目录下的文件,然后输出查询结果到控制台。
修改hive-site.xml
<property>
<name>hive.fetch.task.conversion</name>
<!--
none : 全部走MR
minimal : select * ,limit,filter在一个表所属的分区表上操作不走MR
more : select,filter,limit,都是运行数据的fetch,不跑mr[默认值]
-->
<value>more</value>
</property>
大多数的Hadoop Job是需要Hadoop提供的完整的可扩展性来处理大数据集的。不过,有时Hive的输入数据量是非常小的。在这种情况下,为查询触发执行任务消耗的时间可能会比实际job的执行时间要多的多。对于大多数这种情况,Hive可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短。
-- 开启本地模式默认为false
set hive.exec.mode.local.auto=true;
-- 数据量必须小于指定参数才使用本地模式,默认128M
set hive.exec.mode.local.auto.inputbytes.max=50000000;
-- job的map数必须小于参数才使用本地模式,默认4
set hive.exec.mode.local.auto.tasks.max=10;
查询时应该将数据量小的表放在join的左边,这样可以有效减少内存溢出错误发生的几率。实际新版的hive已经对小表JOIN大表和大表JOIN小表进行了优化。小表放在左边和右边已经没有明显区别。
-- 关闭mapjoin功能(默认是打开的)
set hive.auto.convert.join = false;
如果大表的空key太多,而且空key不参与join,则应该在join之前进行子查询过滤掉空key
select * from (select * from bigNullKeyTable where bigtable_id is not null) T left join bigTable on bigTable.id = T.bigtable_id;
如果大表的空key太多,但是空key参与join,则可以随机生成join用的key避免数据倾斜
select * from bigNullKeyTable left join bigTable on if(bigNullKeyTable.bigtable_id is null, concat('temp_join_key', rand()) ,bigNullKeyTable.bigtable_id ) = bigTable.id;
如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将在Reduce阶段完成join,容易发生数据倾斜。可以用MapJoin把小 表全部加载到内存在map端进行join,避免reduce处理。
-- 设置自动选择MapJoin,默认为true
set hive.auto.convert.join = true;
-- 大表小表的阈值设置(默认25M下认为是小表)
set hive.mapjoin.smalltable.filesize=25000000;
Group By
同样的Key会分发给一个reduce,如果Group By
的字段数据倾斜较大(一样的集中在几个值上)压力会集中在个别Reduce上。并不是所有的聚合操作都需要在Reduce端完成,很多聚合操作都可以先在Map端进行部分聚合,最后在Reduce端得出最终结果。
-- 是否在Map端进行聚合,默认为True
set hive.map.aggr = true;
-- 在Map端进行聚合操作的条目数目,默认100000
set hive.groupby.mapaggr.checkinterval = 100000;
-- 有数据倾斜的时候进行负载均衡(默认是false)
set hive.groupby.skewindata = true;
当选项设定为true, 生成的查询计划会有两个MR Job。第一个MR Job中,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MR Job再根据预处理的数据结果按照Group By Key分布到Reduce中(这个过程可以保证相同的Group By Key被分布到同一个Reduce中),最后完成最终的聚合操作。
数据量小的时候无所谓,数据量大的情况下,由于COUNT DISTINCT
操作需要用一个Reduce Task来完成,这一个Reduce需要处理的数据量太大,就会导致整个Job很难完成,一般COUNT DISTINCT
使用先GROUP BY
再COUNT
的方式替换:
select count(distinct mgr) from employee;
set mapreduce.job.reduces=10;
select count(mgr) from (select mgr from employee group by mgr) T;
默认情况下笛卡尔积校验时关闭的,可以配置hive-site.xml
开启
<property>
<name>hive.strict.checks.cartesian.product</name>
<value>true</value>
</property>
当需要使用时可在当前会话开启使用开启笛卡尔积
-- 开启可使用笛卡尔积
set hive.strict.checks.cartesian.product=false;
列处理:在SELECT
中,只拿需要的列,少使用SELECT *
行处理:如果能过滤出很多列可先将表执行子查询再JOIN
select ename,deptno from (select * from employee where mgr is not null) emp join department on emp.deptno=department.id;
合理设置Map数
-- 决定每个map处理的最大的文件大小,单位为B,默认256M
mapreduce.input.fileinputformat.split.maxsize=256000000;
合理设置reduce数量
-- 每个Reduce处理的数据量默认是256MB
set hive.exec.reducers.bytes.per.reducer=256000000;
-- 每个任务最大的reduce数,默认为1009
set hive.exec.reducers.max=1009;
-- reduce个数设置
set mapreduce.job.reduces=2;
小文件进行合并
-- 默认已合并
set hive.input.format=org.apache.hadoop.hive.q1.io.CombineHive.InputFormat;
Hive会将一个查询转化成-一个或者多个阶段。这样的阶段可以是MapReduce阶段、抽样阶段、合并阶段、limit 阶段。或者Hive执行过程中可能需要的其他阶段。默认情况下,Hive一次只会执行一个阶段。不过,某个特定的job可能包含众多的阶段,而这些阶段可能并非完全互相依赖的,也就是说有些阶段是可以并行执行的,这样可能使得整个job的执行时间缩短。如果有更多的阶段可以并行执行,那么job可能就越快完成。系统资源比较空闲的时候才有优势
-- 打开任务并行执行,默认false
set hive.exec.parallel=true;
-- 同一个sq1允许最大并行度,默认8
set hive.exec.plarallel.thread.number=16;
<!--
hive.mapred.mode被废弃,已被 hive.strict.checks.* 设置替代
-->
<!-- ordey by 必须和limit连用,默认false -->
<property>
<name>hive.strict.checks.orderby.no.limit</name>
<value>false</value>
</property>
<!-- 分区表查询必须含有分区字段过滤,默认false -->
<property>
<name>hive.strict.checks.no.partition.filter</name>
<value>false</value>
</property>
<!-- 笛卡尔积检测,默认false -->
<property>
<name>hive.strict.checks.cartesian.product</name>
<value>false</value>
</property>
Hive自身也开启了推测执行
hive.mapred.reduce.tasks.speculative.execution
,默认true
使用extend分析执行计划,查看MR如何运行
explain select * from employee_partition where deptno=20;
explain extended select * from employee_partition where deptno=20;