百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术资源 > 正文

Flink中处理维表关联技术实现路径

lipiwang 2025-06-15 17:24 4 浏览 0 评论

在 Flink 中处理维表关联大体氛围Table SQL Lookup Join 和 DataStream 算子函数,主要技术实现路径:

I. Flink SQL/Table API 中的 Lookup Join

Flink SQL/Table API 提供了 LOOKUP JOIN 语法,专门用于将流式数据与维表(通常是存储在外部系统中的批数据)进行关联。其核心在于通过异步或同步的方式查询维表,避免阻塞流处理。

1. 同步 Lookup Join

实现路径:

  • JDBC Lookup Function (MySQL, PostgreSQL等关系型数据库): Flink 内部提供了 JDBC connector,可以通过配置来连接关系型数据库。当流数据到来时,Flink 会向数据库发送查询请求,同步等待查询结果。
  • HBase Lookup Function: Flink 也提供了 HBase Connector,可以用于与 HBase 进行同步关联。
  • Redis Lookup Function: 虽然没有内置的 Redis Connector 作为 Lookup Source,但可以通过自定义 TableFunctionAsyncTableFunction 来实现同步或异步与 Redis 的关联。

优点:

  • 实现简单: 对于支持的数据库,配置相对简单,直接使用 SQL 语法即可。
  • 数据一致性高: 每次查询都是从最新维表数据获取,保证了数据的新鲜度(在不考虑数据库同步延迟的情况下)。

缺点:

  • 性能瓶颈: 同步查询会阻塞 Flink 算子,如果维表查询延迟较高,或者查询并发量大,会严重影响流处理的吞吐量和延迟。这在处理高吞吐量流数据时是一个显著的缺点。
  • 外部依赖: 强依赖外部数据库的性能和可用性。
  • 不适合高并发场景: 容易导致数据库连接池耗尽或数据库负载过高。

示例代码:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class SyncLookupJoinExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); // 示例中使用单并行度

        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        // 1. 定义一个订单数据源 (这里使用 VALUES 生成一个模拟的流)
        // 实际应用中可以替换为 Kafka, Pulsar, Kinesis 等数据源
        tEnv.executeSql(
            "CREATE TEMPORARY TABLE orders (\n" +
            "    order_id INT,\n" +
            "    product_id INT,\n" +
            "    amount DOUBLE,\n" +
            "    proctime AS PROCTIME() -- 处理时间字段\n" +
            ") WITH (\n" +
            "    'connector' = 'datagen',\n" + // 使用 datagen 连接器模拟数据
            "    'rows-per-second' = '1',\n" + // 每秒生成一条数据
            "    'fields.order_id.min' = '1',\n" +
            "    'fields.order_id.max' = '1000',\n" +
            "    'fields.product_id.min' = '101',\n" + // 模拟关联 Product ID
            "    'fields.product_id.max' = '202',\n" +
            "    'fields.amount.min' = '10.0',\n" +
            "    'fields.amount.max' = '1000.0'\n" +
            ")"
        );

        // 2. 定义一个维表数据源 (MySQL JDBC Lookup Table)
        tEnv.executeSql(
            "CREATE TEMPORARY TABLE products_dim (\n" +
            "    product_id INT,\n" +
            "    product_name STRING,\n" +
            "    category STRING\n" +
            ") WITH (\n" +
            "    'connector' = 'jdbc',\n" +
            "    'url' = 'jdbc:mysql://localhost:3306/testdb',\n" + // 替换为你的 MySQL 地址和数据库名
            "    'table-name' = 'products',\n" +
            "    'username' = 'root',\n" +     // 替换为你的 MySQL 用户名
            "    'password' = 'your_password', \n" + // 替换为你的 MySQL 密码
            "    'lookup.cache.max-rows' = '1000',\n" + // JDBC Lookup 支持缓存
            "    'lookup.cache.ttl' = '10min',\n" +    // 缓存过期时间
            "    'lookup.max-retries' = '3' \n" +       // 查询失败重试次数
            ")"
        );

        // 3. 执行 Lookup Join 查询
        Table resultTable = tEnv.sqlQuery(
            "SELECT\n" +
            "    o.order_id,\n" +
            "    o.product_id,\n" +
            "    p.product_name,\n" +
            "    p.category,\n" +
            "    o.amount\n" +
            "FROM\n" +
            "    orders AS o\n" +
            "JOIN products_dim AS p ON o.product_id = p.product_id" // 简单的等值 JOIN 即可
        );

        // 4. 将结果打印到控制台 (或输出到其他 Sink)
        resultTable.execute().print();

        // Flink Job 会持续运行,直到手动停止
        // env.execute("Sync Lookup Join Example"); // 对于execute().print()不需要再execute()
    }
}

