/*
 * Decompiled with CFR 0.152.
 */
package com.vmware.transport.bus.store.model;

import com.vmware.transport.bus.EventBus;
import com.vmware.transport.bus.model.Message;
import com.vmware.transport.bus.store.model.BusStore;
import com.vmware.transport.bus.store.model.BusStoreInitializer;
import com.vmware.transport.bus.store.model.MutateStream;
import com.vmware.transport.bus.store.model.MutateStreamImpl;
import com.vmware.transport.bus.store.model.MutationRequestWrapper;
import com.vmware.transport.bus.store.model.StoreContent;
import com.vmware.transport.bus.store.model.StoreStateChange;
import com.vmware.transport.bus.store.model.StoreStateMutation;
import com.vmware.transport.bus.store.model.StoreStream;
import com.vmware.transport.bus.store.model.StoreStreamImpl;
import com.vmware.transport.core.util.Loggable;
import io.reactivex.Observable;
import io.reactivex.functions.Consumer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Generated;
import org.apache.commons.lang3.ArrayUtils;

public class BusStoreImpl<K, T>
extends Loggable
implements BusStore<K, T> {
    private final UUID uuid;
    private final String storeType;
    private final EventBus eventBus;
    private final Map<K, T> cache;
    private final String cacheStreamChannelName;
    private final String cacheMutationChannelName;
    private final String cacheReadyChannelName;
    private final AtomicBoolean isCacheInitialized = new AtomicBoolean(false);
    private final AtomicLong storeVersion = new AtomicLong(0L);
    private Class<T> valueType;
    private Class<K> keyType;

    public BusStoreImpl(EventBus eventBus, String storeType) {
        this.eventBus = eventBus;
        this.uuid = UUID.randomUUID();
        this.storeType = storeType;
        this.cache = new ConcurrentHashMap<K, T>();
        this.cacheStreamChannelName = "stores__store-change-" + this.uuid + "-" + this.storeType;
        this.cacheMutationChannelName = "stores__store-mutation-" + this.uuid + "-" + this.storeType;
        this.cacheReadyChannelName = "stores__store-ready-" + this.uuid + "-" + this.storeType;
        this.infoMsg(String.format("Store: New Store [%s] was created with id %s, named %s", this.storeType, this.uuid, this.storeType));
    }

    @Override
    public boolean isInitialized() {
        return this.isCacheInitialized.get();
    }

    @Override
    public String getStoreType() {
        return this.storeType;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <State> void put(K id, T value, State state) {
        long version;
        if (id == null) {
            return;
        }
        Map<K, T> map = this.cache;
        synchronized (map) {
            this.cache.put(id, value);
            version = this.storeVersion.incrementAndGet();
        }
        this.sendChangeBroadcast(state, id, value, version, false);
        this.logDebugMessage(String.format("Store: [%s] added new object with id: %s", this.storeType, id));
    }

    @Override
    public T get(K id) {
        if (id == null) {
            return null;
        }
        return this.cache.get(id);
    }

    @Override
    public List<T> allValues() {
        return new ArrayList<T>(this.cache.values());
    }

    @Override
    public Map<K, T> allValuesAsMap() {
        return new HashMap<K, T>(this.cache);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public StoreContent<K, T> getStoreContent() {
        Map<K, T> map = this.cache;
        synchronized (map) {
            return new StoreContent<K, T>(this.getCurrentVersion(), this.allValuesAsMap());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <State> boolean remove(K id, State state) {
        T obj;
        if (id == null) {
            return false;
        }
        long version = 0L;
        Map<K, T> map = this.cache;
        synchronized (map) {
            obj = this.cache.remove(id);
            if (obj != null) {
                version = this.storeVersion.incrementAndGet();
            }
        }
        if (obj != null) {
            this.sendChangeBroadcast(state, id, obj, version, true);
            this.eventBus.getApi().complete(this.getObjectChannelName(id), this.storeType);
            this.logDebugMessage(String.format(" Store: [%s] Remove object with id %s", this.storeType, id.toString()));
            return true;
        }
        return false;
    }

    @Override
    public <V, MutationType> boolean mutate(V value, MutationType mutationType, Consumer<Object> successHandler, Consumer<Object> errorHandler) {
        StoreStateMutation<MutationType, V> mutation = new StoreStateMutation<MutationType, V>(mutationType, value);
        mutation.setSuccessHandler(successHandler);
        mutation.setErrorHandler(errorHandler);
        this.eventBus.sendRequestMessage(this.cacheMutationChannelName, mutation);
        this.logDebugMessage(String.format("Store: [%s] fired mutation operation", this.storeType));
        return true;
    }

    @Override
    public <MutationType, MutationRequestType> MutateStream<MutationRequestType> onMutationRequest(MutationType ... mutationType) {
        Observable stream = this.eventBus.getApi().getChannel(this.cacheMutationChannelName, this.getName()).map(msg -> (StoreStateMutation)msg.getPayload());
        Observable filterStream = stream.filter(mutation -> ArrayUtils.isEmpty((Object[])mutationType) || ArrayUtils.indexOf((Object[])mutationType, mutation.getType()) >= 0).map(mutation -> new MutationRequestWrapper(mutation.getValue(), mutation.getSuccessHandler(), mutation.getErrorHandler()));
        return new MutateStreamImpl(filterStream);
    }

    @Override
    public synchronized boolean populate(Map<K, T> items) {
        if (this.cache.isEmpty() && !this.isInitialized()) {
            for (Map.Entry<K, T> item : items.entrySet()) {
                this.cache.put(item.getKey(), item.getValue());
            }
            this.initialize();
            return true;
        }
        return false;
    }

    @Override
    public BusStoreInitializer<K, T> getBusStoreInitializer() {
        if (this.isCacheInitialized.get()) {
            return null;
        }
        return new BusStoreInitializer<K, T>(){

            @Override
            public BusStoreInitializer<K, T> add(K id, T value) {
                if (id != null) {
                    BusStoreImpl.this.cache.put(id, value);
                }
                return this;
            }

            @Override
            public void done() {
                BusStoreImpl.this.initialize();
            }
        };
    }

    @Override
    public <State> StoreStream<T> onChange(K id, State ... stateChangeType) {
        if (id == null) {
            return null;
        }
        Observable<Message> cacheStreamChannel = this.eventBus.getApi().getResponseChannel(this.getObjectChannelName(id), this.getName());
        Observable<Message> cacheErrorChannel = this.eventBus.getApi().getErrorChannel(this.getObjectChannelName(id), this.getName());
        Observable stream = Observable.merge(cacheStreamChannel, cacheErrorChannel).map(msg -> (StoreStateChange)msg.getPayload());
        return new StoreStreamImpl<T>(this.filterByChangeType(stream, stateChangeType));
    }

    @Override
    public <State> StoreStream<T> onAllChanges(State ... stateChangeType) {
        Observable<Message> cacheStreamChannel = this.eventBus.getApi().getResponseChannel(this.cacheStreamChannelName, this.getName());
        Observable<Message> cacheErrorChannel = this.eventBus.getApi().getErrorChannel(this.cacheStreamChannelName, this.getName());
        Observable stream = Observable.merge(cacheStreamChannel, cacheErrorChannel).map(msg -> (StoreStateChange)msg.getPayload());
        return new StoreStreamImpl<T>(this.filterByChangeType(stream, stateChangeType));
    }

    @Override
    public synchronized void whenReady(Consumer<Map<K, T>> readyFunction) {
        if (this.isCacheInitialized.get()) {
            this.logDebugMessage(String.format("Store: [%s] Ready! Contains %d values", this.storeType, this.cache.size()));
            try {
                readyFunction.accept(this.allValuesAsMap());
            }
            catch (Exception ex) {
                this.logErrorMessage("Error in whenReady handler.", ex.getMessage());
            }
        } else {
            this.eventBus.listenOnce(this.cacheReadyChannelName, (Consumer<Message>)((Consumer)message -> readyFunction.accept((Object)((Map)message.getPayload()))));
        }
    }

    @Override
    public synchronized void initialize() {
        if (!this.isCacheInitialized.getAndSet(true)) {
            this.infoMsg(String.format("Store: [%s] Initialized!", this.storeType));
            this.storeVersion.incrementAndGet();
            this.sendResponseMessage(this.cacheReadyChannelName, this.allValuesAsMap());
        }
    }

    @Override
    public synchronized void reset() {
        this.cache.clear();
        this.isCacheInitialized.set(false);
        this.infoMsg(String.format("Store: [%s] has been reset. All data wiped", this.storeType));
    }

    @Override
    public long getCurrentVersion() {
        return this.storeVersion.get();
    }

    private <State> Observable<StoreStateChange<?, T, ?>> filterByChangeType(Observable<StoreStateChange<?, T, ?>> stream, State ... stateChangeType) {
        return stream.filter(state -> ArrayUtils.isEmpty((Object[])stateChangeType) || ArrayUtils.indexOf((Object[])stateChangeType, state.getType()) >= 0);
    }

    private <C> void sendChangeBroadcast(C changeType, K id, T value, long storeVersion, boolean isDeleteChange) {
        StoreStateChange<C, T, K> stateChange = new StoreStateChange<C, T, K>(id, changeType, value, storeVersion, isDeleteChange);
        this.sendResponseMessage(this.cacheStreamChannelName, stateChange);
        this.sendResponseMessage(this.getObjectChannelName(stateChange.getObjectId()), stateChange);
    }

    private void sendResponseMessage(String channel, Object payload) {
        if (this.eventBus.getApi().getChannelRefCount(channel) > 0) {
            this.eventBus.sendResponseMessage(channel, payload);
        }
    }

    private String getObjectChannelName(K objectId) {
        return "store-" + this.uuid + "-object-" + objectId;
    }

    private void infoMsg(String msg) {
        this.logInfoMessage("\ud83d\uddc4", this.getName(), msg);
    }

    @Override
    @Generated
    public Class<T> getValueType() {
        return this.valueType;
    }

    @Override
    @Generated
    public void setValueType(Class<T> valueType) {
        this.valueType = valueType;
    }

    @Override
    @Generated
    public Class<K> getKeyType() {
        return this.keyType;
    }

    @Override
    @Generated
    public void setKeyType(Class<K> keyType) {
        this.keyType = keyType;
    }
}

