19 kafka消息队列

976-沈同学

发表文章数:64

热门标签

首页 » 大数据 » 正文

19 kafka消息队列

一、kafka介绍

1、消息队列基本介绍

消息:在应用系统之间,传递的数据叫做消息
队列:排队的模型 先进先出 类似于火车进隧道
消息(Message)是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,有消息系统来确保信息的可靠专递,消息发布者只管把消息发布到MQ中而不管谁来取,消息使用者只管从MQ中取消息而不管谁发布的,这样发布者和使用者都不用知道对方的存在
19 kafka消息队列
19 kafka消息队列

2、常用的消息队列介绍

标注的消息队列实现
RabbitMQ: rabbit message queue
ActiveMQ:支持消息队列当中事务处理
RocketMQ: 阿里开源的消息队列 rocket
消息队列的模型:主要是基于pub/sub publish 、subscribe 发布与订阅模型
19 kafka消息队列
kafka:linkedin 公司开源提供的 吞吐量非常高,而且消息的处理速度非常快 大数据领域里面大部分都是使用kafka
kafka不是一个标准的消息队列的实现
消息队列模型:主要是基于push/poll 推送与拉取
19 kafka消息队列
19 kafka消息队列

3、消息队列的应用场景

消息队列在实际应用中包括如下四个场景:

应用耦合:多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败;

异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间;

19 kafka消息队列

	限流削峰:广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况;

19 kafka消息队列

消息驱动的系统:系统分为消息队列、消息生产者、消息消费者,生产者负责产生消息,消费者(可能有多个)负责对消息进行处理;

具体场景:用户新上传了一批照片, 人脸识别系统需要对这个用户的所有照片进行聚类,聚类完成后由对账系统重新生成用户的
人脸索引(加快查询)。这三个子系统间由消息队列连接起来,前一个阶段的处理结果放入队列中,
后一个阶段从队列中获取消息继续处理。

19 kafka消息队列

该方法有如下优点:
避免了直接调用下一个系统导致当前系统失败;
每个子系统对于消息的处理方式可以更为灵活,可以选择收到消息时就处理,可以选择定时处理,也可以划分时间段按
不同处理速度处理;

4、消息队列的两种模式

点对点(point to point, queue):两个人之间互相通信,都是点对点这种模型
19 kafka消息队列

点对点模式特点:

1.每个消息只有一个接收者(Consumer)(即一旦被消费,消息就不再在消息队列中);
2.发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息;
3.接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息;

发布订阅(publish/subscribe,topic):群聊
19 kafka消息队列
发布/订阅模式特点:

1.每个消息可以有多个订阅者;
2.发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
3.为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行;

5、kafka的基本介绍

官网:http://kafka.apache.org/
kafka是最初由linkedin公司开发的,使用scala语言编写,kafka是一个分布式,分区的,多副本的,多订阅者的日志系统(分布式MQ系统),可以用于搜索日志,监控日志,访问日志等

kafka是一个分布式的消息队列系统

分布式就是由多个节点组成,一个节点就是一个服务器
在kafka当中节点叫做broker ,一个节点就是一个broker,一个broker就是一个服务器
hadoop当中节点  datanode
hbase当中当中 HMaster以及HRegionServer

kafka的好处

可靠性:分布式的,分区,复制和容错的。
可扩展性:kafka消息传递系统轻松缩放,无需停机。
耐用性:kafka使用分布式提交日志,这意味着消息会尽可能快速的保存在磁盘上,因此它是持久的。 
性能:kafka对于发布和定于消息都具有高吞吐量。即使存储了许多TB的消息,他也爆出稳定的性能。 
kafka非常快:保证零停机和零数据丢失。
*磁盘顺序读写*

分布式的发布与订阅系统

apache kafka是一个分布式发布-订阅消息系统和一个强大的队列,可以处理大量的数据,并使能够将消息从一个端点传递到
另一个端点,kafka适合离线和在线消息消费。kafka消息保留在磁盘上,并在集群内复制以防止数据丢失。kafka构建在
zookeeper同步服务之上。它与apache和spark非常好的集成,应用于实时流式数据分析。

kafka的主要应用场景
指标分析

kafka   通常用于操作监控数据。这设计聚合来自分布式应用程序的统计信息,   以产生操作的数据集中反馈

日志聚合解决方法

kafka可用于跨组织从多个服务器收集日志,并使他们以标准的合适提供给多个服务器。

流式处理:实时处理 数据从出现到产生,在一秒钟以内能够处理完成
流式计算:程序一旦启动,就会一直运行下去,一旦有数据,就能够马上被处理掉

生产者生产数据到kafka里面去  ,然后通过一些实时处理的框架例如storm或者sparkStreaming或者flink等等
一些实时处理的框架去处理kafka里面的数据

6、kafka的架构介绍

分布式:肯定是多节点,多台服务器,组织到一起形成一个集群
19 kafka消息队列
生产者:producer 主要负责生产数据到 topic里面去
topic:虚拟的概念,某一类消息的主题,某一类消息都是存放在某一个topic当中
一个topic有多个partition:一个partition里面有多个segment段,每个segment默认1GB
一个segment: 一个.index文件 + 一个.log文件
.log:存放用户真实的产生的数据
.index 存放的是.log文件的索引数据
消费者:consumer 主要就是消费topic里面的数据
conusmer消费到哪一条数据需要进行记录:offset来进行记录 数据的偏移量 每条数据都有唯一的offset
.index文件:存放的索引文件,用于查找.log文件里面的数据

7、kafka架构内部细节剖析

19 kafka消息队列
19 kafka消息队列

kafka需要依赖zk保存一些节点信息 kakfa紧耦合zookeeper
kafka当中数据消费的时候,消费者都需要指定属于哪一个消费组
一个消费组里面,可以有多个消费者

消费组:同一时间,一个分区里面的数据,只能被一个消费组里面的一个线程进行消费
调大分区的个数:可以加快数据的消费的速度

任意时刻,一个分区里面的数据,只能被一个消费组里面的一个线程进行消费
kafka当中的数据消费出现延迟:加大消费者线程数量,加大分区的个数

8、kafka主要组件说明

1、kafka当中的producer说明

producer主要是用于生产消息,是kafka当中的消息生产者,生产的消息通过topic进行归类,保存到kafka的
broker里面去

2、kafka当中的topic说明

1、kafka将消息以topic为单位进行归类
2、topic特指kafka处理的消息源(feeds of messages)的不同分类。
3、topic是一种分类或者发布的一些列记录的名义上的名字。kafka主题始终是支持多用户订阅的;也就是说,一 个主题可以有零个,一个或者多个消费者订阅写入的数据。
4、在kafka集群中,可以有无数的主题。
5、生产者和消费者消费数据一般以主题为单位。更细粒度可以到分区级别。

3、kafka当中的partition说明

kafka当中,topic是消息的归类,一个topic可以有多个分区,每个分区保存部分topic的数据,所有的partition当中的数据全部合并起来,就是一个topic当中的所有的数据,
一个broker服务下,是否可以创建多个分区?
可以的,broker数与分区数没有关系; 在kafka中,每一个分区会有一个编号:编号从0开始
每一个分区的数据是有序的
说明-数据是有序 如何保证一个主题下的数据是有序的?(生产是什么样的顺序,那么消费的时候也是什么样的顺序)

19 kafka消息队列

topic的Partition数量在创建topic时配置。

Partition数量决定了每个Consumer group中并发消费者的最大数量。

