Hive Join 优化 与 udf (python + java)自定义方法使用 & Java连接Hive(Thrift接口服务)

前言

Hive提供类似Sql的查询语句,那么join在实际工作中可能是使用最多的地方,大多数业务都是关联查询的。下面将介绍john的优化,以及udf自定义函数配置使用。

Hive 数据准备

代码(Java): https://github.com/lishijia/java-data-demo/tree/master/hive-data-demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
create table lsj_order(
order_id string,
amount float,
user_name string
)
row format delimited fields terminated by ',' lines terminated by '\n';

create table lsj_item(
order_id string,
price float,
product string,
item_id string
)
row format delimited fields terminated by ',' lines terminated by '\n';

create table lsj_item_delivery(
order_id string,
item_id string,
status string
)
row format delimited fields terminated by ',' lines terminated by '\n';

hive> load data local inpath '/home/lishijia/Documents/hive/order' overwrite into table lsj_order;
hive> load data local inpath '/home/lishijia/Documents/hive/item' overwrite into table lsj_item;
hive> load data local inpath '/home/lishijia/Documents/hive/delivery' overwrite into table lsj_item_delivery;

Hive Join 优化

  • 生成一个Mapreduce Job

    多表连接如果每个表中连接列出现在join on当中都相同,那么则只会生成一个MR Job。即三个表lsj_order、lsj_item、lsj_item_delivery都分别使用order_id字段进行连接,从而就只会生成一个MR Job。例如:

    1
    select o.user_name, i.product, d.status from lsj_order o join lsj_item i on o.order_id = i.order_id join lsj_item_delivery d on d.order_id = i.order_id;
  • 生成两个Mapredcue Job

    多表连接,如果多表中,其中存在一个表使用了至少2个字段进行连接(同一个表的至少2个列出现在JOIN子句中),则会至少生成2个MR Job,三个表基于2个字段进行连接,这两个字段o.order_id和i.order_id同时出现在lsj_item表中。连接的过程是这样的:首先lsj_order和lsj_item表基于o.order_id和i.order_id进行连接,对应着第一个MR Job;表lsj_order和lsj_item连接的结果,再和lsj_item_delivery进行连接,对应着第二个MR Job。例如:

    1
    select o.user_name, i.product, d.status from lsj_order o join lsj_item i on o.order_id = i.order_id join lsj_item_delivery d on d.item_id = i.item_id;
  • 并行执行

    同步执行hive的多个阶段,hive只执行过程中,将一个查询转化为一个或者多个阶段,某个特定的Job可能包含多个阶段,但是这些阶段并非相互完全依赖,也就是说可以并行的执行对应的阶段,这样可能使得整个Job执行时间缩短从而提醒执行效率。

    1
    hive> set hive.exec.parallel=true
  • 基于条件的LEFT OUTER JOIN优化

    1
    2
    3
    select o.user_name, i.product from lsj_order o 
    left outer join lsj_item i on o.order_id = i.order_id join
    where i.product = 'mac'

    执行顺序是,首先完成2表JOIN,然后再通过WHERE条件进行过滤,这样在JOIN过程中可能会输出大量结果,再对这些结果进行过滤,比较耗时。可以进行优化,将WHERE条件放在ON后,在JOIN的过程中,就对不满足条件的记录进行了预先过滤,例如:

    1
    2
    select o.user_name, i.product from lsj_order o 
    left outer join lsj_item i on (o.order_id = i.order_id join and i.product = 'mac')
  • 表连接顺序

    Hive在进行表连接的过程中,是会转换成多个Mapreduce Job,每一个Job在Hive中称之为Stage(阶段),执行每个Stage,按照Join顺序最后一个表应该是连接表当中最大的一个表。为什么呢?因为Join前一个Stage生成的数据会缓存在Reduce的Buffer当中,然后与下个Stage做Join的时候,那么就直接从上一个Stage Reduce的Buffer中读取缓存的中间结果数据(lsj_order join lsj_item结果数据,相对来说数据过滤一遍后会比较小)这样与后面的大表lsj_comment(评分表)进行连接,速度会更快也可能会必变缓冲区内存溢写(磁盘io)。

    1
    select o.user_name, i.product, d.status from lsj_order o join lsj_item i on o.order_id = i.order_id join lsj_comment c on c.order_id = i.order_id;

    这个Join语句,会生成一个MR Job,在选择Join顺序的时候,数据量相比应该是lsj_item < lsj_comment,表lsj_order和lsj_item基于o.order_id= i.order_id进行连接,得到的结果(基于lsj_order和lsj_item 进行连接的order_id)会在Reducer上缓存在buffer中,在与lsj_comment进行连接时,从buffer中读取Key(o.order_id=i.order_id)来与表lsj_comment的c.order_id进行连接。

    1
    select /*+ STREAMTABLE(lsj_order) */ o.user_name, i.product, d.status from lsj_order o join lsj_item i on o.order_id = i.order_id join lsj_comment c on c.order_id = i.order_id;

    上述Join语句中,lsj_order表被视为大表,则首先会对表lsj_item和lsj_comment进行Join,然后再将得到的结果与表lsj_order进行Join。

  • 大大表关联(根据业务优化)

    1
    select * from t_log t join t_user t1 on t.user_id=t1.user_id

    举例:两个表都上千万

    分析:t_log千万的数据,但是可能实际用户只有10万。

    优化如下:先通过t_log得到一个小表,然后在根据这个表与t_user表在得到一个最终的user小表,最终在join(理念就是拆分,让小表与大表join,而不是直接让两个大表直接join)

    1
    2
    3
    4
    5
    6
    select /*MAPJOIN(t12)*/ * from t_log t11 join (
    select /*MAPJOIN(t)*/ t1.* from (
    select distinct user_id from t_log
    ) t
    join t_user t1 on t.user_id = t1.user_id
    ) t12 on t11.user_id=t12.user_id

