/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.redis.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
import org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
import org.apache.seatunnel.format.json.JsonSerializationSchema;

public class RedisSinkWriter
extends AbstractSinkWriter<SeaTunnelRow, Void>
implements SupportMultiTableSinkWriter<Void> {
    private static final String REDIS_GROUP_DELIMITER = ":";
    private static final String LEFT_PLACEHOLDER_MARKER = "{";
    private static final String RIGHT_PLACEHOLDER_MARKER = "}";
    private final SeaTunnelRowType seaTunnelRowType;
    private final RedisParameters redisParameters;
    private final SerializationSchema serializationSchema;
    private final RedisClient redisClient;
    private final int batchSize;
    private final List<RowKind> rowKinds;
    private final List<String> keyBuffer;
    private final List<String> valueBuffer;

    public RedisSinkWriter(SeaTunnelRowType seaTunnelRowType, RedisParameters redisParameters) {
        this.seaTunnelRowType = seaTunnelRowType;
        this.redisParameters = redisParameters;
        this.serializationSchema = new JsonSerializationSchema(seaTunnelRowType);
        this.redisClient = redisParameters.buildRedisClient();
        this.batchSize = redisParameters.getBatchSize();
        this.rowKinds = new ArrayList<RowKind>(this.batchSize);
        this.keyBuffer = new ArrayList<String>(this.batchSize);
        this.valueBuffer = new ArrayList<String>(this.batchSize);
    }

    public void write(SeaTunnelRow element) throws IOException {
        this.rowKinds.add(element.getRowKind());
        List<String> fields = Arrays.asList(this.seaTunnelRowType.getFieldNames());
        String key = this.getKey(element, fields);
        this.keyBuffer.add(key);
        String value = this.getValue(element, fields);
        this.valueBuffer.add(value);
        if (this.keyBuffer.size() >= this.batchSize) {
            this.flush();
        }
    }

    private String getKey(SeaTunnelRow element, List<String> fields) {
        String key = this.redisParameters.getKeyField();
        Boolean supportCustomKey = this.redisParameters.getSupportCustomKey();
        if (Boolean.TRUE.equals(supportCustomKey)) {
            return this.getCustomKey(element, fields, key);
        }
        return RedisSinkWriter.getNormalKey(element, fields, key);
    }

    private static String getNormalKey(SeaTunnelRow element, List<String> fields, String keyField) {
        if (fields.contains(keyField)) {
            Object fieldValue = element.getField(fields.indexOf(keyField));
            return fieldValue == null ? "" : fieldValue.toString();
        }
        return keyField;
    }

    private String getCustomKey(SeaTunnelRow element, List<String> fields, String keyField) {
        String[] keyFieldSegments = keyField.split(REDIS_GROUP_DELIMITER);
        StringBuilder key = new StringBuilder();
        for (int i = 0; i < keyFieldSegments.length; ++i) {
            String keyFieldSegment = keyFieldSegments[i];
            if (keyFieldSegment.startsWith(LEFT_PLACEHOLDER_MARKER) && keyFieldSegment.endsWith(RIGHT_PLACEHOLDER_MARKER)) {
                String realKeyField = keyFieldSegment.substring(1, keyFieldSegment.length() - 1);
                if (fields.contains(realKeyField)) {
                    Object realFieldValue = element.getField(fields.indexOf(realKeyField));
                    key.append(realFieldValue == null ? "" : realFieldValue.toString());
                } else {
                    key.append(keyFieldSegment);
                }
            } else {
                key.append(keyFieldSegment);
            }
            if (i == keyFieldSegments.length - 1) continue;
            key.append(REDIS_GROUP_DELIMITER);
        }
        return key.toString();
    }

    private String getValue(SeaTunnelRow element, List<String> fields) {
        RedisDataType redisDataType = this.redisParameters.getRedisDataType();
        String value = RedisDataType.HASH.equals((Object)redisDataType) ? this.handleHashType(element, fields) : this.handleOtherTypes(element, fields);
        if (value == null) {
            byte[] serialize = this.serializationSchema.serialize(element);
            value = new String(serialize);
        }
        return value;
    }

    private String handleHashType(SeaTunnelRow element, List<String> fields) {
        Object hashValueFieldValue;
        Object hashKeyFieldValue;
        String hashKeyField = this.redisParameters.getHashKeyField();
        String hashValueField = this.redisParameters.getHashValueField();
        if (StringUtils.isEmpty((CharSequence)hashKeyField)) {
            return null;
        }
        String hashKey = fields.contains(hashKeyField) ? ((hashKeyFieldValue = element.getField(fields.indexOf(hashKeyField))) == null ? "" : hashKeyFieldValue.toString()) : hashKeyField;
        String hashValue = StringUtils.isEmpty((CharSequence)hashValueField) ? new String(this.serializationSchema.serialize(element)) : (fields.contains(hashValueField) ? ((hashValueFieldValue = element.getField(fields.indexOf(hashValueField))) == null ? "" : hashValueFieldValue.toString()) : hashValueField);
        HashMap<String, String> kvMap = new HashMap<String, String>();
        kvMap.put(hashKey, hashValue);
        return JsonUtils.toJsonString(kvMap);
    }

    private String handleOtherTypes(SeaTunnelRow element, List<String> fields) {
        String valueField = this.redisParameters.getValueField();
        if (StringUtils.isEmpty((CharSequence)valueField)) {
            return null;
        }
        if (fields.contains(valueField)) {
            Object fieldValue = element.getField(fields.indexOf(valueField));
            return fieldValue == null ? "" : fieldValue.toString();
        }
        return valueField;
    }

    private void clearBuffer() {
        this.rowKinds.clear();
        this.keyBuffer.clear();
        this.valueBuffer.clear();
    }

    private void doBatchWrite() {
        RedisDataType redisDataType = this.redisParameters.getRedisDataType();
        if (RedisDataType.KEY.equals((Object)redisDataType) || RedisDataType.STRING.equals((Object)redisDataType)) {
            this.redisClient.batchWriteString(this.rowKinds, this.keyBuffer, this.valueBuffer, this.redisParameters.getExpire());
            return;
        }
        if (RedisDataType.LIST.equals((Object)redisDataType)) {
            this.redisClient.batchWriteList(this.rowKinds, this.keyBuffer, this.valueBuffer, this.redisParameters.getExpire());
            return;
        }
        if (RedisDataType.SET.equals((Object)redisDataType)) {
            this.redisClient.batchWriteSet(this.rowKinds, this.keyBuffer, this.valueBuffer, this.redisParameters.getExpire());
            return;
        }
        if (RedisDataType.HASH.equals((Object)redisDataType)) {
            this.redisClient.batchWriteHash(this.rowKinds, this.keyBuffer, this.valueBuffer, this.redisParameters.getExpire());
            return;
        }
        if (RedisDataType.ZSET.equals((Object)redisDataType)) {
            this.redisClient.batchWriteZset(this.rowKinds, this.keyBuffer, this.valueBuffer, this.redisParameters.getExpire());
            return;
        }
        throw new RedisConnectorException((SeaTunnelErrorCode)CommonErrorCode.UNSUPPORTED_DATA_TYPE, "UnSupport redisDataType,only support string,list,hash,set,zset");
    }

    public void close() throws IOException {
        this.flush();
    }

    @Override
    public Optional<Void> prepareCommit() {
        this.flush();
        return Optional.empty();
    }

    private synchronized void flush() {
        if (!this.keyBuffer.isEmpty()) {
            this.doBatchWrite();
            this.clearBuffer();
        }
    }
}

