/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class CheckpointRestoreWithUidHashITCase {
    @ClassRule
    public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();
    private SharedReference<CountDownLatch> startWaitingForCheckpointLatch;
    private SharedReference<List<Integer>> result;

    @Before
    public void setup() {
        this.startWaitingForCheckpointLatch = this.sharedObjects.add((Object)new CountDownLatch(1));
        this.result = this.sharedObjects.add(new ArrayList());
    }

    @Test
    public void testRestoreFromSavepointBySetUidHash() throws Exception {
        int maxNumber = 100;
        try (MiniCluster miniCluster = new MiniCluster(this.createMiniClusterConfig());){
            miniCluster.start();
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            JobGraph firstJob = this.createJobGraph(env, StatefulSourceBehavior.HOLD_AFTER_CHECKPOINT_ON_FIRST_RUN, 100, "test-uid", null, null);
            JobID jobId = ((JobSubmissionResult)miniCluster.submitJob(firstJob).get()).getJobID();
            CommonTestUtils.waitForAllTaskRunning((MiniCluster)miniCluster, (JobID)jobId, (boolean)false);
            ((CountDownLatch)this.startWaitingForCheckpointLatch.get()).await();
            String savepointPath = (String)miniCluster.triggerSavepoint(jobId, TMP_FOLDER.newFolder().getAbsolutePath(), true).get();
            List operatorIds = ((JobVertex)firstJob.getVerticesSortedTopologicallyFromSources().get(0)).getOperatorIDs();
            OperatorIDPair sourceOperatorIds = (OperatorIDPair)operatorIds.get(operatorIds.size() - 1);
            JobGraph secondJob = this.createJobGraph(env, StatefulSourceBehavior.PROCESS_ONLY, 100, null, sourceOperatorIds.getGeneratedOperatorID().toHexString(), savepointPath);
            miniCluster.executeJobBlocking(secondJob);
        }
        MatcherAssert.assertThat((Object)this.result.get(), (Matcher)Matchers.contains((Object[])IntStream.range(0, 100).boxed().toArray()));
    }

    @Test
    public void testRestoreCheckpointAfterFailoverWithUidHashSet() throws Exception {
        int maxNumber = 100;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)2, (long)500L));
        env.enableCheckpointing(500L, CheckpointingMode.EXACTLY_ONCE);
        JobGraph jobGraph = this.createJobGraph(env, StatefulSourceBehavior.FAIL_AFTER_CHECKPOINT_ON_FIRST_RUN, 100, null, new OperatorID().toHexString(), null);
        try (MiniCluster miniCluster = new MiniCluster(this.createMiniClusterConfig());){
            miniCluster.start();
            miniCluster.executeJobBlocking(jobGraph);
        }
        MatcherAssert.assertThat((Object)this.result.get(), (Matcher)Matchers.contains((Object[])IntStream.range(0, 100).boxed().toArray()));
    }

    private MiniClusterConfiguration createMiniClusterConfig() {
        Configuration config = new Configuration();
        config.setString(RestOptions.BIND_PORT, "18081-19000");
        return new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(1).setConfiguration(config).build();
    }

    private JobGraph createJobGraph(StreamExecutionEnvironment env, StatefulSourceBehavior behavior, int maxNumber, @Nullable String uid, @Nullable String uidHash, @Nullable String savepointPath) {
        DataStreamSource source = env.addSource((SourceFunction)new StatefulSource(behavior, maxNumber, this.startWaitingForCheckpointLatch)).setParallelism(1);
        if (uid != null) {
            source = source.uid(uid);
        }
        if (uidHash != null) {
            source = source.setUidHash(uidHash);
        }
        source.addSink((SinkFunction)new CollectSink(this.result)).setParallelism(1);
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        if (savepointPath != null) {
            jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)savepointPath, (boolean)false));
        }
        return jobGraph;
    }

    private static class CollectSink
    implements SinkFunction<Integer> {
        private final SharedReference<List<Integer>> result;

        public CollectSink(SharedReference<List<Integer>> result) {
            this.result = result;
        }

        public void invoke(Integer value, SinkFunction.Context context) throws Exception {
            ((List)this.result.get()).add(value);
        }
    }

    private static class StatefulSource
    extends RichSourceFunction<Integer>
    implements CheckpointedFunction,
    CheckpointListener {
        private final StatefulSourceBehavior behavior;
        private final int maxNumber;
        private final SharedReference<CountDownLatch> startWaitingForCheckpointLatch;
        private ListState<Integer> nextNumberState;
        private int nextNumber;
        private volatile boolean isCanceled;
        private volatile boolean isWaiting;
        private volatile long firstCheckpointIdAfterWaiting;
        private volatile boolean checkpointCompletedAfterWaiting;

        public StatefulSource(StatefulSourceBehavior behavior, int maxNumber, SharedReference<CountDownLatch> startWaitingForCheckpointLatch) {
            this.behavior = behavior;
            this.maxNumber = maxNumber;
            this.startWaitingForCheckpointLatch = startWaitingForCheckpointLatch;
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            this.nextNumberState = context.getOperatorStateStore().getListState(new ListStateDescriptor("next", Integer.class));
            if (((Iterable)this.nextNumberState.get()).iterator().hasNext()) {
                this.nextNumber = (Integer)((Iterable)this.nextNumberState.get()).iterator().next();
            }
        }

        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            this.emitRecordsTill(this.maxNumber / 3, ctx);
            if (this.behavior.waitForCheckpointOnFirstRun && this.getRuntimeContext().getAttemptNumber() == 0) {
                this.isWaiting = true;
                ((CountDownLatch)this.startWaitingForCheckpointLatch.get()).countDown();
                while (!this.checkpointCompletedAfterWaiting) {
                    Thread.sleep(200L);
                }
                if (this.behavior == StatefulSourceBehavior.FAIL_AFTER_CHECKPOINT_ON_FIRST_RUN) {
                    throw new RuntimeException("Artificial Exception");
                }
                if (this.behavior == StatefulSourceBehavior.HOLD_AFTER_CHECKPOINT_ON_FIRST_RUN) {
                    while (!this.isCanceled) {
                        Thread.sleep(200L);
                    }
                }
            } else {
                this.emitRecordsTill(this.maxNumber, ctx);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void emitRecordsTill(int endExclusive, SourceFunction.SourceContext<Integer> ctx) {
            while (!this.isCanceled && this.nextNumber < endExclusive) {
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ctx.collect((Object)this.nextNumber);
                    ++this.nextNumber;
                }
            }
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            this.nextNumberState.update(Collections.singletonList(this.nextNumber));
            if (this.isWaiting && this.firstCheckpointIdAfterWaiting <= 0L) {
                this.firstCheckpointIdAfterWaiting = context.getCheckpointId();
            }
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            if (this.firstCheckpointIdAfterWaiting > 0L && checkpointId >= this.firstCheckpointIdAfterWaiting) {
                this.checkpointCompletedAfterWaiting = true;
            }
        }

        public void cancel() {
            this.isCanceled = true;
        }
    }

    private static enum StatefulSourceBehavior {
        PROCESS_ONLY(false),
        HOLD_AFTER_CHECKPOINT_ON_FIRST_RUN(true),
        FAIL_AFTER_CHECKPOINT_ON_FIRST_RUN(true);

        boolean waitForCheckpointOnFirstRun;

        private StatefulSourceBehavior(boolean waitForCheckpointOnFirstRun) {
            this.waitForCheckpointOnFirstRun = waitForCheckpointOnFirstRun;
        }
    }
}