Consumer group A 有两个消费者来读取4个partition中数据;
Consumer group B有四个消费者来读取4个 partition中的数据

19 kafka消息队列

partition的个数与线程的个数
partition个数  = 线程的个数  刚刚好,一个线程消费一个分区
partition个数 >  线程的个数  有线程需要去消费多个分区里面的数据
partition个数  < 线程的个数  有线程在闲置 

4、kafka当中partition的副本数说明
19 kafka消息队列
副本数(replication-factor):控制消息保存在几个broker(服务器)上,一般情况下等于broker的个数

kakfa当中副本的策略:使用isr这种策略来维护一个副本列表

isr  synchronize  replication :同步完成的副本列表
主分区:可以有多个副本 ,为了最大程度的同步完成数据,使用多个副本,每个副本都启动线程去复制主分区上面的数据
尽量的保证副本分区当中的数据与主分区当中的数据一致的
如果副本分区当中的数据与主分区当中的数据差别太大,将副本分区移除ISR列表
如果副本分区的心跳时间比较久远,也会将副本分区移除ISR列表

5、kafka当中的segment说明

一个partition当中由多个segment文件组成,每个segment文件,包含两部分,
一个是.log文件,另外一个是.index文件,
其中.log文件包含了我们发送的数据存储,
.index文件,记录的是我们.log文件的数据索引值,以便于我们加快数据的查询速度

索引文件与数据文件的关系
比如索引文件中3,497代表:数据文件中的第三个message,它的偏移地址为497。
再来看数据文件中,Message 368772表示:在全局partiton中是第368772个message。

注:segment index file采取稀疏索引存储方式,它减少索引文件大小,通过mmap可以直接内存操作,
稀疏索引为数据文件的每个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间,
但查找起来需要消耗更多的时间。

19 kafka消息队列

二、kafka集群环境搭建

1、初始化环境准备

安装jdk,安装zookeeper并保证zk服务正常启动

2、下载安装包并上传解压

通过以下地址进行下载安装包
node1执行以下命令,下载并解压

cd /export/softwares
wget http://archive.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz
tar –zxvf  kafka_2.11-1.0.0.tgz -C /export/servers/

3、node1服务器修改kafka配置文件

node1执行以下命令进入到kafka的配置文件目录,修改配置文件
node1执行以下命令创建数据文件存放目录

mkdir -p  /export/servers/kafka_2.11-1.0.0/logs 
cd /export/servers/kafka_2.11-1.0.0/config
vim server.properties
broker.id=0 #
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/export/servers/kafka_2.11-1.0.0/logs #
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=node1:2181,node2:2181,node3:2181 # 
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true #
host.name=node1 #

4、安装包分发到其他服务器上面去

node1执行以下命令,将node1服务器的kafka安装包发送到node2和node3服务器上面去

cd /export/servers/
scp -r kafka_2.11-1.0.0/ node02:PWD
scp -r kafka_2.11-1.0.0/ node03:PWD

5、node2与node3服务器修改配置文件

node2使用以下命令修改kafka配置文件(server.properties)

broker.id=1
host.name=node2

node3使用以下命令修改kafka配置文件(server.properties)

broker.id=2
host.name=node3

6、kafka集群启动与停止

注意事项:在kafka启动前,一定要让zookeeper启动起来。
node1执行以下命令将kafka进程启动在后台

cd /export/servers/kafka_2.11-1.0.0
nohup bin/kafka-server-start.sh config/server.properties 2>&1 &

node2执行以下命令将kafka进程启动在后台

cd /export/servers/kafka_2.11-1.0.0
nohup bin/kafka-server-start.sh config/server.properties 2>&1 &

node3执行以下命令将kafka进程启动在后台

cd /export/servers/kafka_2.11-1.0.0
nohup bin/kafka-server-start.sh config/server.properties 2>&1 &

三台机器也可以执行以下命令停止kafka集群

cd /export/servers/kafka_2.11-1.0.0
bin/kafka-server-stop.sh

三、Kafka集群操作

1.基本命令

1、创建topic
创建一个名字为test的主题, 有三个分区,有两个副本
node1执行以下命令来创建topic

cd /export/servers/kafka_2.11-1.0.0
bin/kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic test --zookeeper node1:2181,node2:2181,node3:2181

2、查看主题命令
查看kafka当中存在的主题
node1使用以下命令来查看kafka当中存在的topic主题

cd /export/servers/kafka_2.11-1.0.0
bin/kafka-topics.sh  --list --zookeeper node1:2181,node2:2181,node3:2181

3、生产者生产数据
模拟生产者来生产数据
node1服务器执行以下命令来模拟生产者进行生产数据

cd /export/servers/kafka_2.11-1.0.0
bin/kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test

4、消费者消费数据
node2服务器执行以下命令来模拟消费者进行消费数据

cd /export/servers/kafka_2.11-1.0.0
using the new consumer by passing [bootstrap-server] instead of [zookeeper]
bin/kafka-console-consumer.sh --from-beginning --topic test  --zookeeper node01:2181,node02:2181,node03:2181
bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --from-beginning --topic test

5、运行describe topics命令

node1执行以下命令运行describe查看topic的相关信息

cd /export/servers/kafka_2.11-1.0.0
bin/kafka-topics.sh --describe --zookeeper node1:2181 --topic test

6、增加topic分区数
任意kafka服务器执行以下命令可以增加topic分区数

cd /export/servers/kafka_2.11-1.0.0
bin/kafka-topics.sh --zookeeper node1:2181 --alter --topic test --partitions 4

19 kafka消息队列
7、增加配置
动态修改kakfa的配置
任意kafka服务器执行以下命令可以增加topic分区数

cd /export/servers/kafka_2.11-1.0.0
bin/kafka-topics.sh --zookeeper node1:2181 --alter --topic test --config flush.messages=1

8、删除配置
动态删除kafka集群配置

cd /export/servers/kafka_2.11-1.0.0
bin/kafka-topics.sh --zookeeper node1:2181 --alter --topic test --delete-config flush.messages

9、删除topic
目前删除topic在默认情况下知识打上一个删除的标记,在重新启动kafka后才删除。如果需要立即删除,则需要在

server.properties中配置:
delete.topic.enable=true
然后执行以下命令进行删除topic
kafka-topics.sh --zookeeper node1:2181--delete --topic test

2.kafka的JavaAPI操作

1、创建maven工程并添加jar包

创建maven工程并添加以下依赖jar包的坐标到pom.xml

<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.0</version>
</dependency>    
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>1.0.0</version>
    </dependency>

</dependencies>

<build>
    <plugins>
        <!-- java编译插件 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
    </plugins>
</build>

2、生产者代码

package cn.itcast.kafka.demo1;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * 向test topic里面发送数据
 */
