/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.connector;

import com.hazelcast.connector.ArrayMap;
import com.hazelcast.connector.Hz3Util;
import com.hazelcast.connector.map.AsyncMap;
import com.hazelcast.connector.map.Hz3MapAdapter;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

final class WriteMapP<T, K, V>
implements Processor {
    private final AtomicBoolean pendingOp = new AtomicBoolean();
    private final AtomicReference<Throwable> firstFailure = new AtomicReference();
    private final String mapName;
    @Nonnull
    private final FunctionEx<? super T, ? extends K> toKeyFn;
    @Nonnull
    private final FunctionEx<? super T, ? extends V> toValueFn;
    private final ArrayMap<Object, Object> buffer = new ArrayMap(1024);
    private final BiConsumer<Object, Throwable> callback = (r, t) -> {
        if (t != null) {
            this.firstFailure.compareAndSet((Throwable)null, (Throwable)t);
        }
        this.buffer.clear();
        this.pendingOp.set(false);
    };
    private Hz3MapAdapter hz3MapAdapter;
    private AsyncMap<Object, Object> map;
    private Consumer<T> addToBuffer;

    private WriteMapP(@Nonnull Hz3MapAdapter hz3MapAdapter, @Nonnull String mapName, @Nonnull FunctionEx<? super T, ? extends K> toKeyFn, @Nonnull FunctionEx<? super T, ? extends V> toValueFn) {
        this.hz3MapAdapter = hz3MapAdapter;
        this.mapName = mapName;
        this.toKeyFn = toKeyFn;
        this.toValueFn = toValueFn;
    }

    public void init(@Nonnull Outbox outbox, @Nonnull Processor.Context context) {
        this.map = this.hz3MapAdapter.getMap(this.mapName);
        InternalSerializationService serializationService = ((HazelcastInstanceImpl)context.hazelcastInstance()).node.getCompatibilitySerializationService();
        this.addToBuffer = item -> {
            Data key = serializationService.toData(this.key(item));
            Data value = serializationService.toData(this.value(item));
            Object hz3Key = this.hz3MapAdapter.toHz3Data(key.toByteArray());
            Object hz3Value = this.hz3MapAdapter.toHz3Data(value.toByteArray());
            this.buffer.add(new AbstractMap.SimpleEntry<Object, Object>(hz3Key, hz3Value));
        };
    }

    private K key(T item) {
        return (K)this.toKeyFn.apply(item);
    }

    private V value(T item) {
        return (V)this.toValueFn.apply(item);
    }

    public boolean tryProcess() {
        this.checkFailure();
        return true;
    }

    public void process(int ordinal, @Nonnull Inbox inbox) {
        this.checkFailure();
        if (this.pendingOp.compareAndSet(false, true)) {
            inbox.drain(this.addToBuffer);
            this.map.putAllAsync(this.buffer).whenComplete(this.callback);
        }
    }

    public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
        return true;
    }

    public boolean saveToSnapshot() {
        return this.ensureAllSuccessfullyWritten();
    }

    public boolean complete() {
        return this.ensureAllSuccessfullyWritten();
    }

    private void checkFailure() {
        Throwable failure = this.firstFailure.get();
        if (failure != null) {
            throw ExceptionUtil.sneakyThrow((Throwable)failure);
        }
    }

    private boolean ensureAllSuccessfullyWritten() {
        try {
            boolean bl = !this.pendingOp.get();
            return bl;
        }
        finally {
            this.checkFailure();
        }
    }

    public static class Supplier<T, K, V>
    implements ProcessorSupplier {
        private static final long serialVersionUID = 1L;
        private static final int MAX_PARALLELISM = 16;
        private final String clientXml;
        private final String mapName;
        private final FunctionEx<? super T, ? extends K> toKeyFn;
        private final FunctionEx<? super T, ? extends V> toValueFn;
        private int maxParallelAsyncOps;
        private transient Hz3MapAdapter hz3MapAdapter;

        Supplier(@Nonnull String clientXml, @Nonnull String mapName, @Nonnull FunctionEx<? super T, ? extends K> toKeyFn, @Nonnull FunctionEx<? super T, ? extends V> toValueFn) {
            this.clientXml = clientXml;
            this.mapName = mapName;
            this.toKeyFn = toKeyFn;
            this.toValueFn = toValueFn;
        }

        public void init(@Nonnull ProcessorSupplier.Context context) throws Exception {
            this.hz3MapAdapter = Hz3Util.createMapAdapter(this.clientXml);
            this.maxParallelAsyncOps = Integer.max(1, 16 / context.localParallelism());
        }

        @Nonnull
        public Collection<? extends Processor> get(int count) {
            return Stream.generate(() -> new WriteMapP(Hz3Util.createMapAdapter(this.clientXml), this.mapName, this.toKeyFn, this.toValueFn)).limit(count).collect(Collectors.toList());
        }

        public void close(@Nullable Throwable error) throws Exception {
            this.hz3MapAdapter.shutdown();
        }
    }
}

