/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.connector;

import com.hazelcast.cache.ICache;
import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.impl.clientside.HazelcastClientProxy;
import com.hazelcast.client.proxy.ClientMapProxy;
import com.hazelcast.client.spi.ClientPartitionService;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.core.IList;
import com.hazelcast.core.IMap;
import com.hazelcast.instance.HazelcastInstanceImpl;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.RestartableException;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.JetDataSerializerHook;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.function.BiConsumerEx;
import com.hazelcast.jet.function.BiFunctionEx;
import com.hazelcast.jet.function.BinaryOperatorEx;
import com.hazelcast.jet.function.ConsumerEx;
import com.hazelcast.jet.function.FunctionEx;
import com.hazelcast.jet.impl.connector.WriteBufferedP;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.map.EntryBackupProcessor;
import com.hazelcast.map.EntryProcessor;
import com.hazelcast.map.impl.proxy.MapProxyImpl;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.spi.partition.IPartitionService;
import com.hazelcast.spi.serialization.SerializationService;
import com.hazelcast.spi.serialization.SerializationServiceAware;
import com.hazelcast.util.MapUtil;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public final class HazelcastWriters {
    private HazelcastWriters() {
    }

    @Nonnull
    public static <T, K, V> ProcessorMetaSupplier mergeMapSupplier(@Nonnull String name, @Nullable ClientConfig clientConfig, @Nonnull FunctionEx<? super T, ? extends K> toKeyFn, @Nonnull FunctionEx<? super T, ? extends V> toValueFn, @Nonnull BinaryOperatorEx<V> mergeFn) {
        Util.checkSerializable(toKeyFn, "toKeyFn");
        Util.checkSerializable(toValueFn, "toValueFn");
        Util.checkSerializable(mergeFn, "mergeFn");
        return HazelcastWriters.updateMapSupplier(name, clientConfig, toKeyFn, (? super V oldValue, ? super T item) -> {
            Object newValue = toValueFn.apply(item);
            if (oldValue == null) {
                return newValue;
            }
            return mergeFn.apply((Object)oldValue, (Object)newValue);
        });
    }

    @Nonnull
    public static <T, K, V> ProcessorMetaSupplier updateMapSupplier(@Nonnull String mapName, @Nullable ClientConfig clientConfig, @Nonnull FunctionEx<? super T, ? extends K> toKeyFn, @Nonnull BiFunctionEx<? super V, ? super T, ? extends V> updateFn) {
        Util.checkSerializable(toKeyFn, "toKeyFn");
        Util.checkSerializable(updateFn, "updateFn");
        boolean isLocal = clientConfig == null;
        return ProcessorMetaSupplier.preferLocalParallelismOne(new HazelcastWriterSupplier<UpdateMapContext, Object>(Util.asXmlString(clientConfig), instance -> procContext -> new UpdateMapContext((HazelcastInstance)instance, mapName, toKeyFn, updateFn, isLocal), UpdateMapContext::add, instance -> UpdateMapContext::flush, UpdateMapContext::finish));
    }

    private static <K, V> ICompletableFuture<Map<K, V>> submitToKeys(IMap<K, V> map, Set<Data> keys, EntryProcessor<K, V> entryProcessor) {
        if (map instanceof MapProxyImpl) {
            return ((MapProxyImpl)map).submitToKeys(keys, entryProcessor);
        }
        if (map instanceof ClientMapProxy) {
            return ((ClientMapProxy)map).submitToKeys(keys, entryProcessor);
        }
        throw new RuntimeException("Unexpected map class: " + map.getClass().getName());
    }

    @Nonnull
    public static <T, K, V> ProcessorMetaSupplier updateMapSupplier(@Nonnull String name, @Nullable ClientConfig clientConfig, @Nonnull FunctionEx<? super T, ? extends K> toKeyFn, @Nonnull FunctionEx<? super T, ? extends EntryProcessor<K, V>> toEntryProcessorFn) {
        Util.checkSerializable(toKeyFn, "toKeyFn");
        Util.checkSerializable(toEntryProcessorFn, "toEntryProcessorFn");
        boolean isLocal = clientConfig == null;
        return ProcessorMetaSupplier.preferLocalParallelismOne(new EntryProcessorWriterSupplier(name, Util.asXmlString(clientConfig), toKeyFn, toEntryProcessorFn, isLocal));
    }

    @Nonnull
    public static ProcessorMetaSupplier writeMapSupplier(@Nonnull String name, @Nullable ClientConfig clientConfig) {
        boolean isLocal = clientConfig == null;
        return ProcessorMetaSupplier.preferLocalParallelismOne(new HazelcastWriterSupplier<ArrayMap, Map.Entry>(Util.asXmlString(clientConfig), instance -> procContext -> new ArrayMap(), ArrayMap::add, instance -> {
            IMap map = instance.getMap(name);
            return buffer -> {
                try {
                    map.putAll((Map)buffer);
                }
                catch (HazelcastInstanceNotActiveException e) {
                    throw HazelcastWriters.handleInstanceNotActive(e, isLocal);
                }
                buffer.clear();
            };
        }, ConsumerEx.noop()));
    }

    @Nonnull
    public static ProcessorMetaSupplier writeCacheSupplier(@Nonnull String name, @Nullable ClientConfig clientConfig) {
        boolean isLocal = clientConfig == null;
        return ProcessorMetaSupplier.preferLocalParallelismOne(new HazelcastWriterSupplier<ArrayMap, Map.Entry>(Util.asXmlString(clientConfig), instance -> procContext -> new ArrayMap(), ArrayMap::add, CacheFlush.flushToCache(name, isLocal), ConsumerEx.noop()));
    }

    @Nonnull
    public static ProcessorMetaSupplier writeListSupplier(@Nonnull String name, @Nullable ClientConfig clientConfig) {
        boolean isLocal = clientConfig == null;
        return ProcessorMetaSupplier.preferLocalParallelismOne(new HazelcastWriterSupplier<ArrayList, Object>(Util.asXmlString(clientConfig), instance -> procContext -> new ArrayList(), ArrayList::add, instance -> {
            IList list = instance.getList(name);
            return buffer -> {
                try {
                    list.addAll((Collection)buffer);
                }
                catch (HazelcastInstanceNotActiveException e) {
                    throw HazelcastWriters.handleInstanceNotActive(e, isLocal);
                }
                buffer.clear();
            };
        }, ConsumerEx.noop()));
    }

    private static RuntimeException handleInstanceNotActive(HazelcastInstanceNotActiveException e, boolean isLocal) {
        return isLocal ? new RestartableException(e) : e;
    }

    @SuppressFBWarnings(value={"SE_BAD_FIELD", "SE_NO_SERIALVERSIONID"}, justification="the class is never java-serialized")
    public static class ApplyFnEntryProcessor<K, V, T>
    implements EntryProcessor<K, V>,
    EntryBackupProcessor<K, V>,
    IdentifiedDataSerializable,
    SerializationServiceAware {
        private Map<Data, Object> keysToUpdate;
        private BiFunctionEx<? super V, ? super T, ? extends V> updateFn;
        private SerializationService serializationService;

        public ApplyFnEntryProcessor() {
        }

        ApplyFnEntryProcessor(Map<Data, Object> keysToUpdate, BiFunctionEx<? super V, ? super T, ? extends V> updateFn) {
            this.keysToUpdate = keysToUpdate;
            this.updateFn = updateFn;
        }

        public Object process(Map.Entry<K, V> entry) {
            Data keyData = this.serializationService.toData(entry.getKey());
            Object item = this.keysToUpdate.get(keyData);
            if (item == null && !this.keysToUpdate.containsKey(keyData)) {
                throw new JetException("The new item not found in the map - is equals/hashCode correctly implemented for the key? Key type: " + entry.getKey().getClass().getName());
            }
            if (item instanceof List) {
                for (Data o : (List)item) {
                    this.handle(entry, o);
                }
            } else {
                this.handle(entry, (Data)item);
            }
            return null;
        }

        private void handle(Map.Entry<K, V> entry, Data itemData) {
            Object item = this.serializationService.toObject((Object)itemData);
            V oldValue = entry.getValue();
            V newValue = this.updateFn.apply(oldValue, item);
            entry.setValue(newValue);
        }

        public EntryBackupProcessor<K, V> getBackupProcessor() {
            return this;
        }

        public void processBackup(Map.Entry<K, V> entry) {
            this.process(entry);
        }

        public void setSerializationService(SerializationService serializationService) {
            this.serializationService = serializationService;
        }

        public void writeData(ObjectDataOutput out) throws IOException {
            out.writeInt(this.keysToUpdate.size());
            for (Map.Entry<Data, Object> en : this.keysToUpdate.entrySet()) {
                out.writeData(en.getKey());
                Object value = en.getValue();
                if (value instanceof Data) {
                    out.writeInt(1);
                    out.writeData((Data)value);
                    continue;
                }
                if (value instanceof List) {
                    List list = (List)value;
                    out.writeInt(list.size());
                    for (Data data : list) {
                        out.writeData(data);
                    }
                    continue;
                }
                assert (false) : "Unknown value type: " + value.getClass();
            }
            out.writeObject(this.updateFn);
        }

        public void readData(ObjectDataInput in) throws IOException {
            int keysToUpdateSize = in.readInt();
            this.keysToUpdate = MapUtil.createHashMap((int)keysToUpdateSize);
            for (int i = 0; i < keysToUpdateSize; ++i) {
                Object value;
                Data key = in.readData();
                int size = in.readInt();
                if (size == 1) {
                    value = in.readData();
                } else {
                    ArrayList<Data> list = new ArrayList<Data>(size);
                    for (int j = 0; j < size; ++j) {
                        list.add(in.readData());
                    }
                    value = list;
                }
                this.keysToUpdate.put(key, value);
            }
            this.updateFn = (BiFunctionEx)in.readObject();
        }

        public int getFactoryId() {
            return JetDataSerializerHook.FACTORY_ID;
        }

        public int getId() {
            return 3;
        }

        public static Object append(Object value, Data item) {
            ArrayList<Data> list;
            if (value instanceof List) {
                list = (ArrayList<Data>)value;
            } else {
                list = new ArrayList<Data>();
                list.add((Data)value);
            }
            list.add(item);
            return list;
        }
    }

    private static class HazelcastWriterSupplier<B, T>
    implements ProcessorSupplier {
        static final long serialVersionUID = 1L;
        private final String clientXml;
        private final FunctionEx<HazelcastInstance, ConsumerEx<B>> instanceToFlushBufferFn;
        private final FunctionEx<HazelcastInstance, FunctionEx<Processor.Context, B>> instanceToNewBufferFn;
        private final BiConsumerEx<B, T> addToBufferFn;
        private final ConsumerEx<B> disposeBufferFn;
        private transient FunctionEx<Processor.Context, B> newBufferFn;
        private transient ConsumerEx<B> flushBufferFn;
        private transient HazelcastInstance client;

        HazelcastWriterSupplier(String clientXml, FunctionEx<HazelcastInstance, FunctionEx<Processor.Context, B>> instanceToNewBufferFn, BiConsumerEx<B, T> addToBufferFn, FunctionEx<HazelcastInstance, ConsumerEx<B>> instanceToFlushBufferFn, ConsumerEx<B> disposeBufferFn) {
            this.clientXml = clientXml;
            this.instanceToFlushBufferFn = instanceToFlushBufferFn;
            this.instanceToNewBufferFn = instanceToNewBufferFn;
            this.addToBufferFn = addToBufferFn;
            this.disposeBufferFn = disposeBufferFn;
        }

        @Override
        public void init(@Nonnull ProcessorSupplier.Context context) {
            HazelcastInstance instance = this.isRemote() ? (this.client = HazelcastClient.newHazelcastClient((ClientConfig)Util.asClientConfig(this.clientXml))) : context.jetInstance().getHazelcastInstance();
            this.flushBufferFn = this.instanceToFlushBufferFn.apply(instance);
            this.newBufferFn = this.instanceToNewBufferFn.apply(instance);
        }

        @Override
        public void close(Throwable error) {
            if (this.client != null) {
                this.client.shutdown();
            }
        }

        private boolean isRemote() {
            return this.clientXml != null;
        }

        @Nonnull
        public List<Processor> get(int count) {
            return Stream.generate(() -> new WriteBufferedP<B, T>(this.newBufferFn, this.addToBufferFn, this.flushBufferFn, this.disposeBufferFn)).limit(count).collect(Collectors.toList());
        }
    }

    private static final class EntryProcessorWriterSupplier<T, K, V>
    implements ProcessorSupplier {
        static final long serialVersionUID = 1L;
        private final String name;
        private final String clientXml;
        private final FunctionEx<? super T, ? extends K> toKeyFn;
        private final FunctionEx<? super T, ? extends EntryProcessor<K, V>> toEntryProcessorFn;
        private final boolean isLocal;
        private transient HazelcastInstance client;
        private transient HazelcastInstance instance;

        private EntryProcessorWriterSupplier(@Nonnull String name, @Nullable String clientXml, @Nonnull FunctionEx<? super T, ? extends K> toKeyFn, @Nonnull FunctionEx<? super T, ? extends EntryProcessor<K, V>> toEntryProcessorFn, boolean isLocal) {
            this.name = name;
            this.clientXml = clientXml;
            this.toKeyFn = toKeyFn;
            this.toEntryProcessorFn = toEntryProcessorFn;
            this.isLocal = isLocal;
        }

        @Override
        public void init(@Nonnull ProcessorSupplier.Context context) {
            this.instance = this.clientXml != null ? (this.client = HazelcastClient.newHazelcastClient((ClientConfig)Util.asClientConfig(this.clientXml))) : context.jetInstance().getHazelcastInstance();
        }

        @Override
        public void close(Throwable error) {
            if (this.client != null) {
                this.client.shutdown();
            }
        }

        @Nonnull
        public List<Processor> get(int count) {
            return Stream.generate(() -> new EntryProcessorWriter(this.instance, this.name, this.toKeyFn, this.toEntryProcessorFn, this.isLocal)).limit(count).collect(Collectors.toList());
        }
    }

    private static final class EntryProcessorWriter<T, K, V>
    extends AbstractProcessor {
        private static final int MAX_PARALLEL_ASYNC_OPS = 1000;
        private final AtomicInteger numConcurrentOps = new AtomicInteger();
        private final boolean isLocal;
        private final IMap<? super K, ? extends V> map;
        private final FunctionEx<? super T, ? extends K> toKeyFn;
        private final FunctionEx<? super T, ? extends EntryProcessor<K, V>> toEntryProcessorFn;
        private final AtomicReference<Throwable> lastError = new AtomicReference();
        private final ExecutionCallback callback = Util.callbackOf(response -> this.numConcurrentOps.decrementAndGet(), exception -> {
            this.numConcurrentOps.decrementAndGet();
            if (exception != null) {
                this.lastError.compareAndSet((Throwable)null, (Throwable)exception);
            }
        });

        private EntryProcessorWriter(@Nonnull HazelcastInstance instance, @Nonnull String name, @Nonnull FunctionEx<? super T, ? extends K> toKeyFn, @Nonnull FunctionEx<? super T, ? extends EntryProcessor<K, V>> toEntryProcessorFn, boolean isLocal) {
            this.map = instance.getMap(name);
            this.toKeyFn = toKeyFn;
            this.toEntryProcessorFn = toEntryProcessorFn;
            this.isLocal = isLocal;
        }

        @Override
        public boolean isCooperative() {
            return false;
        }

        @Override
        public boolean tryProcess() {
            this.checkError();
            return true;
        }

        @Override
        protected boolean tryProcess(int ordinal, @Nonnull Object object) {
            this.checkError();
            if (!Util.tryIncrement(this.numConcurrentOps, 1, 1000)) {
                return false;
            }
            try {
                Object item = object;
                EntryProcessor<K, V> entryProcessor = this.toEntryProcessorFn.apply(item);
                K key = this.toKeyFn.apply(item);
                this.map.submitToKey(key, entryProcessor, this.callback);
                return true;
            }
            catch (HazelcastInstanceNotActiveException e) {
                throw HazelcastWriters.handleInstanceNotActive(e, this.isLocal);
            }
        }

        @Override
        public boolean complete() {
            return this.ensureAllWritten();
        }

        @Override
        public boolean saveToSnapshot() {
            return this.ensureAllWritten();
        }

        private boolean ensureAllWritten() {
            boolean allWritten = this.numConcurrentOps.get() == 0;
            this.checkError();
            return allWritten;
        }

        private void checkError() {
            Throwable t = this.lastError.get();
            if (t != null) {
                throw ExceptionUtil.sneakyThrow(t);
            }
        }
    }

    private static final class ArrayMap
    extends AbstractMap<Object, Object> {
        private final List<Map.Entry<Object, Object>> entries;
        private final ArraySet set = new ArraySet();

        ArrayMap() {
            this.entries = new ArrayList<Map.Entry<Object, Object>>();
        }

        @Override
        @Nonnull
        public Set<Map.Entry<Object, Object>> entrySet() {
            return this.set;
        }

        public void add(Map.Entry entry) {
            this.entries.add(entry);
        }

        @Override
        public String toString() {
            return this.entries.toString();
        }

        private class ArraySet
        extends AbstractSet<Map.Entry<Object, Object>> {
            private ArraySet() {
            }

            @Override
            @Nonnull
            public Iterator<Map.Entry<Object, Object>> iterator() {
                return ArrayMap.this.entries.iterator();
            }

            @Override
            public int size() {
                return ArrayMap.this.entries.size();
            }
        }
    }

    private static class CacheFlush {
        private CacheFlush() {
        }

        static FunctionEx<HazelcastInstance, ConsumerEx<ArrayMap>> flushToCache(String name, boolean isLocal) {
            return instance -> {
                ICache cache = instance.getCacheManager().getCache(name);
                return buffer -> {
                    try {
                        cache.putAll((Map)buffer);
                    }
                    catch (HazelcastInstanceNotActiveException e) {
                        throw HazelcastWriters.handleInstanceNotActive(e, isLocal);
                    }
                    buffer.clear();
                };
            };
        }
    }

    private static final class UpdateMapContext<K, V, T> {
        private static final int MAX_PARALLEL_ASYNC_OPS = 1000;
        private final FunctionEx<? super T, ? extends K> toKeyFn;
        private final BiFunctionEx<? super V, ? super T, ? extends V> updateFn;
        private final boolean isLocal;
        private final IPartitionService memberPartitionService;
        private final ClientPartitionService clientPartitionService;
        private final SerializationService serializationService;
        private final Semaphore concurrentAsyncOpsSemaphore = new Semaphore(1000);
        private final AtomicReference<Throwable> firstError = new AtomicReference();
        private final IMap<K, V> map;
        private final Map<Data, Object>[] tmpMaps;

        UpdateMapContext(HazelcastInstance instance, String mapName, @Nonnull FunctionEx<? super T, ? extends K> toKeyFn, @Nonnull BiFunctionEx<? super V, ? super T, ? extends V> updateFn, boolean isLocal) {
            int partitionCount;
            this.toKeyFn = toKeyFn;
            this.updateFn = updateFn;
            this.isLocal = isLocal;
            this.map = instance.getMap(mapName);
            if (isLocal) {
                HazelcastInstanceImpl castedInstance = (HazelcastInstanceImpl)instance;
                this.clientPartitionService = null;
                this.memberPartitionService = castedInstance.node.nodeEngine.getPartitionService();
                this.serializationService = castedInstance.getSerializationService();
                partitionCount = this.memberPartitionService.getPartitionCount();
            } else {
                HazelcastClientProxy clientProxy = (HazelcastClientProxy)instance;
                this.clientPartitionService = clientProxy.client.getClientPartitionService();
                this.memberPartitionService = null;
                this.serializationService = clientProxy.getSerializationService();
                partitionCount = this.clientPartitionService.getPartitionCount();
            }
            this.tmpMaps = new Map[partitionCount];
            for (int i = 0; i < partitionCount; ++i) {
                this.tmpMaps[i] = new HashMap<Data, Object>();
            }
        }

        void add(T item) {
            int partitionId;
            Data keyData;
            K key = this.toKeyFn.apply(item);
            if (this.isLocal) {
                keyData = this.serializationService.toData(key, ((MapProxyImpl)this.map).getPartitionStrategy());
                partitionId = this.memberPartitionService.getPartitionId(keyData);
            } else {
                keyData = this.serializationService.toData(key);
                partitionId = this.clientPartitionService.getPartitionId(keyData);
            }
            Data itemData = this.serializationService.toData(item);
            this.tmpMaps[partitionId].merge(keyData, itemData, (o, n) -> ApplyFnEntryProcessor.append(o, (Data)n));
        }

        void flush() {
            try {
                if (this.firstError.get() != null) {
                    if (this.firstError.get() instanceof HazelcastInstanceNotActiveException) {
                        throw HazelcastWriters.handleInstanceNotActive((HazelcastInstanceNotActiveException)this.firstError.get(), this.isLocal);
                    }
                    throw ExceptionUtil.sneakyThrow(this.firstError.get());
                }
                for (int partitionId = 0; partitionId < this.tmpMaps.length; ++partitionId) {
                    if (this.tmpMaps[partitionId].isEmpty()) continue;
                    ApplyFnEntryProcessor entryProcessor = new ApplyFnEntryProcessor(this.tmpMaps[partitionId], this.updateFn);
                    try {
                        this.concurrentAsyncOpsSemaphore.acquire();
                    }
                    catch (InterruptedException e) {
                        return;
                    }
                    HazelcastWriters.submitToKeys(this.map, this.tmpMaps[partitionId].keySet(), entryProcessor).andThen(Util.callbackOf(r -> this.concurrentAsyncOpsSemaphore.release(), t -> {
                        this.firstError.compareAndSet((Throwable)null, (Throwable)t);
                        this.concurrentAsyncOpsSemaphore.release();
                    }));
                    this.tmpMaps[partitionId] = new HashMap<Data, Object>();
                }
            }
            catch (HazelcastInstanceNotActiveException e) {
                throw HazelcastWriters.handleInstanceNotActive(e, this.isLocal);
            }
        }

        public void finish() {
            try {
                this.concurrentAsyncOpsSemaphore.acquire(1000);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }
}

