/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;

@Internal
public class StreamOperatorWrapper<OUT, OP extends StreamOperator<OUT>> {
    private final OP wrapped;
    private final Optional<ProcessingTimeService> processingTimeService;
    private final MailboxExecutor mailboxExecutor;
    private final boolean isHead;
    private StreamOperatorWrapper<?, ?> previous;
    private StreamOperatorWrapper<?, ?> next;
    private boolean closed;

    StreamOperatorWrapper(OP wrapped, Optional<ProcessingTimeService> processingTimeService, MailboxExecutor mailboxExecutor, boolean isHead) {
        this.wrapped = (StreamOperator)Preconditions.checkNotNull(wrapped);
        this.processingTimeService = (Optional)Preconditions.checkNotNull(processingTimeService);
        this.mailboxExecutor = (MailboxExecutor)Preconditions.checkNotNull((Object)mailboxExecutor);
        this.isHead = isHead;
    }

    public boolean isClosed() {
        return this.closed;
    }

    public void endOperatorInput(int inputId) throws Exception {
        if (this.wrapped instanceof BoundedOneInput) {
            ((BoundedOneInput)this.wrapped).endInput();
        } else if (this.wrapped instanceof BoundedMultiInput) {
            ((BoundedMultiInput)this.wrapped).endInput(inputId);
        }
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        if (!this.closed) {
            this.wrapped.notifyCheckpointComplete(checkpointId);
        }
    }

    public OP getStreamOperator() {
        return this.wrapped;
    }

    void setPrevious(StreamOperatorWrapper previous) {
        this.previous = previous;
    }

    void setNext(StreamOperatorWrapper next) {
        this.next = next;
    }

    public void close(StreamTaskActionExecutor actionExecutor, boolean isStoppingBySyncSavepoint) throws Exception {
        if (!this.isHead && !isStoppingBySyncSavepoint) {
            actionExecutor.runThrowing(() -> this.endOperatorInput(1));
        }
        this.quiesceTimeServiceAndCloseOperator(actionExecutor);
        if (this.next != null) {
            this.next.close(actionExecutor, isStoppingBySyncSavepoint);
        }
    }

    private void quiesceTimeServiceAndCloseOperator(StreamTaskActionExecutor actionExecutor) throws InterruptedException, ExecutionException {
        CompletionStage closedFuture = ((CompletableFuture)this.quiesceProcessingTimeService().thenCompose(unused -> this.deferCloseOperatorToMailbox(actionExecutor))).thenCompose(unused -> this.sendClosedMail());
        while (!((CompletableFuture)closedFuture).isDone()) {
            while (this.mailboxExecutor.tryYield()) {
            }
            try {
                ((CompletableFuture)closedFuture).get(1L, TimeUnit.MILLISECONDS);
            }
            catch (TimeoutException timeoutException) {}
        }
        ((CompletableFuture)closedFuture).get();
    }

    private CompletableFuture<Void> deferCloseOperatorToMailbox(StreamTaskActionExecutor actionExecutor) {
        CompletableFuture<Void> closeOperatorFuture = new CompletableFuture<Void>();
        this.mailboxExecutor.execute((ThrowingRunnable<? extends Exception>)((ThrowingRunnable)() -> {
            try {
                this.closeOperator(actionExecutor);
                closeOperatorFuture.complete(null);
            }
            catch (Throwable t) {
                closeOperatorFuture.completeExceptionally(t);
            }
        }), "StreamOperatorWrapper#closeOperator for " + this.wrapped);
        return closeOperatorFuture;
    }

    private CompletableFuture<Void> quiesceProcessingTimeService() {
        return this.processingTimeService.map(ProcessingTimeService::quiesce).orElse(CompletableFuture.completedFuture(null));
    }

    private CompletableFuture<Void> sendClosedMail() {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.mailboxExecutor.execute((ThrowingRunnable<? extends Exception>)((ThrowingRunnable)() -> future.complete(null)), "StreamOperatorWrapper#sendClosedMail for " + this.wrapped);
        return future;
    }

    private void closeOperator(StreamTaskActionExecutor actionExecutor) throws Exception {
        actionExecutor.runThrowing(() -> {
            this.closed = true;
            this.wrapped.close();
        });
    }

    static class ReadIterator
    implements Iterator<StreamOperatorWrapper<?, ?>>,
    Iterable<StreamOperatorWrapper<?, ?>> {
        private final boolean reverse;
        private StreamOperatorWrapper<?, ?> current;

        ReadIterator(StreamOperatorWrapper<?, ?> first, boolean reverse) {
            this.current = first;
            this.reverse = reverse;
        }

        @Override
        public boolean hasNext() {
            return this.current != null;
        }

        @Override
        public StreamOperatorWrapper<?, ?> next() {
            if (this.hasNext()) {
                StreamOperatorWrapper<?, ?> next = this.current;
                this.current = this.reverse ? ((StreamOperatorWrapper)this.current).previous : ((StreamOperatorWrapper)this.current).next;
                return next;
            }
            throw new NoSuchElementException();
        }

        @Override
        @Nonnull
        public Iterator<StreamOperatorWrapper<?, ?>> iterator() {
            return this;
        }
    }
}