public class MyProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        //指定Kafka的服务器地址
        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        //消息确认机制
        props.put("acks", "all");
        //重试机制
        props.put("retries", 0);
        //批量发送的大小
        props.put("batch.size", 16384);
        //消息的延迟
        props.put("linger.ms", 1);
        //消息缓冲区大小
        props.put("buffer.memory", 33554432);
        //定义key和value的序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //第四种自定义分区需要添加,自定义类
        props.put("partitioner.class","cn.itcast.kafka.demo1.MyPartitioner");

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
        for (int i=0;i<100;i++){
        	//第一种:既没有指定key,也没有指定分区号,使用轮询的方式
            kafkaProducer.send(new ProducerRecord<>("test","这是第" + i + "条数据"));
            //第二种:指定数据key,使用key的hashCode码值来进行分区,一定要注意,key要变化
            kafkaProducer.send(new ProducerRecord<>("test", "mykey"+i,"这是第" + i + "条数据"));
            //第三种:指定分区号来进行分区
            kafkaProducer.send(new ProducerRecord<>("test", 1,"mykey"+i,"这是第" + i + "条数据"));
            //第四种:自定义分区策略,不需要指定分区号,如果指定了分区号还是会发送到指定的分区
            kafkaProducer.send(new ProducerRecord<>("test","mykey"+i,"这是第" + i + "条数据"));
        }
        kafkaProducer.close();

    }
}
package cn.itcast.kafka.demo1;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class MyPartitioner implements Partitioner {
    /**
     * 通过这个方法来自定义我们数据的分区规则
     * @param topic
     * @param key
     * @param keyBytes
     * @param value
     * @param valueBytes
     * @param cluster
     * @return 返回int值,这个值就是分区号
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return 3;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

19 kafka消息队列

kafka的数据的分区策略:
kafka五个分区:由于某种原因 0,1,2三个分区里面的数据太多,3,4分区里面的数据太少。

ProducerRecord源码翻译:
如果指定了分区号,直接将数据发送到指定的分区里面去
如果没有指定分区号,数据带了发送的key,通过key取hashCode决定数据究竟发送到哪一个分区里面去
如果既没有指定分区号,也没有指定数据key,使用 round-robin fashion  轮询策略
如果使用key来作为分区的依据,key一定要是变化的,保证数据发送到不同的分区里面去

分区方式:

第一种:既没有指定key,也没有指定分区号,使用轮询的方式
第二种:指定数据key,使用key的hashCode码值来进行分区,一定要注意,key要变化
第三种:指定分区号来进行分区
第四种:自定义分区策略

3、消费者代码

3.1、自动提交offset

package cn.itcast.kafka.demo2;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class MyConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        //指定Kafka的服务器地址
        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        //指定消费组的名字
        props.put("group.id", "testGroup");
        //允许程序自动提交offset 提交offset保存到了Kafka当中的一个topic中取
        props.put("enable.auto.commit", "true");
        //每隔多长时间提交一次offset的值
        /**
         * 157 hello offset 上一秒提交的offset
         *
         * 287 hello world
         * 295 abc test 900ms 宕机了怎么办?
         * 351 hello abc 1000ms
         *
         * 有可能造成重复消费的一些问题
         *
         */
        props.put("auto.commit.interval.ms", "1000");
        //定义key和value的序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //定义KafkaConsumer
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
        //订阅test这个topic,去消费这个topic里面的数据
        kafkaConsumer.subscribe(Arrays.asList("test"));
        //使用死循环拉取数据
        while(true){
            //所有拉取的数据都封装在了ConsumerRecords
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
            //consumerRecord就是我们每一条数据
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                int partition = consumerRecord.partition(); //获取数据对应的分区号
                String value = consumerRecord.value(); //对应数据值
                long offset = consumerRecord.offset(); //对应数据的偏移量
                String key = consumerRecord.key(); //对应数据发送的key
                System.out.println("数据的key为:"+key+"数据的value为:"+value+"数据的offset为:"+offset+"数据的分区为:"+partition);
            }
        }

    }
}

19 kafka消息队列

3.2、手动提交offset

package cn.itcast.kafka.demo2;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class CommitOffsetByHand {
    public static void main(String[] args) {
        Properties props = new Properties();
        //指定Kafka的服务器地址
        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        //指定消费组的名字
        props.put("group.id", "testGroup");
        //不允许程序自动提交offset,需要我们消费完成后手动提交
        props.put("enable.auto.commit", "false");

        //定义key和value的序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //定义KafkaConsumer
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
        //订阅test这个topic,去消费这个topic里面的数据
        kafkaConsumer.subscribe(Arrays.asList("test"));
        //使用死循环拉取数据
        while(true){
            //所有拉取的数据都封装在了ConsumerRecords
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
            //consumerRecord就是我们每一条数据
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                int partition = consumerRecord.partition(); //获取数据对应的分区号
                String value = consumerRecord.value(); //对应数据值
                long offset = consumerRecord.offset(); //对应数据的偏移量
                String key = consumerRecord.key(); //对应数据发送的key
                System.out.println("数据的key为:"+key+"数据的value为:"+value+"数据的offset为:"+offset+"数据的分区为:"+partition);
            }
            //ConsumerRecords 里面的数据全部消费完了,提交offset
            //使用异步提交的方式不会阻塞程序的消费
            kafkaConsumer.commitAsync();
            // kafkaConsumer.commitSync(); 同步的进行提交,消费数据完成后进行提交offset,完成提交后,才能继续下一次消费
        }

    }
}

3.3、消费完每个分区之后手动提交offset

package cn.itcast.kafka.demo2;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.*;

public class CommitPartition {
    public static void main(String[] args) {
        Properties props = new Properties();
        //指定Kafka的服务器地址
        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        //指定消费组的名字
        props.put("group.id", "testGroup");
        //不允许程序自动提交offset,需要我们消费完成后手动提交
        props.put("enable.auto.commit", "false");
        //定义key和value的序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //定义KafkaConsumer
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
        //订阅test这个topic,去消费这个topic里面的数据
        kafkaConsumer.subscribe(Arrays.asList("test"));
        while (true){
            //调用poll方法,获取所有的数据,包含了各分区里面的数据都有
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(3000);
            //获取到一个topic里面的所有分区
            Set<TopicPartition> partitions = consumerRecords.partitions();
            for (TopicPartition topicPartition : partitions) {
                //获取到一个分区里面的所有数据
                List<ConsumerRecord<String, String>> records = consumerRecords.records(topicPartition);
                //long lastOffset=0;
                for (ConsumerRecord<String, String> record : records) {
                    int partition = record.partition(); //获取数据对应的分区号
                    String value = record.value(); //对应数据值
                    long lastOffset = record.offset(); //对应数据的偏移量
                    String key = record.key(); //对应数据发送的key
                    System.out.println("数据的key为:"+key+"数据的value为:"+value+"数据的offset为:"+lastOffset+"数据的分区为:"+partition);
                }
                //提交partition的offset值
                //Map<TopicPartition, OffsetAndMetadata> offsets
                // 获取分区里面数据的最后一条数据的offset的值
                //1.集合取值
                long offset = records.get(records.size() - 1).offset();
                //2.lastOffset
                Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap = Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset));
                //处理完成一个分区里面的数据,然后提交offset
                kafkaConsumer.commitSync(topicPartitionOffsetAndMetadataMap);
            }
        }
    }
}

3.4、指定分区数据进行消费

package cn.itcast.kafka.demo2;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.*;

/**
 * 实现指定分区进行消费
 *
 */
