百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术资源 > 正文

10分钟入门响应式:Springboot整合kafka实现reactive

lipiwang 2025-06-15 17:23 13 浏览 0 评论

Springboot引入Reactor已经有一段时间了,笔者潜伏在各种技术群里暗中观察发现,好像scala圈子的同仁们,似乎对响应式更热衷一点。也许是因为他们对fp理解的更深吧,所以领悟起来障碍性更少一些的原因吧。尽管webflux对于数据库的支持,还不那么完善,也不妨我们试上一试。

首先请允许我引用全部的反应式宣言作为开篇,接下来会介绍webflux整合kafka做一个demo。


反应式宣言


在不同领域中深耕的组织都在不约而同地尝试发现相似的软件构建模式。希望这些系统会更健壮、更具回弹性 、更灵活,也能更好地满足现代化的需求。

近年来,应用程序的需求已经发生了戏剧性的更改,模式变化也随之而来。仅在几年前, 一个大型应用程序通常拥有数十台服务器、 秒级的响应时间、 数小时的维护时间以及GB级的数据。而今,应用程序被部署到了形态各异的载体上, 从移动设备到运行着数以千计的多核心处理器的云端集群。用户期望着毫秒级的响应时间,以及服务100%正常运行(随时可用)。而数据则以PB计量。昨日的软件架构已经根本无法满足今天的需求。

我们相信大家需要一套贯通整个系统的架构设计方案, 而设计中必需要关注的各个角度也已被理清, 我们需要系统具备以下特质:即时响应性(Responsive)、回弹性(Resilient)、弹性(Elastic)以及消息驱动(Message Driven)。我们称这样的系统为反应式系统(Reactive System)。

反应式系统更加灵活、松耦合和 可伸缩。这使得它们的开发和调整更加容易。它们对系统的失败 也更加的包容, 而当失败确实发生时, 它们的应对方案会是得体处理而非混乱无序。反应式系统具有高度的即时响应性, 为用户提供了高效的互动反馈。

反应式系统的特质:

即时响应性: :只要有可能, 系统就会及时地做出响应。即时响应是可用性和实用性的基石, 而更加重要的是,即时响应意味着可以快速地检测到问题并且有效地对其进行处理。即时响应的系统专注于提供快速而一致的响应时间, 确立可靠的反馈上限, 以提供一致的服务质量。这种一致的行为转而将简化错误处理、 建立最终用户的信任并促使用户与系统作进一步的互动。

回弹性:系统在出现失败时依然保持即时响应性。这不仅适用于高可用的、 任务关键型系统——任何不具备回弹性的系统都将会在发生失败之后丢失即时响应性。回弹性是通过复制、 遏制、 隔离以及委托来实现的。失败的扩散被遏制在了每个组件内部, 与其他组件相互隔离, 从而确保系统某部分的失败不会危及整个系统,并能独立恢复。每个组件的恢复都被委托给了另一个(外部的)组件, 此外,在必要时可以通过复制来保证高可用性。(因此)组件的客户端不再承担组件失败的处理。

弹性: 系统在不断变化的工作负载之下依然保持即时响应性。反应式系统可以对输入(负载)的速率变化做出反应,比如通过增加或者减少被分配用于服务这些输入(负载)的资源。这意味着设计上并没有争用点和中央瓶颈, 得以进行组件的分片或者复制, 并在它们之间分布输入(负载)。通过提供相关的实时性能指标, 反应式系统能支持预测式以及反应式的伸缩算法。这些系统可以在常规的硬件以及软件平台上实现成本高效的弹性。

消息驱动:反应式系统依赖异步的消息传递,从而确保了松耦合、隔离、位置透明的组件之间有着明确边界。这一边界还提供了将失败作为消息委托出去的手段。使用显式的消息传递,可以通过在系统中塑造并监视消息流队列, 并在必要时应用回压, 从而实现负载管理、 弹性以及流量控制。使用位置透明的消息传递作为通信的手段, 使得跨集群或者在单个主机中使用相同的结构成分和语义来管理失败成为了可能。非阻塞的通信使得接收者可以只在活动时才消耗资源, 从而减少系统开销。

