/*
 * Decompiled with CFR 0.152.
 */
package io.streamthoughts.kafka.connect.filepulse.state;

import io.streamthoughts.kafka.connect.filepulse.annotation.VisibleForTesting;
import io.streamthoughts.kafka.connect.filepulse.source.FileObject;
import io.streamthoughts.kafka.connect.filepulse.state.FileObjectStateBackingStore;
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
import io.streamthoughts.kafka.connect.filepulse.storage.StateSnapshot;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemoryFileObjectStateBackingStore
implements FileObjectStateBackingStore {
    private static final Logger LOG = LoggerFactory.getLogger(InMemoryFileObjectStateBackingStore.class);
    private static final int DEFAULT_MAX_SIZE_CAPACITY = 10000;
    private volatile Map<String, FileObject> objects;
    private StateBackingStore.UpdateListener<FileObject> listener;
    private final AtomicBoolean started = new AtomicBoolean(false);

    public InMemoryFileObjectStateBackingStore() {
    }

    @VisibleForTesting
    public InMemoryFileObjectStateBackingStore(Map<String, FileObject> objects) {
        this.configure(Collections.emptyMap());
        this.objects.putAll(objects);
    }

    @Override
    public void configure(Map<String, ?> configs) {
        FileObjectStateBackingStore.super.configure(configs);
        int cacheMaxCapacity = new Config(configs).getCacheMaxCapacity();
        this.objects = Collections.synchronizedMap(InMemoryFileObjectStateBackingStore.createLRUCache(cacheMaxCapacity, objectEntry -> {
            if (!((FileObject)objectEntry.getValue()).status().isDone()) {
                LOG.warn("Evicting a file-object state '{}' from in-memory state with a non terminal status (i.e. 'CLEANED'). This may happen if you are processing more files than the max-capacity of the InMemoryFileObjectStateBackingStore before committing offsets for tasks successfully. Please consider increasing the value of 'tasks.file.status.storage.cache.max.size.capacity' through the connector's configuration.", (Object)((FileObject)objectEntry.getValue()).metadata().stringURI());
            }
        }));
    }

    public void start() {
        this.started.compareAndSet(false, true);
    }

    public void stop() {
        this.started.compareAndSet(true, false);
    }

    public boolean isStarted() {
        return this.started.get();
    }

    public StateSnapshot<FileObject> snapshot() {
        return new StateSnapshot(-1L, Collections.unmodifiableMap(this.objects));
    }

    public boolean contains(String name) {
        return this.objects.containsKey(name);
    }

    public void putAsync(String name, FileObject object) {
        this.put(name, object);
    }

    public void put(String name, FileObject object) {
        LOG.debug("Put object in store with key={}, object={}", (Object)name, (Object)object);
        this.objects.put(name, object);
        if (this.listener != null) {
            this.listener.onStateUpdate(name, (Object)object);
        }
    }

    public void remove(String name) {
        LOG.debug("Remove object in store with key={}", (Object)name);
        this.objects.remove(name);
        if (this.listener != null) {
            this.listener.onStateRemove(name);
        }
    }

    public void removeAsync(String name) {
        this.remove(name);
    }

    public void refresh(long timeout, TimeUnit unit) {
    }

    public void setUpdateListener(StateBackingStore.UpdateListener<FileObject> listener) {
        this.listener = listener;
    }

    @VisibleForTesting
    public StateBackingStore.UpdateListener<FileObject> getListener() {
        return this.listener;
    }

    private static <K, V> Map<K, V> createLRUCache(final int maxCacheSize, final Consumer<Map.Entry<K, V>> callbackOnRemoveEldest) {
        return new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true){

            @Override
            protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
                boolean remove;
                boolean bl = remove = this.size() > maxCacheSize;
                if (remove) {
                    callbackOnRemoveEldest.accept(eldest);
                }
                return remove;
            }
        };
    }

    private static final class Config
    extends AbstractConfig {
        private static final String GROUP = "InMemoryFileObjectStateBackingStore";
        public static final String TASKS_FILE_STATUS_STORAGE_CACHE_MAX_SIZE_CAPACITY_CONFIG = "tasks.file.status.storage.cache.max.size.capacity";
        private static final String TASKS_FILE_STATUS_STORAGE_CACHE_MAX_SIZE_CAPACITY_DOC = "The max size capacity of the LRU in-memory cache (default: 10_000).";

        public Config(Map<?, ?> originals) {
            super(Config.configDef(), originals, false);
        }

        public int getCacheMaxCapacity() {
            return this.getInt(TASKS_FILE_STATUS_STORAGE_CACHE_MAX_SIZE_CAPACITY_CONFIG);
        }

        private static ConfigDef configDef() {
            int groupCounter = 0;
            return new ConfigDef().define(TASKS_FILE_STATUS_STORAGE_CACHE_MAX_SIZE_CAPACITY_CONFIG, ConfigDef.Type.INT, (Object)10000, ConfigDef.Importance.LOW, TASKS_FILE_STATUS_STORAGE_CACHE_MAX_SIZE_CAPACITY_DOC, GROUP, groupCounter++, ConfigDef.Width.NONE, TASKS_FILE_STATUS_STORAGE_CACHE_MAX_SIZE_CAPACITY_CONFIG);
        }
    }
}

