/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.hooks;

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

public class MasterHooks {
    public static List<MasterState> triggerMasterHooks(Collection<MasterTriggerRestoreHook<?>> hooks, long checkpointId, long timestamp, Executor executor, Time timeout) throws FlinkException {
        ArrayList<MasterState> states = new ArrayList<MasterState>(hooks.size());
        for (MasterTriggerRestoreHook<?> hook : hooks) {
            MasterState state = MasterHooks.triggerHook(hook, checkpointId, timestamp, executor, timeout);
            if (state == null) continue;
            states.add(state);
        }
        states.trimToSize();
        return states;
    }

    private static <T> MasterState triggerHook(MasterTriggerRestoreHook<?> hook, long checkpointId, long timestamp, Executor executor, Time timeout) throws FlinkException {
        Object result;
        Future<?> resultFuture;
        MasterTriggerRestoreHook<?> typedHook = hook;
        String id = typedHook.getIdentifier();
        SimpleVersionedSerializer<?> serializer = typedHook.createCheckpointDataSerializer();
        try {
            resultFuture = typedHook.triggerCheckpoint(checkpointId, timestamp, executor);
        }
        catch (Throwable t) {
            ExceptionUtils.rethrowIfFatalErrorOrOOM((Throwable)t);
            throw new FlinkException("Error while triggering checkpoint master hook '" + id + '\'', t);
        }
        if (resultFuture == null) {
            return null;
        }
        try {
            result = resultFuture.get(timeout.getSize(), timeout.getUnit());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new FlinkException("Checkpoint master hook was interrupted");
        }
        catch (ExecutionException e) {
            throw new FlinkException("Checkpoint master hook '" + id + "' produced an exception", e.getCause());
        }
        catch (TimeoutException e) {
            throw new FlinkException("Checkpoint master hook '" + id + "' did not complete in time (" + timeout + ')');
        }
        if (result == null) {
            return null;
        }
        if (serializer != null) {
            try {
                int version = serializer.getVersion();
                byte[] bytes = serializer.serialize(result);
                return new MasterState(id, bytes, version);
            }
            catch (Throwable t) {
                ExceptionUtils.rethrowIfFatalErrorOrOOM((Throwable)t);
                throw new FlinkException("Failed to serialize state of master hook '" + id + '\'', t);
            }
        }
        throw new FlinkException("Checkpoint hook '" + id + " is stateful but creates no serializer");
    }

    public static void restoreMasterHooks(Map<String, MasterTriggerRestoreHook<?>> masterHooks, Collection<MasterState> states, long checkpointId, boolean allowUnmatchedState, Logger log) throws FlinkException {
        if (states == null || states.isEmpty() || masterHooks == null || masterHooks.isEmpty()) {
            log.info("No master state to restore");
            return;
        }
        log.info("Calling master restore hooks");
        LinkedHashMap allHooks = new LinkedHashMap(masterHooks);
        ArrayList<Tuple2> hooksAndStates = new ArrayList<Tuple2>();
        for (MasterState masterState : states) {
            if (masterState == null) continue;
            String name = masterState.name();
            MasterTriggerRestoreHook hook = (MasterTriggerRestoreHook)allHooks.remove(name);
            if (hook != null) {
                log.debug("Found state to restore for hook '{}'", (Object)name);
                Object deserializedState = MasterHooks.deserializeState(masterState, hook);
                hooksAndStates.add(new Tuple2((Object)hook, deserializedState));
                continue;
            }
            if (!allowUnmatchedState) {
                throw new IllegalStateException("Found state '" + masterState.name() + "' which is not resumed by any hook.");
            }
            log.info("Dropping unmatched state from '{}'", (Object)name);
        }
        for (Tuple2 tuple2 : hooksAndStates) {
            MasterHooks.restoreHook(tuple2.f1, (MasterTriggerRestoreHook)tuple2.f0, checkpointId);
        }
        for (MasterTriggerRestoreHook masterTriggerRestoreHook : allHooks.values()) {
            MasterHooks.restoreHook(null, masterTriggerRestoreHook, checkpointId);
        }
    }

