Kafka + Spark Streaming +Hive 数据采集入库整合实践(二)

前言

基于Hadoop平台的很多应用场景中,我们业务数据一般是需要进行实实时或离线分析,当时给某个大集团下面做数据分析,业务包含了实时数据统计(销售,入园,在园人数等),离线分析包含用户特性分析等场景。为了统一离线和实时计算,我们都希望把实时计算和离线计算的数据源统一起来一起输入,然后将它们分别流向实时分析系统与离线分析系统本别进行分析处理。这里我们使用的是Kafka作为数据接收(由于是交易数据),由后台直接把数据发送到Kafka作为收据收集中间层,然后通过不同的消费端来处理这些数据或实时分析或入库存储(HDFS)做离线分析。这里我们先介绍入库Hive,为后续做离线分析做数据准备。

环境准备

Kafka + Spark Streaming + Hive整合。通过后台打入到Kafka的消息,然后通过Spark Streaming消费作为中间层,把数据流向Hive中,为后续的用户离线分析计算做准备

Hive创建分区表

某集团下的景区遍布全国,这里通过景区编码与年份作为分区,这里做的数据分析一般都是按照景区年份来做分析,so按照这两个维度来进行创建分区表

  • 销售数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    create table s_order(
    scenic_name string,
    channel_name string,
    channel_code string,
    order_no string,
    place_time string,
    settle_price string,
    settle_amount string,
    certificate_no string,
    mobile_no string,
    buy_number string
    )partitioned by (scenic_code string,year_of_place string);
  • 入园消费数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    create table s_consume(
    scenic_name string,
    channel_name string,
    order_no string,
    consume_no string,
    consume_time string,
    consume_number string,
    settle_price string,
    settle_amount string
    )partitioned by (scenic_code string,year_of_place string);