大型系统由多个较小型的系统所构成, 因此整体效用取决于它们的构成部分的反应式属性。 这意味着, 反应式系统应用着一些设计原则,使这些属性能在所有级别的规模上生效,而且可组合。世界上各类最大型的系统所依赖的架构都基于这些属性,而且每天都在服务于数十亿人的需求。现在,是时候在系统设计一开始就有意识地应用这些设计原则了, 而不是每次都去重新发现它们。


Springboot Webflux

引入springboot官网的一张图来解释Spring webflux和spring mvc的区别:

Spring MVC is built on the Servlet API and uses a synchronous blocking I/O architecture whth a one-request-per-thread model.

Spring MVC 构建在 Servlet API 之上,使用的是同步阻塞式 I/O 模型,什么是同步阻塞式 I/O 模型呢?就是说,每一个请求对应一个线程去处理。


Spring WebFlux is a non-blocking web framework built from the ground up to take advantage of multi-core, next-generation processors and handle massive numbers of concurrent connections.

Spring WebFlux 是一个异步非阻塞式的 Web 框架,它能够充分利用多核 CPU 的硬件资源去处理大量的并发请求。


我们不难看出,持久层上Webflux针对部分NoSQL有一定的支持,而对应传统的关系型数据库就不那么友善了,这也许就是目前大部分javaer还是做着Crud Boy吧,限制了他们响应式的梦想 ... ...

当然非阻塞IO并不是银弹,如果你想用它来提升应用的访问效率,那么还是放弃吧,引用下面一段话,作为回答

Reactive and non-blocking generally do not make applications run faster.

WebFlux 并不能使接口的响应时间缩短,它仅仅能够提升吞吐量和伸缩性

所以如果你的应用是 IO密集型,还是很建议你试一试的。好了国际惯例TICSMTC...


Talk Is Cheap, Show Me The Code

我们本次应用的流程大体如下:创建一个路由用于生产数据,写入kafka里,然后再由注册的kafka消费者,消费该数据


引入依赖

这次demo使用了gradle代替了maven

Bash
implementation group: 'io.projectreactor.kafka', name: 'reactor-kafka', version: '1.3.4'
implementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.7.4'
implementation group: 'org.springframework.boot', name: 'spring-boot-starter-webflux', version: '2.5.2'


构建实体

该实体,用于在kafka中传输

Bash
package wang.datahub.dto;

public class Warehouse {
   private Long id;
   private String name;
   private String label;
   private String lon;
   private String lat;

   public Long getId() {
       return id;
  }

   public void setId(Long id) {
       this.id = id;
  }

   public String getName() {
       return name;
  }

   public void setName(String name) {
       this.name = name;
  }

   public String getLabel() {
       return label;
  }

   public void setLabel(String label) {
       this.label = label;
  }

   public String getLon() {
       return lon;
  }

   public void setLon(String lon) {
       this.lon = lon;
  }

   public String getLat() {
       return lat;
  }

   public void setLat(String lat) {
       this.lat = lat;
  }

   @Override
   public String toString() {
       return "Warehouse{" +
               "id=" + id +
               ", name='" + name + '\'' +
               ", label='" + label + '\'' +
               ", lon='" + lon + '\'' +
               ", lat='" + lat + '\'' +
               '}';
  }
}


构建service

用于mock数据,并将对象发送至kafka

package wang.datahub.service;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.SenderResult;
import wang.datahub.dto.Warehouse;

import java.util.Random;

@Service
public class WarehouseService {

   public static final String[] WAREHOUSE_NAME = new String[]{"天津仓库","北京仓库","上海仓库","广州仓库","深圳仓库"};
   public static final String[] WAREHOUSE_LEVEL = new String[]{"A级","B级","C级","D级","E级"};


   public Warehouse mock(long id) {
       Random random = new Random();
       try {
           Thread.sleep(random.nextInt(2000));
      } catch (InterruptedException e) {
      }

       Warehouse warehouse = new Warehouse();
       warehouse.setId(id);
       warehouse.setName(WAREHOUSE_NAME[random.nextInt(WAREHOUSE_NAME.length)]);
       warehouse.setLabel(WAREHOUSE_LEVEL[random.nextInt(WAREHOUSE_LEVEL.length)]);
       warehouse.setLon(random.nextDouble()+"");
       warehouse.setLat(random.nextDouble()+"");
       return warehouse;
  }


   @Autowired
   private ReactiveKafkaProducerTemplate template;