public class ConsumerMypartition {
    public static void main(String[] args) {
        Properties props = new Properties();
        //指定Kafka的服务器地址
        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        //指定消费组的名字
        props.put("group.id", "testGroup2");
        //不允许程序自动提交offset,需要我们消费完成后手动提交
        props.put("enable.auto.commit", "false");
        //定义key和value的序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //定义KafkaConsumer
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
        //订阅test这个topic,去消费这个topic里面的数据
        //Collection<TopicPartition> partitions
        //创建一个集合,集合的泛型是topicPartition
        TopicPartition topicPartition0 = new TopicPartition("test", 0);
        TopicPartition topicPartition1 = new TopicPartition("test", 1);
        List<TopicPartition> topicPartitions = Arrays.asList(topicPartition0, topicPartition1);
        //通过assign方法来注册我们只消费某些分区里面的数据
        kafkaConsumer.assign(topicPartitions);
        while (true){
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(3000);
            //获取所有的分区
            Set<TopicPartition> partitions = consumerRecords.partitions();
            for (TopicPartition topicPartition : partitions) {
                //获取到一个分区 里面的数据
                List<ConsumerRecord<String, String>> records = consumerRecords.records(topicPartition);
                for (ConsumerRecord<String, String> record : records) {
                    int partition = record.partition(); //获取数据对应的分区号
                    String value = record.value(); //对应数据值
                    long lastOffset = record.offset(); //对应数据的偏移量
                    String key = record.key(); //对应数据发送的key
                    System.out.println("数据的key为:"+key+"数据的value为:"+value+"数据的offset为:"+lastOffset+"数据的分区为:"+partition);
                }
                //需要提交这个分区的offset值
                long offset = records.get(records.size() - 1).offset();
                kafkaConsumer.commitSync(Collections.singletonMap(topicPartition,new OffsetAndMetadata(offset)));
            }
        }
    }
}

3.5、重复消费与数据丢失

19 kafka消息队列
kafka的数据消费模型:

 exactly  once:消费且仅消费一次
 at  least  once:最少消费一次  出现数据重复消费的问题
 at  most  once : 至多消费一次  出现数据丢失的问题

数据重复消费或者数据丢失的原因造成:offset没有管理好
将offset的值给保存到redis里面去或者hbase里面去
默认的offset保存在哪里??

 可以保存到zk里面去,
 也可以保存到kafka自带的一个topic里面去  __consumer_offsets

3.6、consumer消费者消费数据流程

高阶API high level API

将offset的值,保存在zk当中了,早期的kafka版本,默认都是使用high level  api进行消费的

低阶API low level API

将offset的值,保存在kafka的一个默认的topic里面了

新的版本都是使用low level API进行消费,将数据的offset保存到一个topic里面去了

4、kafka Streams API开发

kafka新版本的一个流式计算的模块,主要用于流式计算,实时计算
案例:

1.使用kafka-stream API实现获取test这个topic里面的数据,然后写入到test2这个topic里面去,
2.并且将数据小写转换成为大写

19 kafka消息队列

第一步:创建一个topic

node1服务器使用以下命令来常见一个topic 名称为test2

cd /export/servers/kafka_2.11-1.0.0/
bin/kafka-topics.sh --create  --partitions 3 --replication-factor 2 --topic test2 --zookeeper node1:2181,node2:2181,node3:2181

第二步:开发StreamAPI

package cn.itcast.kafka.demo1;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * 向test topic里面发送数据
 */
public class MyProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        //指定Kafka的服务器地址
        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        //消息确认机制
        props.put("acks", "all");
        //重试机制
        props.put("retries", 0);
        //批量发送的大小
        props.put("batch.size", 16384);
        //消息的延迟
        props.put("linger.ms", 1);
        //消息缓冲区大小
        props.put("buffer.memory", 33554432);
        //定义key和value的序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//        props.put("partitioner.class","cn.itcast.kafka.demo1.MyPartitioner");
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
        for (int i=0;i<100;i++){
//            kafkaProducer.send(new ProducerRecord<>("test","这是第" + i + "条数据"));
            kafkaProducer.send(new ProducerRecord<>("test", "mykey"+i,"hello" + i + "world"));
//            kafkaProducer.send(new ProducerRecord<>("test", 1,"mykey"+i,"这是第" + i + "条数据"));
            //自定义分区策略,不需要指定分区号,如果指定了分区号还是会发送到指定的分区
//            kafkaProducer.send(new ProducerRecord<>("test","mykey"+i,"这是第" + i + "条数据"));

        }
        kafkaProducer.close();
    }
}
package cn.itcast.kafka.demo3;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import java.util.Properties;

public class StreamAPI {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        //通过KStreamBuilder来实现将我们数据进行流式处理
        KStreamBuilder builder = new KStreamBuilder();
        builder.stream("test").mapValues(line -> line.toString().toUpperCase()).to("test2");
        KafkaStreams streams = new KafkaStreams(builder, props);
        streams.start();
    }
}

19 kafka消息队列

第三步:生产数据

node1执行以下命令,向test这个topic当中生产数据

cd /export/servers/kafka_2.11-1.0.0
bin/kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic test

第四步:消费数据

node2执行一下命令消费test2这个topic当中的数据

cd /export/servers/kafka_2.11-1.0.0
bin/kafka-console-consumer.sh --from-beginning  --topic test2 --zookeeper node1:2181,node2:2181,node3:2181

3.kafka的log存储以及查询机制

kafka中log日志目录及组成

kafka在我们指定的log.dir目录下,会创建一些文件夹;名字是【主题名字-分区名】所组成的文件夹。 
一个topic由多个partition组成的
一个partition里面有多个segment文件段
一个segment里面有两个文件,
.log文件:存放日志数据的文件
.index文件:索引文件
每当.log文件达到1GB的时候,就会产生一个新的segment

第一个segment段:
-rw-r--r-- 1 root root 10485760 Jul 26 11:48 00000000000000000000.index
-rw-r--r-- 1 root root     7775 Jul 26 15:54 00000000000000000000.log

第二个segment段:
-rw-r--r-- 1 root root 10485760 Jul 26 11:48 00000000000000789546.index
-rw-r--r-- 1 root root     7775 Jul 26 15:54 00000000000000789546.log

第三个segment段:
-rw-r--r-- 1 root root 10485760 Jul 26 11:48 00000000000000874569.index
-rw-r--r-- 1 root root     7775 Jul 26 15:54 00000000000000874569.log

第四个segment段:
-rw-r--r-- 1 root root 10485760 Jul 26 11:48 00000000000000984561.index
-rw-r--r-- 1 root root     7775 Jul 26 15:54 00000000000000984561.log



下一个segment的文件的名字,是上一个segment文件最后一条数据的offset值
查找
654789  offset  在哪个segment文件段里面,文件里里面第多少条数据
折半查找  二分查找 来查找数据的offset究竟在哪一个segment段里面去

如果确定了数据的offset在第一个segment里面,怎么继续快速的找到是哪一行数据
.index文件里面存放了一些数据索引值,不会将.log文件里面每一条数据都进行索引,每过一段就索引一次
减少索引文件的大小
索引文件是比较稀疏的,没有将所有的数据都建立索引值 避免索引文件太大

offset
157894  在第358行
257894  第 514行
354678  第612行
514789  第 1200行
714895  第1500行

还是使用折半查找在segment中寻找数据

kafka的log的寻址机制,背下来
1、
第一步:使用折半查找,找数据属于哪一个segment段
第二步:通过.index文件来查找数据究竟对应哪一条数据

segment段的命名规则:
下一个segment起始的数据name值,是上一个segment文件最后一条数据的offset值
19 kafka消息队列
kafka Message的物理结构及介绍
19 kafka消息队列

4.kafka当中的数据不丢失机制

kafka当中如何保证数据不丢失:
1、生产者如何保证数据不丢失 使用ack来确认
2、消费者如何保证数据不丢失 使用offset来记录
3、broker如何保证数据不丢失 副本机制
19 kafka消息队列

生产者生产数据:同步的发送以及异步发送

