/*
 * Decompiled with CFR 0.152.
 */
package io.streamthoughts.kafka.connect.filepulse.storage;

import io.streamthoughts.kafka.connect.filepulse.storage.Callback;
import io.streamthoughts.kafka.connect.filepulse.storage.KafkaBasedLog;
import io.streamthoughts.kafka.connect.filepulse.storage.KafkaBasedLogFactory;
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
import io.streamthoughts.kafka.connect.filepulse.storage.StateSerde;
import io.streamthoughts.kafka.connect.filepulse.storage.StateSnapshot;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaStateBackingStore<T>
implements StateBackingStore<T> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaStateBackingStore.class);
    private static final long READ_TO_END_TIMEOUT_MS = 30000L;
    private final KafkaBasedLog<String, byte[]> configLog;
    private final Object lock = new Object();
    private final String groupId;
    private final AtomicLong offset = new AtomicLong(-1L);
    private final Map<String, T> states = new HashMap<String, T>();
    private final StateSerde<T> serde;
    private final String keyPrefix;
    private final boolean isProducerOnly;
    private States status = States.CREATED;
    private StateBackingStore.UpdateListener<T> updateListener;

    public KafkaStateBackingStore(String topic, String keyPrefix, String groupId, Map<String, ?> configs, StateSerde<T> serde, boolean isProducerOnly) {
        KafkaBasedLogFactory factory = new KafkaBasedLogFactory(configs);
        this.configLog = factory.make(topic, new ConsumeCallback());
        this.groupId = groupId;
        this.serde = serde;
        this.keyPrefix = keyPrefix;
        this.isProducerOnly = isProducerOnly;
    }

    synchronized States getState() {
        return this.status;
    }

    private synchronized void setState(States status) {
        this.status = status;
    }

    @Override
    public void start() {
        if (this.isStarted()) {
            throw new IllegalStateException("Cannot init again.");
        }
        LOG.info("Starting {}", (Object)this.getBackingStoreName());
        this.configLog.start(this.isProducerOnly);
        this.setState(States.STARTED);
        LOG.info("Started {}", (Object)this.getBackingStoreName());
    }

    @Override
    public boolean isStarted() {
        return this.getState().equals((Object)States.STARTED);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() {
        KafkaStateBackingStore kafkaStateBackingStore = this;
        synchronized (kafkaStateBackingStore) {
            this.setState(States.PENDING_SHUTDOWN);
            LOG.info("Closing {}", (Object)this.getBackingStoreName());
            this.configLog.flush();
            this.configLog.stop();
            LOG.info("Closed {}", (Object)this.getBackingStoreName());
            this.setState(States.SHUTDOWN);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public StateSnapshot<T> snapshot() {
        Object object = this.lock;
        synchronized (object) {
            return new StateSnapshot<T>(this.offset.get(), Collections.unmodifiableMap(this.states));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean contains(String name) {
        Object object = this.lock;
        synchronized (object) {
            return this.states.containsKey(name);
        }
    }

    @Override
    public void putAsync(String name, T state) {
        this.put(name, state, false);
    }

    @Override
    public void put(String name, T state) {
        this.put(name, state, true);
    }

    private void put(String name, T state, boolean sync) {
        this.checkStates();
        try {
            this.configLog.send(this.newRecordKey(this.groupId, name), this.serde.serialize(state));
            if (sync) {
                this.configLog.readToEnd().get(30000L, TimeUnit.MILLISECONDS);
            }
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            LOG.error("Failed to write consumer state to Kafka: ", (Throwable)e);
            throw new RuntimeException("Error writing consumer state to Kafka", e);
        }
    }

    @Override
    public void removeAsync(String name) {
        this.remove(name, false);
    }

    private void remove(String name, boolean sync) {
        this.checkStates();
        LOG.debug("Removing consumer server state for name {}", (Object)name);
        try {
            this.configLog.send(this.newRecordKey(this.groupId, name), null);
            if (sync) {
                this.configLog.readToEnd().get(30000L, TimeUnit.MILLISECONDS);
            }
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            LOG.error("Failed to remove state from Kafka: ", (Throwable)e);
            throw new RuntimeException("Error removing state from Kafka", e);
        }
    }

    @Override
    public void remove(String name) {
        this.remove(name, true);
    }

    @Override
    public void refresh(long timeout, TimeUnit unit) throws TimeoutException {
        this.checkStates();
        try {
            this.configLog.readToEnd().get(timeout, unit);
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException("Error trying to read to end of configDef log", e);
        }
    }

    @Override
    public void setUpdateListener(StateBackingStore.UpdateListener<T> listener) {
        this.updateListener = listener;
    }

    private String getBackingStoreName() {
        return this.getClass().getSimpleName();
    }

    private synchronized void checkStates() {
        if (this.getState() == States.SHUTDOWN || this.getState() == States.PENDING_SHUTDOWN) {
            throw new IllegalStateException("Bad state " + this.getState().name());
        }
    }

    private String newRecordKey(String groupId, String stateName) {
        return this.keyPrefix + groupId + "." + stateName;
    }

    public class ConsumeCallback
    implements Callback<ConsumerRecord<String, byte[]>> {
        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) {
            if (error != null) {
                LOG.error("Unexpected in consumer callback for KafkaStreamsStateBackingStore: ", error);
                return;
            }
            KafkaStateBackingStore.this.offset.set(record.offset() + 1L);
            byte[] value = (byte[])record.value();
            if (((String)record.key()).startsWith(KafkaStateBackingStore.this.keyPrefix)) {
                String[] groupAndState = ((String)record.key()).substring(KafkaStateBackingStore.this.keyPrefix.length()).split("\\.", 2);
                String recordGroup = groupAndState[0];
                String stateName = groupAndState[1];
                if (recordGroup.equals(KafkaStateBackingStore.this.groupId)) {
                    boolean removed = false;
                    Object newState = null;
                    Object object = KafkaStateBackingStore.this.lock;
                    synchronized (object) {
                        if (value == null) {
                            LOG.debug("Removed state {} due to null configuration. This is usually intentional and does not indicate an issue.", (Object)stateName);
                            KafkaStateBackingStore.this.states.remove(stateName);
                            removed = true;
                        } else {
                            try {
                                newState = KafkaStateBackingStore.this.serde.deserialize(value);
                            }
                            catch (Exception e) {
                                LOG.error("Failed to read state : {}", (Object)stateName, (Object)e);
                                return;
                            }
                            LOG.debug("Updating state for name {} : {}", (Object)stateName, newState);
                            KafkaStateBackingStore.this.states.put(stateName, newState);
                        }
                    }
                    if (KafkaStateBackingStore.this.getState() == States.STARTED && KafkaStateBackingStore.this.updateListener != null) {
                        if (removed) {
                            KafkaStateBackingStore.this.updateListener.onStateRemove(stateName);
                        } else {
                            KafkaStateBackingStore.this.updateListener.onStateUpdate(stateName, newState);
                        }
                    }
                } else {
                    LOG.trace("Discarding state update value - not belong to group {} : {}", (Object)KafkaStateBackingStore.this.groupId, record.key());
                }
            } else {
                LOG.warn("Discarding state update value with invalid key : {}", record.key());
            }
        }
    }

    public static enum States {
        CREATED,
        STARTED,
        PENDING_SHUTDOWN,
        SHUTDOWN;

    }
}