   public static final String WAREHOUSE_TOPIC = "warehouse";

   public Mono<Boolean> add(Warehouse warehouse) {
       Mono<SenderResult<Void>> resultMono = template.send(WAREHOUSE_TOPIC, warehouse.getId(), warehouse);
       return resultMono.flatMap(rs -> {
           if(rs.exception() != null) {
               System.out.println("send kafka error" + rs.exception());
               return Mono.just(false);
          }
           return Mono.just(true);
      });
  }


}


构建handler 并 注册 route

构建handler

package wang.datahub.handler;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import wang.datahub.dto.User;
import wang.datahub.dto.Warehouse;
import wang.datahub.service.WarehouseService;

@Component
public class WarehouseHandler {
   @Autowired
   WarehouseService warehouseService;
   private long i = 1;
   public Mono<ServerResponse> addWarehouse(ServerRequest request) {
       //mock 数据
       Warehouse warehouse = warehouseService.mock(i++);
       Mono<Boolean> tag = warehouseService.add(warehouse);
       return ServerResponse.ok().body(tag,Boolean.class);
  }
}

注册route,用于访问

package wang.datahub.route;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import wang.datahub.handler.UserHandler;
import wang.datahub.handler.WarehouseHandler;

@Configuration
public class Routes {

   @Autowired
   UserHandler userHandler;

   @Autowired
   WarehouseHandler warehouseHandler;

   @Bean
   public RouterFunction<ServerResponse> routersFunction(){
       return RouterFunctions
              .route(RequestPredicates.GET("/api/warehouse"),warehouseHandler::addWarehouse);
  }
}


构建kafka消费者

package wang.datahub.kafka.consumer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
import org.springframework.stereotype.Service;
import reactor.kafka.receiver.ReceiverOptions;
import wang.datahub.dto.Warehouse;
import wang.datahub.service.WarehouseService;

import javax.annotation.PostConstruct;
import java.util.Collections;

@Service
public class WarehouseConsumer {
   @Autowired
   private KafkaProperties properties;

   @PostConstruct
   public void consumer() {
       ReceiverOptions<Long, Warehouse> options = ReceiverOptions.create(properties.getConsumer().buildProperties());
       options = options.subscription(Collections.singleton(WarehouseService.WAREHOUSE_TOPIC));
       new ReactiveKafkaConsumerTemplate(options)
              .receiveAutoAck()
              .subscribe(record -> {
                   System.out.println("Warehouse Record:" + record);
              });
  }
}


配置springboot的kafka信息

spring:
kafka:
  producer:
    bootstrap-servers: 172.18.70.184:9092
    key-serializer: org.apache.kafka.common.serialization.LongSerializer
    value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  consumer:
    bootstrap-servers: 172.18.70.184:9092
    key-serializer: org.apache.kafka.common.serialization.LongSerializer
    value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    group-id: warehouse-consumers


构建完整应用


加载kafka配置

package wang.datahub;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
import reactor.kafka.sender.SenderOptions;

@Configuration
public class KafkaConfig {
   @Autowired
   private KafkaProperties properties;

   @Bean
   public ReactiveKafkaProducerTemplate reactiveKafkaProducerTemplate() {
       SenderOptions options = SenderOptions.create(properties.getProducer().buildProperties());
       ReactiveKafkaProducerTemplate template = new ReactiveKafkaProducerTemplate(options);
       return template;
  }
}


启动应用

package wang.datahub;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.reactive.config.EnableWebFlux;

@SpringBootApplication
@EnableWebFlux
public class WebfluxApplication {

   public static void main(String[] args) {
       SpringApplication.run(WebfluxApplication.class, args);
  }
}



kafka no zookeeper

启动kafka还需要额外的zk配置,是不是让你很不爽呢?2.8开始,kafka已经开始准备着手去掉外部zk了,尽管现在还不推荐上生产环境,至少是一个好的开始,下面我们简单的看下,如何抛弃zk

 root@DESKTOP-2J030JA  /mnt/e/devlop/envs/kafka_2.13-2.8.0  bin/kafka-storage.sh random-uuid
dKcraOLdTH6PCYuGizZ1nw
 root@DESKTOP-2J030JA  /mnt/e/devlop/envs/kafka_2.13-2.8.0  bin/kafka-storage.sh format -t dKcraOLdTH6PCYuGizZ1nw -c config/kraft/server.properties
