/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.InternalCheckpointListener;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.util.Preconditions;

@Internal
public class StreamOperatorWrapper<OUT, OP extends StreamOperator<OUT>> {
    private final OP wrapped;
    private final Optional<ProcessingTimeService> processingTimeService;
    private final MailboxExecutor mailboxExecutor;
    private final boolean isHead;
    private StreamOperatorWrapper<?, ?> previous;
    private StreamOperatorWrapper<?, ?> next;
    private boolean closed;

    StreamOperatorWrapper(OP wrapped, Optional<ProcessingTimeService> processingTimeService, MailboxExecutor mailboxExecutor, boolean isHead) {
        this.wrapped = (StreamOperator)Preconditions.checkNotNull(wrapped);
        this.processingTimeService = Preconditions.checkNotNull(processingTimeService);
        this.mailboxExecutor = Preconditions.checkNotNull(mailboxExecutor);
        this.isHead = isHead;
    }

    public boolean isClosed() {
        return this.closed;
    }

    public void endOperatorInput(int inputId) throws Exception {
        if (this.wrapped instanceof BoundedOneInput) {
            ((BoundedOneInput)this.wrapped).endInput();
        } else if (this.wrapped instanceof BoundedMultiInput) {
            ((BoundedMultiInput)this.wrapped).endInput(inputId);
        }
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        if (!this.closed) {
            this.wrapped.notifyCheckpointComplete(checkpointId);
        }
    }

    public void notifyCheckpointSubsumed(long checkpointId) throws Exception {
        if (!this.closed) {
            KeyedStateBackend keyedStateBackend = null;
            if (this.wrapped instanceof AbstractStreamOperator) {
                keyedStateBackend = ((AbstractStreamOperator)this.wrapped).getKeyedStateBackend();
            } else if (this.wrapped instanceof AbstractStreamOperatorV2) {
                keyedStateBackend = ((AbstractStreamOperatorV2)this.wrapped).getKeyedStateBackend();
            }
            if (keyedStateBackend instanceof InternalCheckpointListener) {
                ((InternalCheckpointListener)((Object)keyedStateBackend)).notifyCheckpointSubsumed(checkpointId);
            }
        }
    }

    public OP getStreamOperator() {
        return this.wrapped;
    }

    void setPrevious(StreamOperatorWrapper previous) {
        this.previous = previous;
    }

    void setNext(StreamOperatorWrapper next) {
        this.next = next;
    }

    public void finish(StreamTaskActionExecutor actionExecutor, StopMode stopMode) throws Exception {
        if (!this.isHead && stopMode == StopMode.DRAIN) {
            actionExecutor.runThrowing(() -> this.endOperatorInput(1));
        }
        this.quiesceTimeServiceAndFinishOperator(actionExecutor, stopMode);
        if (this.next != null) {
            this.next.finish(actionExecutor, stopMode);
        }
    }

    public void close() throws Exception {
        this.closed = true;
        this.wrapped.close();
    }

    private void quiesceTimeServiceAndFinishOperator(StreamTaskActionExecutor actionExecutor, StopMode stopMode) throws InterruptedException, ExecutionException {
        CompletionStage finishedFuture = ((CompletableFuture)this.quiesceProcessingTimeService().thenCompose(unused -> this.deferFinishOperatorToMailbox(actionExecutor, stopMode))).thenCompose(unused -> this.sendFinishedMail());
        while (!((CompletableFuture)finishedFuture).isDone()) {
            while (this.mailboxExecutor.tryYield()) {
            }
            try {
                ((CompletableFuture)finishedFuture).get(1L, TimeUnit.MILLISECONDS);
            }
            catch (TimeoutException timeoutException) {}
        }
        ((CompletableFuture)finishedFuture).get();
    }

    private CompletableFuture<Void> deferFinishOperatorToMailbox(StreamTaskActionExecutor actionExecutor, StopMode stopMode) {
        if (stopMode == StopMode.NO_DRAIN) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> finishOperatorFuture = new CompletableFuture<Void>();
        this.mailboxExecutor.execute(() -> {
            try {
                this.finishOperator(actionExecutor);
                finishOperatorFuture.complete(null);
            }
            catch (Throwable t) {
                finishOperatorFuture.completeExceptionally(t);
            }
        }, "StreamOperatorWrapper#finishOperator for " + this.wrapped);
        return finishOperatorFuture;
    }

    private CompletableFuture<Void> quiesceProcessingTimeService() {
        return this.processingTimeService.map(ProcessingTimeService::quiesce).orElse(CompletableFuture.completedFuture(null));
    }

    private CompletableFuture<Void> sendFinishedMail() {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.mailboxExecutor.execute(() -> future.complete(null), "StreamOperatorWrapper#sendFinishedMail for " + this.wrapped);
        return future;
    }

    private void finishOperator(StreamTaskActionExecutor actionExecutor) throws Exception {
        actionExecutor.runThrowing(() -> this.wrapped.finish());
    }

    static class ReadIterator
    implements Iterator<StreamOperatorWrapper<?, ?>>,
    Iterable<StreamOperatorWrapper<?, ?>> {
        private final boolean reverse;
        private StreamOperatorWrapper<?, ?> current;

        ReadIterator(StreamOperatorWrapper<?, ?> first, boolean reverse) {
            this.current = first;
            this.reverse = reverse;
        }

        @Override
        public boolean hasNext() {
            return this.current != null;
        }

        @Override
        public StreamOperatorWrapper<?, ?> next() {
            if (this.hasNext()) {
                StreamOperatorWrapper<?, ?> next = this.current;
                this.current = this.reverse ? this.current.previous : this.current.next;
                return next;
            }
            throw new NoSuchElementException();
        }

        @Override
        @Nonnull
        public Iterator<StreamOperatorWrapper<?, ?>> iterator() {
            return this;
        }
    }
}