    private static <T> T deserializeState(MasterState state, MasterTriggerRestoreHook<?> hook) throws FlinkException {
        MasterTriggerRestoreHook<?> typedHook = hook;
        String id = hook.getIdentifier();
        try {
            SimpleVersionedSerializer<?> deserializer = typedHook.createCheckpointDataSerializer();
            if (deserializer == null) {
                throw new FlinkException("null serializer for state of hook " + hook.getIdentifier());
            }
            return (T)deserializer.deserialize(state.version(), state.bytes());
        }
        catch (Throwable t) {
            throw new FlinkException("Cannot deserialize state for master hook '" + id + '\'', t);
        }
    }

    private static <T> void restoreHook(Object state, MasterTriggerRestoreHook<?> hook, long checkpointId) throws FlinkException {
        Object typedState = state;
        MasterTriggerRestoreHook<?> typedHook = hook;
        try {
            typedHook.restoreCheckpoint(checkpointId, typedState);
        }
        catch (FlinkException e) {
            throw e;
        }
        catch (Throwable t) {
            ExceptionUtils.rethrowIfFatalError((Throwable)t);
            throw new FlinkException("Error while calling restoreCheckpoint on checkpoint hook '" + hook.getIdentifier() + '\'', t);
        }
    }

    public static <T> MasterTriggerRestoreHook<T> wrapHook(MasterTriggerRestoreHook<T> hook, ClassLoader userClassLoader) {
        return new WrappedMasterHook<T>(hook, userClassLoader);
    }

    private MasterHooks() {
    }

    private static class WrappedMasterHook<T>
    implements MasterTriggerRestoreHook<T> {
        private final MasterTriggerRestoreHook<T> hook;
        private final ClassLoader userClassLoader;

        WrappedMasterHook(MasterTriggerRestoreHook<T> hook, ClassLoader userClassLoader) {
            this.hook = (MasterTriggerRestoreHook)Preconditions.checkNotNull(hook);
            this.userClassLoader = (ClassLoader)Preconditions.checkNotNull((Object)userClassLoader);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public String getIdentifier() {
            Thread thread = Thread.currentThread();
            ClassLoader originalClassLoader = thread.getContextClassLoader();
            thread.setContextClassLoader(this.userClassLoader);
            try {
                String string = this.hook.getIdentifier();
                return string;
            }
            finally {
                thread.setContextClassLoader(originalClassLoader);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        @Nullable
        public Future<T> triggerCheckpoint(long checkpointId, long timestamp, final Executor executor) throws Exception {
            Executor wrappedExecutor = new Executor(){

                @Override
                public void execute(Runnable command) {
                    executor.execute(new WrappedCommand(command));
                }
            };
            Thread thread = Thread.currentThread();
            ClassLoader originalClassLoader = thread.getContextClassLoader();
            thread.setContextClassLoader(this.userClassLoader);
            try {
                Future<T> future = this.hook.triggerCheckpoint(checkpointId, timestamp, wrappedExecutor);
                return future;
            }
            finally {
                thread.setContextClassLoader(originalClassLoader);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void restoreCheckpoint(long checkpointId, @Nullable T checkpointData) throws Exception {
            Thread thread = Thread.currentThread();
            ClassLoader originalClassLoader = thread.getContextClassLoader();
            thread.setContextClassLoader(this.userClassLoader);
            try {
                this.hook.restoreCheckpoint(checkpointId, checkpointData);
            }
            finally {
                thread.setContextClassLoader(originalClassLoader);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        @Nullable
        public SimpleVersionedSerializer<T> createCheckpointDataSerializer() {
            Thread thread = Thread.currentThread();
            ClassLoader originalClassLoader = thread.getContextClassLoader();
            thread.setContextClassLoader(this.userClassLoader);
            try {
                SimpleVersionedSerializer<T> simpleVersionedSerializer = this.hook.createCheckpointDataSerializer();
                return simpleVersionedSerializer;
            }
            finally {
                thread.setContextClassLoader(originalClassLoader);
            }
        }

        private class WrappedCommand
        implements Runnable {
            private final Runnable command;

            WrappedCommand(Runnable command) {
                this.command = (Runnable)Preconditions.checkNotNull((Object)command);
            }

            @Override
            public void run() {
                Thread thread = Thread.currentThread();
                ClassLoader originalClassLoader = thread.getContextClassLoader();
                thread.setContextClassLoader(WrappedMasterHook.this.userClassLoader);
                try {
                    this.command.run();
                }
                finally {
                    thread.setContextClassLoader(originalClassLoader);
                }
            }
        }
    }
}

