/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.spi.impl.merge;

import com.hazelcast.config.InMemoryFormat;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.internal.cluster.ClusterService;
import com.hazelcast.internal.cluster.Versions;
import com.hazelcast.internal.config.ConfigValidator;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.Disposable;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.OperationFactory;
import com.hazelcast.spi.OperationService;
import com.hazelcast.spi.merge.SplitBrainMergePolicy;
import com.hazelcast.spi.partition.IPartitionService;
import com.hazelcast.util.ExceptionUtil;
import com.hazelcast.util.MutableLong;
import com.hazelcast.util.function.BiConsumer;
import com.hazelcast.version.Version;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public abstract class AbstractMergeRunnable<Store, MergingItem>
implements Runnable,
Disposable {
    private static final long TIMEOUT_FACTOR = 500L;
    private static final long MINIMAL_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(5L);
    private final Semaphore semaphore = new Semaphore(0);
    private final ILogger logger;
    private final String serviceName;
    private final ClusterService clusterService;
    private final InternalSerializationService serializationService;
    private final OperationService operationService;
    private final IPartitionService partitionService;
    private final Collection<Store> backupStores;
    private final Map<String, Collection<Store>> collectedStores;
    private final Map<String, Collection<Store>> collectedStoresWithLegacyPolicies;

    protected AbstractMergeRunnable(String serviceName, Map<String, Collection<Store>> collectedStores, Map<String, Collection<Store>> collectedStoresWithLegacyPolicies, Collection<Store> backupStores, NodeEngine nodeEngine) {
        this.serviceName = serviceName;
        this.logger = nodeEngine.getLogger(this.getClass());
        this.partitionService = nodeEngine.getPartitionService();
        this.clusterService = nodeEngine.getClusterService();
        this.operationService = nodeEngine.getOperationService();
        this.serializationService = (InternalSerializationService)nodeEngine.getSerializationService();
        this.backupStores = backupStores;
        this.collectedStores = collectedStores;
        this.collectedStoresWithLegacyPolicies = collectedStoresWithLegacyPolicies;
    }

    @Override
    public final void run() {
        int mergedCount = 0;
        mergedCount += this.mergeWithSplitBrainMergePolicy();
        this.waitMergeEnd(mergedCount += this.mergeWithLegacyMergePolicy());
    }

    private int mergeWithSplitBrainMergePolicy() {
        int mergedCount = 0;
        for (Map.Entry<String, Collection<Store>> entry : this.collectedStores.entrySet()) {
            String dataStructureName = entry.getKey();
            Collection<Store> recordStores = entry.getValue();
            SplitBrainMergePolicy mergePolicy = (SplitBrainMergePolicy)this.getMergePolicy(dataStructureName);
            if (!this.isClusterVersion310OrLater(dataStructureName, mergePolicy)) continue;
            int batchSize = this.getBatchSize(dataStructureName);
            MergingItemBiConsumer consumer = new MergingItemBiConsumer(dataStructureName, mergePolicy, batchSize);
            for (Store recordStore : recordStores) {
                this.consumeStore(recordStore, consumer);
            }
            consumer.consumeRemaining();
            mergedCount += consumer.mergedCount;
        }
        return mergedCount;
    }

    private boolean isClusterVersion310OrLater(String dataStructureName, SplitBrainMergePolicy policy) {
        Version v310 = Versions.V3_10;
        Version currentVersion = this.clusterService.getClusterVersion();
        if (currentVersion.isGreaterOrEqual(v310)) {
            return true;
        }
        String msg = "Cannot merge '%s' with merge policy '%s'. Cluster version should be %s or later but found %s";
        this.logger.info(String.format(msg, dataStructureName, policy, v310, currentVersion));
        return false;
    }

    private int mergeWithLegacyMergePolicy() {
        LegacyOperationBiConsumer consumer = new LegacyOperationBiConsumer();
        for (Map.Entry<String, Collection<Store>> entry : this.collectedStoresWithLegacyPolicies.entrySet()) {
            String dataStructureName = entry.getKey();
            if (!ConfigValidator.checkMergePolicySupportsInMemoryFormat(dataStructureName, this.getMergePolicy(dataStructureName).getClass().getName(), this.getInMemoryFormat(dataStructureName), this.clusterService.getClusterVersion(), false, this.logger)) continue;
            Collection<Store> recordStores = entry.getValue();
            for (Store recordStore : recordStores) {
                this.consumeStoreLegacy(recordStore, consumer);
            }
        }
        return consumer.mergedCount;
    }

    private void waitMergeEnd(int mergedCount) {
        try {
            long timeoutMillis = Math.max((long)mergedCount * 500L, MINIMAL_TIMEOUT_MILLIS);
            if (!this.semaphore.tryAcquire(mergedCount, timeoutMillis, TimeUnit.MILLISECONDS)) {
                this.logger.warning("Split-brain healing didn't finish within the timeout...");
            }
        }
        catch (InterruptedException e) {
            this.logger.finest("Interrupted while waiting for split-brain healing...");
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public final void dispose() {
        for (Collection<Store> stores : this.collectedStores.values()) {
            this.destroyStores(stores);
        }
        for (Collection<Store> stores : this.collectedStoresWithLegacyPolicies.values()) {
            this.destroyStores(stores);
        }
        this.destroyStores(this.backupStores);
    }

    protected InternalSerializationService getSerializationService() {
        return this.serializationService;
    }

    protected Data toData(Object object) {
        return this.serializationService.toData(object);
    }

    protected abstract void destroyStores(Collection<Store> var1);

    protected abstract void consumeStore(Store var1, BiConsumer<Integer, MergingItem> var2);

    protected abstract void consumeStoreLegacy(Store var1, BiConsumer<Integer, Operation> var2);

    protected abstract int getBatchSize(String var1);

    protected abstract Object getMergePolicy(String var1);

    protected abstract InMemoryFormat getInMemoryFormat(String var1);

    protected abstract OperationFactory createMergeOperationFactory(String var1, SplitBrainMergePolicy var2, int[] var3, List<MergingItem>[] var4);

    private class LegacyOperationBiConsumer
    implements BiConsumer<Integer, Operation> {
        private final ExecutionCallback<Object> mergeCallback = new ExecutionCallback<Object>(){

            @Override
            public void onResponse(Object response) {
                AbstractMergeRunnable.this.semaphore.release(1);
            }

            @Override
            public void onFailure(Throwable t) {
                AbstractMergeRunnable.this.logger.warning("Error while running merge operation: " + t.getMessage());
                AbstractMergeRunnable.this.semaphore.release(1);
            }
        };
        private int mergedCount;

        private LegacyOperationBiConsumer() {
        }

        @Override
        public void accept(Integer partitionId, Operation operation) {
            try {
                AbstractMergeRunnable.this.operationService.invokeOnPartition(AbstractMergeRunnable.this.serviceName, operation, partitionId).andThen(this.mergeCallback);
            }
            catch (Throwable t) {
                throw ExceptionUtil.rethrow(t);
            }
            ++this.mergedCount;
        }
    }

    private class MergingItemBiConsumer
    implements BiConsumer<Integer, MergingItem> {
        private final int batchSize;
        private final int partitionCount;
        private final String dataStructureName;
        private final Address[] addresses;
        private final MutableLong[] counterPerMember;
        private final SplitBrainMergePolicy mergePolicy;
        private final List<MergingItem>[] mergingItemsPerPartition;
        private final Map<Address, List<Integer>> memberPartitionsMap;
        private int mergedCount;

        MergingItemBiConsumer(String dataStructureName, SplitBrainMergePolicy mergePolicy, int batchSize) {
            this.dataStructureName = dataStructureName;
            this.batchSize = batchSize;
            this.mergePolicy = mergePolicy;
            this.memberPartitionsMap = AbstractMergeRunnable.this.partitionService.getMemberPartitionsMap();
            this.partitionCount = AbstractMergeRunnable.this.partitionService.getPartitionCount();
            this.addresses = new Address[this.partitionCount];
            this.counterPerMember = new MutableLong[this.partitionCount];
            this.mergingItemsPerPartition = new List[this.partitionCount];
            this.init();
        }

        private void init() {
            for (Map.Entry<Address, List<Integer>> addressListEntry : this.memberPartitionsMap.entrySet()) {
                MutableLong counter = new MutableLong();
                Address address = addressListEntry.getKey();
                for (int partitionId : addressListEntry.getValue()) {
                    this.counterPerMember[partitionId] = counter;
                    this.addresses[partitionId] = address;
                }
            }
        }

        @Override
        public void accept(Integer partitionId, MergingItem mergingItem) {
            List entries = this.mergingItemsPerPartition[partitionId];
            if (entries == null) {
                entries = new LinkedList();
                this.mergingItemsPerPartition[partitionId.intValue()] = entries;
            }
            entries.add(mergingItem);
            ++this.mergedCount;
            long currentSize = ++this.counterPerMember[partitionId.intValue()].value;
            if (currentSize % (long)this.batchSize == 0L) {
                List<Integer> partitions = this.memberPartitionsMap.get(this.addresses[partitionId]);
                this.sendBatch(this.dataStructureName, partitions, this.mergingItemsPerPartition, this.mergePolicy);
            }
        }

        private void consumeRemaining() {
            for (Map.Entry<Address, List<Integer>> entry : this.memberPartitionsMap.entrySet()) {
                this.sendBatch(this.dataStructureName, entry.getValue(), this.mergingItemsPerPartition, this.mergePolicy);
            }
        }

        private void sendBatch(String dataStructureName, List<Integer> memberPartitions, List<MergingItem>[] entriesPerPartition, SplitBrainMergePolicy mergePolicy) {
            int size = memberPartitions.size();
            int[] partitions = new int[size];
            int index = 0;
            for (Integer partitionId : memberPartitions) {
                if (entriesPerPartition[partitionId] == null) continue;
                partitions[index++] = partitionId;
            }
            if (index == 0) {
                return;
            }
            if (index < size) {
                partitions = Arrays.copyOf(partitions, index);
                size = index;
            }
            List[] entries = new List[size];
            index = 0;
            int totalSize = 0;
            for (int partitionId : partitions) {
                int batchSize = entriesPerPartition[partitionId].size();
                entries[index++] = entriesPerPartition[partitionId];
                totalSize += batchSize;
                entriesPerPartition[partitionId] = null;
            }
            if (totalSize == 0) {
                return;
            }
            this.sendMergingData(dataStructureName, mergePolicy, partitions, entries, totalSize);
        }

        private void sendMergingData(String dataStructureName, SplitBrainMergePolicy mergePolicy, int[] partitions, List<MergingItem>[] entries, int totalSize) {
            try {
                OperationFactory factory = AbstractMergeRunnable.this.createMergeOperationFactory(dataStructureName, mergePolicy, partitions, entries);
                AbstractMergeRunnable.this.operationService.invokeOnPartitions(AbstractMergeRunnable.this.serviceName, factory, partitions);
            }
            catch (Throwable t) {
                AbstractMergeRunnable.this.logger.warning("Error while running merge operation: " + t.getMessage());
                throw ExceptionUtil.rethrow(t);
            }
            finally {
                AbstractMergeRunnable.this.semaphore.release(totalSize);
            }
        }
    }
}

