java实现kafka发送消息和接收消息(java无注解方式+springBoot注解方式)

 

条件:搭建好 kafka 环境

搭建 zookeeper+kafka 地址:https://www.cnblogs.com/weibanggang/p/12377055.html

 

1、java 无注解方式

加入 kafka 包:

 <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.0</version>
 </dependency>

 

 

 消费者代码

package com.wbg.springboot_kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class Consumer extends Thread {

KafkaConsumer</span>&lt;Integer,String&gt;<span style="color: rgba(0, 0, 0, 1)"> consumer;
String topic;

</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> Consumer(String topic){
    Properties properties</span>=<span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,</span>"192.168.198.129:9092,192.168.198.130:9092,192.168.198.131:9092"<span style="color: rgba(0, 0, 0, 1)">);
    properties.put(ConsumerConfig.CLIENT_ID_CONFIG,</span>"consumer"<span style="color: rgba(0, 0, 0, 1)">);
    properties.put(ConsumerConfig.GROUP_ID_CONFIG,</span>"consumer"<span style="color: rgba(0, 0, 0, 1)">);
    properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,</span>"30000"<span style="color: rgba(0, 0, 0, 1)">);
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,</span>"1000"); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">自动提交(批量确认)</span>
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.<span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)">.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.</span><span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)">.getName());
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">一个新的group的消费者去消费一个topic</span>
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">这个属性. 它能够消费昨天发布的数据</span> consumer=<span style="color: rgba(0, 0, 255, 1)">new</span> KafkaConsumer&lt;Integer, String&gt;<span>(properties); </span><span style="color: rgba(0, 0, 255, 1)">this</span>.topic =<span> topic; } @Override </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span> run() { consumer.subscribe(Collections.singleton(</span><span style="color: rgba(0, 0, 255, 1)">this</span><span>.topic)); </span><span style="color: rgba(0, 0, 255, 1)">while</span> (<span style="color: rgba(0, 0, 255, 1)">true</span><span>){ ConsumerRecords</span>&lt;Integer,String&gt; consumerRecords = consumer.poll(Duration.ofSeconds(1<span>)); consumerRecords.forEach(record </span>-&gt;<span>{ System.out.println(record.key()</span>+"-&gt;"+record.value()+"-&gt;"+<span>record.offset()); }); } } </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span> main(String[] args) { </span><span style="color: rgba(0, 0, 255, 1)">new</span> Consumer("test_partition"<span>).start(); } }</span></pre>

生产者代码

package com.wbg.springboot_kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class Producer extends Thread {
KafkaProducer
<Integer, String> producer;
String topic;

</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> Producer(String topic) {
    Properties properties </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, </span>"192.168.198.129:9092,192.168.198.130:9092,192.168.198.131:9092"<span style="color: rgba(0, 0, 0, 1)">);
    properties.put(ProducerConfig.CLIENT_ID_CONFIG, </span>"producer"<span style="color: rgba(0, 0, 0, 1)">);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.</span><span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)">.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.</span><span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)">.getName());

    producer </span>= <span style="color: rgba(0, 0, 255, 1)">new</span> KafkaProducer&lt;Integer, String&gt;<span style="color: rgba(0, 0, 0, 1)">(properties);
    </span><span style="color: rgba(0, 0, 255, 1)">this</span>.topic =<span style="color: rgba(0, 0, 0, 1)"> topic;
}

@Override
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> run() {
    </span><span style="color: rgba(0, 0, 255, 1)">int</span> num = 0<span style="color: rgba(0, 0, 0, 1)">;
    </span><span style="color: rgba(0, 0, 255, 1)">while</span> (num &lt; 20<span style="color: rgba(0, 0, 0, 1)">) {

        </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> {
            String msg </span>= "kafka msg " +<span style="color: rgba(0, 0, 0, 1)"> num;

            producer.send(</span><span style="color: rgba(0, 0, 255, 1)">new</span> ProducerRecord&lt;&gt;(topic, 3, msg), ((recordMetadata, e) -&gt;<span style="color: rgba(0, 0, 0, 1)"> {
                System.out.println(recordMetadata.offset() </span>+ "-&gt;" +<span style="color: rgba(0, 0, 0, 1)"> recordMetadata.partition());
            }));
            TimeUnit.SECONDS.sleep(</span>2<span style="color: rgba(0, 0, 0, 1)">);
        } </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> main(String[] args) {
    </span><span style="color: rgba(0, 0, 255, 1)">new</span> Producer("test_partition"<span style="color: rgba(0, 0, 0, 1)">).start();
}

}

启动生产者

 

 

 启动消费者

 

 

 

2、SpringBoot 注解方式

pom 依赖:

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.2.0.RELEASE</version>
        </dependency>

 application.properties 文件

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer

spring.kafka.bootstrap-servers=192.168.198.129:9092,192.168.198.130:9092,192.168.198.131:9092

spring.kafka.consumer.group-id=springboot-groupid
spring.kafka.consumer.auto
-offset-reset=earliest
spring.kafka.consumer.enable
-auto-commit=true

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value
-deserializer=org.apache.kafka.common.serialization.StringDeserializer

 

 

 消费者代码

@Component
public class KafkaMyConsumer {
@KafkaListener(topics </span>= {"test"<span style="color: rgba(0, 0, 0, 1)">})
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> listener(ConsumerRecord record){
    Optional msg </span>=<span style="color: rgba(0, 0, 0, 1)"> Optional.ofNullable(record.value());
    </span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)">(msg.isPresent()){
        System.out.println(msg.get());;
    }
}

}

View Code

 

 

 生产者代码

@Component
public class KafkaMyProducer {
    @Autowired
    private KafkaTemplate<Integer,String> kafkaTemplate;
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> send(){
    kafkaTemplate.send(</span>"test",1,"msgData"<span style="color: rgba(0, 0, 0, 1)">);
}

}

View Code

 

 启动

 

 

@SpringBootApplication
public class SpringbootKafkaApplication {
</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">void</span> main(String[] args) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> InterruptedException {
    ConfigurableApplicationContext context </span>= SpringApplication.run(SpringbootKafkaApplication.<span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)">,args);
    KafkaMyProducer kafkaMyProducer </span>= context.getBean(KafkaMyProducer.<span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)">);
    </span><span style="color: rgba(0, 0, 255, 1)">for</span> (<span style="color: rgba(0, 0, 255, 1)">int</span> i = 0; i &lt; 10; i++<span style="color: rgba(0, 0, 0, 1)">) {
        kafkaMyProducer.send();
        TimeUnit.SECONDS.sleep(</span>3<span style="color: rgba(0, 0, 0, 1)">);
    }
}

}