专业编程基础技术教程

网站首页 > 基础教程 正文

Trino——实现Redis连接器支持Insert操作

ccvgpt 2024-08-18 14:34:54 基础教程 19 ℃

导读:Trino 是由 Presto SQL 更名而得,是一个分布式SQL查询引擎。Trino 目前提供了丰富的 Connectors 用于访问不同的数据源。本文接下来将讨论如何在现有的 redis connector 的基础上进行改造使其支持 insert 操作。

1、基础知识

本文只提供笔者对于 Redis Connect 支持 Insert 功能的大致实现思路,不做详细源码分析。朋友们需根据自身实际情况调整优化,以下代码均未在生产环境实践应用。

Trino——实现Redis连接器支持Insert操作

// 官方文档大致讲述了redis connect 的构成以及使用的方式,这里就不做细表,文档如下:
https://trino.io/docs/current/connector/redis.html
// Trino 的 github 地址,拉取后我们可以参考借鉴 JDBC 模块实现 insert 功能 
https://github.com/trinodb/trino

2、Insert 功能具体实现

2.1 首先重写ConnectorMetadata 的 beginInsert 和 finishInsert 方法

RedisMetadata 实现了 ConnectorMetadata。 ConnectorMetadata 接口中如果实现类不覆盖这两个方法,则连接器本身是不支持 insert 操作的。

default ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle) {
        throw new TrinoException(StandardErrorCode.NOT_SUPPORTED, "This connector does not support inserts");
    }
default Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics) {
        throw new TrinoException(StandardErrorCode.GENERIC_INTERNAL_ERROR, "ConnectorMetadata beginInsert() is implemented without finishInsert()");
    }

因此我们需在 RedisMetadata 中重写这两个方法,如下:

@Override
    public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle, List<ColumnHandle> columns)
    {
        String catalogName = ((FullConnectorSession)session).getSession().getCatalog().get();
        RedisTableHandle redisTableHandle = ((RedisTableHandle)tableHandle);
        String schemaName = redisTableHandle.getSchemaName();
        String tableName = redisTableHandle.getTableName();
        ImmutableList.Builder<String> columnNames = ImmutableList.builder();
        ImmutableList.Builder<Type> columnTypes = ImmutableList.builder();
        List<RedisColumnHandle> columnHandles = columns.stream()
                .map(RedisColumnHandle.class::cast)
                .collect(toImmutableList());
        for (RedisColumnHandle column : columnHandles) {
            columnNames.add(column.getName());
            columnTypes.add(column.getType());
        }
        RedisOutputTableHandle handle = new RedisOutputTableHandle(
                catalogName,
                schemaName,
                tableName,
                columnNames.build(),
                columnTypes.build()
        );
        return handle;
    }

@Override
    public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session, ConnectorInsertTableHandle insertHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics) {
        return Optional.empty();
    }

2.2 构建一个 RedisOutputTableHandle

通过参考 trino-base-jdbc 插件,我们在 beginInsert 的主要目的是构建一个 RedisOutputTableHandle,其需实现 ConnectorOutputTableHandle, ConnectorInsertTableHandle 接口。如下:

public class RedisOutputTableHandle
        implements ConnectorOutputTableHandle, ConnectorInsertTableHandle
{
    private final String catalogName;
    private final String schemaName;
    private final String tableName;
    private final List<String> columnNames;
    private final List<Type> columnTypes;

    @JsonCreator
    public RedisOutputTableHandle(
            @JsonProperty("catalogName") @Nullable String catalogName,
            @JsonProperty("schemaName") @Nullable String schemaName,
            @JsonProperty("tableName") String tableName,
            @JsonProperty("columnNames") List<String> columnNames,
            @JsonProperty("columnTypes") List<Type> columnTypes) {
        this.catalogName = catalogName;
        this.schemaName = schemaName;
        this.tableName = requireNonNull(tableName, "tableName is null");
        requireNonNull(columnNames, "columnNames is null");
        requireNonNull(columnTypes, "columnTypes is null");
        this.columnNames = ImmutableList.copyOf(columnNames);
        this.columnTypes = ImmutableList.copyOf(columnTypes);
    }
// 此处省略 getter...
}

2.3 RedisHandleResolver 重写 getOutputTableHandleClass 和 getInsertTableHandleClass 两个方法

RedisHandleResolver 实现了 ConnectorHandleResolver 接口,ConnectorHandleResolver 的 getOutputTableHandleClass 和 getInsertTableHandleClass 默认抛出不支持操作符异常。

default Class<? extends ConnectorOutputTableHandle> getOutputTableHandleClass() {
        throw new UnsupportedOperationException();
    }
    default Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass() {
        throw new UnsupportedOperationException();
    }

因此我们需重写它们并返回 RedisOutputTableHandle.class

public class RedisHandleResolver implements ConnectorHandleResolver
{
    //... 省略其他代码
    @Override
    public Class<? extends ConnectorOutputTableHandle> getOutputTableHandleClass() {
        return RedisOutputTableHandle.class;
    }

