/*
 * Decompiled with CFR 0.152.
 */
package io.streamthoughts.kafka.connect.filepulse.state;

import io.streamthoughts.kafka.connect.filepulse.source.FileObject;
import io.streamthoughts.kafka.connect.filepulse.state.FileObjectSerde;
import io.streamthoughts.kafka.connect.filepulse.state.FileObjectStateBackingStore;
import io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStoreConfig;
import io.streamthoughts.kafka.connect.filepulse.storage.KafkaStateBackingStore;
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
import io.streamthoughts.kafka.connect.filepulse.storage.StateSerde;
import io.streamthoughts.kafka.connect.filepulse.storage.StateSnapshot;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.TopicExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaFileObjectStateBackingStore
implements FileObjectStateBackingStore {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaFileObjectStateBackingStore.class);
    private static final String KEY_PREFIX = "connect-file-pulse";
    private KafkaStateBackingStore<FileObject> store;

    @Override
    public void configure(Map<String, ?> props) {
        KafkaFileObjectStateBackingStoreConfig config = new KafkaFileObjectStateBackingStoreConfig(props);
        this.store = new KafkaStateBackingStore(config.getTaskStorageTopic(), KEY_PREFIX, config.getTaskStorageName(), config.getTaskStorageConfigs(), (StateSerde)new FileObjectSerde(), config.getTaskStorageConsumerEnabled());
        try (AdminClient client = AdminClient.create(config.getTaskStorageConfigs());){
            HashMap<String, String> topicConfig = new HashMap<String, String>();
            topicConfig.put("cleanup.policy", "compact");
            NewTopic newTopic = new NewTopic(config.getTaskStorageTopic(), config.getTopicPartitions(), config.getReplicationFactor()).configs(topicConfig);
            this.createTopic(client, newTopic);
        }
    }

    private void createTopic(AdminClient adminClient, NewTopic topic) {
        try {
            LOG.info("Attempt to create new topic '{}'", (Object)topic);
            CreateTopicsResult result = adminClient.createTopics(List.of(topic));
            KafkaFuture future = result.all();
            future.get();
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof TopicExistsException) {
                LOG.debug("Failed to created topic '{}'. Topic already exists.", (Object)topic);
            } else {
                LOG.warn("Failed to create topic '{}'", (Object)topic, (Object)e);
            }
        }
        catch (InterruptedException e) {
            LOG.warn("Failed to create topic '{}'", (Object)topic, (Object)e);
        }
    }

    public void start() {
        this.store.start();
    }

    public void stop() {
        this.store.stop();
    }

    public boolean isStarted() {
        return this.store.isStarted();
    }

    public StateSnapshot<FileObject> snapshot() {
        return this.store.snapshot();
    }

    public boolean contains(String name) {
        return this.store.contains(name);
    }

    public void putAsync(String name, FileObject state) {
        this.store.putAsync(name, (Object)state);
    }

    public void put(String name, FileObject state) {
        this.store.put(name, (Object)state);
    }

    public void remove(String name) {
        this.store.remove(name);
    }

    public void removeAsync(String name) {
        this.store.removeAsync(name);
    }

    public void refresh(long timeout, TimeUnit unit) throws TimeoutException {
        this.store.refresh(timeout, unit);
    }

    public void setUpdateListener(StateBackingStore.UpdateListener<FileObject> listener) {
        this.store.setUpdateListener(listener);
    }
}

