/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.instance.state;

import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.functions.api.StateStoreContext;
import org.apache.pulsar.functions.api.state.StateValue;
import org.apache.pulsar.functions.instance.state.DefaultStateStore;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;

public class PulsarMetadataStateStoreImpl
implements DefaultStateStore {
    private final MetadataStore store;
    private final String prefixPath;
    private final MetadataCache<Long> countersCache;
    private final String namespace;
    private final String tenant;
    private final String name;
    private final String fqsn;

    PulsarMetadataStateStoreImpl(MetadataStore store, String prefix, String tenant, String namespace, String name) {
        this.store = store;
        this.tenant = tenant;
        this.namespace = namespace;
        this.name = name;
        this.fqsn = tenant + "/" + namespace + "/" + name;
        this.prefixPath = prefix + "/" + this.fqsn + "/";
        this.countersCache = store.getMetadataCache(Long.class);
    }

    @Override
    public String tenant() {
        return this.tenant;
    }

    @Override
    public String namespace() {
        return this.namespace;
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public String fqsn() {
        return this.fqsn;
    }

    @Override
    public void init(StateStoreContext ctx) {
    }

    @Override
    public void close() {
    }

    @Override
    public void put(String key, ByteBuffer value) {
        this.putAsync(key, value).join();
    }

    @Override
    public CompletableFuture<Void> putAsync(String key, ByteBuffer value) {
        byte[] bytes = new byte[value.remaining()];
        value.get(bytes);
        return this.store.put(this.getPath(key), bytes, Optional.empty()).thenApply(__ -> null);
    }

    @Override
    public void delete(String key) {
        this.deleteAsync(key).join();
    }

    @Override
    public CompletableFuture<Void> deleteAsync(String key) {
        return this.store.delete(this.getPath(key), Optional.empty());
    }

    @Override
    public ByteBuffer get(String key) {
        return this.getAsync(key).join();
    }

    @Override
    public CompletableFuture<ByteBuffer> getAsync(String key) {
        return this.store.get(this.getPath(key)).thenApply(optRes -> optRes.map(x -> ByteBuffer.wrap(x.getValue())).orElse(null));
    }

    @Override
    public StateValue getStateValue(String key) {
        return this.getStateValueAsync(key).join();
    }

    @Override
    public CompletableFuture<StateValue> getStateValueAsync(String key) {
        return this.store.get(this.getPath(key)).thenApply(optRes -> optRes.map(x -> new StateValue(x.getValue(), x.getStat().getVersion(), null)).orElse(null));
    }

    @Override
    public void incrCounter(String key, long amount) {
        this.incrCounterAsync(key, amount).join();
    }

    @Override
    public CompletableFuture<Void> incrCounterAsync(String key, long amount) {
        return this.countersCache.readModifyUpdateOrCreate(this.getPath(key), optValue -> optValue.orElse(0L) + amount).thenApply(__ -> null);
    }

    @Override
    public long getCounter(String key) {
        return this.getCounterAsync(key).join();
    }

    @Override
    public CompletableFuture<Long> getCounterAsync(String key) {
        return this.countersCache.get(this.getPath(key)).thenApply(optValue -> optValue.orElse(0L));
    }

    private String getPath(String key) {
        return this.prefixPath + key;
    }
}