Hive udf配置使用

  • Java版Hive Udf代码&使用

    代码(Java): https://github.com/lishijia/java-data-demo/tree/master/hive-data-demo

    Pom库依赖

    1
    2
    3
    4
    5
    <dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>1.2.2</version>
    </dependency>

    Java代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    package lishijia.data.hive;
    import org.apache.hadoop.hive.ql.exec.UDF;
    /**
    * @program:
    * @description:
    * @author: lishijia
    * @create: 2018/10/13 11:06
    **/
    public class UpperCassUDF extends UDF {
    public String evaluate(String input) {
    if (input == null) {
    return null;
    } else {
    return new String(input.toUpperCase());
    }
    }
    }

    Maven编译jar包,并且上传至服务器,然后put至hdfs一个临时目录

    1
    2
    [root@master201 bin]# hadoop fs -mkdir /lishijia/hive/udf/
    [root@hadoop100 hive]# hadoop fs -put hive-data-demo-1.0-SNAPSHOT.jar /lishijia/hive/udf/

    切换到hive命令控制台使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    hive> add jar hdfs:////lishijia/hive/udf/hive-data-demo-1.0-SNAPSHOT.jar;
    hive> create temporary function toupper as 'lishijia.data.hive.UpperCassUDF';
    hive> show functions;
    可以查看到添加名称为toupper的function方法,然后通过下面的方式直接使用即可
    hive> select toupper(product) from lsj_item;
    MAC
    IPHONE
    XIAOMI
    HUAWEI
    HUAWEI
  • Python版Hive Udf代码&使用

    代码(Python): https://github.com/lishijia/py-data-demo/tree/master/hive

    1
    2
    3
    4
    5
    6
    7
    8
    #!/usr/bin/env python
    # coding=utf-8
    # python udf function for hive
    import sys
    for line in sys.stdin:
    tmp_line = line.upper()
    # 增加一个,号使之不换行,要不然在hive 命令控制台的时候,会看到多余的一个空行
    print tmp_line ,

    上传至服务器,然后put至hdfs一个临时目录

    1
    [root@hadoop100 hive]# hadoop fs -put udf-hive.py /lishijia/hive/udf/

    切换到hive命令控制台使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    hive> add file hdfs:////lishijia/hive/udf/udf-hive.py;
    hive> select transform(product) using 'python udf-hive.py' as product from lsj_item;
    Total MapReduce CPU Time Spent: 1 seconds 50 msec
    OK
    MAC
    IPHONE
    XIAOMI
    HUAWEI
    HUAWEI
    Time taken: 14.218 seconds, Fetched: 5 row(s)

