前言
基于Hadoop平台的很多应用场景中,我们业务数据一般是需要进行实实时或离线分析,当时给某个大集团下面做数据分析,业务包含了实时数据统计(销售,入园,在园人数等),离线分析包含用户特性分析等场景。为了统一离线和实时计算,我们都希望把实时计算和离线计算的数据源统一起来一起输入,然后将它们分别流向实时分析系统与离线分析系统本别进行分析处理。这里我们使用的是Kafka作为数据接收(由于是交易数据),由后台直接把数据发送到Kafka作为收据收集中间层,然后通过不同的消费端来处理这些数据或实时分析或入库存储(HDFS)做离线分析。这里我们先介绍入库Hive,为后续做离线分析做数据准备。
环境准备
Kafka + Spark Streaming + Hive整合。通过后台打入到Kafka的消息,然后通过Spark Streaming消费作为中间层,把数据流向Hive中,为后续的用户离线分析计算做准备
Hive创建分区表
某集团下的景区遍布全国,这里通过景区编码与年份作为分区,这里做的数据分析一般都是按照景区年份来做分析,so按照这两个维度来进行创建分区表
销售数据
1
2
3
4
5
6
7
8
9
10
11
12create table s_order(
scenic_name string,
channel_name string,
channel_code string,
order_no string,
place_time string,
settle_price string,
settle_amount string,
certificate_no string,
mobile_no string,
buy_number string
)partitioned by (scenic_code string,year_of_place string);入园消费数据
1
2
3
4
5
6
7
8
9
10create table s_consume(
scenic_name string,
channel_name string,
order_no string,
consume_no string,
consume_time string,
consume_number string,
settle_price string,
settle_amount string
)partitioned by (scenic_code string,year_of_place string);
Kafka配置Topic创建&模拟发送消息
首先,写了一个Kafka Producer模拟程序,用来模拟向Kafka实时写入用户行为的事件数据,数据是JSON格式,示例如下
字段解释:
scenic_code,景区编号
scenic_name,景区名称
channel_name,销售渠道
channel_code,渠道编码
order_no,订单号
place_time,下单时间
settle_price,结算单价
settle_amount,计算总价
certificate_no, 下单用户身份证号码
mobile_no,下单用户手机号
buy_number, 购买数量
1
{"scenic_code":"s0001", "scenic_name":"景区s0001", "channel_name":"销售渠道c001", "channel_code":"c001","order_no":"s000100000001", "place_time":"2018-12-05 12:23:12", "settle_price":"110.00", "settle_amount":"110.00", "certificate_no":"430502xxxx06176212", "mobile_no":"136xxxx6224", "buy_number":"1", }
创建Kafka对应Order的Topic
1
./kafka-topics.sh --create --zookeeper master201:2181,slave202:2181,slave203:2181/kafka --replication-factor 3 --partitions 5 --topic orderTopicV1
准备发送消息程序,模拟后台往Kafka中发送消息
程序入口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45package lishijia.streaming.scenic.kafka;
import com.alibaba.fastjson.JSONObject;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.UUID;
/**
* 模拟发送消息
*/
public class KafkaApp {
public static void main(String args[]) throws InterruptedException {
SimpleDateFormat format = new SimpleDateFormat("yyyy-mm-dd HH:mm:ss");
KafkaSenderProcessor processor = new KafkaSenderProcessor();
KafkaConfiguration configuration = new KafkaConfiguration();
configuration.setHost("master201:9092,slave202:9092,slave203:9092");
processor.setKafkaConfiguration(configuration);
processor.init();
Random s = new Random();
for(int i=0;i<10000;i++){
Orders order = new Orders();
int code = (s.nextInt(9)+1);
order.setScenic_code("s000" + code);
order.setScenic_name("景区s000" + code);
order.setChannel_name("销售渠道c00" + code);
order.setChannel_code("c00" + code);
order.setOrder_no(System.currentTimeMillis()+"");
order.setPlace_time(format.format(new Date()));
order.setSettle_price("110.00");
order.setSettle_amount("110.00");
order.setCertificate_no("430502xxxx06176212");
order.setMobile_no("136xxxx6224");
//发送消息
processor.send(JSONObject.toJSONString(order), "orderTopicV1");
System.out.println("send mess i = " + i);
Thread.sleep(1000);
}
System.out.println("done");
}
}Kafka发送消息Producer处理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50package lishijia.streaming.scenic.kafka;
import org.apache.kafka.clients.producer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.Future;
public class KafkaSenderProcessor {
private Logger logger = LoggerFactory.getLogger(KafkaSenderProcessor.class);
private KafkaConfiguration kafkaConfiguration;
private Producer<String, String> kafkaProducer;
public boolean send(String value, String topic) {
try {
ProducerRecord<String, String> msg = new ProducerRecord<String, String>(topic, null, value);
Future<RecordMetadata> f = kafkaProducer.send(msg);
RecordMetadata resp = f.get();
logger.info(" send message topic: " + topic + ", offset : " + resp.offset());
} catch (Exception e) {
e.printStackTrace();
logger.error(e.getMessage(), e);
return false;
}
return true;
}
public void init() {
Properties props = new Properties();
long begin = System.currentTimeMillis();
props.put("key.serializer", kafkaConfiguration.getKeySerializer());
props.put("value.serializer", kafkaConfiguration.getValueSerializer());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getFullIp());
logger.info(" kafka sender processor init ... ");
kafkaProducer = new KafkaProducer<String, String>(props);
logger.info(" kafka sender processor init complete, time: " + (System.currentTimeMillis() - begin));
}
public void setKafkaConfiguration(KafkaConfiguration kafkaConfiguration) {
this.kafkaConfiguration = kafkaConfiguration;
}
}Kafka配置实体
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59package lishijia.streaming.scenic.kafka;
public class KafkaConfiguration {
private String host;
private String keySerializer = "org.apache.kafka.common.serialization.StringSerializer";
private String valueSerializer = "org.apache.kafka.common.serialization.StringSerializer";
private String keyDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
private String valueDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getKeySerializer() {
return keySerializer;
}
public void setKeySerializer(String keySerializer) {
this.keySerializer = keySerializer;
}
public String getValueSerializer() {
return valueSerializer;
}
public void setValueSerializer(String valueSerializer) {
this.valueSerializer = valueSerializer;
}
public String getFullIp() {
return host;
}
public String getKeyDeserializer() {
return keyDeserializer;
}
public void setKeyDeserializer(String keyDeserializer) {
this.keyDeserializer = keyDeserializer;
}
public String getValueDeserializer() {
return valueDeserializer;
}
public void setValueDeserializer(String valueDeserializer) {
this.valueDeserializer = valueDeserializer;
}
}消息实体
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111package lishijia.streaming.scenic.kafka;
import java.io.Serializable;
public class Order implements Serializable{
private String scenic_code;
private String scenic_name;
private String channel_name;
private String channel_code;
private String order_no;
private String place_time;
private String settle_price;
private String settle_amount;
private String certificate_no;
private String mobile_no;
private String buy_number;
public String getScenic_code() {
return scenic_code;
}
public void setScenic_code(String scenic_code) {
this.scenic_code = scenic_code;
}
public String getScenic_name() {
return scenic_name;
}
public void setScenic_name(String scenic_name) {
this.scenic_name = scenic_name;
}
public String getChannel_name() {
return channel_name;
}
public void setChannel_name(String channel_name) {
this.channel_name = channel_name;
}
public String getChannel_code() {
return channel_code;
}
public void setChannel_code(String channel_code) {
this.channel_code = channel_code;
}
public String getOrder_no() {
return order_no;
}
public void setOrder_no(String order_no) {
this.order_no = order_no;
}
public String getPlace_time() {
return place_time;
}
public void setPlace_time(String place_time) {
this.place_time = place_time;
}
public String getSettle_price() {
return settle_price;
}
public void setSettle_price(String settle_price) {
this.settle_price = settle_price;
}
public String getSettle_amount() {
return settle_amount;
}
public void setSettle_amount(String settle_amount) {
this.settle_amount = settle_amount;
}
public String getCertificate_no() {
return certificate_no;
}
public void setCertificate_no(String certificate_no) {
this.certificate_no = certificate_no;
}
public String getMobile_no() {
return mobile_no;
}
public void setMobile_no(String mobile_no) {
this.mobile_no = mobile_no;
}
public String getBuy_number() {
return buy_number;
}
public void setBuy_number(String buy_number) {
this.buy_number = buy_number;
}
}
Spark Streaming消费消息追加写入到Hive中
离线数据通过Streaming消费的方式入库到hive中
这个业务处理方式processRdd(rdd),目前没办法做到仅且一次(有可能会存在重复执行的可能)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91package lishijia.streaming.scenic.offline
import com.alibaba.fastjson.JSON
import kafka.serializer.StringDecoder
import lishijia.streaming.scenic.kafka.Orders
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.streaming.kafka.KafkaManagerV1
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Streaming对接Kafka消息,然后入库到Hive中
*/
object KafkaToStreamingToHive {
case class Order(scenic_code:String,scenic_name:String,channel_name:String,channel_code:String,order_no:String
,place_time:String,settle_price:String,settle_amount:String,certificate_no:String,mobile_no:String
,buy_number:String,place_year:String)
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
val Array(brokers, topics, consumer) =
Array("master201:9092,slave202:9092,slave203:9092",
"orderTopicV1",
"offline_consume")
val conf = new SparkConf().setAppName("KafkaToStreamingToHive").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(2))
val topicSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
"group.id" -> consumer)
val km = new KafkaManagerV1(kafkaParams)
val message = km.createDirectStream[
String,
String,
StringDecoder,
StringDecoder](ssc, kafkaParams, topicSet)
message.foreachRDD(rdd => {
// 先处理消息
processRdd(rdd)
// 再更新offsets
km.updateZKOffsets(rdd)
})
ssc.start()
ssc.awaitTermination()
}
def processRdd(rdd: RDD[(String, String)]): Unit = {
val orders = rdd.map(_._2)
val df = rdd2DF(orders)
//通过mode来指定输出文件的是append。创建新文件来追加文件
df.write.mode(SaveMode.Append).insertInto("lishijia.s_order")
}
/**
* @param rdd
* @return
*/
def rdd2DF(rdd:RDD[String]): DataFrame = {
val spark = SparkSession
.builder()
.appName("KafkaDataToHive")
.config("hive.exec.dynamic.parition", "true")
.config("hive.exec.dynamic.parition.mode", "nonstrict")
.enableHiveSupport().getOrCreate()
import spark.implicits._
/**
* {"scenic_code":"s0001", "scenic_name":"景区s0001", "channel_name":"销售渠道c001", "
* channel_code":"c001","order_no":"s000100000001", "place_time":"2018-12-05 12:23:12", "
* settle_price":"110.00", "settle_amount":"110.00", "certificate_no":"430502xxxx06176212",
* "mobile_no":"136xxxx6224", "buy_number":"1", }
*/
rdd.map{x=>
val order = JSON.parseObject(x, classOf[Orders])
import java.text.SimpleDateFormat
val aDate = new SimpleDateFormat("yyyy-MM-dd")
val place_year = aDate.parse(order.getPlace_time).getYear.toString
Order(order.getScenic_name, order.getChannel_name, order.getChannel_code,
order.getOrder_no, order.getPlace_time, order.getSettle_price, order.getSettle_amount, order.getCertificate_no,
order.getMobile_no, order.getBuy_number, order.getScenic_code, place_year)
}.toDF()
}
}Kafka重写createDirectStream对接消费方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140package org.apache.spark.streaming.kafka
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaCluster.{LeaderOffset}
import scala.reflect.ClassTag
/**
*
* @param kafkaParams
*/
class KafkaManagerV1(val kafkaParams: Map[String, String]) extends Serializable {
private val kc = new KafkaCluster(kafkaParams)
/**
* 创建数据流
*
* @param ssc
* @param kafkaParams
* @param topics
* @tparam K
* @tparam V
* @tparam KD
* @tparam VD
* @return
*/
def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K] : ClassTag, VD <: Decoder[V] : ClassTag](
ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String]): InputDStream[(K, V)] = {
val groupId = kafkaParams.get("group.id").get
// 在zookeeper上读取offsets前先根据实际情况更新offsets
setOrUpdateOffsets(topics, groupId)
//从zookeeper上读取offset开始消费message
val messages = {
val partitionsE = kc.getPartitions(topics)
if (partitionsE.isLeft)
throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
val partitions = partitionsE.right.get
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
if (consumerOffsetsE.isLeft)
throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}")
val consumerOffsets = consumerOffsetsE.right.get
KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
}
messages
}
/**
* 创建数据流前,根据实际消费情况更新消费offsets
*
* @param topics
* @param groupId
*/
private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
topics.foreach(topic => {
var hasConsumed = true
val partitionsE = kc.getPartitions(Set(topic))
if (partitionsE.isLeft)
throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
val partitions = partitionsE.right.get
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
if (consumerOffsetsE.isLeft) hasConsumed = false
if (hasConsumed) {
// 消费过
/**
* 如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException,
* 说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。
* 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
* 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时,
* 这时把consumerOffsets更新为earliestLeaderOffsets
*/
val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
if (earliestLeaderOffsetsE.isLeft)
throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")
val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get
val consumerOffsets = consumerOffsetsE.right.get
// 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets
var offsets: Map[TopicAndPartition, Long] = Map()
consumerOffsets.foreach({ case (tp, n) =>
val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
if (n < earliestLeaderOffset) {
println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
" offsets已经过时,更新为" + earliestLeaderOffset)
offsets += (tp -> earliestLeaderOffset)
}
})
if (!offsets.isEmpty) {
kc.setConsumerOffsets(groupId, offsets)
}
} else {
// 没有消费过
val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
if (reset == Some("smallest")) {
val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
if (leaderOffsetsE.isLeft)
throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
leaderOffsets = leaderOffsetsE.right.get
} else {
val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
if (leaderOffsetsE.isLeft)
throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
leaderOffsets = leaderOffsetsE.right.get
}
val offsets = leaderOffsets.map {
case (tp, offset) => (tp, offset.offset)
}
kc.setConsumerOffsets(groupId, offsets)
}
})
}
/**
* 更新zookeeper上的消费offsets
*
* @param rdd
*/
def updateZKOffsets(rdd: RDD[(String, String)]): Unit = {
val groupId = kafkaParams.get("group.id").get
val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (offsets <- offsetsList) {
val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
if (o.isLeft) {
println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
}
}
}
}本地运行
注释掉hive部分代码,即可。本地消费打印
提交yarn集群运行
本地打包
1
mvn assembly:assembly
执行脚本如下:
1
2
3
4
5
6cd /home/lishijia/Soft/spark-2.0.2-bin-hadoop2.7
./bin/spark-submit \
--class lishijia.streaming.scenic.offline.KafkaToStreamingToHive \
--master yarn-cluster \
--files $HIVE_HOME/conf/hive-site.xml \
/home/lishijia/Soft/spark-2.0.2-bin-hadoop2.7/demo/scala-spark-demo-mvn-1.0-SNAPSHOT-jar-with-dependencies.jar验证信息
由Driver分发到不同的机器执行
slave202机器日志
slave203机器日志,由此可以看到大部分的处理分发到了slave202上去执行了
hive查询结果
由于是通过append的方式追加到hive中,所以生成了很多小文件。此处还需要通过定时任务去把这些小文件合并
遇到的问题
创建表的时候使用了terminated \n标识每一行数据,但是通过append追加的方式
创建表的顺序保证
提交任务hive配置错误,由localhost改为对应的ip或者host名称
把hive-site.xml配置修改为ip
Hive动态设置分区报错
解决方案:
1
set hive.exec.dynamic.partition.mode=nonstrict;
1
2
3
4
518/12/13 21:53:38 INFO metastore.HiveMetaStore: 0: get_table : db=lishijia tbl=s_order
18/12/13 21:53:38 INFO HiveMetaStore.audit: ugi=root ip=unknown-ip-addr cmd=get_table : db=lishijia tbl=s_order
18/12/13 21:53:38 INFO common.FileUtils: Creating directory if it doesn't exist: hdfs://master201:9000/user/hive/warehouse/lishijia.db/s_order/.hive-staging_hive_2018-12-13_21-53-38_162_2961647512288825303-1
18/12/13 21:53:38 ERROR scheduler.JobScheduler: Error running job streaming job 1544709218000 ms.0
org.apache.spark.SparkException: Dynamic partition strict mode requires at least one static partition column. To turn this off set hive.exec.dynamic.partition.mode=nonstrict at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:191)
总结
通过以上步骤,从业务生成消息到发送到Kafka中,并且通过Spark Streaming消费处理入库到hive的整个过程,这个是当时我们技术预演的一个方案,由于对业务代码还是存在侵入性,需要业务在一些关键点上把消息发送到Kafka中。也相当于了解到对应技术方案的优缺点。而之后我们直接通过Flume抓取业务系统Mysql的binlog日志直接到Kafka来解决对业务侵入的问题。
参考
https://blog.csdn.net/duan_zhihua/article/details/52006054?locationNum=11
https://blog.csdn.net/ligt0610/article/details/47311771
https://www.jianshu.com/p/2369a020e604
http://www.zhangrenhua.com/2016/08/02/hadoop-spark-streaming%E6%95%B0%E6%8D%AE%E6%97%A0%E4%B8%A2%E5%A4%B1%E8%AF%BB%E5%8F%96kafka%EF%BC%8C%E4%BB%A5%E5%8F%8A%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90/