/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.connector;

import com.hazelcast.client.impl.clientside.HazelcastClientProxy;
import com.hazelcast.client.impl.proxy.ClientMapProxy;
import com.hazelcast.client.impl.spi.ClientPartitionService;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import com.hazelcast.internal.nio.IOUtil;
import com.hazelcast.internal.partition.IPartitionService;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.serialization.SerializationService;
import com.hazelcast.internal.serialization.SerializationServiceAware;
import com.hazelcast.internal.util.MapUtil;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.JetDataSerializerHook;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.impl.connector.AbstractHazelcastConnectorSupplier;
import com.hazelcast.jet.impl.connector.AsyncHazelcastWriterP;
import com.hazelcast.map.EntryProcessor;
import com.hazelcast.map.IMap;
import com.hazelcast.map.impl.proxy.MapProxyImpl;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.spi.impl.InternalCompletableFuture;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public final class UpdateMapP<T, K, V, R>
extends AsyncHazelcastWriterP {
    private static final int PENDING_ITEM_COUNT_LIMIT = 1024;
    private final String mapName;
    private final FunctionEx<? super T, ? extends K> toKeyFn;
    private final BiFunctionEx<? super V, ? super T, ? extends V> updateFn;
    private final Consumer<T> addToBuffer = this::addToBuffer;
    private final BiFunction<Object, Object, Object> remappingFunction = (o, n) -> ApplyFnEntryProcessor.append(o, (Data)n);
    private IPartitionService memberPartitionService;
    private ClientPartitionService clientPartitionService;
    private SerializationService serializationService;
    private IMap<K, V> map;
    private Map<Data, Object>[] tmpMaps;
    private int[] tmpCounts;
    private int pendingItemCount;
    private int currentPartitionId;

    UpdateMapP(HazelcastInstance instance, int maxParallelAsyncOps, String mapName, @Nonnull FunctionEx<? super T, ? extends K> toKeyFn, @Nonnull BiFunctionEx<? super V, ? super T, ? extends V> updateFn) {
        super(instance, maxParallelAsyncOps);
        this.mapName = mapName;
        this.toKeyFn = toKeyFn;
        this.updateFn = updateFn;
    }

    @Override
    public void init(@Nonnull Outbox outbox, @Nonnull Processor.Context context) {
        int partitionCount;
        this.map = this.instance().getMap(this.mapName);
        if (this.isLocal()) {
            HazelcastInstanceImpl castInstance = (HazelcastInstanceImpl)this.instance();
            this.clientPartitionService = null;
            this.memberPartitionService = castInstance.node.nodeEngine.getPartitionService();
            this.serializationService = castInstance.getSerializationService();
            partitionCount = this.memberPartitionService.getPartitionCount();
        } else {
            HazelcastClientProxy clientProxy = (HazelcastClientProxy)this.instance();
            this.clientPartitionService = clientProxy.client.getClientPartitionService();
            this.memberPartitionService = null;
            this.serializationService = clientProxy.getSerializationService();
            partitionCount = this.clientPartitionService.getPartitionCount();
        }
        this.tmpMaps = new Map[partitionCount];
        this.tmpCounts = new int[partitionCount];
        for (int i = 0; i < partitionCount; ++i) {
            this.tmpMaps[i] = new HashMap<Data, Object>();
        }
    }

    @Override
    protected void processInternal(Inbox inbox) {
        if (this.pendingItemCount < 1024) {
            this.pendingItemCount += inbox.size();
            inbox.drain(this.addToBuffer);
        }
        this.submitPending();
    }

    @Override
    protected boolean flushInternal() {
        return this.submitPending();
    }

    private boolean submitPending() {
        if (this.pendingItemCount == 0) {
            return true;
        }
        for (int i = 0; i < this.tmpMaps.length; ++i) {
            if (!this.tmpMaps[this.currentPartitionId].isEmpty()) {
                if (!this.tryAcquirePermit()) {
                    return false;
                }
                Map<Data, Object> buffer = this.tmpMaps[this.currentPartitionId];
                ApplyFnEntryProcessor entryProcessor = new ApplyFnEntryProcessor(buffer, this.updateFn);
                this.setCallback(UpdateMapP.submitToKeys(this.map, buffer.keySet(), entryProcessor));
                this.pendingItemCount -= this.tmpCounts[this.currentPartitionId];
                this.tmpCounts[this.currentPartitionId] = 0;
                this.tmpMaps[this.currentPartitionId] = new HashMap<Data, Object>();
            }
            this.currentPartitionId = UpdateMapP.incrCircular(this.currentPartitionId, this.tmpMaps.length);
        }
        if (this.currentPartitionId == this.tmpMaps.length) {
            this.currentPartitionId = 0;
        }
        assert (this.pendingItemCount == 0) : "pending item count should be 0, but was " + this.pendingItemCount;
        return true;
    }

    private void addToBuffer(T item) {
        int partitionId;
        Object keyData;
        K key = this.toKeyFn.apply(item);
        if (this.isLocal()) {
            keyData = this.serializationService.toData(key, ((MapProxyImpl)this.map).getPartitionStrategy());
            partitionId = this.memberPartitionService.getPartitionId((Data)keyData);
        } else {
            keyData = this.serializationService.toData(key);
            partitionId = this.clientPartitionService.getPartitionId((Data)keyData);
        }
        Object itemData = this.serializationService.toData(item);
        this.tmpMaps[partitionId].merge((Data)keyData, itemData, this.remappingFunction);
        int n = partitionId;
        this.tmpCounts[n] = this.tmpCounts[n] + 1;
    }

    private static <K, V, R> InternalCompletableFuture<Map<K, V>> submitToKeys(IMap<K, V> map, Set<Data> keys, EntryProcessor<K, V, R> entryProcessor) {
        if (map instanceof MapProxyImpl) {
            return ((MapProxyImpl)map).submitToKeys(keys, entryProcessor);
        }
        if (map instanceof ClientMapProxy) {
            return ((ClientMapProxy)map).submitToKeys(keys, entryProcessor);
        }
        throw new RuntimeException("Unexpected map class: " + map.getClass().getName());
    }

    @CheckReturnValue
    private static int incrCircular(int v, int limit) {
        if (++v == limit) {
            v = 0;
        }
        return v;
    }

    @SuppressFBWarnings(value={"SE_BAD_FIELD", "SE_NO_SERIALVERSIONID"}, justification="the class is never java-serialized")
    public static class ApplyFnEntryProcessor<K, V, T, R>
    implements EntryProcessor<K, V, R>,
    IdentifiedDataSerializable,
    SerializationServiceAware {
        private Map<Data, Object> keysToUpdate;
        private BiFunctionEx<? super V, ? super T, ? extends V> updateFn;
        private SerializationService serializationService;

        public ApplyFnEntryProcessor() {
        }

        ApplyFnEntryProcessor(Map<Data, Object> keysToUpdate, BiFunctionEx<? super V, ? super T, ? extends V> updateFn) {
            this.keysToUpdate = keysToUpdate;
            this.updateFn = updateFn;
        }

        @Override
        public R process(Map.Entry<K, V> entry) {
            Object keyData = this.serializationService.toData(entry.getKey());
            Object item = this.keysToUpdate.get(keyData);
            if (item == null && !this.keysToUpdate.containsKey(keyData)) {
                throw new JetException("A key not found in the map - is equals/hashCode correctly implemented for the key? Key type: " + entry.getKey().getClass().getName());
            }
            if (item instanceof List) {
                List castList = (List)item;
                for (Data o : castList) {
                    this.handle(entry, o);
                }
            } else {
                this.handle(entry, (Data)item);
            }
            return null;
        }

        private void handle(Map.Entry<K, V> entry, Data itemData) {
            Object item = this.serializationService.toObject(itemData);
            V oldValue = entry.getValue();
            V newValue = this.updateFn.apply(oldValue, item);
            entry.setValue(newValue);
        }

        @Override
        public void setSerializationService(SerializationService serializationService) {
            this.serializationService = serializationService;
        }

        @Override
        public void writeData(ObjectDataOutput out) throws IOException {
            out.writeInt(this.keysToUpdate.size());
            for (Map.Entry<Data, Object> en : this.keysToUpdate.entrySet()) {
                IOUtil.writeData(out, en.getKey());
                Object value = en.getValue();
                if (value instanceof Data) {
                    out.writeInt(1);
                    IOUtil.writeData(out, (Data)value);
                    continue;
                }
                if (value instanceof List) {
                    List list = (List)value;
                    out.writeInt(list.size());
                    for (Data data : list) {
                        IOUtil.writeData(out, data);
                    }
                    continue;
                }
                assert (false) : "Unknown value type: " + value.getClass();
            }
            out.writeObject(this.updateFn);
        }

        @Override
        public void readData(ObjectDataInput in) throws IOException {
            int keysToUpdateSize = in.readInt();
            this.keysToUpdate = MapUtil.createHashMap(keysToUpdateSize);
            for (int i = 0; i < keysToUpdateSize; ++i) {
                Object value;
                Data key = IOUtil.readData(in);
                int size = in.readInt();
                if (size == 1) {
                    value = IOUtil.readData(in);
                } else {
                    ArrayList<Data> list = new ArrayList<Data>(size);
                    for (int j = 0; j < size; ++j) {
                        list.add(IOUtil.readData(in));
                    }
                    value = list;
                }
                this.keysToUpdate.put(key, value);
            }
            this.updateFn = (BiFunctionEx)in.readObject();
        }

        @Override
        public int getFactoryId() {
            return JetDataSerializerHook.FACTORY_ID;
        }

        @Override
        public int getClassId() {
            return 3;
        }

        static Object append(Object value, Data item) {
            ArrayList<Data> list;
            if (value instanceof List) {
                list = (ArrayList<Data>)value;
            } else {
                list = new ArrayList<Data>();
                list.add((Data)value);
            }
            list.add(item);
            return list;
        }
    }

    static class Supplier<T, K, V>
    extends AbstractHazelcastConnectorSupplier {
        static final long serialVersionUID = 1L;
        private String name;
        private final FunctionEx<? super T, ? extends K> toKeyFn;
        private final BiFunctionEx<? super V, ? super T, ? extends V> updateFn;

        Supplier(@Nullable String clientXml, @Nonnull String name, @Nonnull FunctionEx<? super T, ? extends K> toKeyFn, @Nonnull BiFunctionEx<? super V, ? super T, ? extends V> updateFn) {
            super(clientXml);
            this.name = name;
            this.toKeyFn = toKeyFn;
            this.updateFn = updateFn;
        }

        @Override
        protected Processor createProcessor(HazelcastInstance instance) {
            return new UpdateMapP(instance, 1000, this.name, this.toKeyFn, this.updateFn);
        }
    }
}