如果你的 MySQL 响应慢,你会发现 Flink 的输出也会变慢,因为它是同步阻塞的

2. 异步 Lookup Join (推荐)

实现路径:

  • 自定义 AsyncTableFunction: 这是实现异步 Lookup Join 的核心和推荐方式。
  • 原理: AsyncTableFunction 允许用户编写异步查询逻辑。当流数据到来时,Flink 会并发地向外部维表发起查询请求,但不会阻塞当前处理线程。查询结果返回后,通过 ResultFuture 回调通知 Flink,将维表数据与流数据进行合并。
  • 适用场景: 适用于任何支持异步客户端的外部存储,例如: Redis: 使用 Jedis 或 Lettuce 等异步客户端。 HBase (Async Client): 使用 HBase 提供的异步客户端。 Cassandra: 使用 DataStax Java Driver。 ClickHouse (异步JDBC或HTTP API): 虽然是关系型数据库,但如果其驱动支持异步,或通过 HTTP API 异步访问,也可以实现。 HTTP API (微服务接口): 调用外部微服务接口获取维表数据。
  • 实现步骤: 实现 AsyncTableFunction 接口。eval 方法中发起异步查询,并传入 ResultFuture 当异步查询返回结果时,调用 ResultFuture.complete() 来通知 Flink。 注册自定义的 AsyncTableFunction 到 Table Environment。 在 SQL 中使用 LATERAL TABLE() 语法进行关联。

优点:

  • 高吞吐量和低延迟: 非阻塞查询,允许多个并发查询同时进行,极大地提高了吞吐量,降低了端到端延迟。
  • 资源利用率高: 充分利用 I/O 资源,避免了线程阻塞。
  • 灵活性强: 可以集成任何支持异步查询的外部存储或服务。
  • 背压能力: Flink 的异步 I/O 组件提供了内置的背压机制,可以根据下游处理能力自动调整并发度,防止外部系统过载。

缺点:

  • 实现复杂度相对较高: 需要编写异步查询逻辑,处理回调和异常。
  • 外部系统支持: 依赖外部系统提供的异步客户端或接口。

示例代码:

首先,我们需要创建一个自定义的 AsyncRedisLookupFunction。

import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.util.Collector;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.Map;

public class AsyncRedisLookupFunction extends AsyncTableFunction<RowData> {

    private final String redisHost;
    private final int redisPort;
    private final String redisPassword; // 如果有密码
    private transient JedisPool jedisPool; // transient 表示不参与序列化
    private transient ExecutorService executorService; // 用于Jedis同步操作的线程池,模拟异步

    public AsyncRedisLookupFunction(String redisHost, int redisPort, String redisPassword) {
        this.redisHost = redisHost;
        this.redisPort = redisPort;
        this.redisPassword = redisPassword;
    }

    @Override
    public void open(FunctionContext context) throws Exception {
        super.open(context);
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxTotal(100); // 最大连接数
        poolConfig.setMaxIdle(20);  // 最大空闲连接数
        poolConfig.setMinIdle(5);   // 最小空闲连接数
        poolConfig.setTestOnBorrow(true); // 借用连接时测试
        poolConfig.setTestOnReturn(true); // 归还连接时测试
        poolConfig.setTestWhileIdle(true); // 空闲时测试

        if (redisPassword != null && !redisPassword.isEmpty()) {
            this.jedisPool = new JedisPool(poolConfig, redisHost, redisPort, 2000, redisPassword);
        } else {
            this.jedisPool = new JedisPool(poolConfig, redisHost, redisPort, 2000);
        }

        // 创建一个线程池来模拟异步查询。
        // 实际生产中,更推荐使用 Lettuce 等真正的异步 Redis 客户端。
        this.executorService = Executors.newFixedThreadPool(
                Runtime.getRuntime().availableProcessors() * 2 // 通常设置为 CPU 核数的两倍
        );
    }

