/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.coordination;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.SubtaskAccess;
import org.apache.flink.runtime.operators.coordination.SubtaskGatewayImpl;
import org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.jetbrains.annotations.NotNull;

public class EventReceivingTasks
implements SubtaskAccess.SubtaskAccessFactory {
    final ArrayList<EventWithSubtask> events = new ArrayList();
    private final CompletableFuture<Acknowledge> eventSendingResult;
    private final boolean createdTasksAreRunning;

    public static EventReceivingTasks createForNotYetRunningTasks() {
        return new EventReceivingTasks(false, CompletableFuture.completedFuture(Acknowledge.get()));
    }

    public static EventReceivingTasks createForRunningTasks() {
        return new EventReceivingTasks(true, CompletableFuture.completedFuture(Acknowledge.get()));
    }

    public static EventReceivingTasks createForRunningTasksFailingRpcs(Throwable rpcException) {
        return new EventReceivingTasks(true, FutureUtils.completedExceptionally((Throwable)rpcException));
    }

    public static EventReceivingTasks createForRunningTasksWithRpcResult(CompletableFuture<Acknowledge> result) {
        return new EventReceivingTasks(true, result);
    }

    private EventReceivingTasks(boolean createdTasksAreRunning, CompletableFuture<Acknowledge> eventSendingResult) {
        this.createdTasksAreRunning = createdTasksAreRunning;
        this.eventSendingResult = eventSendingResult;
    }

    public int getNumberOfSentEvents() {
        return this.events.size();
    }

    public List<EventWithSubtask> getAllSentEvents() {
        return this.events;
    }

    public List<OperatorEvent> getSentEventsForSubtask(int subtaskIndex) {
        return new ArrayList<EventWithSubtask>(this.events).stream().filter(evt -> evt.subtask == subtaskIndex).map(evt -> evt.event).collect(Collectors.toList());
    }

    public Collection<SubtaskAccess> getAccessesForSubtask(int subtaskIndex) {
        return Collections.singleton(this.getAccessForAttempt(subtaskIndex, 0));
    }

    public SubtaskAccess getAccessForAttempt(int subtaskIndex, int attemptNumber) {
        return new TestSubtaskAccess(subtaskIndex, attemptNumber, this.createdTasksAreRunning);
    }

    public OperatorCoordinator.SubtaskGateway createGatewayForSubtask(int subtaskIndex, int attemptNumber) {
        SubtaskAccess sta = this.getAccessForAttempt(subtaskIndex, attemptNumber);
        return new SubtaskGatewayImpl(sta, (ComponentMainThreadExecutor)new NoMainThreadCheckComponentMainThreadExecutor(), new IncompleteFuturesTracker());
    }

    Callable<CompletableFuture<Acknowledge>> createSendAction(OperatorEvent event, int subtask) {
        return () -> {
            this.events.add(new EventWithSubtask(event, subtask));
            return this.eventSendingResult;
        };
    }

    private static class NoMainThreadCheckComponentMainThreadExecutor
    implements ComponentMainThreadExecutor {
        private final ScheduledExecutor scheduledExecutor = new ScheduledExecutorServiceAdapter((ScheduledExecutorService)new DirectScheduledExecutorService());

        private NoMainThreadCheckComponentMainThreadExecutor() {
        }

        public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
            return this.scheduledExecutor.schedule(command, delay, unit);
        }

        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
            return this.scheduledExecutor.schedule(callable, delay, unit);
        }

        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
            return this.scheduledExecutor.scheduleAtFixedRate(command, initialDelay, period, unit);
        }

        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
            return this.scheduledExecutor.scheduleAtFixedRate(command, initialDelay, delay, unit);
        }

        public void assertRunningInMainThread() {
        }

        public void execute(@NotNull Runnable command) {
            this.scheduledExecutor.execute(command);
        }
    }

    final class TestSubtaskAccess
    implements SubtaskAccess {
        private final ExecutionAttemptID executionAttemptId;
        private final CompletableFuture<?> running;
        private final int subtaskIndex;
        private final List<Throwable> taskFailoverReasons = new ArrayList<Throwable>();

        private TestSubtaskAccess(int subtaskIndex, int attemptNumber, boolean isRunning) {
            this.subtaskIndex = subtaskIndex;
            this.executionAttemptId = ExecutionGraphTestUtils.createExecutionAttemptId(new JobVertexID(), subtaskIndex, attemptNumber);
            this.running = new CompletableFuture();
            if (isRunning) {
                this.switchToRunning();
            }
        }

        public Callable<CompletableFuture<Acknowledge>> createEventSendAction(SerializedValue<OperatorEvent> event) {
            OperatorEvent deserializedEvent;
            try {
                deserializedEvent = (OperatorEvent)event.deserializeValue(this.getClass().getClassLoader());
            }
            catch (IOException | ClassNotFoundException e) {
                throw new AssertionError((Object)e);
            }
            return EventReceivingTasks.this.createSendAction(deserializedEvent, this.subtaskIndex);
        }

        public int getSubtaskIndex() {
            return this.subtaskIndex;
        }

        public ExecutionAttemptID currentAttempt() {
            return this.executionAttemptId;
        }

        public String subtaskName() {
            return "test_task-" + this.subtaskIndex + " #: " + this.executionAttemptId;
        }

        public CompletableFuture<?> hasSwitchedToRunning() {
            return this.running;
        }

        public boolean isStillRunning() {
            return true;
        }

        void switchToRunning() {
            this.running.complete(null);
        }

        public void triggerTaskFailover(Throwable cause) {
            this.taskFailoverReasons.add(cause);
        }

        public List<Throwable> getTaskFailoverReasons() {
            return this.taskFailoverReasons;
        }
    }

    static final class EventWithSubtask {
        public final OperatorEvent event;
        public final int subtask;

        public EventWithSubtask(OperatorEvent event, int subtask) {
            this.event = event;
            this.subtask = subtask;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            EventWithSubtask that = (EventWithSubtask)o;
            return this.subtask == that.subtask && this.event.equals(that.event);
        }

        public int hashCode() {
            return Objects.hash(this.event, this.subtask);
        }

        public String toString() {
            return this.event + " => subtask " + this.subtask;
        }
    }
}

