Kafka介绍

Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性:

  • 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
  • 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。
  • 支持通过Kafka服务器和消费机集群来分区消息。
  • 支持Hadoop并行数据加载。

    消息队列的作用

  • 应用程序解耦并行处理
  • 顺序保证
  • 高吞吐率
  • 高容错、高可用
  • 可扩展
  • 峰值处理
    Kafka集群.png
    Kafka集群.png

    kafka原理

    Kafka集群由多个实例组成,每个节点称为Broker,对消息保存时根据Topic进行归类
    一个Topic可以被划分为多个Partition每个Partition可以有多个副本。
    Kafka原理图01.png
    Kafka原理图01.png

Partition内顺序存储,写入新消息采用追加的方式,消费消息采用FIFO的方式顺序拉取消息
一个Topic可以有多个分区,Kafka只保证同一个分区内有序,不保证Topic整体(多个分区之间)有序

kafka原理图02.png
kafka原理图02.png

Consumer Group(CG),为了加快读取速度,多个consumer可以划分为一个组,并行消费一个Toic,一个Topic可以由多个CG订阅,多个CG之间是平等的,同一个CG内可以有一个或多个consumer,同一个CG内的consumer之间是竞争 关系,一个消息在一个CG内的只能被一个consumer消费

kafka原理图03.png
kafka原理图03.png

一、Kafka集群部署

集群规划清单

名称 节点 说明 节点名
Broker01 192.168.43.22 kafka节点01 hadoop03
Broker02 192.168.43.23 kafka节点02 hadoop04
Broker03 192.168.43.24 kafka节点03 hadoop05
Zookeeper 192.168.43.20/21/22 Zookeeper集群节点 hadoop01/hadoop02/hadoop03

1.下载Kafka安装包,并解压安装

1
2
3
4
5
6
7
8
9
10
[root@hadoop03 kafka_2.11-0.10.2.1]# ll
总用量 52
drwxr-xr-x. 3 hadoop hadoop 4096 4月 22 2017 bin
drwxr-xr-x. 2 hadoop hadoop 4096 4月 22 2017 config
drwxr-xr-x. 2 root root 152 1月 20 18:57 kafka-logs
drwxr-xr-x. 2 hadoop hadoop 4096 1月 20 18:43 libs
-rw-r--r--. 1 hadoop hadoop 28824 4月 22 2017 LICENSE
drwxr-xr-x. 2 root root 4096 1月 20 23:07 logs
-rw-r--r--. 1 hadoop hadoop 336 4月 22 2017 NOTICE
drwxr-xr-x. 2 hadoop hadoop 47 4月 22 2017 site-docs

2.创建软链接

1
[root@hadoop03 kafka_2.11-0.10.2.1]# ln -s /home/hadoop/apps/kafka_2.11-0.10.2.1 /usr/local/kafka

3.创建日志文件夹

1
2
3
[root@hadoop03 kafka]# pwd
/usr/local/kafka
[root@hadoop03 kafka]# mkdir kafka-logs/

4.配置服务启动信息

在/usr/local/kafka/config目录下修改server.properties文件,具体内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

#每个borker的id是唯一的,多个broker要设置不同的id
broker.id=0

#访问端口号
port=9092

#访问地址
host.name=192.168.43.22

#允许删除topic
delete.topic.enable=true


# The number of threads handling network requests
num.network.threads=3

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

#存储数据路径,默认是在/tmp目录下,需要修改
log.dirs=/usr/local/kafka/kafka-logs

#创建topic默认分区数
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

#数据保存时间,默认7天,单位小时
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

#zookeeper地址,多个地址用逗号隔开
zookeeper.connect=192.168.43.20:2181,192.168.43.21:2181,192.168.43.22:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

5.拷贝文件信息到Broker02/Broker03节点上

1
2
scp -r /home/hadoop/apps/kafka_2.11-0.10.2.1 hadoop@node04:/home/hadoop/apps/
scp -r /home/hadoop/apps/kafka_2.11-0.10.2.1 hadoop@node04:/home/hadoop/apps/