    @Override
    public void close() throws Exception {
        if (jedisPool != null) {
            jedisPool.close();
        }
        if (executorService != null) {
            executorService.shutdown();
        }
        super.close();
    }

    // eval 方法接收流中的 join key,并返回一个 CompletableFuture
    // Flink 将在 CompletableFuture 完成时收集结果
    public void eval(CompletableFuture<Collector<RowData>> resultFuture, Integer productId) {
        executorService.submit(() -> {
            try (Jedis jedis = jedisPool.getResource()) {
                String key = "product:" + productId;
                Map<String, String> productInfo = jedis.hgetAll(key);

                if (productInfo != null && !productInfo.isEmpty()) {
                    String productName = productInfo.get("name");
                    String category = productInfo.get("category");
                    // 返回一个 RowData,包含维表查询到的字段
                    // 顺序与定义 Table 时声明的字段顺序一致
                    resultFuture.complete(new SimpleCollector<>(
                            Collections.singletonList(GenericRowData.of(
                                    StringData.fromString(productName),
                                    StringData.fromString(category)
                            ))
                    ));
                } else {
                    // 如果未找到,返回空结果
                    resultFuture.complete(new SimpleCollector<>(Collections.emptyList()));
                }
            } catch (Exception e) {
                // 处理异常,可以返回空结果或抛出异常让 Flink 处理
                resultFuture.completeExceptionally(e);
            }
        });
    }

    // 辅助类,用于将 List<RowData> 收集到 Collector 中
    private static class SimpleCollector<T> implements Collector<T> {
        private final java.util.List<T> list;

        public SimpleCollector(java.util.List<T> list) {
            this.list = list;
        }

        @Override
        public void collect(T record) {
            list.add(record); // 实际上这里我们只收集一个结果
        }

        @Override
        public void close() {
            // No-op
        }
    }
}

现在,我们将在 Flink SQL 中注册并使用这个 AsyncRedisLookupFunction。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.utils.LogicalTypeComparators;
import org.apache.flink.table.types.utils.TypeConversions;

import java.util.Collections;

import static org.apache.flink.table.api.DataTypes.*;

public class AsyncLookupJoinExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); // 示例中使用单并行度,但异步 I/O 可以并行

        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        // 1. 定义一个订单数据源 (Datagen 生成模拟流)
        tEnv.executeSql(
            "CREATE TEMPORARY TABLE orders (\n" +
            "    order_id INT,\n" +
            "    product_id INT,\n" +
            "    amount DOUBLE,\n" +
            "    proctime AS PROCTIME() -- 处理时间字段\n" +
            ") WITH (\n" +
            "    'connector' = 'datagen',\n" +
            "    'rows-per-second' = '10',\n" + // 提高生成速度,更明显地展示异步优势
            "    'fields.order_id.min' = '1',\n" +
            "    'fields.order_id.max' = '1000',\n" +
            "    'fields.product_id.min' = '101',\n" +
            "    'fields.product_id.max' = '202',\n" + // 确保 product_id 在 Redis 示例范围内
            "    'fields.amount.min' = '10.0',\n" +
            "    'fields.amount.max' = '1000.0'\n" +
            ")"
        );

        // 2. 注册自定义的 AsyncRedisLookupFunction
        // 构造函数参数: redisHost, redisPort, redisPassword
        tEnv.createTemporarySystemFunction(
            "AsyncRedisProducts",
            new AsyncRedisLookupFunction("localhost", 6379, "") // 替换为你的 Redis 地址、端口和密码
        );

        // 3. 执行 Lookup Join 查询
        // 使用 LATERAL TABLE(FunctionName(lookup_keys)) 语法
        // 注意:AsyncRedisProducts 函数的返回类型决定了 JOIN 后的字段名
        // AsyncRedisLookupFunction 返回的是 RowData,包含 (product_name, category)
        Table resultTable = tEnv.sqlQuery(
            "SELECT\n" +
            "    o.order_id,\n" +
            "    o.product_id,\n" +
            "    T.product_name,\n" + // T 是 LATERAL TABLE 的别名
            "    T.category,\n" +
            "    o.amount\n" +
            "FROM\n" +
            "    orders AS o,\n" + // 注意这里是逗号连接,而不是 JOIN
            "    LATERAL TABLE(AsyncRedisProducts(o.product_id)) AS T(product_name, category)" // 指定返回的字段名
        );

        // 4. 将结果打印到控制台
        resultTable.execute().print();
    }
}