Kafka配置Topic创建&模拟发送消息

  • 首先,写了一个Kafka Producer模拟程序,用来模拟向Kafka实时写入用户行为的事件数据,数据是JSON格式,示例如下

    字段解释:

    scenic_code,景区编号

    scenic_name,景区名称

    channel_name,销售渠道

    channel_code,渠道编码

    order_no,订单号

    place_time,下单时间

    settle_price,结算单价

    settle_amount,计算总价

    certificate_no, 下单用户身份证号码

    mobile_no,下单用户手机号

    buy_number, 购买数量

    1
    {"scenic_code":"s0001", "scenic_name":"景区s0001", "channel_name":"销售渠道c001", "channel_code":"c001","order_no":"s000100000001", "place_time":"2018-12-05 12:23:12", "settle_price":"110.00", "settle_amount":"110.00", "certificate_no":"430502xxxx06176212", "mobile_no":"136xxxx6224", "buy_number":"1", }
  • 创建Kafka对应Order的Topic

    1
    ./kafka-topics.sh --create --zookeeper master201:2181,slave202:2181,slave203:2181/kafka --replication-factor 3 --partitions 5 --topic orderTopicV1
  • 准备发送消息程序,模拟后台往Kafka中发送消息

    程序入口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    package lishijia.streaming.scenic.kafka;

    import com.alibaba.fastjson.JSONObject;

    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Random;
    import java.util.UUID;

    /**
    * 模拟发送消息
    */
    public class KafkaApp {

    public static void main(String args[]) throws InterruptedException {

    SimpleDateFormat format = new SimpleDateFormat("yyyy-mm-dd HH:mm:ss");

    KafkaSenderProcessor processor = new KafkaSenderProcessor();
    KafkaConfiguration configuration = new KafkaConfiguration();
    configuration.setHost("master201:9092,slave202:9092,slave203:9092");
    processor.setKafkaConfiguration(configuration);
    processor.init();
    Random s = new Random();
    for(int i=0;i<10000;i++){
    Orders order = new Orders();
    int code = (s.nextInt(9)+1);
    order.setScenic_code("s000" + code);
    order.setScenic_name("景区s000" + code);
    order.setChannel_name("销售渠道c00" + code);
    order.setChannel_code("c00" + code);
    order.setOrder_no(System.currentTimeMillis()+"");
    order.setPlace_time(format.format(new Date()));
    order.setSettle_price("110.00");
    order.setSettle_amount("110.00");
    order.setCertificate_no("430502xxxx06176212");
    order.setMobile_no("136xxxx6224");
    //发送消息
    processor.send(JSONObject.toJSONString(order), "orderTopicV1");
    System.out.println("send mess i = " + i);
    Thread.sleep(1000);
    }
    System.out.println("done");
    }
    }

    Kafka发送消息Producer处理器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    package lishijia.streaming.scenic.kafka;

    import org.apache.kafka.clients.producer.*;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import java.util.Properties;
    import java.util.concurrent.Future;

    public class KafkaSenderProcessor {

    private Logger logger = LoggerFactory.getLogger(KafkaSenderProcessor.class);

    private KafkaConfiguration kafkaConfiguration;

    private Producer<String, String> kafkaProducer;

    public boolean send(String value, String topic) {

    try {
    ProducerRecord<String, String> msg = new ProducerRecord<String, String>(topic, null, value);
    Future<RecordMetadata> f = kafkaProducer.send(msg);
    RecordMetadata resp = f.get();
    logger.info(" send message topic: " + topic + ", offset : " + resp.offset());
    } catch (Exception e) {
    e.printStackTrace();
    logger.error(e.getMessage(), e);
    return false;
    }

    return true;
    }

    public void init() {
    Properties props = new Properties();
    long begin = System.currentTimeMillis();
    props.put("key.serializer", kafkaConfiguration.getKeySerializer());
    props.put("value.serializer", kafkaConfiguration.getValueSerializer());
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getFullIp());
    logger.info(" kafka sender processor init ... ");
    kafkaProducer = new KafkaProducer<String, String>(props);
    logger.info(" kafka sender processor init complete, time: " + (System.currentTimeMillis() - begin));
    }


    public void setKafkaConfiguration(KafkaConfiguration kafkaConfiguration) {
    this.kafkaConfiguration = kafkaConfiguration;
    }

    }

    Kafka配置实体

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    package lishijia.streaming.scenic.kafka;

    public class KafkaConfiguration {

    private String host;

    private String keySerializer = "org.apache.kafka.common.serialization.StringSerializer";

    private String valueSerializer = "org.apache.kafka.common.serialization.StringSerializer";

    private String keyDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";

    private String valueDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";

    public String getHost() {

    return host;
    }

    public void setHost(String host) {
    this.host = host;
    }

    public String getKeySerializer() {
    return keySerializer;
    }

    public void setKeySerializer(String keySerializer) {
    this.keySerializer = keySerializer;
    }

    public String getValueSerializer() {
    return valueSerializer;
    }

    public void setValueSerializer(String valueSerializer) {
    this.valueSerializer = valueSerializer;
    }

    public String getFullIp() {
    return host;
    }

    public String getKeyDeserializer() {
    return keyDeserializer;
    }

    public void setKeyDeserializer(String keyDeserializer) {
    this.keyDeserializer = keyDeserializer;
    }

    public String getValueDeserializer() {
    return valueDeserializer;
    }

    public void setValueDeserializer(String valueDeserializer) {
    this.valueDeserializer = valueDeserializer;
    }
    }

    消息实体

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    package lishijia.streaming.scenic.kafka;

    import java.io.Serializable;

    public class Order implements Serializable{

    private String scenic_code;
    private String scenic_name;

    private String channel_name;
    private String channel_code;

    private String order_no;
    private String place_time;

    private String settle_price;
    private String settle_amount;

    private String certificate_no;
    private String mobile_no;

    private String buy_number;

    public String getScenic_code() {
    return scenic_code;
    }

    public void setScenic_code(String scenic_code) {
    this.scenic_code = scenic_code;
    }

    public String getScenic_name() {
    return scenic_name;
    }

    public void setScenic_name(String scenic_name) {
    this.scenic_name = scenic_name;
    }

    public String getChannel_name() {
    return channel_name;
    }

    public void setChannel_name(String channel_name) {
    this.channel_name = channel_name;
    }

    public String getChannel_code() {
    return channel_code;
    }

    public void setChannel_code(String channel_code) {
    this.channel_code = channel_code;
    }

    public String getOrder_no() {
    return order_no;
    }

    public void setOrder_no(String order_no) {
    this.order_no = order_no;
    }

    public String getPlace_time() {
    return place_time;
    }

    public void setPlace_time(String place_time) {
    this.place_time = place_time;
    }

    public String getSettle_price() {
    return settle_price;
    }

    public void setSettle_price(String settle_price) {
    this.settle_price = settle_price;
    }

    public String getSettle_amount() {
    return settle_amount;
    }

    public void setSettle_amount(String settle_amount) {
    this.settle_amount = settle_amount;
    }

    public String getCertificate_no() {
    return certificate_no;
    }

    public void setCertificate_no(String certificate_no) {
    this.certificate_no = certificate_no;
    }

    public String getMobile_no() {
    return mobile_no;
    }

    public void setMobile_no(String mobile_no) {
    this.mobile_no = mobile_no;
    }

    public String getBuy_number() {
    return buy_number;
    }

    public void setBuy_number(String buy_number) {
    this.buy_number = buy_number;
    }
    }