6.修改Broker02和Broker03信息

创建软连接

1
[root@hadoop03 kafka_2.11-0.10.2.1]# ln -s /home/hadoop/apps/kafka_2.11-0.10.2.1 /usr/local/kafka

修改配置文件server.properties信息

1
2
broker.id=1
host.name=192.168.43.23

修改Broker03节点server.properties信息

1
2
broker.id=2
host.name=192.168.43.24

7.分别启动Broker01/Broker02/Broker03

以后台进程的方式启动Kafka

1
[root@hadoop03 bin]#./kafka-server-start.sh -daemon config/server.properties

二、Kafka应用实践

1.创建主题

1
2
3
4
[root@hadoop03 bin]# pwd
/usr/local/kafka/bin
[root@hadoop03 bin]# ./kafka-topics.sh --create --zookeeper 192.168.43.20:2181 --replication-factor 2 --partitions 3 --topic topicnewtest1
Created topic "topicnewtest1".

2.查看主题

1
2
[root@hadoop03 bin]# ./kafka-topics.sh  --list --zookeeper 192.168.43.20:2181
topicnewtest1

3.查看主题信息

1
2
3
4
5
[root@hadoop03 bin]# ./kafka-topics.sh --describe --zookeeper 192.168.43.20:2181 --topic topicnewtest1
Topic:topicnewtest1 PartitionCount:3 ReplicationFactor:2 Configs:
Topic: topicnewtest1 Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: topicnewtest1 Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: topicnewtest1 Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2

4.删除主题

1
2
3
[root@hadoop03 bin]# ./kafka-topics.sh --delete --zookeeper 192.168.43.20:2181 --topic topicnewtest1
Topic topicnewtest1 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

5.增加分区

1
2
3
4
5
6
7
8
9
10
[root@hadoop03 bin]# ./kafka-topics.sh --alter --zookeeper 192.168.43.20:2181 --topic topicnewtest1 --partitions 5
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
[root@hadoop03 bin]# ./kafka-topics.sh --describe --zookeeper 192.168.43.20:2181 --topic topicnewtest1
Topic:topicnewtest1 PartitionCount:5 ReplicationFactor:2 Configs:
Topic: topicnewtest1 Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: topicnewtest1 Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: topicnewtest1 Partition: 2 Leader: 0 Replicas: 0,2 Isr: 0,2
Topic: topicnewtest1 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: topicnewtest1 Partition: 4 Leader: 2 Replicas: 2,0 Isr: 2,0

6.使用kafka自带的生产者客户端脚本和消费端脚本

使用kafka自带的生产者客户端脚本

1
[root@hadoop03 bin]# ./kafka-console-producer.sh --broker-list 192.168.43.22:9092,192.168.43.23:9092 --topic topicnewtest1

使用kafka自带的消费者客户端脚本

1
[root@hadoop04 bin]# ./kafka-console-consumer.sh --zookeeper 192.168.43.20:2181 --from-beginning --topic topicnewtest1

在生成端发送消息,可以在消费看到消息

7.使用Java访问Kafka产生消息和消费消息

  • Producer

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    package cn.chinahadoop.client;

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;

    import java.util.Date;
    import java.util.Properties;
    import java.util.Random;

    /**
    * Kafka生产端
    * @author Zhangyongliang
    */
    public class ProducerClient {
    public static void main(String[] args){
    Properties props = new Properties();
    //kafka broker列表
    props.put("bootstrap.servers", "192.168.43.22:9092,192.168.43.23:9092,192.168.43.24:9092");
    //acks=1表示Broker接收到消息成功写入本地log文件后向Producer返回成功接收的信号,不需要等待所有的Follower全部同步完消息后再做回应
    props.put("acks", "1");
    //key和value的字符串序列化类
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<String, String>(props);
    //用户产生随机数,模拟消息生成
    Random rand = new Random();
    for(int i = 0; i < 20; i++) {
    //通过随机数产生一个ip地址作为key发送出去
    String ip = "192.168.1." + rand.nextInt(255);
    long runtime = new Date().getTime();
    //组装一条消息内容
    String msg = runtime + "---" + ip;
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println("send to kafka->key:" + ip + " value:" + msg);
    //向kafka topictest1主题发送消息
    producer.send(new ProducerRecord<String, String>("topicnewtest1", ip, msg));
    }
    producer.close();
    }
    }
  • ConSumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package com.yongliang.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

/**
* Kafka消费端
* @author Zhangyongliang
*/
public class ConsumerClient {
/**
* 手动提交偏移量
*/
public static void manualCommintClient(){
Properties props = new Properties();
//kafka broker列表
props.put("bootstrap.servers", "192.168.43.22:9092,192.168.43.23:9092,192.168.43.24:9092");
//consumer group id
props.put("group.id", "yongliang");
//手动提交offset
props.put("enable.auto.commit", "false");
//earliest表示从最早的偏移量开始拉取,latest表示从最新的偏移量开始拉取,none表示如果没有发现该Consumer组之前拉取的偏移量则抛异常。默认值latest。
props.put("auto.offset.reset", "earliest");
//key和value的字符串反序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//consumer订阅topictest1主题,同时消费多个主题用逗号隔开
consumer.subscribe(Arrays.asList("topicnewtest1"));
//每次最少处理10条消息后才提交
final int minBatchSize = 10;
//用于保存消息的list
List<ConsumerRecord<String, String>> bufferList = new ArrayList<ConsumerRecord<String, String>>();
while (true) {
System.out.println("--------------start pull message---------------" );
long starttime = System.currentTimeMillis();
//poll方法需要传入一个超时时间,当没有可以拉取的消息时先等待,
//如果已到超时时间还没有可以拉取的消息则进行下一轮拉取,单位毫秒
ConsumerRecords<String, String> records = consumer.poll(1000);
long endtime = System.currentTimeMillis();
long tm = (endtime - starttime) / 1000;
System.out.println("--------------end pull message and times=" + tm + "s -------------");

for (ConsumerRecord<String, String> record : records) {
System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
bufferList.add(record);
}
System.out.println("--------------buffer size->" + bufferList.size());
//如果读取到的消息满了10条, 就进行处理
if (bufferList.size() >= minBatchSize) {
System.out.println("******start deal message******");
try {
//当前线程睡眠1秒钟,模拟消息处理过程
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("manual commint offset start...");
//处理完之后进行提交
consumer.commitSync();
//清除list, 继续接收
bufferList.clear();
System.out.println("manual commint offset end...");
}
}
}

/**
* 自动提交偏移量
*/
public static void autoCommintClient(){
Properties props = new Properties();
//kafka broker列表
props.put("bootstrap.servers", "192.168.43.22:9092,192.168.43.23:9092,192.168.43.24:9092");
props.put("group.id", "newConsumerGroup");
//自动提交
props.put("enable.auto.commit", "true");
//自动提交时间间隔1000毫秒
props.put("auto.commit.interval.ms", "1000");
//earliest表示从最早的偏移量开始拉取,latest表示从最新的偏移量开始拉取,none表示如果没有发现该Consumer组之前拉取的偏移量则抛异常。默认值latest。
props.put("auto.offset.reset", "earliest");
//key和value的字符串反序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//consumer订阅topictest1主题,同时消费多个主题用逗号隔开
consumer.subscribe(Arrays.asList("topicnewtest1"));
while (true) {
//poll方法需要传入一个超时时间,当没有可以拉取的消息时先等待,
//如果已到超时时间还没有可以拉取的消息则进行下一轮拉取,单位毫秒
ConsumerRecords<String, String> records = consumer.poll(1000);
//处理拉取过来的消息
for (ConsumerRecord<String, String> record : records){
System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
}

}
}
public static void main(String[] args){
//自动提交offset
// autoCommintClient();
//手动提交offset
manualCommintClient();
}
}