查询 Redis 是异步进行的,不会阻塞主线程。

II. DataStream API 中的维表关联

在 DataStream API 中,没有直接的 LOOKUP JOIN 概念,但可以通过自定义函数和状态管理来实现维表关联。

1. 使用 RichFlatMapFunction/RichMapFunction + 缓存

实现路径:

  • 原理:open() 方法中初始化外部维表连接或加载部分维表数据到内存(如果维表数据量不大)。在 map()flatMap() 方法中,当流数据到来时,首先尝试从本地缓存中查询维表数据。如果缓存未命中,则向外部维表发起同步查询,并将结果更新到缓存中。
  • 缓存策略: LRU Cache: 维护一个固定大小的最近最少使用缓存。 TTL Cache: 给缓存中的数据设置过期时间,定期清理过期数据。
  • 适用场景: 维表数据量不大,可以完全或部分加载到内存中。 对数据新鲜度要求不是极高,允许一定的缓存延迟。 对实时性要求较高,但外部维表查询延迟较低。

优点:

  • 性能提升: 命中缓存可以避免与外部存储的交互,显著提升性能。
  • 降低外部系统压力: 减少对外部维表的查询次数。
  • 实现相对简单: 比异步 I/O 相对容易理解和实现。

缺点:

  • 数据新鲜度问题: 缓存可能导致数据不新鲜,特别是维表数据更新频繁时。需要实现合适的缓存失效和更新机制。
  • 内存消耗: 维表数据如果过大,可能导致内存溢出。
  • 并发瓶颈: 如果缓存未命中,仍需进行同步查询,在高并发下仍可能成为瓶颈。
  • 缺乏内置背压: 需要手动管理与外部系统的交互,没有像异步 I/O 那样内置的背压机制。

示例代码:

RichFlatMapFunction 实现

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class RedisProductCachedFlatMapFunction extends RichFlatMapFunction<Order, EnrichedOrder> {

    private final String redisHost;
    private final int redisPort;
    private final String redisPassword;
    private transient JedisPool jedisPool;
    private transient Map<Integer, ProductInfo> cache; // LRU 缓存
    private final int cacheMaxSize;
    private final long cacheTTL; // 缓存过期时间,单位毫秒

    // 内部类,存储产品信息
    private static class ProductInfo {
        String productName;
        String category;
        long timestamp; // 记录缓存时间,用于TTL

        public ProductInfo(String productName, String category) {
            this.productName = productName;
            this.category = category;
            this.timestamp = System.currentTimeMillis();
        }
    }

    public RedisProductCachedFlatMapFunction(String redisHost, int redisPort, String redisPassword, int cacheMaxSize, long cacheTTL) {
        this.redisHost = redisHost;
        this.redisPort = redisPort;
        this.redisPassword = redisPassword;
        this.cacheMaxSize = cacheMaxSize;
        this.cacheTTL = cacheTTL;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxTotal(50); // 连接数可以适当减少,因为有缓存
        poolConfig.setMaxIdle(10);

        if (redisPassword != null && !redisPassword.isEmpty()) {
            this.jedisPool = new JedisPool(poolConfig, redisHost, redisPort, 2000, redisPassword);
        } else {
            this.jedisPool = new JedisPool(poolConfig, redisHost, redisPort, 2000);
        }

        // 实现一个简单的 LRU 缓存,带 TTL
        this.cache = new LinkedHashMap<Integer, ProductInfo>(cacheMaxSize, 0.75f, true) {
            @Override
            protected boolean removeEldestEntry(Map.Entry<Integer, ProductInfo> eldest) {
                // 移除最老或过期的条目
                return size() > cacheMaxSize || (System.currentTimeMillis() - eldest.getValue().timestamp > cacheTTL);
            }
        };
    }

    @Override
    public void close() throws Exception {
        if (jedisPool != null) {
            jedisPool.close();
        }
        super.close();
    }

    @Override
    public void flatMap(Order order, Collector<EnrichedOrder> out) throws Exception {
        ProductInfo productInfo = cache.get(order.productId);
        long currentTime = System.currentTimeMillis();

        // 检查缓存是否存在且未过期
        if (productInfo != null && (currentTime - productInfo.timestamp) <= cacheTTL) {
            // 缓存命中
            out.collect(new EnrichedOrder(order, productInfo.productName, productInfo.category));
        } else {
            // 缓存未命中或已过期,从 Redis 查询
            try (Jedis jedis = jedisPool.getResource()) {
                String key = "product:" + order.productId;
                Map<String, String> redisResult = jedis.hgetAll(key);

                if (redisResult != null && !redisResult.isEmpty()) {
                    String productName = redisResult.get("name");
                    String category = redisResult.get("category");
                    productInfo = new ProductInfo(productName, category);
                    cache.put(order.productId, productInfo); // 更新缓存
                    out.collect(new EnrichedOrder(order, productName, category));
                } else {
                    // 未找到维表数据,选择丢弃或使用默认值
                    // System.err.println("Product ID " + order.productId + " not found in Redis.");
                    // out.collect(new EnrichedOrder(order, "UNKNOWN", "UNKNOWN")); // 示例:使用默认值
                }
            } catch (Exception e) {
                System.err.println("Error fetching product info for productId " + order.productId + ": " + e.getMessage());
                // out.collect(new EnrichedOrder(order, "ERROR_PRODUCT", "ERROR_CATEGORY")); // 错误时使用默认值
            }
        }
    }
}