Spark Streaming消费消息追加写入到Hive中

  • 离线数据通过Streaming消费的方式入库到hive中

    这个业务处理方式processRdd(rdd),目前没办法做到仅且一次(有可能会存在重复执行的可能)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    package lishijia.streaming.scenic.offline

    import com.alibaba.fastjson.JSON
    import kafka.serializer.StringDecoder
    import lishijia.streaming.scenic.kafka.Orders
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
    import org.apache.spark.streaming.kafka.KafkaManagerV1
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    /**
    * Streaming对接Kafka消息,然后入库到Hive中
    */
    object KafkaToStreamingToHive {

    case class Order(scenic_code:String,scenic_name:String,channel_name:String,channel_code:String,order_no:String
    ,place_time:String,settle_price:String,settle_amount:String,certificate_no:String,mobile_no:String
    ,buy_number:String,place_year:String)

    def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

    val Array(brokers, topics, consumer) =
    Array("master201:9092,slave202:9092,slave203:9092",
    "orderTopicV1",
    "offline_consume")

    val conf = new SparkConf().setAppName("KafkaToStreamingToHive").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(2))

    val topicSet = topics.split(",").toSet

    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
    "group.id" -> consumer)

    val km = new KafkaManagerV1(kafkaParams)
    val message = km.createDirectStream[
    String,
    String,
    StringDecoder,
    StringDecoder](ssc, kafkaParams, topicSet)

    message.foreachRDD(rdd => {
    // 先处理消息
    processRdd(rdd)
    // 再更新offsets
    km.updateZKOffsets(rdd)
    })

    ssc.start()
    ssc.awaitTermination()
    }

    def processRdd(rdd: RDD[(String, String)]): Unit = {
    val orders = rdd.map(_._2)
    val df = rdd2DF(orders)
    //通过mode来指定输出文件的是append。创建新文件来追加文件
    df.write.mode(SaveMode.Append).insertInto("lishijia.s_order")
    }

    /**
    * @param rdd
    * @return
    */
    def rdd2DF(rdd:RDD[String]): DataFrame = {
    val spark = SparkSession
    .builder()
    .appName("KafkaDataToHive")
    .config("hive.exec.dynamic.parition", "true")
    .config("hive.exec.dynamic.parition.mode", "nonstrict")
    .enableHiveSupport().getOrCreate()
    import spark.implicits._
    /**
    * {"scenic_code":"s0001", "scenic_name":"景区s0001", "channel_name":"销售渠道c001", "
    * channel_code":"c001","order_no":"s000100000001", "place_time":"2018-12-05 12:23:12", "
    * settle_price":"110.00", "settle_amount":"110.00", "certificate_no":"430502xxxx06176212",
    * "mobile_no":"136xxxx6224", "buy_number":"1", }
    */
    rdd.map{x=>
    val order = JSON.parseObject(x, classOf[Orders])
    import java.text.SimpleDateFormat
    val aDate = new SimpleDateFormat("yyyy-MM-dd")
    val place_year = aDate.parse(order.getPlace_time).getYear.toString
    Order(order.getScenic_name, order.getChannel_name, order.getChannel_code,
    order.getOrder_no, order.getPlace_time, order.getSettle_price, order.getSettle_amount, order.getCertificate_no,
    order.getMobile_no, order.getBuy_number, order.getScenic_code, place_year)
    }.toDF()
    }
    }

    Kafka重写createDirectStream对接消费方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    package org.apache.spark.streaming.kafka

    import kafka.common.TopicAndPartition
    import kafka.message.MessageAndMetadata
    import kafka.serializer.Decoder
    import org.apache.spark.SparkException
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka.KafkaCluster.{LeaderOffset}

    import scala.reflect.ClassTag


    /**
    *
    * @param kafkaParams
    */
    class KafkaManagerV1(val kafkaParams: Map[String, String]) extends Serializable {

    private val kc = new KafkaCluster(kafkaParams)

    /**
    * 创建数据流
    *
    * @param ssc
    * @param kafkaParams
    * @param topics
    * @tparam K
    * @tparam V
    * @tparam KD
    * @tparam VD
    * @return
    */
    def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K] : ClassTag, VD <: Decoder[V] : ClassTag](
    ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String]): InputDStream[(K, V)] = {
    val groupId = kafkaParams.get("group.id").get
    // 在zookeeper上读取offsets前先根据实际情况更新offsets
    setOrUpdateOffsets(topics, groupId)

    //从zookeeper上读取offset开始消费message
    val messages = {
    val partitionsE = kc.getPartitions(topics)
    if (partitionsE.isLeft)
    throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
    val partitions = partitionsE.right.get
    val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
    if (consumerOffsetsE.isLeft)
    throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}")
    val consumerOffsets = consumerOffsetsE.right.get
    KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
    ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
    }
    messages
    }

    /**
    * 创建数据流前,根据实际消费情况更新消费offsets
    *
    * @param topics
    * @param groupId
    */
    private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
    topics.foreach(topic => {
    var hasConsumed = true
    val partitionsE = kc.getPartitions(Set(topic))
    if (partitionsE.isLeft)
    throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
    val partitions = partitionsE.right.get
    val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
    if (consumerOffsetsE.isLeft) hasConsumed = false
    if (hasConsumed) {
    // 消费过
    /**
    * 如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException,
    * 说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。
    * 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
    * 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时,
    * 这时把consumerOffsets更新为earliestLeaderOffsets
    */
    val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
    if (earliestLeaderOffsetsE.isLeft)
    throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")
    val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get
    val consumerOffsets = consumerOffsetsE.right.get

    // 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets
    var offsets: Map[TopicAndPartition, Long] = Map()
    consumerOffsets.foreach({ case (tp, n) =>
    val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
    if (n < earliestLeaderOffset) {
    println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
    " offsets已经过时,更新为" + earliestLeaderOffset)
    offsets += (tp -> earliestLeaderOffset)
    }
    })
    if (!offsets.isEmpty) {
    kc.setConsumerOffsets(groupId, offsets)
    }
    } else {
    // 没有消费过
    val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
    var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
    if (reset == Some("smallest")) {
    val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
    if (leaderOffsetsE.isLeft)
    throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
    leaderOffsets = leaderOffsetsE.right.get
    } else {
    val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
    if (leaderOffsetsE.isLeft)
    throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
    leaderOffsets = leaderOffsetsE.right.get
    }
    val offsets = leaderOffsets.map {
    case (tp, offset) => (tp, offset.offset)
    }
    kc.setConsumerOffsets(groupId, offsets)
    }
    })
    }

    /**
    * 更新zookeeper上的消费offsets
    *
    * @param rdd
    */
    def updateZKOffsets(rdd: RDD[(String, String)]): Unit = {
    val groupId = kafkaParams.get("group.id").get
    val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    for (offsets <- offsetsList) {
    val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
    val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
    if (o.isLeft) {
    println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
    }
    }
    }
    }
  • 本地运行

    注释掉hive部分代码,即可。本地消费打印

  • 提交yarn集群运行

    本地打包

    1
    mvn assembly:assembly

    执行脚本如下:

    1
    2
    3
    4
    5
    6
    cd /home/lishijia/Soft/spark-2.0.2-bin-hadoop2.7       
    ./bin/spark-submit \
    --class lishijia.streaming.scenic.offline.KafkaToStreamingToHive \
    --master yarn-cluster \
    --files $HIVE_HOME/conf/hive-site.xml \
    /home/lishijia/Soft/spark-2.0.2-bin-hadoop2.7/demo/scala-spark-demo-mvn-1.0-SNAPSHOT-jar-with-dependencies.jar
  • 验证信息

    由Driver分发到不同的机器执行

    slave202机器日志

    slave203机器日志,由此可以看到大部分的处理分发到了slave202上去执行了

    hive查询结果

    由于是通过append的方式追加到hive中,所以生成了很多小文件。此处还需要通过定时任务去把这些小文件合并

