/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.util.Collection;
import java.util.HashMap;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.SnapshotExecutionType;
import org.apache.flink.runtime.state.SnapshotResources;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.SnapshotStrategyRunner;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

public class CheckpointFailureManagerITCase
extends TestLogger {
    @ClassRule
    public static MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().build());

    @Test(timeout=20000L)
    public void testAsyncCheckpointFailureTriggerJobFailed() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(500L);
        env.setRestartStrategy(RestartStrategies.noRestart());
        env.setStateBackend((StateBackend)new AsyncFailureStateBackend());
        env.addSource((SourceFunction)new StringGeneratingSourceFunction()).addSink((SinkFunction)new DiscardingSink());
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph());
        try {
            TestUtils.submitJobAndWaitForResult(cluster.getClusterClient(), jobGraph, ((Object)((Object)this)).getClass().getClassLoader());
        }
        catch (JobExecutionException jobException) {
            Optional throwable = ExceptionUtils.findThrowable((Throwable)jobException, FlinkRuntimeException.class);
            Assert.assertTrue((boolean)throwable.isPresent());
            Assert.assertEquals((Object)"Exceeded checkpoint tolerable failure threshold.", (Object)((FlinkRuntimeException)throwable.get()).getMessage());
        }
        Assert.assertEquals((long)1L, (long)StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
    }

    private static class AsyncFailureStateBackend
    extends MemoryStateBackend {
        private static final long serialVersionUID = 1L;
        private static final SnapshotStrategy<OperatorStateHandle, SnapshotResources> ASYNC_DECLINING_SNAPSHOT_STRATEGY = new SnapshotStrategy<OperatorStateHandle, SnapshotResources>(){

            public SnapshotResources syncPrepareResources(long checkpointId) throws Exception {
                return null;
            }

            public SnapshotStrategy.SnapshotResultSupplier<OperatorStateHandle> asyncSnapshot(SnapshotResources syncPartResource, long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) {
                return closeableRegistry -> {
                    throw new Exception("Expected async snapshot exception.");
                };
            }
        };

        private AsyncFailureStateBackend() {
        }

        public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier, @Nonnull Collection<OperatorStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) {
            return new DefaultOperatorStateBackendBuilder(env.getUserCodeClassLoader().asClassLoader(), env.getExecutionConfig(), true, stateHandles, cancelStreamRegistry){

                public DefaultOperatorStateBackend build() {
                    CloseableRegistry closeableRegistry = new CloseableRegistry();
                    return new DefaultOperatorStateBackend(this.executionConfig, closeableRegistry, new HashMap(), new HashMap(), new HashMap(), new HashMap(), new SnapshotStrategyRunner("Async Failure State Backend", ASYNC_DECLINING_SNAPSHOT_STRATEGY, closeableRegistry, SnapshotExecutionType.ASYNCHRONOUS));
                }
            }.build();
        }

        public AsyncFailureStateBackend configure(ReadableConfig config, ClassLoader classLoader) {
            return this;
        }
    }

    private static class StringGeneratingSourceFunction
    extends RichParallelSourceFunction<String>
    implements CheckpointedFunction {
        private static final long serialVersionUID = 1L;
        private static final ListStateDescriptor<Long> stateDescriptor = new ListStateDescriptor("emitted", Long.class);
        private final byte[] randomBytes = new byte[10];
        private ListState<Long> listState;
        private long emitted = 0L;
        private volatile boolean isRunning = true;
        public static final AtomicInteger INITIALIZE_TIMES = new AtomicInteger(0);

        private StringGeneratingSourceFunction() {
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            this.listState.clear();
            this.listState.add((Object)this.emitted);
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            this.listState = context.getOperatorStateStore().getListState(stateDescriptor);
            INITIALIZE_TIMES.addAndGet(1);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            while (this.isRunning) {
                ThreadLocalRandom.current().nextBytes(this.randomBytes);
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ctx.collect((Object)new String(this.randomBytes));
                    ++this.emitted;
                }
                Thread.sleep(10L);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }
}