DataStream Job 代码

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;

public class DataStreamCachedLookupExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2); // 可以设置多个并行度,每个并行度会维护自己的缓存

        // 1. 模拟订单数据流
        DataStream<Order> orderStream = env.addSource(new SourceFunction<Order>() {
            private volatile boolean isRunning = true;
            private final Random random = new Random();
            private long orderCounter = 0;

            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                while (isRunning) {
                    orderCounter++;
                    int productId = random.nextInt(202 - 101 + 1) + 101; // 101到202之间
                    double amount = 10.0 + random.nextDouble() * 990.0;
                    ctx.collect(new Order((int) orderCounter, productId, amount, System.currentTimeMillis()));
                    Thread.sleep(50); // 每50毫秒生成一个订单
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        });

        // 2. 使用 RichFlatMapFunction 进行维表关联,带缓存
        DataStream<EnrichedOrder> enrichedStream = orderStream.flatMap(
                new RedisProductCachedFlatMapFunction(
                        "localhost", 6379, "", // 替换为你的Redis地址和密码
                        1000, // 缓存最大条目数
                        5 * 60 * 1000L // 缓存TTL 5分钟 (5 * 60 * 1000 毫秒)
                )
        );

        // 3. 打印结果
        enrichedStream.print();

        env.execute("DataStream Cached Lookup Join Example");
    }
}

观察控制台输出。你会注意到,当缓存生效时,处理速度会非常快。如果产品 ID 命中缓存,则不会访问 Redis。当新的产品 ID 出现或缓存过期时,才会触发对 Redis 的查询。

2. 使用 AsyncDataStream (异步 I/O)

实现路径:

  • 原理: 这是 DataStream API 中实现异步维表关联的推荐方式,与 Table API 的 AsyncTableFunction 原理类似。用户实现 AsyncFunction 接口,在 asyncInvoke() 方法中发起异步查询,并通过 ResultFuture 返回结果。
  • 适用场景: 同步 Lookup Join 的异步版本在 DataStream API 中的实现,适用于对性能、吞吐量和延迟有高要求的场景,并且外部系统支持异步客户端。

优点:

  • 与 Table API 的异步 Lookup Join 类似,具有相同的优点: 高吞吐量、低延迟、高资源利用率、灵活性强、内置背压机制。

缺点:

  • 与 Table API 的异步 Lookup Join 类似,具有相同的缺点: 实现复杂度相对较高,依赖外部系统提供的异步客户端或接口。

示例代码:

AsyncFunction 实现

