/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.connector;

import com.hazelcast.cache.journal.EventJournalCacheEvent;
import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.impl.HazelcastClientProxy;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.core.Partition;
import com.hazelcast.internal.journal.EventJournalInitialSubscriberState;
import com.hazelcast.internal.journal.EventJournalReader;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.WatermarkGenerationParams;
import com.hazelcast.jet.core.WatermarkSourceUtil;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedPredicate;
import com.hazelcast.jet.impl.connector.SerializableClientConfig;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.pipeline.JournalInitialPosition;
import com.hazelcast.map.journal.EventJournalMapEvent;
import com.hazelcast.nio.Address;
import com.hazelcast.projection.Projection;
import com.hazelcast.ringbuffer.ReadResultSet;
import com.hazelcast.util.function.Predicate;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public final class StreamEventJournalP<E, T>
extends AbstractProcessor {
    private static final int MAX_FETCH_SIZE = 128;
    @Nonnull
    private final EventJournalReader<? extends E> eventJournalReader;
    @Nonnull
    private final Predicate<? super E> predicate;
    @Nonnull
    private final Projection<? super E, ? extends T> projection;
    @Nonnull
    private final JournalInitialPosition initialPos;
    @Nonnull
    private final int[] partitionIds;
    @Nonnull
    private final WatermarkSourceUtil<T> watermarkSourceUtil;
    private final boolean isRemoteReader;
    @Nonnull
    private final long[] emitOffsets;
    @Nonnull
    private final long[] readOffsets;
    private ICompletableFuture<ReadResultSet<T>>[] readFutures;
    @Nullable
    private ReadResultSet<T> resultSet;
    private int currentPartitionIndex = -1;
    private int resultSetPosition;
    private Traverser<Map.Entry<BroadcastKey<Integer>, long[]>> snapshotTraverser;
    private Traverser<Object> traverser = Traversers.empty();

    StreamEventJournalP(@Nonnull EventJournalReader<? extends E> eventJournalReader, @Nonnull List<Integer> assignedPartitions, @Nonnull DistributedPredicate<? super E> predicateFn, @Nonnull DistributedFunction<? super E, ? extends T> projectionFn, @Nonnull JournalInitialPosition initialPos, boolean isRemoteReader, @Nonnull WatermarkGenerationParams<? super T> wmGenParams) {
        this.eventJournalReader = eventJournalReader;
        this.predicate = predicateFn::test;
        this.projection = StreamEventJournalP.toProjection(projectionFn);
        this.initialPos = initialPos;
        this.isRemoteReader = isRemoteReader;
        this.partitionIds = assignedPartitions.stream().mapToInt(Integer::intValue).toArray();
        this.emitOffsets = new long[this.partitionIds.length];
        this.readOffsets = new long[this.partitionIds.length];
        this.watermarkSourceUtil = new WatermarkSourceUtil<T>(wmGenParams);
        this.watermarkSourceUtil.increasePartitionCount(1);
    }

    @Override
    protected void init(@Nonnull Processor.Context context) throws Exception {
        ICompletableFuture[] futures = new ICompletableFuture[this.partitionIds.length];
        Arrays.setAll(futures, i -> this.eventJournalReader.subscribeToEventJournal(this.partitionIds[i]));
        for (int i2 = 0; i2 < futures.length; ++i2) {
            this.emitOffsets[i2] = this.readOffsets[i2] = this.getSequence((EventJournalInitialSubscriberState)futures[i2].get());
        }
    }

    @Override
    public boolean complete() {
        if (this.readFutures == null) {
            this.initialRead();
        }
        if (!this.emitFromTraverser(this.traverser)) {
            return false;
        }
        do {
            this.tryGetNextResultSet();
            if (this.resultSet == null) break;
            this.emitResultSet();
        } while (this.resultSet == null);
        return false;
    }

    private void emitResultSet() {
        assert (this.resultSet != null) : "null resultSet";
        while (this.resultSetPosition < this.resultSet.size()) {
            Object event = this.resultSet.get(this.resultSetPosition);
            this.emitOffsets[this.currentPartitionIndex] = this.resultSet.getSequence(this.resultSetPosition) + 1L;
            ++this.resultSetPosition;
            if (event == null) continue;
            this.traverser = this.watermarkSourceUtil.handleEvent(event, 0);
            if (this.emitFromTraverser(this.traverser)) continue;
            return;
        }
        this.resultSetPosition = 0;
        this.resultSet = null;
    }

    @Override
    public boolean saveToSnapshot() {
        boolean done;
        if (!this.emitFromTraverser(this.traverser)) {
            return false;
        }
        if (this.snapshotTraverser == null) {
            this.snapshotTraverser = Traversers.traverseStream(IntStream.range(0, this.partitionIds.length).mapToObj(pIdx -> Util.entry(BroadcastKey.broadcastKey(this.partitionIds[pIdx]), new long[]{this.emitOffsets[pIdx], this.watermarkSourceUtil.getWatermark(0)})));
        }
        if (done = this.emitFromTraverserToSnapshot(this.snapshotTraverser)) {
            LoggingUtil.logFinest(this.getLogger(), "Saved snapshot. partitions=%s, offsets=%s", Arrays.toString(this.partitionIds), Arrays.toString(this.emitOffsets));
            this.snapshotTraverser = null;
        }
        return done;
    }

    @Override
    protected void restoreFromSnapshot(@Nonnull Object key, @Nonnull Object value) {
        int partitionId = (Integer)((BroadcastKey)key).key();
        int partitionIndex = com.hazelcast.jet.impl.util.Util.arrayIndexOf(partitionId, this.partitionIds);
        long offset = ((long[])value)[0];
        long wm = ((long[])value)[1];
        if (partitionIndex >= 0) {
            this.readOffsets[partitionIndex] = offset;
            this.emitOffsets[partitionIndex] = offset;
            this.watermarkSourceUtil.restoreWatermark(0, wm);
        }
    }

    @Override
    public boolean finishSnapshotRestore() {
        LoggingUtil.logFinest(this.getLogger(), "Restored snapshot. partitions=%s, offsets=%s", Arrays.toString(this.partitionIds), Arrays.toString(this.readOffsets));
        return true;
    }

    @Override
    public boolean isCooperative() {
        return false;
    }

    private void initialRead() {
        this.readFutures = new ICompletableFuture[this.partitionIds.length];
        for (int i = 0; i < this.readFutures.length; ++i) {
            this.readFutures[i] = this.readFromJournal(this.partitionIds[i], this.readOffsets[i]);
        }
    }

    private long getSequence(EventJournalInitialSubscriberState state) {
        return this.initialPos == JournalInitialPosition.START_FROM_CURRENT ? state.getNewestSequence() + 1L : state.getOldestSequence();
    }

    private void tryGetNextResultSet() {
        while (this.resultSet == null && ++this.currentPartitionIndex < this.partitionIds.length) {
            ICompletableFuture<ReadResultSet<T>> future = this.readFutures[this.currentPartitionIndex];
            if (!future.isDone()) continue;
            this.resultSet = this.toResultSet(future);
            int partitionId = this.partitionIds[this.currentPartitionIndex];
            if (this.resultSet != null) {
                assert (this.resultSet.size() > 0) : "empty resultSet";
                long prevSequence = this.readOffsets[this.currentPartitionIndex];
                long lostCount = this.resultSet.getNextSequenceToReadFrom() - (long)this.resultSet.readCount() - prevSequence;
                if (lostCount > 0L) {
                    this.getLogger().warning(lostCount + " events lost for partition " + partitionId + " due to journal overflow when reading from event journal. Increase journal size to avoid this error. nextSequenceToReadFrom=" + this.resultSet.getNextSequenceToReadFrom() + ", readCount=" + this.resultSet.readCount() + ", prevSeq=" + prevSequence);
                }
                this.readOffsets[this.currentPartitionIndex] = this.resultSet.getNextSequenceToReadFrom();
            }
            this.readFutures[this.currentPartitionIndex] = this.readFromJournal(partitionId, this.readOffsets[this.currentPartitionIndex]);
        }
        if (this.currentPartitionIndex == this.partitionIds.length) {
            this.currentPartitionIndex = -1;
            this.traverser = this.watermarkSourceUtil.handleNoEvent();
        }
    }

    private ReadResultSet<T> toResultSet(ICompletableFuture<ReadResultSet<T>> future) {
        try {
            return (ReadResultSet)future.get();
        }
        catch (ExecutionException e) {
            Throwable ex = ExceptionUtil.peel(e);
            if (ex instanceof HazelcastInstanceNotActiveException && !this.isRemoteReader) {
                return null;
            }
            throw ExceptionUtil.rethrow(ex);
        }
        catch (InterruptedException e) {
            throw ExceptionUtil.rethrow(e);
        }
    }

    private ICompletableFuture<ReadResultSet<T>> readFromJournal(int partition, long offset) {
        LoggingUtil.logFinest(this.getLogger(), "Reading from partition %d and offset %d", partition, offset);
        return this.eventJournalReader.readFromEventJournal(offset, 1, 128, partition, this.predicate, this.projection);
    }

    private static <E, T> Projection<E, T> toProjection(final Function<E, T> projectionFn) {
        return new Projection<E, T>(){

            public T transform(E input) {
                return projectionFn.apply(input);
            }
        };
    }

    public static <K, V, T> ProcessorMetaSupplier streamMapP(@Nonnull String mapName, @Nonnull DistributedPredicate<EventJournalMapEvent<K, V>> predicate, @Nonnull DistributedFunction<EventJournalMapEvent<K, V>, T> projection, @Nonnull JournalInitialPosition initialPos, WatermarkGenerationParams<? super T> wmGenParams) {
        return new ClusterMetaSupplier<EventJournalMapEvent<K, V>, T>(null, instance -> (EventJournalReader)instance.getMap(mapName), predicate, projection, initialPos, wmGenParams);
    }

    public static <K, V, T> ProcessorMetaSupplier streamRemoteMapP(@Nonnull String mapName, @Nonnull ClientConfig clientConfig, @Nonnull DistributedPredicate<EventJournalMapEvent<K, V>> predicate, @Nonnull DistributedFunction<EventJournalMapEvent<K, V>, T> projection, @Nonnull JournalInitialPosition initialPos, @Nonnull WatermarkGenerationParams<T> wmGenParams) {
        return new ClusterMetaSupplier<EventJournalMapEvent<K, V>, T>(clientConfig, instance -> (EventJournalReader)instance.getMap(mapName), predicate, projection, initialPos, wmGenParams);
    }

    public static <K, V, T> ProcessorMetaSupplier streamCacheP(@Nonnull String cacheName, @Nonnull DistributedPredicate<EventJournalCacheEvent<K, V>> predicate, @Nonnull DistributedFunction<EventJournalCacheEvent<K, V>, T> projection, @Nonnull JournalInitialPosition initialPos, @Nonnull WatermarkGenerationParams<T> wmGenParams) {
        return new ClusterMetaSupplier<EventJournalCacheEvent<K, V>, T>(null, inst -> (EventJournalReader)inst.getCacheManager().getCache(cacheName), predicate, projection, initialPos, wmGenParams);
    }

    public static <K, V, T> ProcessorMetaSupplier streamRemoteCacheP(@Nonnull String cacheName, @Nonnull ClientConfig clientConfig, @Nonnull DistributedPredicate<EventJournalCacheEvent<K, V>> predicate, @Nonnull DistributedFunction<EventJournalCacheEvent<K, V>, T> projection, @Nonnull JournalInitialPosition initialPos, @Nonnull WatermarkGenerationParams<T> wmGenParams) {
        return new ClusterMetaSupplier<EventJournalCacheEvent<K, V>, T>(clientConfig, inst -> (EventJournalReader)inst.getCacheManager().getCache(cacheName), predicate, projection, initialPos, wmGenParams);
    }

    private static class ClusterProcessorSupplier<E, T>
    implements ProcessorSupplier {
        static final long serialVersionUID = 1L;
        @Nonnull
        private final List<Integer> ownedPartitions;
        @Nullable
        private final SerializableClientConfig serializableClientConfig;
        @Nonnull
        private final DistributedFunction<HazelcastInstance, EventJournalReader<E>> eventJournalReaderSupplier;
        @Nonnull
        private final DistributedPredicate<? super E> predicate;
        @Nonnull
        private final DistributedFunction<? super E, ? extends T> projection;
        @Nonnull
        private final JournalInitialPosition initialPos;
        @Nonnull
        private final WatermarkGenerationParams<? super T> wmGenParams;
        private transient HazelcastInstance client;
        private transient EventJournalReader<E> eventJournalReader;

        ClusterProcessorSupplier(@Nonnull List<Integer> ownedPartitions, @Nullable SerializableClientConfig serializableClientConfig, @Nonnull DistributedFunction<HazelcastInstance, EventJournalReader<E>> eventJournalReaderSupplier, @Nonnull DistributedPredicate<? super E> predicate, @Nonnull DistributedFunction<? super E, ? extends T> projection, @Nonnull JournalInitialPosition initialPos, @Nonnull WatermarkGenerationParams<? super T> wmGenParams) {
            this.ownedPartitions = ownedPartitions;
            this.serializableClientConfig = serializableClientConfig;
            this.eventJournalReaderSupplier = eventJournalReaderSupplier;
            this.predicate = predicate;
            this.projection = projection;
            this.initialPos = initialPos;
            this.wmGenParams = wmGenParams;
        }

        @Override
        public void init(@Nonnull ProcessorSupplier.Context context) {
            HazelcastInstance instance = context.jetInstance().getHazelcastInstance();
            if (this.serializableClientConfig != null) {
                instance = this.client = HazelcastClient.newHazelcastClient((ClientConfig)this.serializableClientConfig.asClientConfig());
            }
            this.eventJournalReader = (EventJournalReader)this.eventJournalReaderSupplier.apply(instance);
        }

        @Override
        public void close(Throwable error) {
            if (this.client != null) {
                this.client.shutdown();
            }
        }

        @Nonnull
        public List<Processor> get(int count) {
            return com.hazelcast.jet.impl.util.Util.processorToPartitions(count, this.ownedPartitions).values().stream().map(this::processorForPartitions).collect(Collectors.toList());
        }

        private Processor processorForPartitions(List<Integer> partitions) {
            return partitions.isEmpty() ? (Processor)Processors.noopP().get() : new StreamEventJournalP<E, T>(this.eventJournalReader, partitions, this.predicate, this.projection, this.initialPos, this.client != null, this.wmGenParams);
        }
    }

    private static class ClusterMetaSupplier<E, T>
    implements ProcessorMetaSupplier {
        static final long serialVersionUID = 1L;
        private final SerializableClientConfig serializableConfig;
        private final DistributedFunction<HazelcastInstance, EventJournalReader<E>> eventJournalReaderSupplier;
        private final DistributedPredicate<E> predicate;
        private final DistributedFunction<E, T> projection;
        private final JournalInitialPosition initialPos;
        private final WatermarkGenerationParams<? super T> wmGenParams;
        private transient int remotePartitionCount;
        private transient Map<Address, List<Integer>> addrToPartitions;

        ClusterMetaSupplier(@Nullable ClientConfig clientConfig, @Nonnull DistributedFunction<HazelcastInstance, EventJournalReader<E>> eventJournalReaderSupplier, @Nonnull DistributedPredicate<E> predicate, @Nonnull DistributedFunction<E, T> projection, @Nonnull JournalInitialPosition initialPos, @Nonnull WatermarkGenerationParams<? super T> wmGenParams) {
            this.serializableConfig = clientConfig == null ? null : new SerializableClientConfig(clientConfig);
            this.eventJournalReaderSupplier = eventJournalReaderSupplier;
            this.predicate = predicate;
            this.projection = projection;
            this.initialPos = initialPos;
            this.wmGenParams = wmGenParams;
        }

        @Override
        public int preferredLocalParallelism() {
            return this.serializableConfig != null ? 1 : 2;
        }

        @Override
        public void init(@Nonnull ProcessorMetaSupplier.Context context) {
            if (this.serializableConfig != null) {
                this.initRemote();
            } else {
                this.initLocal(context.jetInstance().getHazelcastInstance().getPartitionService().getPartitions());
            }
        }

        private void initRemote() {
            HazelcastInstance client = HazelcastClient.newHazelcastClient((ClientConfig)this.serializableConfig.asClientConfig());
            try {
                HazelcastClientProxy clientProxy = (HazelcastClientProxy)client;
                this.remotePartitionCount = clientProxy.client.getClientPartitionService().getPartitionCount();
            }
            finally {
                client.shutdown();
            }
        }

        private void initLocal(Set<Partition> partitions) {
            this.addrToPartitions = partitions.stream().collect(Collectors.groupingBy(p -> p.getOwner().getAddress(), Collectors.mapping(Partition::getPartitionId, Collectors.toList())));
        }

        @Override
        @Nonnull
        public Function<Address, ProcessorSupplier> get(@Nonnull List<Address> addresses) {
            if (this.addrToPartitions == null) {
                this.addrToPartitions = IntStream.range(0, this.remotePartitionCount).boxed().collect(Collectors.groupingBy(partition -> (Address)addresses.get(partition % addresses.size())));
            }
            return address -> new ClusterProcessorSupplier<E, T>(this.addrToPartitions.get(address), this.serializableConfig, this.eventJournalReaderSupplier, this.predicate, this.projection, this.initialPos, this.wmGenParams);
        }
    }
}

