Flink自定义sink实现从mysql同步数据到redis

yuyu888 于 2024-04-12 发布

pom.xml

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-redis_2.11</artifactId>
			<version>1.1.5</version>
		</dependency>

大多数情况都用这个包

核心实现

非常简单

        //创建一个jedis连接配置
        FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
                .setHost("127.0.0.1")
                .setPort(6379)
                .build();

        userStream.addSink(new RedisSink<UserVo>(config,new MyRedisMapper()));

完整代码

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 从数据表读取数据
        tableEnv.executeSql(
                "CREATE TABLE `user`("
                        + "  `id` INT NOT NULL,"
                        + "  `unique_id` STRING NOT NULL,"
                        + "  `username` STRING NOT NULL,"
                        + "  `c_username` STRING NOT NULL,"
                        + "  PRIMARY KEY (`id`) NOT ENFORCED "
                        + ") WITH ("
                        + " 'connector' = 'mysql-cdc', "
                        + " 'hostname' = '127.0.0.1',"
                        + " 'port' = '3306', "
                        + " 'username' = 'root', "
                        + "  'password' = '123456', "
                        + " 'database-name' = 'mytest', "
                        + " 'table-name' = 'user' "
                        + ")"
        );
        Table resultTable = tableEnv.from("user").select($("id"), $("unique_id"), $("username"), $("c_username")).filter($("id").isEqual(2));
        DataStream<Row> dataStream = tableEnv.toChangelogStream(resultTable);

        DataStream<UserVo> userStream = dataStream.map(new MapFunction<Row, UserVo>() {
            @Override
            public UserVo map(Row value) throws Exception {
                return new UserVo(
                        value.getField(1).toString(), 
                        value.getField(2).toString()
                );
            }
        });


        //创建一个jedis连接配置
        FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
                .setHost("127.0.0.1")
                .setPort(6379)
                .build();

        userStream.addSink(new RedisSink<UserVo>(config,new MyRedisMapper()));
        env.execute("Flink mysql to redis");


MyRedisMapper

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

@Slf4j
public class MyRedisMapper implements RedisMapper<UserVo> {

    @Override
    //getCommandDescription()方法主要是返回当前Redis操作命令的描述,
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.HSET,"userMap");
    }

    @Override
    public String getKeyFromData(UserVo user) {
        return user.getUniqueId();
    }

    @Override
    public String getValueFromData(UserVo user) {
        return user.getUsername();

    }
}

UserVo

  @Data
  public class UserVo {
      public String uniqueId;
      public String username;

      public UserVo(String uniqueId, String username) {
          this.uniqueId = uniqueId;
          this.username = username;
      }
  }

局限性

我们点开 org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper

发现

public interface RedisMapper<T> extends Function, Serializable {

	/**
	 * Returns descriptor which defines data type.
	 *
	 * @return data type descriptor
	 */
	RedisCommandDescription getCommandDescription();

	/**
	 * Extracts key from data.
	 *
	 * @param data source data
	 * @return key
	 */
	String getKeyFromData(T data);

	/**
	 * Extracts value from data.
	 *
	 * @param data source data
	 * @return value
	 */
	String getValueFromData(T data);
}

发现他没有关于过期时间的设置,这个不能忍

如果我们想要实现 setex 就抓瞎了, 只能自己动手丰衣足食

自定义redisSink

RedisCustomSink

import com.myproject.api.logic.impl.flink.UserVo;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import redis.clients.jedis.Jedis;

public class RedisCustomSink extends RichSinkFunction<UserVo> {
    private transient Jedis jedis;
    private String redisHost;
    private int redisPort;
    private int expirySeconds;
    private String keyPrefix;

    public RedisCustomSink(String redisHost, int redisPort, int expirySeconds, String keyPrefix) {
        this.redisHost = redisHost;
        this.redisPort = redisPort;
        this.expirySeconds = expirySeconds;
        this.keyPrefix = keyPrefix;

    }