import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.configuration.Configuration;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class RedisProductAsyncFunction extends RichAsyncFunction<Order, EnrichedOrder> {

    private final String redisHost;
    private final int redisPort;
    private final String redisPassword;
    private transient JedisPool jedisPool;
    private transient ExecutorService executorService; // 用于Jedis同步操作的线程池,模拟异步

    public RedisProductAsyncFunction(String redisHost, int redisPort, String redisPassword) {
        this.redisHost = redisHost;
        this.redisPort = redisPort;
        this.redisPassword = redisPassword;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxTotal(100);
        poolConfig.setMaxIdle(20);
        poolConfig.setMinIdle(5);
        poolConfig.setTestOnBorrow(true);
        poolConfig.setTestOnReturn(true);
        poolConfig.setTestWhileIdle(true);

        if (redisPassword != null && !redisPassword.isEmpty()) {
            this.jedisPool = new JedisPool(poolConfig, redisHost, redisPort, 2000, redisPassword);
        } else {
            this.jedisPool = new JedisPool(poolConfig, redisHost, redisPort, 2000);
        }

        this.executorService = Executors.newFixedThreadPool(
                getRuntimeContext().getNumberOfParallelSubtasks() * 2 // 每个并行度至少2个线程
        );
    }

    @Override
    public void close() throws Exception {
        if (jedisPool != null) {
            jedisPool.close();
        }
        if (executorService != null) {
            executorService.shutdown();
        }
        super.close();
    }

    @Override
    public void asyncInvoke(Order order, ResultFuture<EnrichedOrder> resultFuture) throws Exception {
        executorService.submit(() -> {
            try (Jedis jedis = jedisPool.getResource()) {
                String key = "product:" + order.productId;
                Map<String, String> productInfo = jedis.hgetAll(key);

                if (productInfo != null && !productInfo.isEmpty()) {
                    String productName = productInfo.get("name");
                    String category = productInfo.get("category");
                    EnrichedOrder enrichedOrder = new EnrichedOrder(order, productName, category);
                    resultFuture.complete(Collections.singletonList(enrichedOrder));
                } else {
                    // 如果未找到维表数据,可以选择丢弃,或者用默认值填充
                    // 这里选择丢弃,也可以返回一个只包含订单信息的 EnrichedOrder
                    resultFuture.complete(Collections.emptyList());
                }
            } catch (Exception e) {
                // 处理异常,例如打印日志,并返回空列表或原始数据
                System.err.println("Error fetching product info for productId " + order.productId + ": " + e.getMessage());
                resultFuture.complete(Collections.emptyList()); // 发生异常时,丢弃该订单
            }
        });
    }

    // 可选:处理超时,当异步请求在指定时间内未完成时调用
    @Override
    public void timeout(Order input, ResultFuture<EnrichedOrder> resultFuture) throws Exception {
        System.err.println("Async lookup timeout for order: " + input.orderId);
        // 超时时可以选择返回原始数据,或丢弃
        resultFuture.complete(Collections.singletonList(new EnrichedOrder(input, "UNKNOWN", "UNKNOWN")));
    }
}

DataStream Job 代码

import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;
import java.util.concurrent.TimeUnit;

public class DataStreamAsyncLookupExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2); // 可以设置多个并行度,体验异步I/O的并发性

        // 1. 模拟订单数据流
        DataStream<Order> orderStream = env.addSource(new SourceFunction<Order>() {
            private volatile boolean isRunning = true;
            private final Random random = new Random();
            private long orderCounter = 0;

            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                while (isRunning) {
                    orderCounter++;
                    int productId = random.nextInt(202 - 101 + 1) + 101; // 101到202之间
                    double amount = 10.0 + random.nextDouble() * 990.0;
                    ctx.collect(new Order((int) orderCounter, productId, amount, System.currentTimeMillis()));
                    Thread.sleep(50); // 每50毫秒生成一个订单,模拟高吞吐
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        });

        // 2. 使用 AsyncDataStream 进行异步维表关联
        // orderStream: 输入流
        // new RedisProductAsyncFunction(...): 异步函数实例
        // 100: 最大并发异步请求数 (maxConcurrentRequests)
        // 1000: 超时时间 (timeout)
        // TimeUnit.MILLISECONDS: 超时时间单位
        DataStream<EnrichedOrder> enrichedStream = AsyncDataStream.unorderedWait(
                orderStream,
                new RedisProductAsyncFunction("localhost", 6379, ""), // 替换为你的Redis地址和密码
                1000, // 超时时间,例如1000毫秒
                TimeUnit.MILLISECONDS,
                100 // 最大并发请求数,可以调整
        );
        // 也可以使用 orderedWait 保持事件顺序,但通常 unorderedWait 性能更高

        // 3. 打印结果
        enrichedStream.print();

        env.execute("DataStream Async Lookup Join Example");
    }
}

3. 使用 Managed State (Stateful Functions) + 定期更新