遇到的问题

  • 创建表的时候使用了terminated \n标识每一行数据,但是通过append追加的方式

  • 创建表的顺序保证

  • 提交任务hive配置错误,由localhost改为对应的ip或者host名称

    把hive-site.xml配置修改为ip

  • Hive动态设置分区报错

    解决方案:

    1
    set hive.exec.dynamic.partition.mode=nonstrict;
    1
    2
    3
    4
    5
    18/12/13 21:53:38 INFO metastore.HiveMetaStore: 0: get_table : db=lishijia tbl=s_order
    18/12/13 21:53:38 INFO HiveMetaStore.audit: ugi=root ip=unknown-ip-addr cmd=get_table : db=lishijia tbl=s_order
    18/12/13 21:53:38 INFO common.FileUtils: Creating directory if it doesn't exist: hdfs://master201:9000/user/hive/warehouse/lishijia.db/s_order/.hive-staging_hive_2018-12-13_21-53-38_162_2961647512288825303-1
    18/12/13 21:53:38 ERROR scheduler.JobScheduler: Error running job streaming job 1544709218000 ms.0
    org.apache.spark.SparkException: Dynamic partition strict mode requires at least one static partition column. To turn this off set hive.exec.dynamic.partition.mode=nonstrict at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:191)

总结

通过以上步骤,从业务生成消息到发送到Kafka中,并且通过Spark Streaming消费处理入库到hive的整个过程,这个是当时我们技术预演的一个方案,由于对业务代码还是存在侵入性,需要业务在一些关键点上把消息发送到Kafka中。也相当于了解到对应技术方案的优缺点。而之后我们直接通过Flume抓取业务系统Mysql的binlog日志直接到Kafka来解决对业务侵入的问题。

代码:https://github.com/lishijia/scala-spark-demo-mvn/tree/master/src/main/scala/lishijia/spark/demo/streaming

参考

https://blog.csdn.net/duan_zhihua/article/details/52006054?locationNum=11
https://blog.csdn.net/ligt0610/article/details/47311771
https://www.jianshu.com/p/2369a020e604
http://www.zhangrenhua.com/2016/08/02/hadoop-spark-streaming%E6%95%B0%E6%8D%AE%E6%97%A0%E4%B8%A2%E5%A4%B1%E8%AF%BB%E5%8F%96kafka%EF%BC%8C%E4%BB%A5%E5%8F%8A%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90/

分享到 评论