同步:发送一批数据给kafka后,等待kafka返回结果
1、生产者等待10s,如果broker没有给出ack相应,就认为失败。
2、生产者重试3次,如果还没有响应,就报错

异步:发送一批数据给kafka,只是提供一个回调函数。
1、先将数据保存在生产者端的buffer中。buffer大小是2万条 
2、满足数据阈值或者数量阈值其中的一个条件就可以发送数据。
3、发送一批数据的大小是500条

producer的buffer缓冲区可以装2W条数据,如果数据一直没有发送出去,
如果buffer满了,我们可以设置,设置生产者阻塞,或者设置清空buffer

Kafka集群:

broker如何保证数据不丢失:使用副本的机制,来同步主分区当中的数据

消费者:优先选择主分区当中的数据进行消费,主分区当中的数据是最完整的

如何记录消费到了哪一条避免重复消费或者数据丢失???
通过offset来进行记录,可以将offset保存到redis或者hbase里面去等等,下次消费的时候就将offset取出来,
去进行消费

5.kafka 压力测试

https://blog.csdn.net/laofashi2015/article/details/81111466

数据的存储,都是存储在磁盘里面了:磁盘文件为什么能够做到速度这么快?
实现每秒过万条数据可以轻松处理?

第一个原因:使用pageCache 页缓存技术
第二个原因:顺序的读写磁盘,顺序的读写磁盘的速度比操作内存更快

19 kafka消息队列

6.kafka的配置文件说明

Server.properties配置文件说明

#broker的全局唯一编号,不能重复
broker.id=0

#用来监听链接的端口,producer或consumer将在此端口建立连接
port=9092

#处理网络请求的线程数量
num.network.threads=3

#用来处理磁盘IO的线程数量
num.io.threads=8

#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400

#接受套接字的缓冲区大小
socket.receive.buffer.bytes=102400

#请求套接字的缓冲区大小
socket.request.max.bytes=104857600

#kafka运行日志存放的路径
log.dirs=/export/data/kafka/

#topic在当前broker上的分片个数
num.partitions=2

#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1

#segment文件保留的最长时间,超时将被删除
log.retention.hours=1

#滚动生成新的segment文件的最大时间
log.roll.hours=1

#日志文件中每个segment的大小,默认为1G
log.segment.bytes=1073741824

#周期性检查文件大小的时间
log.retention.check.interval.ms=300000

#日志清理是否打开
log.cleaner.enable=true

#broker需要使用zookeeper保存meta数据
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181

#zookeeper链接超时时间
zookeeper.connection.timeout.ms=6000

#partion buffer中,消息的条数达到阈值,将触发flush到磁盘
log.flush.interval.messages=10000

#消息buffer的时间,达到阈值,将触发flush到磁盘
log.flush.interval.ms=3000

#删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除
delete.topic.enable=true

#此处的host.name为本机IP(重要),如果不改,则客户端会抛出:Producer connection to localhost:9092 unsuccessful 错误!
host.name=kafka01

advertised.host.name=192.168.140.128

日志数据的处理

#日志清理是否打开
log.cleaner.enable=true

#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=8
#日志数据保存的时间  168小时  7天
log.retention.hours=168

kafka当中已经消费掉的数据,没有存在的必要,可以对其进行删除,默认是168小时之后就将过期的segment进行删除掉

控制内存当中的数据多长时间刷新一次到磁盘,或者多少条数据刷新一次到磁盘

#partion buffer中,消息的条数达到阈值,将触发flush到磁盘
log.flush.interval.messages=10000

#消息buffer的时间,达到阈值,将触发flush到磁盘
log.flush.interval.ms=3000

producer生产者配置文件说明

#指定kafka节点列表,用于获取metadata,不必全部指定
metadata.broker.list=node1:9092,node2:9092,node3:9092
# 指定分区处理类。默认kafka.producer.DefaultPartitioner,表通过key哈希到对应分区
#partitioner.class=kafka.producer.DefaultPartitioner
# 是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩。压缩后消息中会有头来指明消息压缩类型,故在消费者端消息解压是透明的无需指定。
compression.codec=none
# 指定序列化处理类
serializer.class=kafka.serializer.DefaultEncoder
# 如果要压缩消息,这里指定哪些topic要压缩消息,默认empty,表示不压缩。
#compressed.topics=

# 设置发送数据是否需要服务端的反馈,有三个值0,1,-1
# 0: producer不会等待broker发送ack 
# 1: 当leader接收到消息之后发送ack 
# -1: 当所有的follower都同步消息成功后发送ack. 
request.required.acks=0 

# 在向producer发送ack之前,broker允许等待的最大时间 ,如果超时,broker将会向producer发送一个error ACK.意味着上一次消息因为某种原因未能成功(比如follower未能同步成功) 
request.timeout.ms=10000

# 同步还是异步发送消息,默认“sync”表同步,"async"表异步。异步可以提高发送吞吐量,
也意味着消息将会在本地buffer中,并适时批量发送,但是也可能导致丢失未发送过去的消息
producer.type=sync

# 在async模式下,当message被缓存的时间超过此值后,将会批量发送给broker,默认为5000ms
# 此值和batch.num.messages协同工作.
queue.buffering.max.ms = 5000

# 在async模式下,producer端允许buffer的最大消息量
# 无论如何,producer都无法尽快的将消息发送给broker,从而导致消息在producer端大量沉积
# 此时,如果消息的条数达到阀值,将会导致producer端阻塞或者消息被抛弃,默认为10000
queue.buffering.max.messages=20000

# 如果是异步,指定每次批量发送数据量,默认为200
batch.num.messages=500

# 当消息在producer端沉积的条数达到"queue.buffering.max.meesages"后 
# 阻塞一定时间后,队列仍然没有enqueue(producer仍然没有发送出任何消息) 
# 此时producer可以继续阻塞或者将消息抛弃,此timeout值用于控制"阻塞"的时间 
# -1: 无阻塞超时限制,消息不会被抛弃 
# 0:立即清空队列,消息被抛弃 
queue.enqueue.timeout.ms=-1


# 当producer接收到error ACK,或者没有接收到ACK时,允许消息重发的次数 
# 因为broker并没有完整的机制来避免消息重复,所以当网络异常时(比如ACK丢失) 
# 有可能导致broker接收到重复的消息,默认值为3.
message.send.max.retries=3

# producer刷新topic metada的时间间隔,producer需要知道partition leader的位置,以及当前topic的情况 
# 因此producer需要一个机制来获取最新的metadata,当producer遇到特定错误时,将会立即刷新 
# (比如topic失效,partition丢失,leader失效等),此外也可以通过此参数来配置额外的刷新机制,默认值600000 
topic.metadata.refresh.interval.ms=60000

consumer消费者配置详细说明

# zookeeper连接服务器地址
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
# zookeeper的session过期时间,默认5000ms,用于检测消费者是否挂掉
zookeeper.session.timeout.ms=5000
#当消费者挂掉,其他消费者要等该指定时间才能检查到并且触发重新负载均衡
zookeeper.connection.timeout.ms=10000
# 指定多久消费者更新offset到zookeeper中。注意offset更新时基于time而不是每次获得的消息。一旦在更新zookeeper发生异常并重启,将可能拿到已拿到过的消息
zookeeper.sync.time.ms=2000
#指定消费 
group.id=itcast
# 当consumer消费一定量的消息之后,将会自动向zookeeper提交offset信息 
# 注意offset信息并不是每消费一次消息就向zk提交一次,而是现在本地保存(内存),并定期提交,默认为true
auto.commit.enable=true
# 自动更新时间。默认60 * 1000
auto.commit.interval.ms=1000
# 当前consumer的标识,可以设定,也可以有系统生成,主要用来跟踪消息消费情况,便于观察
conusmer.id=xxx 
# 消费者客户端编号,用于区分不同客户端,默认客户端程序自动产生
client.id=xxxx
# 最大取多少块缓存到消费者(默认10)
queued.max.message.chunks=50
# 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新  的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册 "Partition Owner registry"节点信息,但是有可能此时旧的consumer尚没有释放此节点, 此值用于控制,注册节点的重试次数. 
rebalance.max.retries=5

# 获取消息的最大尺寸,broker不会像consumer输出大于此值的消息chunk 每次feth将得到多条消息,此值为总大小,提升此值,将会消耗更多的consumer端内存
fetch.min.bytes=6553600

# 当消息的尺寸不足时,server阻塞的时间,如果超时,消息将立即发送给consumer
fetch.wait.max.ms=5000
socket.receive.buffer.bytes=655360
# 如果zookeeper没有offset值或offset值超出范围。那么就给个初始的offset。有smallest、largest、anything可选,分别表示给当前最小的offset、当前最大的offset、抛异常。默认largest
auto.offset.reset=smallest
# 指定序列化处理类
derializer.class=kafka.serializer.DefaultDecoder

四、flume整合kafka

数据采集工具:flume,sqoop
19 kafka消息队列
flume三个组件:

source
channel
sink:kafkaSink
使用flume监控一个文件夹,一旦文件夹下面有了数据,就将数据发送到kafka里面去
source  TailDirSource

第一步:flume下载地址

http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.14.0.tar.gz

第二步:上传解压flume

第三步:配置flume.conf

#为我们的source channel  sink起名
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#指定我们的source收集到的数据发送到哪个管道
a1.sources.r1.channels = c1
#指定我们的source数据收集策略
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /export/servers/flumedata
a1.sources.r1.deletePolicy = never
a1.sources.r1.fileSuffix = .COMPLETED
a1.sources.r1.ignorePattern = ^(.)*//.tmp$
a1.sources.r1.inputCharset = GBK
#指定我们的channel为memory,即表示所有的数据都装进memory当中
a1.channels.c1.type = memory
#指定我们的sink为kafka  sink,并指定我们的sink从哪个channel当中读取数据
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = test
a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1

第四步:启动flume

bin/flume-ng agent --conf conf --conf-file conf/flume.conf --name a1 -Dflume.root.logger=INFO,console

第五步:消费Kafka内数据

bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --from-beginning --topic test

五、CAP理论以及kafka当中的CAP机制

CAP理论:三口锅,但是只有两个锅盖,用于有一口锅盖不住

Consistency:一致性
Availability:可用性
Partition tolerance:分区容错性

在分布式系统当中,这三个指标顶多只能满足其中的两个
所有的分布式系统,都遵循这个CAP定律

hadoop  hbase  redis集群  es  kafka  

19 kafka消息队列
一致性:研究的是多个节点当中的数据是否一样

强一致性:一旦更新之后,其他的节点可以马上感知得到
弱一致性:一旦更新之后,其他的节点,不用更新
最终一致性:一旦更新之后,最终所有的节点都会更新

可用性:研究的就是服务器是否会给响应
分区容错:研究的就是多个分区实现数据的备份机制

kafka满足的是CAP当中的CA:一致性和可用性
不满足 分区容错性 kafka当中使用ISR尽量的避免分区容错性
19 kafka消息队列

ISR列表维护的依据

replica.lag.time.max.ms=10000     副本分区与主分区心跳时间延迟
replica.lag.max.messages=4000    副本分区与主分区消息同步最大差

订单系统:每当有人下单之后,就会打印log4j的日志

六、kafka监控及运维

1、kafka-eagle概述

为了简化开发者和服务工程师维护Kafka集群的工作有一个监控管理工具,叫做 Kafka-eagle。这个管理工具可以很容易地发现分布在集群中的哪些topic分布不均匀,或者是分区在整个集群分布不均匀的的情况。它支持管理多个集群、选择副本、副本重新分配以及创建Topic。同时,这个管理工具也是一个非常好的可以快速浏览这个集群的工具,

2、环境和安装

2.1、环境要求

需要安装jdk,启动zk以及kafka的服务

2.2、安装步骤

2.2.1、下载源码包

kafka-eagle官网:

http://download.kafka-eagle.org/

我们可以从官网上面直接下载最细的安装包即可kafka-eagle-bin-1.3.2.tar.gz这个版本即可

2.2.2、解压

这里我们选择将kafak-eagle安装在第三台
直接将kafka-eagle安装包上传到node03服务器的/export/softwares路径下,然后进行解压
node3服务器执行一下命令进行解压

cd /export/softwares/
tar -zxf kafka-eagle-bin-1.3.2.tar.gz -C /export/servers/
cd /export/servers/kafka-eagle-bin-1.3.2
tar -zxf kafka-eagle-web-1.3.2-bin.tar.gz

2.2.3、准备数据库

kafka-eagle需要使用一个数据库来保存一些元数据信息,我们这里直接使用msyql数据库来保存即可,在node3服务器执行以下命令创建一个mysql数据库即可

进入mysql客户端
mysql -uroot -p
create database eagle;

2.2.4、修改kafak-eagle配置文件

node3执行以下命令修改kafak-eagle配置文件
cd /export/servers/kafka-eagle-bin-1.3.2/kafka-eagle-web-1.3.2/conf
vim system-config.properties

kafka.eagle.zk.cluster.alias=cluster1,cluster2
cluster1.zk.list=node1:2181,node2:2181,node3:2181
cluster2.zk.list=node1:2181,node2:2181,node3:2181

kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://node3:3306/eagle
kafka.eagle.username=root
kafka.eagle.password=123456

2.2.5、配置环境变量

kafka-eagle必须配置环境变量,node03服务器执行以下命令来进行配置环境变量
vim /etc/profile

export KE_HOME=/export/servers/kafka-eagle-bin-1.3.2/kafka-eagle-web-1.3.2
export PATH=:KE_HOME/bin:PATH
让修改立即生效,执行  
source /etc/profile

2.2.6、启动kafka-eagle

node3执行以下界面启动kafka-eagle

cd /export/servers/kafka-eagle-bin-1.3.2/kafka-eagle-web-1.3.2/bin
chmod u+x ke.sh
./ke.sh start

2.2.7、主界面

访问kafka-eagle

http://node3:8048/ke/account/signin?/ke/
用户名:admin
密码:123456

七、实时看板案例

1、项目需求梳理

根据订单mq,快速计算双11当天的订单量、销售金额
19 kafka消息队列

2、项目架构模型

支付系统+kafka+ redis
1、支付系统发送mq到kafka集群中,编写程序消费kafka的数据并计算实时的订单数量、订单数量
2、将计算的实时结果保存在redis中
3、外部程序访问redis的数据实时展示结果
19 kafka消息队列

3、订单数据模型

19 kafka消息队列
订单编号、订单时间、支付编号、支付时间、商品编号、商家名称、商品价格、优惠价格、支付金额
19 kafka消息队列

4、指标需求

平台运维角度统计指标

平台总销售额度 
 redisRowKey设计  itcast:order:total:price:date