实现路径:

  • 原理: 将维表数据作为 Flink Managed State(如 ValueState, MapState)存储在 Flink 内部,通常与 Keyed State 结合使用。通过一个独立的流(或定时任务)定期从外部维表读取最新数据,并更新到 Flink 的 Managed State 中。当主数据流到来时,直接从本地状态中查询维表数据。
  • 更新机制: 全量同步: 定期拉取全量维表数据。 增量同步: 如果维表支持,可以通过 CDC (Change Data Capture) 或消息队列(如 Kafka)订阅维表的变化,将变化数据作为单独的流推送到 Flink,然后更新到 Managed State。
  • 适用场景: 维表数据量大,无法全部加载到每个 TaskManager 的内存中,但仍希望在 Flink 内部实现高性能查询。 对数据新鲜度有一定容忍度,允许维表数据与外部源存在短暂延迟。 需要利用 Flink 的状态管理能力(如状态快照、故障恢复)。

优点:

  • 极高查询性能: 维表数据在 Flink 内部,查询速度极快,避免了网络 I/O 延迟。
  • 故障恢复: 维表数据作为 Managed State,可以随着 Flink 的状态快照和故障恢复机制得到保护。
  • 独立更新: 维表更新与主数据流处理分离,不会阻塞主数据流。
  • 降低外部系统压力: 维表查询不再直接访问外部系统,而是访问 Flink 内部状态。

缺点:

  • 数据新鲜度: 维表数据与外部源存在更新延迟,取决于更新频率和机制。
  • 实现复杂度高: 需要管理状态的更新逻辑,特别是增量更新。
  • 状态管理开销: 维表数据如果非常大,会增加 Flink State 的存储和快照开销。
  • 扩容和再平衡: 如果维表数据是 Keyed State,当 Flink 集群扩容或任务再平衡时,状态迁移可能会带来开销。

总结与选择建议:

实现路径

适用场景

优点

缺点

SQL/Table API - 同步 Lookup Join

简单验证、低吞吐量、对性能要求不高。

实现简单,易于上手。

性能瓶颈,不适合高并发,强依赖外部数据库性能。

SQL/Table API - 异步 Lookup Join

推荐,对性能、吞吐量和延迟有高要求,外部系统支持异步客户端。

高吞吐量,低延迟,高资源利用率,内置背压。

实现复杂度相对高,依赖外部系统异步客户端。

DataStream - 缓存 (RichFunction)

维表数据量小,对数据新鲜度有一定容忍,且外部维表查询延迟较低。

命中缓存性能高,降低外部系统压力,实现相对简单。

数据新鲜度问题,内存消耗,无内置背压。

DataStream - AsyncDataStream

对性能、吞吐量和延迟有高要求,外部系统支持异步客户端。

同 Table API 异步 Lookup Join 的优点。

同 Table API 异步 Lookup Join 的缺点。

DataStream - Managed State (定期更新)

维表数据量大,但希望高性能查询,对数据新鲜度有一定容忍,需要 Flink 状态管理。

极高查询性能,故障恢复,独立更新,降低外部系统压力。

数据新鲜度有延迟,实现复杂度高,状态管理开销,扩容开销。

选择建议:

  • 首选异步 I/O: 对于大多数生产环境下的维表关联场景,无论是使用 Flink SQL/Table API 的 LOOKUP JOIN 还是 DataStream API 的 AsyncDataStream,都强烈推荐使用异步 I/O。它能最大程度地提高吞吐量,降低延迟,并有效利用资源。
  • 考虑缓存: 如果维表数据量不大,且对数据新鲜度有一定容忍,可以考虑在 RichFunction 中结合缓存策略。
  • 大型维表或高一致性要求: 对于极其庞大或需要极高查询性能和故障恢复能力的维表,可以考虑将维表数据作为 Flink Managed State 来管理,通过增量或全量同步机制定期更新。
  • SQL/Table API vs. DataStream API: SQL/Table API: 如果业务逻辑可以通过 SQL 或 Table API 表达,且需要快速开发和部署,优先考虑。Lookup Join 语法简洁。 DataStream API: 如果业务逻辑复杂,需要更精细的控制,或者需要与 Flink 的其他高级特性(如事件时间处理、状态编程)深度结合,DataStream API 会提供更大的灵活性。

在实际应用中,通常需要根据维表的数据量、更新频率、数据新鲜度要求、以及对性能、延迟和实现复杂度的权衡来选择最合适的实现路径。

相关推荐

软件测试|MySQL CROSS JOIN:交叉连接的详细解析

