/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobgraph;

import java.io.IOException;
import java.io.Serializable;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.io.FinalizeOnMaster;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.common.io.InitializeOnMaster;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.runtime.executiongraph.SimpleInitializeOnMasterContext;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.InputOutputFormatContainer;
import org.apache.flink.runtime.jobgraph.InputOutputFormatVertex;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.testutils.junit.SharedObjectsExtension;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.InstantiationUtil;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class JobTaskVertexTest {
    @RegisterExtension
    final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();

    JobTaskVertexTest() {
    }

    @Test
    void testMultipleConsumersVertices() {
        JobVertex producer = new JobVertex("producer");
        JobVertex consumer1 = new JobVertex("consumer1");
        JobVertex consumer2 = new JobVertex("consumer2");
        IntermediateDataSetID dataSetId = new IntermediateDataSetID();
        consumer1.connectNewDataSetAsInput(producer, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING, dataSetId, false);
        consumer2.connectNewDataSetAsInput(producer, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING, dataSetId, false);
        JobVertex consumer3 = new JobVertex("consumer3");
        consumer3.connectNewDataSetAsInput(producer, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        Assertions.assertThat((List)producer.getProducedDataSets()).hasSize(2);
        IntermediateDataSet dataSet = (IntermediateDataSet)producer.getProducedDataSets().get(0);
        Assertions.assertThat((Comparable)dataSet.getId()).isEqualTo((Object)dataSetId);
        List consumers1 = dataSet.getConsumers();
        Assertions.assertThat((List)consumers1).hasSize(2);
        Assertions.assertThat((Comparable)((JobEdge)consumers1.get(0)).getTarget().getID()).isEqualTo((Object)consumer1.getID());
        Assertions.assertThat((Comparable)((JobEdge)consumers1.get(1)).getTarget().getID()).isEqualTo((Object)consumer2.getID());
        List consumers2 = ((IntermediateDataSet)producer.getProducedDataSets().get(1)).getConsumers();
        Assertions.assertThat((List)consumers2).hasSize(1);
        Assertions.assertThat((Comparable)((JobEdge)consumers2.get(0)).getTarget().getID()).isEqualTo((Object)consumer3.getID());
    }

    @Test
    void testConnectDirectly() {
        JobVertex source = new JobVertex("source");
        JobVertex target = new JobVertex("target");
        target.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        Assertions.assertThat((boolean)source.isInputVertex()).isTrue();
        Assertions.assertThat((boolean)source.isOutputVertex()).isFalse();
        Assertions.assertThat((boolean)target.isInputVertex()).isFalse();
        Assertions.assertThat((boolean)target.isOutputVertex()).isTrue();
        Assertions.assertThat((int)source.getNumberOfProducedIntermediateDataSets()).isEqualTo(1);
        Assertions.assertThat((int)target.getNumberOfInputs()).isEqualTo(1);
        Assertions.assertThat(source.getProducedDataSets().get(0)).isEqualTo((Object)((JobEdge)target.getInputs().get(0)).getSource());
        Assertions.assertThat((Object)((JobEdge)((IntermediateDataSet)source.getProducedDataSets().get(0)).getConsumers().get(0)).getTarget()).isEqualTo((Object)target);
    }

    @Test
    void testOutputFormat() throws Exception {
        InputOutputFormatVertex vertex = new InputOutputFormatVertex("Name");
        OperatorID operatorID = new OperatorID();
        Configuration parameters = new Configuration();
        parameters.setString("test_key", "test_value");
        new InputOutputFormatContainer(Thread.currentThread().getContextClassLoader()).addOutputFormat(operatorID, (OutputFormat)new TestingOutputFormat(parameters)).addParameters(operatorID, parameters).write(new TaskConfig(vertex.getConfiguration()));
        TestClassLoader cl = new TestClassLoader();
        Assertions.assertThatThrownBy(() -> vertex.initializeOnMaster((JobVertex.InitializeOnMasterContext)new SimpleInitializeOnMasterContext(cl, vertex.getParallelism()))).isInstanceOf(TestException.class);
        InputOutputFormatVertex copy = (InputOutputFormatVertex)InstantiationUtil.clone((Serializable)vertex);
        ClassLoader ctxCl = Thread.currentThread().getContextClassLoader();
        Assertions.assertThatThrownBy(() -> copy.initializeOnMaster((JobVertex.InitializeOnMasterContext)new SimpleInitializeOnMasterContext(cl, copy.getParallelism()))).isInstanceOf(TestException.class);
        ((ObjectAssert)Assertions.assertThat((Object)Thread.currentThread().getContextClassLoader()).as("Previous classloader was not restored.", new Object[0])).isEqualTo((Object)ctxCl);
        Assertions.assertThatThrownBy(() -> copy.finalizeOnMaster((JobVertex.InitializeOnMasterContext)new SimpleInitializeOnMasterContext(cl, copy.getParallelism()))).isInstanceOf(TestException.class);
        ((ObjectAssert)Assertions.assertThat((Object)Thread.currentThread().getContextClassLoader()).as("Previous classloader was not restored.", new Object[0])).isEqualTo((Object)ctxCl);
    }

    @Test
    void testInputFormat() throws Exception {
        InputOutputFormatVertex vertex = new InputOutputFormatVertex("Name");
        OperatorID operatorID = new OperatorID();
        Configuration parameters = new Configuration();
        parameters.setString("test_key", "test_value");
        new InputOutputFormatContainer(Thread.currentThread().getContextClassLoader()).addInputFormat(operatorID, (InputFormat)new TestInputFormat(parameters)).addParameters(operatorID, "test_key", "test_value").write(new TaskConfig(vertex.getConfiguration()));
        TestClassLoader cl = new TestClassLoader();
        vertex.initializeOnMaster((JobVertex.InitializeOnMasterContext)new SimpleInitializeOnMasterContext((ClassLoader)cl, vertex.getParallelism()));
        Object[] splits = vertex.getInputSplitSource().createInputSplits(77);
        Assertions.assertThat((Object[])splits).isNotNull();
        Assertions.assertThat((Object[])splits).hasSize(1);
        Assertions.assertThat(splits[0].getClass()).isEqualTo(TestSplit.class);
    }

    @Test
    void testOutputFormatUsesCorrectParallelism() throws Exception {
        InputOutputFormatVertex vertex = new InputOutputFormatVertex("Name");
        int initialParallelism = 1;
        vertex.setParallelism(initialParallelism);
        OperatorID operatorID = new OperatorID();
        SharedReference globalParallelism = this.sharedObjects.add((Object)new AtomicInteger());
        new InputOutputFormatContainer(Thread.currentThread().getContextClassLoader()).addOutputFormat(operatorID, (OutputFormat)new TestInitializeOutputFormat(globalParallelism)).write(new TaskConfig(vertex.getConfiguration()));
        int executionParallelism = initialParallelism + 3;
        try (TestClassLoader cl = new TestClassLoader();){
            vertex.initializeOnMaster((JobVertex.InitializeOnMasterContext)new SimpleInitializeOnMasterContext((ClassLoader)cl, executionParallelism));
            Assertions.assertThat((int)((AtomicInteger)globalParallelism.get()).get()).isEqualTo(executionParallelism);
        }
    }

    private static class TestClassLoader
    extends URLClassLoader {
        public TestClassLoader() {
            super(new URL[0], Thread.currentThread().getContextClassLoader());
        }
    }

    private static final class TestingOutputFormat
    extends DiscardingOutputFormat<Object>
    implements InitializeOnMaster,
    FinalizeOnMaster {
        private boolean isConfigured = false;
        private final Configuration expectedParameters;

        public TestingOutputFormat(Configuration expectedParameters) {
            this.expectedParameters = expectedParameters;
        }

        public void initializeGlobal(int parallelism) throws IOException {
            if (!this.isConfigured) {
                throw new IllegalStateException("OutputFormat was not configured before initializeGlobal was called.");
            }
            if (!(Thread.currentThread().getContextClassLoader() instanceof TestClassLoader)) {
                throw new IllegalStateException("Context ClassLoader was not correctly switched.");
            }
            throw new TestException();
        }

        public void finalizeGlobal(int parallelism) throws IOException {
            if (!this.isConfigured) {
                throw new IllegalStateException("OutputFormat was not configured before finalizeGlobal was called.");
            }
            if (!(Thread.currentThread().getContextClassLoader() instanceof TestClassLoader)) {
                throw new IllegalStateException("Context ClassLoader was not correctly switched.");
            }
            throw new TestException();
        }

        public void configure(Configuration parameters) {
            if (this.isConfigured) {
                throw new IllegalStateException("OutputFormat is already configured.");
            }
            if (!(Thread.currentThread().getContextClassLoader() instanceof TestClassLoader)) {
                throw new IllegalStateException("Context ClassLoader was not correctly switched.");
            }
            for (String key : this.expectedParameters.keySet()) {
                Assertions.assertThat((String)parameters.getString(key, null)).isEqualTo(this.expectedParameters.getString(key, null));
            }
            this.isConfigured = true;
        }
    }

    private static final class TestInputFormat
    extends GenericInputFormat<Object> {
        private boolean isConfigured = false;
        private final Configuration expectedParameters;

        public TestInputFormat(Configuration expectedParameters) {
            this.expectedParameters = expectedParameters;
        }

        public boolean reachedEnd() {
            return false;
        }

        public Object nextRecord(Object reuse) {
            return null;
        }

        public GenericInputSplit[] createInputSplits(int numSplits) {
            if (!this.isConfigured) {
                throw new IllegalStateException("InputFormat was not configured before createInputSplits was called.");
            }
            return new GenericInputSplit[]{new TestSplit(0, 1)};
        }

        public void configure(Configuration parameters) {
            if (this.isConfigured) {
                throw new IllegalStateException("InputFormat is already configured.");
            }
            if (!(Thread.currentThread().getContextClassLoader() instanceof TestClassLoader)) {
                throw new IllegalStateException("Context ClassLoader was not correctly switched.");
            }
            for (String key : this.expectedParameters.keySet()) {
                Assertions.assertThat((String)parameters.getString(key, null)).isEqualTo(this.expectedParameters.getString(key, null));
            }
            this.isConfigured = true;
        }
    }

    private static final class TestSplit
    extends GenericInputSplit {
        public TestSplit(int partitionNumber, int totalNumberOfPartitions) {
            super(partitionNumber, totalNumberOfPartitions);
        }
    }

    private static final class TestException
    extends IOException {
        private TestException() {
        }
    }

    private static final class TestInitializeOutputFormat
    implements OutputFormat<Object>,
    InitializeOnMaster {
        private final SharedReference<AtomicInteger> globalParallelism;

        private TestInitializeOutputFormat(SharedReference<AtomicInteger> globalParallelism) {
            this.globalParallelism = globalParallelism;
        }

        public void configure(Configuration parameters) {
        }

        public void open(int taskNumber, int numTasks) throws IOException {
        }

        public void writeRecord(Object record) throws IOException {
        }

        public void close() throws IOException {
        }

        public void initializeGlobal(int parallelism) throws IOException {
            ((AtomicInteger)this.globalParallelism.get()).set(parallelism);
        }
    }
}