平台今天下单人数
redisRowKey设计  itcast:order:total:user:date
平台商品销售数量
redisRowKey设计  itcast:order:total:num:date

商品销售角度统计指标

每个商品的总销售额
Redis的rowKey设计itcast:order:productId:price:date
每个商品的购买人数
Redis的rowKey设计itcast:order:productId:user:date
每个商品的销售数量
Redis的rowKey设计itcast:order:productId:num:date

店铺销售角度统计指标

每个店铺的总销售额
Redis的rowKey设计itcast:order:shopId:price:date
每个店铺的购买人数
Redis的rowKey设计itcast:order:shopId:user:date
每个店铺的销售数量
Redis的rowKey设计itcast:order:shopId:num:date

19 kafka消息队列

5、kafka 当中的topic创建,以及模拟消息生产程序

1、创建我们的topic

bin/kafka-topics.sh  --create --replication-factor 2 --topic itcast_order --zookeeper node1:2181,node2:2181,node3:2181 --partitions 5

2、创建maven项目并导入必须依赖的jar包

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>1.0.0</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.41</version>
    </dependency>
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>2.9.0</version>
    </dependency>

    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>


</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.4.1</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>cn.itcast.realboard.LogOperate</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
         <plugin>
                 <artifactId> maven-assembly-plugin </artifactId>
                 <configuration>
                      <descriptorRefs>
                           <descriptorRef>jar-with-dependencies</descriptorRef>
                      </descriptorRefs>
                      <archive>
                           <manifest>
                                <mainClass>cn.itcast.realboard.LogOperate</mainClass>
                           </manifest>
                      </archive>
                 </configuration>
                 <executions>
                      <execution>
                           <id>make-assembly</id>
                           <phase>package</phase>
                           <goals>
                                <goal>single</goal>
                           </goals>
                      </execution>
                 </executions>
            </plugin>
    </plugins>
</build>

6、代码实现

消息生产代码实现

第一步:创建我们的订单实体类

package cn.itcast.realboard;

import com.alibaba.fastjson.JSONObject;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.UUID;

public class PaymentInfo {
    private static final long serialVersionUID = -7958315778386204397L;
    private String orderId;//订单编号
    private Date createOrderTime;//订单创建时间
    private String paymentId;//支付编号
    private Date paymentTime;//支付时间
    private String productId;//商品编号
    private String productName;//商品名称
    private long productPrice;//商品价格
    private long promotionPrice;//促销价格
    private String shopId;//商铺编号
    private String shopName;//商铺名称
    private String shopMobile;//商品电话
    private long payPrice;//订单支付价格
    private int num;//订单数量
    /**
     * <Province>19</Province>
     * <City>1657</City>
     * <County>4076</County>
     */
    private String province; //省
    private String city; //市
    private String county;//县
    //102,144,114
    private String catagorys;
    public String getProvince() {
        return province;
    }
    public void setProvince(String province) {
        this.province = province;
    }

    public String getCity() {
        return city;
    }

    public void setCity(String city) {
        this.city = city;
    }

    public String getCounty() {
        return county;
    }

    public void setCounty(String county) {
        this.county = county;
    }

    public String getCatagorys() {
        return catagorys;
    }

    public void setCatagorys(String catagorys) {
        this.catagorys = catagorys;
    }

    public PaymentInfo() {
    }

    public PaymentInfo(String orderId, Date createOrderTime, String paymentId, Date paymentTime, String productId, String productName, long productPrice, long promotionPrice, String shopId, String shopName, String shopMobile, long payPrice, int num) {
        this.orderId = orderId;
        this.createOrderTime = createOrderTime;
        this.paymentId = paymentId;
        this.paymentTime = paymentTime;
        this.productId = productId;
        this.productName = productName;
        this.productPrice = productPrice;
        this.promotionPrice = promotionPrice;
        this.shopId = shopId;
        this.shopName = shopName;
        this.shopMobile = shopMobile;
        this.payPrice = payPrice;
        this.num = num;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public Date getCreateOrderTime() {
        return createOrderTime;
    }

    public void setCreateOrderTime(Date createOrderTime) {
        this.createOrderTime = createOrderTime;
    }

    public String getPaymentId() {
        return paymentId;
    }

    public void setPaymentId(String paymentId) {
        this.paymentId = paymentId;
    }

    public Date getPaymentTime() {
        return paymentTime;
    }

    public void setPaymentTime(Date paymentTime) {
        this.paymentTime = paymentTime;
    }

    public String getProductId() {
        return productId;
    }

    public void setProductId(String productId) {
        this.productId = productId;
    }

    public String getProductName() {
        return productName;
    }

    public void setProductName(String productName) {
        this.productName = productName;
    }

    public long getProductPrice() {
        return productPrice;
    }

    public void setProductPrice(long productPrice) {
        this.productPrice = productPrice;
    }

    public long getPromotionPrice() {
        return promotionPrice;
    }

    public void setPromotionPrice(long promotionPrice) {
        this.promotionPrice = promotionPrice;
    }

    public String getShopId() {
        return shopId;
    }

    public void setShopId(String shopId) {
        this.shopId = shopId;
    }

    public String getShopName() {
        return shopName;
    }

    public void setShopName(String shopName) {
        this.shopName = shopName;
    }

    public String getShopMobile() {
        return shopMobile;
    }

    public void setShopMobile(String shopMobile) {
        this.shopMobile = shopMobile;
    }

    public long getPayPrice() {
        return payPrice;
    }

    public void setPayPrice(long payPrice) {
        this.payPrice = payPrice;
    }

    public int getNum() {
        return num;
    }

    public void setNum(int num) {
        this.num = num;
    }

    @Override
    public String toString() {
        return "PaymentInfo{" +
                "orderId='" + orderId + '/'' +
                ", createOrderTime=" + createOrderTime +
                ", paymentId='" + paymentId + '/'' +
                ", paymentTime=" + paymentTime +
                ", productId='" + productId + '/'' +
                ", productName='" + productName + '/'' +
                ", productPrice=" + productPrice +
                ", promotionPrice=" + promotionPrice +
                ", shopId='" + shopId + '/'' +
                ", shopName='" + shopName + '/'' +
                ", shopMobile='" + shopMobile + '/'' +
                ", payPrice=" + payPrice +
                ", num=" + num +
                '}';
    }

    public String random() throws ParseException {
        this.orderId = UUID.randomUUID().toString().replaceAll("-", "");
        this.paymentId = UUID.randomUUID().toString().replaceAll("-", "");
        this.productPrice = new Random().nextInt(1000);
        this.promotionPrice = new Random().nextInt(500);
        this.payPrice = new Random().nextInt(480);
        this.shopId = new Random().nextInt(200000)+"";

        this.catagorys = new Random().nextInt(10000)+","+new Random().nextInt(10000)+","+new Random().nextInt(10000);
        this.province = new Random().nextInt(23)+"";
        this.city = new Random().nextInt(265)+"";
        this.county = new Random().nextInt(1489)+"";

        String date = "2015-11-11 12:22:12";
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        try {
            this.createOrderTime = simpleDateFormat.parse(date);
        } catch (ParseException e) {
            e.printStackTrace();
        }
        JSONObject obj = new JSONObject();
        String jsonString = obj.toJSONString(this);
        return jsonString;
        //  return new Gson().toJson(this);
    }

}

第二步:定义log4j.properties配置文件

在项目的src/main/resources路径下创建log4j.properties并进行配置

### 设置###
log4j.rootLogger = debug,stdout,D,E

### 输出信息到控制抬 ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n

### 输出DEBUG 级别以上的日志到=E://logs/error.log ###
log4j.appender.D = org.apache.log4j.DailyRollingFileAppender
#log4j.appender.D.File = E://logs/log.log
log4j.appender.D.File = /export/servers/orderLogs/orderinfo.log
log4j.appender.D.Append = true
log4j.appender.D.Threshold = DEBUG 
log4j.appender.D.layout = org.apache.log4j.PatternLayout
#log4j.appender.D.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n
log4j.appender.D.layout.ConversionPattern = %m%n

### 输出ERROR 级别以上的日志到=E://logs/error.log ###
log4j.appender.E = org.apache.log4j.DailyRollingFileAppender
log4j.appender.E.File = /export/servers/orderLogs/ordererror.log
log4j.appender.E.Append = true
log4j.appender.E.Threshold = ERROR 
log4j.appender.E.layout = org.apache.log4j.PatternLayout
#log4j.appender.E.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n
log4j.appender.E.layout.ConversionPattern =  %m%n

第三步:开发日志生产代码

package cn.itcast.realboard;

import org.apache.log4j.Logger;

import java.text.ParseException;

public class LogOperate {
    private static Logger printLogger = Logger.getLogger("printLogger");

    public static void main(String[] args) throws ParseException, InterruptedException {
        PaymentInfo paymentInfo = new PaymentInfo();
        while (true){
            String random = paymentInfo.random();
            System.out.println(random);
            printLogger.info(random);
            Thread.sleep(1000);
        }
    }
}

第四步:将程序打包并上传服务器运行

将我们的程序进行打包,并上传到node3服务器进行运行,产生日志处理
19 kafka消息队列

第五步:运行jar包

node3执行以下命令运行Java程序

java -jar day12_kafka-1.0-SNAPSHOT-jar-with-dependencies.jar

第六步:开发flume配置文件,实现收集数据到kafka

node3执行以下命令,开发flume配置文件

cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/conf
vim file_kafka.conf
#为我们的source channel  sink起名
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#指定我们的source收集到的数据发送到哪个管道
a1.sources.r1.channels = c1
#指定我们的source数据收集策略
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /export/servers/orderLogs/orderinfo.log

#指定我们的channel为memory,即表示所有的数据都装进memory当中
a1.channels.c1.type = memory
#指定我们的sink为kafka  sink,并指定我们的sink从哪个channel当中读取数据
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = itcast_order
a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1

启动flume

bin/flume-ng agent -c conf -f conf/file_kafka.conf -n a1

第七步:kafka启动console控制台,消费数据以验证数据进入kafka

node1执行以下命令进入kafka控制台进行消费,消费kafka当中的数据以验证数据计入kafka

cd /export/servers/kafka_2.11-1.0.0
bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic itcast_order --from-beginning 

消息消费代码实现

定义redis工具类

定义redis工具类

package cn.itcast.realboard;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
 * 主要用于获取jedis的客户端连接
 *
 */
public class JedisUtils {
    private static JedisPool jedisPool;

    public static JedisPool getJedisPool(){
        if (null==jedisPool){
            JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
            jedisPoolConfig.setMaxTotal(20);
            jedisPoolConfig.setMaxIdle(10);
            jedisPoolConfig.setMaxIdle(5);
            jedisPoolConfig.setMaxWaitMillis(3000);
            jedisPool = new JedisPool(jedisPoolConfig,"node1",6379);
        }
        return jedisPool;
    }

    public static void main(String[] args) {
        JedisPool jedisPool = getJedisPool();
        Jedis resource = jedisPool.getResource();
        resource.set("setkey","setvalue");
        resource.close();
    }

}

开发kafka消费代码

package cn.itcast.realboard;

import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

import java.util.*;

public class MyKafkaConsumer {
    /**
     * 消费itcast_order这个topic里面的数据
     * @param args
     */
    public static void main(String[] args) {
        Properties props = new Properties();
        //指定Kafka的服务器地址
        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        //指定消费组的名字
        props.put("group.id", "testGroup");
        //允许程序自动提交offset 提交offset保存到了Kafka当中的一个topic中取
        props.put("enable.auto.commit", "false");
        //每隔多长时间提交一次offset的值
        /**
         * 157 hello offset 上一秒提交的offset
         *
         * 287 hello world
         * 295 abc test 900ms 宕机了怎么办?
         * 351 hello abc 1000ms
         *
         * 有可能造成重复消费的一些问题
         *
         */
        //props.put("auto.commit.interval.ms", "1000");
        //定义key和value的序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
        kafkaConsumer.subscribe(Arrays.asList("itcast_order"));
        while (true){
            //获取topic中所有数据
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(3000);
            Set<TopicPartition> partitions = consumerRecords.partitions();
            for (TopicPartition topicPartition : partitions) {
                List<ConsumerRecord<String, String>> records = consumerRecords.records(topicPartition);
                for (ConsumerRecord<String, String> record : records) {
                    //处理业务逻辑
                    JedisPool jedisPool = JedisUtils.getJedisPool();
                    Jedis jedis = jedisPool.getResource(); //获取jedis客户端

                    String value = record.value();//获取json格式字符串
                    //将json格式字符串转换成为对象
                    PaymentInfo paymentInfo = JSONObject.parseObject(value, PaymentInfo.class);
                    long payPrice = paymentInfo.getPayPrice();

                    //redis中的key一般都是约定俗成的
                    //求取平台销售总额度
                    jedis.incrBy("itcast:order:total:price:date",payPrice);
                    //平台今天下单人数
                    jedis.incr("itcast:order:total:user:date");
                    //平台商品销售数量 简单认为一个订单就一个商品
                    jedis.incr("itcast:order:total:num:date");

                    //每个商品的总销售额
                    jedis.incrBy("itcast:order:"+paymentInfo.getProductId()+":price:date",payPrice);
                    //每个商品的购买人数
                    jedis.incr("itcast:order:"+paymentInfo.getProductId()+":user:date");
                    //每个商品的销售数量
                    jedis.incr("itcast:order:"+paymentInfo.getProductId()+":num:date");

                    //每个店铺的总销售额
                    jedis.incrBy("itcast:order:"+paymentInfo.getShopId()+":price:date",payPrice);
                    //每个店铺的购买人数
                    jedis.incr("itcast:order:"+paymentInfo.getShopId()+":user:date");
                    //每个店铺的销售数量
                    jedis.incr("itcast:order:"+paymentInfo.getShopId()+":num:date");
                    jedis.close();
                }
                //每个分区完成之后提交一次offset值
                long offset = records.get(records.size() - 1).offset();
                Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap = Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset));
                kafkaConsumer.commitSync(topicPartitionOffsetAndMetadataMap);
            }
        }

    }
}

成功写入redis数据库模拟实时数据读取
19 kafka消息队列

标签:

拜师教育学员文章:作者:976-沈同学, 转载或复制请以 超链接形式 并注明出处 拜师资源博客
原文地址:《19 kafka消息队列》 发布于2020-06-23

分享到:
赞(0) 打赏

评论 抢沙发

评论前必须登录!

  注册



长按图片转发给朋友

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏

Vieu3.3主题
专业打造轻量级个人企业风格博客主题!专注于前端开发,全站响应式布局自适应模板。

登录

忘记密码 ?

您也可以使用第三方帐号快捷登录

Q Q 登 录
微 博 登 录