Java连接操作Hive(Thrift接口)

  • 启动hiveserver2(Thrift接口)服务

    1
    2
    [root@hadoop100 bin]# cd /home/lishijia/Soft/apache-hive-1.2.2/bin
    [root@hadoop100 bin]# ./hiveserver2
  • 验证测试hiveserver2 beeline 连接测试,如果报以下错误,赋值下权限(通过Beeline命令连接hive)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    [root@master201 bin]# beeline -u jdbc:hive2://master201:10000          
    Connecting to jdbc:hive2://master201:10000
    Error: Failed to open new session: java.lang.RuntimeException: java.lang.RuntimeException: org.apache.hadoop.security.AccessControlException: Permission denied: user=anonymous, access=EXECUTE, inode="/tmp/hive":root:supergroup:drwx------(state=,code=0)
    Beeline version 1.2.2 by Apache Hive
    0: jdbc:hive2://master201:10000 (closed)> [root@master201 bin]#
    [root@master201 bin]#
    [root@master201 bin]#
    [root@master201 bin]# hadoop fs -chmod -R 777 /tmp/
    [root@master201 bin]# beeline -u jdbc:hive2://master201:10000
    Connecting to jdbc:hive2://master201:10000
    Connected to: Apache Hive (version 1.2.2)
    Driver: Hive JDBC (version 1.2.2)
    Transaction isolation: TRANSACTION_REPEATABLE_READ
    Beeline version 1.2.2 by Apache Hive
    0: jdbc:hive2://master201:10000> use lishijia;
    No rows affected (1.329 seconds)
    0: jdbc:hive2://master201:10000> show tables;
    +--------------------+--+
    | tab_name |
    +--------------------+--+
    | lsj_item |
    | lsj_item_delivery |
    | lsj_order |
    | st_article |
    +--------------------+--+
    4 rows selected (0.696 seconds)
    0: jdbc:hive2://master201:10000>
  • Java代码示例

    代码(Java): https://github.com/lishijia/java-data-demo/tree/master/hive-data-demo

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    package lishijia.data.hive;
    import java.sql.*;

    /**
    * @program:
    * @description: 通过jdbc连接hive对外提供的接口服务(thrift)
    * @author: lishijia
    * @create: 2018/9/25 16:37
    **/
    public class ThirftHiveDemo {

    String driverClass = "org.apache.hive.jdbc.HiveDriver";
    String url = "jdbc:hive2://master201:10000/lishijia";

    public static void main(String[] args) {
    long l = System.currentTimeMillis();
    ThirftHiveDemo thirftHiveDemo = new ThirftHiveDemo();
    thirftHiveDemo.search();
    System.out.println(System.currentTimeMillis()-l);
    }

    private void search(){
    Connection conn = null;
    Statement stmt = null;
    try {
    conn = getConn();
    System.out.println(conn);
    stmt = conn.createStatement();
    // hive表名
    String tableName = "lsj_order";
    String sql = "select * from " + tableName + " limit 10";
    System.out.println("Running:" + sql);
    ResultSet res = stmt.executeQuery(sql);
    System.out.println("执行 select * query 运行结果:");
    while (res.next()) {
    System.out.println(res.getString(1) + "\t" + res.getString(2));
    }
    } catch (ClassNotFoundException e) {
    e.printStackTrace();
    System.exit(1);
    } catch (SQLException e) {
    e.printStackTrace();
    System.exit(1);
    } finally {
    try {
    if (stmt != null) {
    stmt.close();
    stmt = null;
    }
    if (conn != null) {
    conn.close();
    conn = null;
    }
    } catch (SQLException e) {
    e.printStackTrace();
    }
    }
    }

    private Connection getConn() throws ClassNotFoundException,
    SQLException {
    Class.forName(driverClass);
    Connection conn = DriverManager.getConnection(url);
    return conn;
    }

    }

    查询结果如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    C:\Users\Administrator\.m2\repository\stax\stax-api\1.0.1\stax-api-1.0.1.jar;C:\Users\Administrator\.m2\repository\jline\jline\2.12\jline-2.12.jar" lishijia.data.hive.ThirftHiveDemo
    log4j:WARN No appenders could be found for logger (org.apache.hive.jdbc.Utils).
    log4j:WARN Please initialize the log4j system properly.
    log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
    org.apache.hive.jdbc.HiveConnection@215be6bb
    Running:select * from st_order limit 10
    执行 select * query 运行结果:
    order_id user_id
    2539329 1
    2398795 1
    473747 1
    2254736 1
    431534 1
    3367565 1
    550135 1
    3108588 1
    2295261 1
    5933

    Process finished with exit code 0

综合案例:Hive用户购买top10%

  • Group by o.user_id,p.product_id最里面的查询,可以得出user_id, product_id, count(1) product_cnt 即用户的商品种类统计
  • row_number() over(distribute by user_id sort by product_cnt desc) row_product 根据用户购买的商品种类排序(商品购买数量倒序)
  • ceil((count(1) over(partition by user_id))*0.1) topNum from 计算出需要top10%个商品,ceil向上取正的方式。即如果用户购买了25个,top10%则取3个商品
  • where row_product <= topNum 取top10%的商品(按照商品的购买数量倒序取)
  • concat_ws(‘,’,collect_set(product_id)) 打平
1
2
3
4
5
6
7
8
9
10
11
12
Select user_id, concat_ws(',',collect_set(product_id)) from (
Select user_id, product_id,
row_number() over(distribute by user_id sort by product_cnt desc) row_product,
ceil((count(1) over(partition by user_id))*0.1) topNum from (
Select user_id, product_id, count(1) product_cnt
from orders o
Join order_products_prior p
On o.order_id = p.order_id
Where o.user_id = 1
Group by o.user_id,p.product_id
) s ) ss where row_product <= topNum
Group by user_id

总结

Hive Join连接一些注意事项及优化方法,另外如何自定义函数通过hive的方式使用。最后通过Java代码的方式连接Hive(thrift接口)查询表中的数据。

代码(Java): https://github.com/lishijia/java-data-demo/tree/master/hive-data-demo

代码(Python): https://github.com/lishijia/py-data-demo/tree/master/hive

Hive 安装配置 & 简单使用

Hadoop2.7.2集群安装

分享到 评论