/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.sql.redis;

import com.google.common.base.Preconditions;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.storm.redis.bolt.RedisStoreBolt;
import org.apache.storm.redis.common.config.JedisClusterConfig;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
import org.apache.storm.redis.common.mapper.RedisStoreMapper;
import org.apache.storm.sql.runtime.DataSourcesProvider;
import org.apache.storm.sql.runtime.FieldInfo;
import org.apache.storm.sql.runtime.IOutputSerializer;
import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
import org.apache.storm.sql.runtime.utils.FieldInfoUtils;
import org.apache.storm.sql.runtime.utils.SerdeUtils;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.tuple.ITuple;
import org.apache.storm.tuple.Values;
import redis.clients.jedis.util.JedisURIHelper;

public class RedisDataSourcesProvider
implements DataSourcesProvider {
    private static final int DEFAULT_REDIS_PORT = 6379;
    private static final int DEFAULT_TIMEOUT = 2000;
    private static final String PROPERTY_DATA_TYPE = "data.type";
    private static final String PROPERTY_DATA_ADDITIONAL_KEY = "data.additional.key";
    private static final String PROPERTY_REDIS_TIMEOUT = "redis.timeout";
    private static final String PROPERTY_USE_REDIS_CLUSTER = "use.redis.cluster";
    private static final String DEFAULT_USE_REDIS_CLUSTER = "false";

    public String scheme() {
        return "redis";
    }

    public ISqlStreamsDataSource constructStreams(URI uri, String inputFormatClass, String outputFormatClass, Properties props, List<FieldInfo> fields) {
        Preconditions.checkArgument((boolean)JedisURIHelper.isValid((URI)uri), (Object)("URI is not valid for Redis: " + String.valueOf(uri)));
        String host = uri.getHost();
        int port = uri.getPort() != -1 ? uri.getPort() : 6379;
        int dbIdx = JedisURIHelper.getDBIndex((URI)uri);
        String password = JedisURIHelper.getPassword((URI)uri);
        int timeout = Integer.parseInt(props.getProperty(PROPERTY_REDIS_TIMEOUT, String.valueOf(2000)));
        boolean clusterMode = Boolean.valueOf(props.getProperty(PROPERTY_USE_REDIS_CLUSTER, DEFAULT_USE_REDIS_CLUSTER));
        List fieldNames = FieldInfoUtils.getFieldNames(fields);
        IOutputSerializer serializer = SerdeUtils.getSerializer((String)outputFormatClass, (Properties)props, (List)fieldNames);
        if (clusterMode) {
            JedisClusterConfig config = new JedisClusterConfig.Builder().setNodes(Collections.singleton(new InetSocketAddress(host, port))).setTimeout(timeout).build();
            return new RedisClusterStreamsDataSource(config, props, fields, serializer);
        }
        JedisPoolConfig config = new JedisPoolConfig(host, port, timeout, password, dbIdx);
        return new RedisStreamsDataSource(config, props, fields, serializer);
    }

    private static class RedisClusterStreamsDataSource
    extends AbstractRedisStreamsDataSource {
        private final JedisClusterConfig config;

        RedisClusterStreamsDataSource(JedisClusterConfig config, Properties props, List<FieldInfo> fields, IOutputSerializer serializer) {
            super(props, fields, serializer);
            this.config = config;
        }

        @Override
        protected IRichBolt newRedisBolt(RedisStoreMapper storeMapper) {
            return new RedisStoreBolt(this.config, storeMapper);
        }
    }

    private static class RedisStreamsDataSource
    extends AbstractRedisStreamsDataSource {
        private final JedisPoolConfig config;

        RedisStreamsDataSource(JedisPoolConfig config, Properties props, List<FieldInfo> fields, IOutputSerializer serializer) {
            super(props, fields, serializer);
            this.config = config;
        }

        @Override
        protected IRichBolt newRedisBolt(RedisStoreMapper storeMapper) {
            return new RedisStoreBolt(this.config, storeMapper);
        }
    }

    private static class SqlRedisStoreMapper
    implements RedisStoreMapper {
        private final RedisDataTypeDescription dataTypeDescription;
        private final FieldInfo primaryKeyField;
        private final IOutputSerializer outputSerializer;

        private SqlRedisStoreMapper(RedisDataTypeDescription dataTypeDescription, List<FieldInfo> fields, IOutputSerializer outputSerializer) {
            this.dataTypeDescription = dataTypeDescription;
            this.outputSerializer = outputSerializer;
            FieldInfo pkField = this.findPrimaryKeyField(fields);
            Preconditions.checkArgument((pkField != null ? 1 : 0) != 0, (Object)"Primary key must be presented to field list");
            this.primaryKeyField = pkField;
        }

        private FieldInfo findPrimaryKeyField(List<FieldInfo> fields) {
            FieldInfo pkField = null;
            for (FieldInfo field : fields) {
                if (!field.isPrimary()) continue;
                pkField = field;
                break;
            }
            return pkField;
        }

        public RedisDataTypeDescription getDataTypeDescription() {
            return this.dataTypeDescription;
        }

        public String getKeyFromTuple(ITuple tuple) {
            Object key = tuple.getValue(0);
            if (key == null) {
                throw new NullPointerException("key field is null");
            }
            return String.valueOf(key);
        }

        public String getValueFromTuple(ITuple tuple) {
            Values values = (Values)tuple.getValue(1);
            byte[] array = this.outputSerializer.write((List)values, null).array();
            return new String(array);
        }
    }

    private static abstract class AbstractRedisStreamsDataSource
    implements ISqlStreamsDataSource,
    Serializable {
        private final Properties props;
        private final List<FieldInfo> fields;
        private final IOutputSerializer serializer;

        protected abstract IRichBolt newRedisBolt(RedisStoreMapper var1);

        AbstractRedisStreamsDataSource(Properties props, List<FieldInfo> fields, IOutputSerializer serializer) {
            this.props = props;
            this.fields = fields;
            this.serializer = serializer;
        }

        public IRichSpout getProducer() {
            throw new UnsupportedOperationException(this.getClass().getName() + " doesn't provide Producer");
        }

        public IRichBolt getConsumer() {
            RedisDataTypeDescription dataTypeDescription = this.getDataTypeDesc(this.props);
            SqlRedisStoreMapper storeMapper = new SqlRedisStoreMapper(dataTypeDescription, this.fields, this.serializer);
            return this.newRedisBolt(storeMapper);
        }

        private RedisDataTypeDescription getDataTypeDesc(Properties props) {
            Preconditions.checkArgument((boolean)props.containsKey(RedisDataSourcesProvider.PROPERTY_DATA_TYPE), (Object)"Redis data source must contain data.type config");
            RedisDataTypeDescription.RedisDataType dataType = RedisDataTypeDescription.RedisDataType.valueOf((String)props.getProperty(RedisDataSourcesProvider.PROPERTY_DATA_TYPE).toUpperCase());
            String additionalKey = props.getProperty(RedisDataSourcesProvider.PROPERTY_DATA_ADDITIONAL_KEY);
            return new RedisDataTypeDescription(dataType, additionalKey);
        }
    }
}