简介在MySQL数据库中,CROSSJOIN是一种用于生成两个或多个表的笛卡尔积的连接方法。CROSSJOIN不需要任何连接条件,它将左表的每一行与右表的每一行进行组合,从而生成一个包含所...

「MySQL笔记」left join-on-and 与 left join-on-where 的区别

1.摘要关于这两种写法的重要知识点摘要如下:left-join时,即使有相同的查询条件,二者的查询结果集也不同,原因是优先级导致的,on的优先级比where高on-and是进行韦恩运算连接...

MySQL中的JOIN——联合查询的基本语法

MySQL中的JOIN指令用来将两个或多个表中的数据进行联合查询,根据连接条件来匹配记录,从而得到需要的结果集。在MySQL中,常见的JOIN类型包括INNERJOIN、LEFTJOIN和RIGH...

MySQL 中的 CROSS JOIN:强大的连接工具

CROSSJOIN在MySQL里是一种挺特别的连接操作,它能弄出连接表的笛卡尔积。这就是说,要是表A有m行,表B有n行,那ACROSSJOINB的结果就会有m*n...

大厂必问:MySQL 三表 JOIN 操作的解析与性能优化,效率又如何?

大厂必问:MySQL三表JOIN操作的解析与性能优化策略,效率又如何?点击关注,开启技术之旅!大家好,这里是互联网技术学堂,无论你是一名程序员、设计师、还是对技术充满好奇心的普通人,都欢迎你加入...

面试题:MySQL 的 JOIN 查询优化(mysql查询优化方法)

MySQL的JOIN查询优化是提升数据库性能的关键环节。以下是综合多个技术文档的核心优化策略,按优先级和实现难度分类:一、索引优化:性能提升的基础为连接字段建立索引确保参与JOIN的列(通常...

Flink中处理维表关联技术实现路径

在Flink中处理维表关联大体氛围TableSQLLookupJoin和DataStream算子函数,主要技术实现路径:I.FlinkSQL/TableAPI中的Lookup...

深入剖析Zookeeper原理(一)整体设计

1.ZK集群架构设计与特性1.ZK集群架构设计:ZK主要分为三种角色:Leader(领导者):一个Zookeeper集群同一时间只会有一个实际工作的Leader,它会发起并维护与各Follwer及...

多种负载均衡算法及其Java代码实现

首先给大家介绍下什么是负载均衡负载均衡建立在现有网络结构之上,它提供了一种廉价有效透明的方法扩展网络设备和服务器的带宽、增加吞吐量、加强网络数据处理能力、提高网络的灵活性和可用性。负载均衡,英...

一分钟了解SpringCloud中的ribbon到底是什么,原理是啥?

1.概念ribbon是一款客户端负载均衡器,用于微服务之间的负载均衡。首先,什么是客户端负载均衡?如图,ribbon可以通过注册中心获取服务列表,然后自己执行自己的负载均衡策略来决定要访问哪个微服务,...

Step by Step之腾讯云短信-验证码实践

在商城小程序和前端上线用了一阵子之后,用户提出了体验提升的需求,如忘记密码、绑定用户、快捷注册等,作为业界最佳实践的短信验证码登录、重置密码和注册等功能开发也就提上日程了,本文就以重置密码为例,将验证...

10分钟入门响应式:Springboot整合kafka实现reactive

Springboot引入Reactor已经有一段时间了,笔者潜伏在各种技术群里暗中观察发现,好像scala圈子的同仁们,似乎对响应式更热衷一点。也许是因为他们对fp理解的更深吧,所以领悟起来障碍性更少...

使用java随机生成有个性的用户名,LOL地名+水浒传,合计2808个

*随机生成用户名*取水浒传108好汉名字*取LOL地名26个,组合而成*一共可以生成2808个不同特色的用户名如果你在上网的时候,用户名难取的话,这里有很多可选择的用户名,现提供100个...

深入理解Math.random()的概率分布特性

直接上源码/***Returnsa{@codedouble}valuewithapositivesign,*返回一个带符号的double类型的数字,说人话就是返回一个非负...

编程英文 - 创建/生成/构建 (create/generate/build)

在软件开发中,create、generate和build这三个词经常被用到,它们都与"创造"或"产生"某些东西有关,但在具体使用场景和含义上有所不同。基本含义creat...

取消回复欢迎 发表评论: