/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.connector;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.IMap;
import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.impl.connector.AbstractHazelcastConnectorSupplier;
import com.hazelcast.jet.impl.connector.HazelcastWriters;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.ImdgUtil;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import javax.annotation.Nonnull;

public final class WriteMapP<K, V>
implements Processor {
    private final AtomicBoolean pendingOp = new AtomicBoolean();
    private final AtomicReference<Throwable> firstFailure = new AtomicReference();
    private final HazelcastInstance instance;
    private final String mapName;
    private final boolean isLocal;
    private final HazelcastWriters.ArrayMap<K, V> buffer = new HazelcastWriters.ArrayMap(1024);
    private final BiConsumer<Object, Throwable> callback = (r, t) -> {
        if (t != null) {
            this.firstFailure.compareAndSet((Throwable)null, (Throwable)t);
        }
        this.buffer.clear();
        this.pendingOp.set(false);
    };
    private IMap<K, V> map;

    private WriteMapP(HazelcastInstance instance, String mapName) {
        this.instance = instance;
        this.mapName = mapName;
        this.isLocal = ImdgUtil.isMemberInstance(instance);
    }

    @Override
    public void init(@Nonnull Outbox outbox, @Nonnull Processor.Context context) {
        this.map = this.instance.getMap(this.mapName);
    }

    @Override
    public boolean tryProcess() {
        this.checkFailure();
        return true;
    }

    @Override
    public void process(int ordinal, @Nonnull Inbox inbox) {
        this.checkFailure();
        if (this.pendingOp.compareAndSet(false, true)) {
            inbox.drain(this.buffer::add);
            ImdgUtil.mapPutAllAsync(this.map, this.buffer).whenComplete(this.callback);
        }
    }

    @Override
    public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
        return true;
    }

    @Override
    public boolean saveToSnapshot() {
        return this.ensureAllSuccessfullyWritten();
    }

    @Override
    public boolean complete() {
        return this.ensureAllSuccessfullyWritten();
    }

    private void checkFailure() {
        Throwable failure = this.firstFailure.get();
        if (failure != null) {
            if (failure instanceof HazelcastInstanceNotActiveException) {
                failure = HazelcastWriters.handleInstanceNotActive((HazelcastInstanceNotActiveException)failure, this.isLocal);
            }
            throw ExceptionUtil.sneakyThrow(failure);
        }
    }

    private boolean ensureAllSuccessfullyWritten() {
        try {
            boolean bl = !this.pendingOp.get();
            return bl;
        }
        finally {
            this.checkFailure();
        }
    }

    public static class Supplier<K, V>
    extends AbstractHazelcastConnectorSupplier {
        private static final long serialVersionUID = 1L;
        private final String mapName;

        public Supplier(String clientXml, String mapName) {
            super(clientXml);
            this.mapName = mapName;
        }

        @Override
        protected Processor createProcessor(HazelcastInstance instance) {
            return new WriteMapP(instance, this.mapName);
        }
    }
}