Formatting /tmp/kraft-combined-logs
 root@DESKTOP-2J030JA  /mnt/e/devlop/envs/kafka_2.13-2.8.0  bin/kafka-server-start.sh config/kraft/server.properties

如果本文对你有帮助,别忘记给我个3连 ,点赞,转发,评论,

咱们下期见!答案获取方式:已赞 已评 已关~

学习更多JAVA知识与技巧,关注与私信博主(03)






















原文出处:
https://mp.weixin.qq.com/s/XAO2GbuizlGPGWiWZaJJ-g

相关推荐

微软Office Open XML中的数字签名漏洞

MicrosoftOffice是最广泛使用的办公文档应用程序之一。对于重要文件,如合同和发票,可以对其内容进行签名,以确保其真实性和完整性。自2019年以来,安全研究人员发现了针对PDF和ODF等其...

Javaweb知识 day12 XML(javaweb中xml作用)

一、XML:1.1概念:ExtensibleMarkupLanguage可扩展标记语言*可扩展:标签都是自定义的。<user><student>1.2功能:...

易筋洗髓功——内外同修方可致远(易筋洗髓功口诀)

达摩祖师所传易筋、洗髓两经,一分为二,二实为一,无非以方便法门接引众生,而归于慈悲清净之心地。修炼《易筋经》是为强身健体,修炼《洗髓经》是为修心养性,此二者相辅相成,内外兼修,缺一不可。这是一套传统中...

《增演易筋洗髓内功图说》17卷(1930年(清)周述官撰 4

《增演易筋洗髓内功图说》17卷(1930年(清)周述官撰 5

道家洗髓功修炼要义,洗髓功如何做到丹田聚气?

不管是道家洗髓功,还是洗髓经,其修炼的关键点就在于得气、行气、聚气...那么,作为洗髓功修炼者,具体该怎么做呢?在实际修炼中,就洗髓功的修炼方法来讲,我们可以简单的归纳为修炼三部曲,其具体表现如下:一...

「清风聊练功」师门传我易筋经:聊聊我的学习经历和正身图感受

一个人的眼界认识,是随着是自身的知识积累和水平不断成长的。开篇为什么要说这么一句呢?是从我的学习经历上感受明显的这句话:一处不到一处迷。我们学传统武术,内功功法,也是从小白到明白一步步走的,走的越远,...

内功外练功介绍(练内功 外功)

这里介绍我练习的两套动功心得体会。是老道长的八部金刚功、长寿功和增演易筋洗髓经。八部金刚功外练奇经八脉,练出健康强壮的好身体还是可以的,长寿功也是内练功法。这部功法很好的预防效果。这个大家都认同的。说...

孔德易筋洗髓大全注解(下)(孔德易筋经教学视频)

...

《增演易筋洗髓内功图说》17卷(1930年(清)周述官撰 1

少林空悟老师珍藏

国术典籍:《增演易筋洗髓内功图说》【2024年8月编校】

《增演易筋洗髓内功图说》系养生气功著作,全书共十八卷。清周述官编撰于光绪二十一年(1895年)。清光绪十九年(1893年),僧人静一空悟将少林功法传授于周述官,并将《增益易筋洗髓内功图说》十二卷(按,...

小说:自媒体小白的修道之路-洗髓(自媒体小白运营技巧)

谁应了谁的劫,谁又变成了谁的执念。当沧海遗忘了桑田,这世间又多了一个不回家的人!异域空间中,知生缓缓起身,目光扫了一下小帝后,又转身看向画板上的那朵白色蒲公英,自言道:“白瑛,这一世我们莫要再辜负了!...

这才是少林洗髓经真相:它是静功和导引术与八段锦暗合

不少朋友误解易筋经和洗髓经,将其简单归为强力呼吸的吐纳功以及为了提升房中的关窍功。事实上易筋经和洗髓经是两部功法:易筋经主要为炼体,包含以膜论为核心的十二月怕打筋膜法,以及辅助的呼吸、导引功法;洗髓经...

孔德易筋洗髓大全注解(上)(孔德易筋经洗髓经视频)

...

洗髓经传承与心得(二)(《洗髓经》)

...

取消回复欢迎 发表评论: