package org.apache.storm.sql.redis;

import com.google.common.base.Preconditions;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.storm.redis.bolt.RedisStoreBolt;
import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import org.apache.storm.sql.runtime.DataSourcesProvider;
import org.apache.storm.sql.runtime.FieldInfo;
import org.apache.storm.sql.runtime.IOutputSerializer;
import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
import org.apache.storm.sql.runtime.utils.FieldInfoUtils;
import org.apache.storm.sql.runtime.utils.SerdeUtils;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.tuple.ITuple;
import org.apache.storm.tuple.Values;
import redis.clients.util.JedisURIHelper;

/* loaded from: input_file:org/apache/storm/sql/redis/RedisDataSourcesProvider.class */
public class RedisDataSourcesProvider implements DataSourcesProvider {
    private static final int DEFAULT_REDIS_PORT = 6379;
    private static final int DEFAULT_TIMEOUT = 2000;
    private static final String PROPERTY_DATA_TYPE = "data.type";
    private static final String PROPERTY_DATA_ADDITIONAL_KEY = "data.additional.key";
    private static final String PROPERTY_REDIS_TIMEOUT = "redis.timeout";
    private static final String PROPERTY_USE_REDIS_CLUSTER = "use.redis.cluster";
    private static final String DEFAULT_USE_REDIS_CLUSTER = "false";

    /* loaded from: input_file:org/apache/storm/sql/redis/RedisDataSourcesProvider$AbstractRedisStreamsDataSource.class */
    private static abstract class AbstractRedisStreamsDataSource implements ISqlStreamsDataSource, Serializable {
        private final Properties props;
        private final List<FieldInfo> fields;
        private final IOutputSerializer serializer;

        protected abstract IRichBolt newRedisBolt(RedisStoreMapper redisStoreMapper);

        AbstractRedisStreamsDataSource(Properties properties, List<FieldInfo> list, IOutputSerializer iOutputSerializer) {
            this.props = properties;
            this.fields = list;
            this.serializer = iOutputSerializer;
        }

        public IRichSpout getProducer() {
            throw new UnsupportedOperationException(getClass().getName() + " doesn't provide Producer");
        }

        public IRichBolt getConsumer() {
            return newRedisBolt(new SqlRedisStoreMapper(getDataTypeDesc(this.props), this.fields, this.serializer));
        }

        private RedisDataTypeDescription getDataTypeDesc(Properties properties) {
            Preconditions.checkArgument(properties.containsKey(RedisDataSourcesProvider.PROPERTY_DATA_TYPE), "Redis data source must contain data.type config");
            return new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.valueOf(properties.getProperty(RedisDataSourcesProvider.PROPERTY_DATA_TYPE).toUpperCase()), properties.getProperty(RedisDataSourcesProvider.PROPERTY_DATA_ADDITIONAL_KEY));
        }
    }

    /* loaded from: input_file:org/apache/storm/sql/redis/RedisDataSourcesProvider$RedisClusterStreamsDataSource.class */
    private static class RedisClusterStreamsDataSource extends AbstractRedisStreamsDataSource {
        private final JedisClusterConfig config;

        RedisClusterStreamsDataSource(JedisClusterConfig jedisClusterConfig, Properties properties, List<FieldInfo> list, IOutputSerializer iOutputSerializer) {
            super(properties, list, iOutputSerializer);
            this.config = jedisClusterConfig;
        }

        @Override // org.apache.storm.sql.redis.RedisDataSourcesProvider.AbstractRedisStreamsDataSource
        protected IRichBolt newRedisBolt(RedisStoreMapper redisStoreMapper) {
            return new RedisStoreBolt(this.config, redisStoreMapper);
        }
    }

    /* loaded from: input_file:org/apache/storm/sql/redis/RedisDataSourcesProvider$RedisStreamsDataSource.class */
    private static class RedisStreamsDataSource extends AbstractRedisStreamsDataSource {
        private final JedisPoolConfig config;

        RedisStreamsDataSource(JedisPoolConfig jedisPoolConfig, Properties properties, List<FieldInfo> list, IOutputSerializer iOutputSerializer) {
            super(properties, list, iOutputSerializer);
            this.config = jedisPoolConfig;
        }

        @Override // org.apache.storm.sql.redis.RedisDataSourcesProvider.AbstractRedisStreamsDataSource
        protected IRichBolt newRedisBolt(RedisStoreMapper redisStoreMapper) {
            return new RedisStoreBolt(this.config, redisStoreMapper);
        }
    }

    /* loaded from: input_file:org/apache/storm/sql/redis/RedisDataSourcesProvider$SqlRedisStoreMapper.class */
    private static class SqlRedisStoreMapper implements RedisStoreMapper {
        private final RedisDataTypeDescription dataTypeDescription;
        private final FieldInfo primaryKeyField;
        private final IOutputSerializer outputSerializer;

        private SqlRedisStoreMapper(RedisDataTypeDescription redisDataTypeDescription, List<FieldInfo> list, IOutputSerializer iOutputSerializer) {
            this.dataTypeDescription = redisDataTypeDescription;
            this.outputSerializer = iOutputSerializer;
            FieldInfo findPrimaryKeyField = findPrimaryKeyField(list);
            Preconditions.checkArgument(findPrimaryKeyField != null, "Primary key must be presented to field list");
            this.primaryKeyField = findPrimaryKeyField;
        }

        private FieldInfo findPrimaryKeyField(List<FieldInfo> list) {
            FieldInfo fieldInfo = null;
            Iterator<FieldInfo> it = list.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                FieldInfo next = it.next();
                if (next.isPrimary()) {
                    fieldInfo = next;
                    break;
                }
            }
            return fieldInfo;
        }

        public RedisDataTypeDescription getDataTypeDescription() {
            return this.dataTypeDescription;
        }

        public String getKeyFromTuple(ITuple iTuple) {
            Object value = iTuple.getValue(0);
            if (value == null) {
                throw new NullPointerException("key field is null");
            }
            return String.valueOf(value);
        }

        public String getValueFromTuple(ITuple iTuple) {
            return new String(this.outputSerializer.write((Values) iTuple.getValue(1), (ByteBuffer) null).array());
        }
    }

    public String scheme() {
        return "redis";
    }

    public ISqlStreamsDataSource constructStreams(URI uri, String str, String str2, Properties properties, List<FieldInfo> list) {
        Preconditions.checkArgument(JedisURIHelper.isValid(uri), "URI is not valid for Redis: " + uri);
        String host = uri.getHost();
        int port = uri.getPort() != -1 ? uri.getPort() : DEFAULT_REDIS_PORT;
        int dBIndex = JedisURIHelper.getDBIndex(uri);
        String password = JedisURIHelper.getPassword(uri);
        int parseInt = Integer.parseInt(properties.getProperty(PROPERTY_REDIS_TIMEOUT, String.valueOf(DEFAULT_TIMEOUT)));
        boolean booleanValue = Boolean.valueOf(properties.getProperty(PROPERTY_USE_REDIS_CLUSTER, DEFAULT_USE_REDIS_CLUSTER)).booleanValue();
        IOutputSerializer serializer = SerdeUtils.getSerializer(str2, properties, FieldInfoUtils.getFieldNames(list));
        return booleanValue ? new RedisClusterStreamsDataSource(new JedisClusterConfig.Builder().setNodes(Collections.singleton(new InetSocketAddress(host, port))).setTimeout(parseInt).build(), properties, list, serializer) : new RedisStreamsDataSource(new JedisPoolConfig(host, port, parseInt, password, dBIndex), properties, list, serializer);
    }
}
