前言
Hive提供类似Sql的查询语句,那么join在实际工作中可能是使用最多的地方,大多数业务都是关联查询的。下面将介绍john的优化,以及udf自定义函数配置使用。
Hive 数据准备
代码(Java): https://github.com/lishijia/java-data-demo/tree/master/hive-data-demo
1 | create table lsj_order( |
Hive Join 优化
生成一个Mapreduce Job
多表连接如果每个表中连接列出现在join on当中都相同,那么则只会生成一个MR Job。即三个表lsj_order、lsj_item、lsj_item_delivery都分别使用order_id字段进行连接,从而就只会生成一个MR Job。例如:
1
select o.user_name, i.product, d.status from lsj_order o join lsj_item i on o.order_id = i.order_id join lsj_item_delivery d on d.order_id = i.order_id;
生成两个Mapredcue Job
多表连接,如果多表中,其中存在一个表使用了至少2个字段进行连接(同一个表的至少2个列出现在JOIN子句中),则会至少生成2个MR Job,三个表基于2个字段进行连接,这两个字段o.order_id和i.order_id同时出现在lsj_item表中。连接的过程是这样的:首先lsj_order和lsj_item表基于o.order_id和i.order_id进行连接,对应着第一个MR Job;表lsj_order和lsj_item连接的结果,再和lsj_item_delivery进行连接,对应着第二个MR Job。例如:
1
select o.user_name, i.product, d.status from lsj_order o join lsj_item i on o.order_id = i.order_id join lsj_item_delivery d on d.item_id = i.item_id;
并行执行
同步执行hive的多个阶段,hive只执行过程中,将一个查询转化为一个或者多个阶段,某个特定的Job可能包含多个阶段,但是这些阶段并非相互完全依赖,也就是说可以并行的执行对应的阶段,这样可能使得整个Job执行时间缩短从而提醒执行效率。
1
hive> set hive.exec.parallel=true
基于条件的LEFT OUTER JOIN优化
1
2
3select o.user_name, i.product from lsj_order o
left outer join lsj_item i on o.order_id = i.order_id join
where i.product = 'mac'执行顺序是,首先完成2表JOIN,然后再通过WHERE条件进行过滤,这样在JOIN过程中可能会输出大量结果,再对这些结果进行过滤,比较耗时。可以进行优化,将WHERE条件放在ON后,在JOIN的过程中,就对不满足条件的记录进行了预先过滤,例如:
1
2select o.user_name, i.product from lsj_order o
left outer join lsj_item i on (o.order_id = i.order_id join and i.product = 'mac')表连接顺序
Hive在进行表连接的过程中,是会转换成多个Mapreduce Job,每一个Job在Hive中称之为Stage(阶段),执行每个Stage,按照Join顺序最后一个表应该是连接表当中最大的一个表。为什么呢?因为Join前一个Stage生成的数据会缓存在Reduce的Buffer当中,然后与下个Stage做Join的时候,那么就直接从上一个Stage Reduce的Buffer中读取缓存的中间结果数据(lsj_order join lsj_item结果数据,相对来说数据过滤一遍后会比较小)这样与后面的大表lsj_comment(评分表)进行连接,速度会更快也可能会必变缓冲区内存溢写(磁盘io)。
1
select o.user_name, i.product, d.status from lsj_order o join lsj_item i on o.order_id = i.order_id join lsj_comment c on c.order_id = i.order_id;
这个Join语句,会生成一个MR Job,在选择Join顺序的时候,数据量相比应该是lsj_item < lsj_comment,表lsj_order和lsj_item基于o.order_id= i.order_id进行连接,得到的结果(基于lsj_order和lsj_item 进行连接的order_id)会在Reducer上缓存在buffer中,在与lsj_comment进行连接时,从buffer中读取Key(o.order_id=i.order_id)来与表lsj_comment的c.order_id进行连接。
1
select /*+ STREAMTABLE(lsj_order) */ o.user_name, i.product, d.status from lsj_order o join lsj_item i on o.order_id = i.order_id join lsj_comment c on c.order_id = i.order_id;
上述Join语句中,lsj_order表被视为大表,则首先会对表lsj_item和lsj_comment进行Join,然后再将得到的结果与表lsj_order进行Join。
大大表关联(根据业务优化)
1
select * from t_log t join t_user t1 on t.user_id=t1.user_id
举例:两个表都上千万
分析:t_log千万的数据,但是可能实际用户只有10万。
优化如下:先通过t_log得到一个小表,然后在根据这个表与t_user表在得到一个最终的user小表,最终在join(理念就是拆分,让小表与大表join,而不是直接让两个大表直接join)
1
2
3
4
5
6select /*MAPJOIN(t12)*/ * from t_log t11 join (
select /*MAPJOIN(t)*/ t1.* from (
select distinct user_id from t_log
) t
join t_user t1 on t.user_id = t1.user_id
) t12 on t11.user_id=t12.user_id
Hive udf配置使用
Java版Hive Udf代码&使用
代码(Java): https://github.com/lishijia/java-data-demo/tree/master/hive-data-demo
Pom库依赖
1
2
3
4
5<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.2</version>
</dependency>Java代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17package lishijia.data.hive;
import org.apache.hadoop.hive.ql.exec.UDF;
/**
* @program:
* @description:
* @author: lishijia
* @create: 2018/10/13 11:06
**/
public class UpperCassUDF extends UDF {
public String evaluate(String input) {
if (input == null) {
return null;
} else {
return new String(input.toUpperCase());
}
}
}Maven编译jar包,并且上传至服务器,然后put至hdfs一个临时目录
1
2[root@master201 bin]# hadoop fs -mkdir /lishijia/hive/udf/
[root@hadoop100 hive]# hadoop fs -put hive-data-demo-1.0-SNAPSHOT.jar /lishijia/hive/udf/切换到hive命令控制台使用
1
2
3
4
5
6
7
8
9
10hive> add jar hdfs:////lishijia/hive/udf/hive-data-demo-1.0-SNAPSHOT.jar;
hive> create temporary function toupper as 'lishijia.data.hive.UpperCassUDF';
hive> show functions;
可以查看到添加名称为toupper的function方法,然后通过下面的方式直接使用即可
hive> select toupper(product) from lsj_item;
MAC
IPHONE
XIAOMI
HUAWEI
HUAWEIPython版Hive Udf代码&使用
代码(Python): https://github.com/lishijia/py-data-demo/tree/master/hive
1
2
3
4
5
6
7
8#!/usr/bin/env python
# coding=utf-8
# python udf function for hive
import sys
for line in sys.stdin:
tmp_line = line.upper()
# 增加一个,号使之不换行,要不然在hive 命令控制台的时候,会看到多余的一个空行
print tmp_line ,上传至服务器,然后put至hdfs一个临时目录
1
[root@hadoop100 hive]# hadoop fs -put udf-hive.py /lishijia/hive/udf/
切换到hive命令控制台使用
1
2
3
4
5
6
7
8
9
10hive> add file hdfs:////lishijia/hive/udf/udf-hive.py;
hive> select transform(product) using 'python udf-hive.py' as product from lsj_item;
Total MapReduce CPU Time Spent: 1 seconds 50 msec
OK
MAC
IPHONE
XIAOMI
HUAWEI
HUAWEI
Time taken: 14.218 seconds, Fetched: 5 row(s)
Java连接操作Hive(Thrift接口)
启动hiveserver2(Thrift接口)服务
1
2[root@hadoop100 bin]# cd /home/lishijia/Soft/apache-hive-1.2.2/bin
[root@hadoop100 bin]# ./hiveserver2验证测试hiveserver2 beeline 连接测试,如果报以下错误,赋值下权限(通过Beeline命令连接hive)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27[root@master201 bin]# beeline -u jdbc:hive2://master201:10000
Connecting to jdbc:hive2://master201:10000
Error: Failed to open new session: java.lang.RuntimeException: java.lang.RuntimeException: org.apache.hadoop.security.AccessControlException: Permission denied: user=anonymous, access=EXECUTE, inode="/tmp/hive":root:supergroup:drwx------(state=,code=0)
Beeline version 1.2.2 by Apache Hive
0: jdbc:hive2://master201:10000 (closed)> [root@master201 bin]#
[root@master201 bin]#
[root@master201 bin]#
[root@master201 bin]# hadoop fs -chmod -R 777 /tmp/
[root@master201 bin]# beeline -u jdbc:hive2://master201:10000
Connecting to jdbc:hive2://master201:10000
Connected to: Apache Hive (version 1.2.2)
Driver: Hive JDBC (version 1.2.2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 1.2.2 by Apache Hive
0: jdbc:hive2://master201:10000> use lishijia;
No rows affected (1.329 seconds)
0: jdbc:hive2://master201:10000> show tables;
+--------------------+--+
| tab_name |
+--------------------+--+
| lsj_item |
| lsj_item_delivery |
| lsj_order |
| st_article |
+--------------------+--+
4 rows selected (0.696 seconds)
0: jdbc:hive2://master201:10000>Java代码示例
代码(Java): https://github.com/lishijia/java-data-demo/tree/master/hive-data-demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67package lishijia.data.hive;
import java.sql.*;
/**
* @program:
* @description: 通过jdbc连接hive对外提供的接口服务(thrift)
* @author: lishijia
* @create: 2018/9/25 16:37
**/
public class ThirftHiveDemo {
String driverClass = "org.apache.hive.jdbc.HiveDriver";
String url = "jdbc:hive2://master201:10000/lishijia";
public static void main(String[] args) {
long l = System.currentTimeMillis();
ThirftHiveDemo thirftHiveDemo = new ThirftHiveDemo();
thirftHiveDemo.search();
System.out.println(System.currentTimeMillis()-l);
}
private void search(){
Connection conn = null;
Statement stmt = null;
try {
conn = getConn();
System.out.println(conn);
stmt = conn.createStatement();
// hive表名
String tableName = "lsj_order";
String sql = "select * from " + tableName + " limit 10";
System.out.println("Running:" + sql);
ResultSet res = stmt.executeQuery(sql);
System.out.println("执行 select * query 运行结果:");
while (res.next()) {
System.out.println(res.getString(1) + "\t" + res.getString(2));
}
} catch (ClassNotFoundException e) {
e.printStackTrace();
System.exit(1);
} catch (SQLException e) {
e.printStackTrace();
System.exit(1);
} finally {
try {
if (stmt != null) {
stmt.close();
stmt = null;
}
if (conn != null) {
conn.close();
conn = null;
}
} catch (SQLException e) {
e.printStackTrace();
}
}
}
private Connection getConn() throws ClassNotFoundException,
SQLException {
Class.forName(driverClass);
Connection conn = DriverManager.getConnection(url);
return conn;
}
}查询结果如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20C:\Users\Administrator\.m2\repository\stax\stax-api\1.0.1\stax-api-1.0.1.jar;C:\Users\Administrator\.m2\repository\jline\jline\2.12\jline-2.12.jar" lishijia.data.hive.ThirftHiveDemo
log4j:WARN No appenders could be found for logger (org.apache.hive.jdbc.Utils).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
org.apache.hive.jdbc.HiveConnection@215be6bb
Running:select * from st_order limit 10
执行 select * query 运行结果:
order_id user_id
2539329 1
2398795 1
473747 1
2254736 1
431534 1
3367565 1
550135 1
3108588 1
2295261 1
5933
Process finished with exit code 0
综合案例:Hive用户购买top10%
- Group by o.user_id,p.product_id最里面的查询,可以得出user_id, product_id, count(1) product_cnt 即用户的商品种类统计
- row_number() over(distribute by user_id sort by product_cnt desc) row_product 根据用户购买的商品种类排序(商品购买数量倒序)
- ceil((count(1) over(partition by user_id))*0.1) topNum from 计算出需要top10%个商品,ceil向上取正的方式。即如果用户购买了25个,top10%则取3个商品
- where row_product <= topNum 取top10%的商品(按照商品的购买数量倒序取)
- concat_ws(‘,’,collect_set(product_id)) 打平
1 | Select user_id, concat_ws(',',collect_set(product_id)) from ( |
总结
Hive Join连接一些注意事项及优化方法,另外如何自定义函数通过hive的方式使用。最后通过Java代码的方式连接Hive(thrift接口)查询表中的数据。
代码(Java): https://github.com/lishijia/java-data-demo/tree/master/hive-data-demo
代码(Python): https://github.com/lishijia/py-data-demo/tree/master/hive