    @Override
    public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
        super.open(parameters);
        jedis = new Jedis(redisHost, redisPort);
    }

    @Override
    public void close() throws Exception {
        jedis.close();
        super.close();
    }

    @Override
    public void invoke(UserVo vo, Context context) {
        jedis.setex(this.keyPrefix+vo.getUniqueId(), expirySeconds, vo.getUsername());
    }
}

完整代码:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 从数据表读取数据
        tableEnv.executeSql(
                "CREATE TABLE `user`("
                        + "  `id` INT NOT NULL,"
                        + "  `unique_id` STRING NOT NULL,"
                        + "  `username` STRING NOT NULL,"
                        + "  `c_username` STRING NOT NULL,"
                        + "  PRIMARY KEY (`id`) NOT ENFORCED "
                        + ") WITH ("
                        + " 'connector' = 'mysql-cdc', "
                        + " 'hostname' = '127.0.0.1',"
                        + " 'port' = '3306', "
                        + " 'username' = 'root', "
                        + "  'password' = '123456', "
                        + " 'database-name' = 'mytest', "
                        + " 'table-name' = 'user' "
                        + ")"
        );
        Table resultTable = tableEnv.from("user").select($("id"), $("unique_id"), $("username"), $("c_username")).filter($("id").isEqual(2));
        DataStream<Row> dataStream = tableEnv.toChangelogStream(resultTable);

        // 变更stream结构
        DataStream<UserVo> userStream = dataStream.map(new MapFunction<Row, UserVo>() {
            @Override
            public UserVo map(Row value) throws Exception {
                return new UserVo(
                        value.getField(1).toString(),
                        value.getField(2).toString()
                );
            }
        });

        userStream.addSink(new RedisCustomSink("localhost", 6379, 3600, "user_")); // 设置过期时间为3600秒
        env.execute("Flink mysql to redis");

自定义redis操作

上例子中, 我把invoke的操作逻辑写死了,如果这会我想实现一个hset,只能在写一个sink

    @Override
    public void invoke(UserVo vo, Context context) {
        jedis.setex(this.keyPrefix+vo.getUniqueId(), expirySeconds, vo.getUsername());
    }

如果我希望redis操作,作为参数传入sink 改怎么做呢?

这通常通过使用接口或者函数式接口来实现。我将创建一个接口 RedisOperation 作为一个函数式接口,然后修改 RedisCustomSink 来接受一个 RedisOperation 实例。这将允许你在添加 sink 时,定义如何与 Redis 进行交互。

首先,定义 RedisOperation 接口:

@FunctionalInterface
public interface RedisOperation {
    void execute(Jedis jedis, UserVo vo, String keyPrefix, int expirySeconds);
}

然后,修改 RedisCustomSink 来接受一个 RedisOperation

public class RedisCustomSink extends RichSinkFunction<UserVo> {
    private final transient RedisOperation redisOperation;
    private transient Jedis jedis;
    private final String redisHost;
    private final int redisPort;
    private final int expirySeconds;
    private final String keyPrefix;

    public RedisCustomSink(String redisHost, int redisPort, int expirySeconds, String keyPrefix, RedisOperation redisOperation) {
        this.redisHost = redisHost;
        this.redisPort = redisPort;
        this.expirySeconds = expirySeconds;
        this.keyPrefix = keyPrefix;
        this.redisOperation = redisOperation;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.jedis = new Jedis(this.redisHost, this.redisPort);
        // this.jedis.auth("your password");
    }

    @Override
    public void invoke(UserVo vo, Context context) {
        this.redisOperation.execute(this.jedis, vo, this.keyPrefix, this.expirySeconds);
    }

    @Override
    public void close() throws Exception {
        if (this.jedis != null) {
            this.jedis.close();
        }
        super.close();
    }
}

最后,你可以在添加 sink 时,定义如何与 Redis 进行交互:

    userStream.addSink(new RedisCustomSink("localhost", 6379, 10, "user_", 
    (jedis, vo, keyPrefix, expirySeconds) -> {
        jedis.hset(keyPrefix + vo.getUniqueId(), "username", vo.getUsername());
    }));