java实现kafka发送消息和接收消息(java无注解方式+springBoot注解方式)
条件:搭建好 kafka 环境
搭建 zookeeper+kafka 地址:https://www.cnblogs.com/weibanggang/p/12377055.html
1、java 无注解方式
加入 kafka 包:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.0</version> </dependency>
消费者代码
package com.wbg.springboot_kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class Consumer extends Thread {
KafkaConsumer</span><Integer,String><span style="color: rgba(0, 0, 0, 1)"> consumer; String topic; </span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> Consumer(String topic){ Properties properties</span>=<span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,</span>"192.168.198.129:9092,192.168.198.130:9092,192.168.198.131:9092"<span style="color: rgba(0, 0, 0, 1)">); properties.put(ConsumerConfig.CLIENT_ID_CONFIG,</span>"consumer"<span style="color: rgba(0, 0, 0, 1)">); properties.put(ConsumerConfig.GROUP_ID_CONFIG,</span>"consumer"<span style="color: rgba(0, 0, 0, 1)">); properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,</span>"30000"<span style="color: rgba(0, 0, 0, 1)">); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,</span>"1000"); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">自动提交(批量确认)</span> properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.<span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)">.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.</span><span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)">.getName()); </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">一个新的group的消费者去消费一个topic</span> properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">这个属性. 它能够消费昨天发布的数据</span> consumer=<span style="color: rgba(0, 0, 255, 1)">new</span> KafkaConsumer<Integer, String><span>(properties); </span><span style="color: rgba(0, 0, 255, 1)">this</span>.topic =<span> topic; } @Override </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span> run() { consumer.subscribe(Collections.singleton(</span><span style="color: rgba(0, 0, 255, 1)">this</span><span>.topic)); </span><span style="color: rgba(0, 0, 255, 1)">while</span> (<span style="color: rgba(0, 0, 255, 1)">true</span><span>){ ConsumerRecords</span><Integer,String> consumerRecords = consumer.poll(Duration.ofSeconds(1<span>)); consumerRecords.forEach(record </span>-><span>{ System.out.println(record.key()</span>+"->"+record.value()+"->"+<span>record.offset()); }); } } </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span> main(String[] args) { </span><span style="color: rgba(0, 0, 255, 1)">new</span> Consumer("test_partition"<span>).start(); } }</span></pre>
生产者代码
package com.wbg.springboot_kafka;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.TimeUnit;public class Producer extends Thread {
KafkaProducer<Integer, String> producer;
String topic;</span><span style="color: rgba(0, 0, 255, 1)">public</span><span style="color: rgba(0, 0, 0, 1)"> Producer(String topic) { Properties properties </span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, </span>"192.168.198.129:9092,192.168.198.130:9092,192.168.198.131:9092"<span style="color: rgba(0, 0, 0, 1)">); properties.put(ProducerConfig.CLIENT_ID_CONFIG, </span>"producer"<span style="color: rgba(0, 0, 0, 1)">); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.</span><span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)">.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.</span><span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)">.getName()); producer </span>= <span style="color: rgba(0, 0, 255, 1)">new</span> KafkaProducer<Integer, String><span style="color: rgba(0, 0, 0, 1)">(properties); </span><span style="color: rgba(0, 0, 255, 1)">this</span>.topic =<span style="color: rgba(0, 0, 0, 1)"> topic; } @Override </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> run() { </span><span style="color: rgba(0, 0, 255, 1)">int</span> num = 0<span style="color: rgba(0, 0, 0, 1)">; </span><span style="color: rgba(0, 0, 255, 1)">while</span> (num < 20<span style="color: rgba(0, 0, 0, 1)">) { </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)"> { String msg </span>= "kafka msg " +<span style="color: rgba(0, 0, 0, 1)"> num; producer.send(</span><span style="color: rgba(0, 0, 255, 1)">new</span> ProducerRecord<>(topic, 3, msg), ((recordMetadata, e) -><span style="color: rgba(0, 0, 0, 1)"> { System.out.println(recordMetadata.offset() </span>+ "->" +<span style="color: rgba(0, 0, 0, 1)"> recordMetadata.partition()); })); TimeUnit.SECONDS.sleep(</span>2<span style="color: rgba(0, 0, 0, 1)">); } </span><span style="color: rgba(0, 0, 255, 1)">catch</span><span style="color: rgba(0, 0, 0, 1)"> (InterruptedException e) { e.printStackTrace(); } } } </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> main(String[] args) { </span><span style="color: rgba(0, 0, 255, 1)">new</span> Producer("test_partition"<span style="color: rgba(0, 0, 0, 1)">).start(); }
}
启动生产者
启动消费者
2、SpringBoot 注解方式
pom 依赖:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.0.RELEASE</version> </dependency>
application.properties 文件
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializerspring.kafka.bootstrap-servers=192.168.198.129:9092,192.168.198.130:9092,192.168.198.131:9092
spring.kafka.consumer.group-id=springboot-groupid
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=truespring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
消费者代码
@Component public class KafkaMyConsumer {@KafkaListener(topics </span>= {"test"<span style="color: rgba(0, 0, 0, 1)">}) </span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> listener(ConsumerRecord record){ Optional msg </span>=<span style="color: rgba(0, 0, 0, 1)"> Optional.ofNullable(record.value()); </span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)">(msg.isPresent()){ System.out.println(msg.get());; } }
}
生产者代码
@Component public class KafkaMyProducer { @Autowired private KafkaTemplate<Integer,String> kafkaTemplate;</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">void</span><span style="color: rgba(0, 0, 0, 1)"> send(){ kafkaTemplate.send(</span>"test",1,"msgData"<span style="color: rgba(0, 0, 0, 1)">); }
}
启动
@SpringBootApplication public class SpringbootKafkaApplication {</span><span style="color: rgba(0, 0, 255, 1)">public</span> <span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">void</span> main(String[] args) <span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> InterruptedException { ConfigurableApplicationContext context </span>= SpringApplication.run(SpringbootKafkaApplication.<span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)">,args); KafkaMyProducer kafkaMyProducer </span>= context.getBean(KafkaMyProducer.<span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)">); </span><span style="color: rgba(0, 0, 255, 1)">for</span> (<span style="color: rgba(0, 0, 255, 1)">int</span> i = 0; i < 10; i++<span style="color: rgba(0, 0, 0, 1)">) { kafkaMyProducer.send(); TimeUnit.SECONDS.sleep(</span>3<span style="color: rgba(0, 0, 0, 1)">); } }
}