    @Override
    public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass() {
        return RedisOutputTableHandle.class;
    }
}

2.4 创建 RedisPageSinkProvider 和 RedisPageSink

参考 trino-base-jdbc 插件,要实现输出需要提供 ConnectorPageSinkProvider 和 ConnectorPageSink。

RedisPageSinkProvider.class 实现如下:

public class RedisPageSinkProvider implements ConnectorPageSinkProvider {

    private final RedisJedisManager jedisManager;

    @Inject
    public RedisPageSinkProvider(RedisJedisManager jedisManager) {
        this.jedisManager = requireNonNull(jedisManager, "jedisManager is null");
    }

    @Override
    public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorOutputTableHandle tableHandle) {
        return new RedisPageSink(session, (RedisOutputTableHandle) tableHandle, jedisManager);
    }

    @Override
    public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorInsertTableHandle tableHandle) {
        return new RedisPageSink(session, (RedisOutputTableHandle) tableHandle, jedisManager);
    }
}

RedisPageSink 实现如下:

public class RedisPageSink implements ConnectorPageSink {
    private final List<Type> columnTypes;
    private final Jedis jedis;

    public RedisPageSink(ConnectorSession session, RedisOutputTableHandle handle, RedisJedisManager jedisManager, RedisSplitManager redisSplitManager) {
        columnTypes = handle.getColumnTypes();
        RedisConnectorConfig redisConnectorConfig = jedisManager.getRedisConnectorConfig();
        HostAddress hostAddress = new ArrayList<>(redisConnectorConfig.getNodes()).get(0);
        JedisPool jedisPool = jedisManager.getJedisPool(HostAddress.fromParts(hostAddress.getHostText(), hostAddress.getPort()));
        this.jedis = jedisPool.getResource();
        this.pipeline = jedis.pipelined();
    }

    /**
     * 数据处理
     * page可以理解为存储多行数据的一个组,也可理解为一张表
     * 表由多个 channel 组成,每个channel 包含一个 block
     * 每个 block 就可理解为是表中的一列,而列中的每一项则是占据了一个 position
     */
    @Override
    public CompletableFuture<?> appendPage(Page page) {
         // 扫描行,position 相同表示同一行
        for (int position = 0; position < page.getPositionCount(); position++) {
            // 获取第一行第一列的数据(redis_key)
            Block block = page.getBlock(0);
            Type keyType = columnTypes.get(0);
            Slice key = keyType.getSlice(block, position);
            // 获取第一行第二列的数据,(redis_value ,目前只支持 String 和 hash)
            Block block2 = page.getBlock(1);
            Type valueType = columnTypes.get(1);
            Slice value = valueType.getSlice(block2, position);
            // 使用通道技术提升插入效率
            pipeline.set(new String(key.getBytes()), new String(value.getBytes()));
        }
        return NOT_BLOCKED;
    }

    @Override
    public CompletableFuture<Collection<Slice>> finish() {
        pipeline.sync();
        jedis.close();
        return completedFuture(ImmutableList.of());
    }

    @Override
    public void abort() {
    }
}

2.5 在 RedisConnectorModule 中注册 RedisPageSinkProvider

public class RedisConnectorModule implements Module
{
   @Override
    public void configure(Binder binder)
    {
        .//.. 省略其他
        // 注册 RedisPageSinkProvider 单例
        binder.bind(RedisPageSinkProvider.class).in(Scopes.SINGLETON);
    }
}

2.6 RedisConnector 中重写 getPageSinkProvider 方法

提供获取 RedisPageSinkProvider 的方式。

public class RedisConnector implements Connector
{
    private final RedisPageSinkProvider redisPageSinkProvider;

    @Inject
    public RedisConnector(
            RedisMetadata metadata,
            RedisSplitManager splitManager,
            RedisRecordSetProvider recordSetProvider,
            RedisPageSinkProvider redisPageSinkProvider)
    {
        this.metadata = requireNonNull(metadata, "metadata is null");
        this.splitManager = requireNonNull(splitManager, "splitManager is null");
        this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
        // 增加 redisPageSinkProvider 属性
        this.redisPageSinkProvider = requireNonNull(redisPageSinkProvider, "recordSetProvider is null");
    }

    @Override
    public ConnectorPageSinkProvider getPageSinkProvider()
    {
        return redisPageSinkProvider;
    }
}

2.7 处理 Page

回到 RedisPageSink 类,在 appendPage 中编写具体的操作逻辑。此处核心逻辑是处理 Page 以及使用 jedis 操作 redis 。关于 Page、Block、Slice 等数据结构的理解可参考Presto核心数据结构:Slice Block Page 这篇文章。

最后

以上就是笔者 redis connect 实现 Insert 功能的大致实现思路,希望对各有有所帮助。由于篇幅有限就不做详细的源码解析,朋友们需根据自身实际情况调整优化。感谢您的阅读,如果喜欢本文欢迎关注和转发,本头条号将坚持持续分享IT技术知识。对于文章内容有其他想法或意见建议等,欢迎提出共同讨论共同进步。

Tags:

最近发表
标签列表