/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.kinesis.multilang;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.multilang.DrainChildSTDERRTask;
import software.amazon.kinesis.multilang.MessageReader;
import software.amazon.kinesis.multilang.MessageWriter;
import software.amazon.kinesis.multilang.MultiLangProtocol;
import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class MultiLangShardRecordProcessor
implements ShardRecordProcessor {
    private static final Logger log = LoggerFactory.getLogger(MultiLangShardRecordProcessor.class);
    private static final int EXIT_VALUE = 1;
    private volatile boolean initialized;
    private String shardId;
    private Future<?> stderrReadTask;
    private final MessageWriter messageWriter;
    private final MessageReader messageReader;
    private final DrainChildSTDERRTask readSTDERRTask;
    private final ProcessBuilder processBuilder;
    private Process process;
    private final ExecutorService executorService;
    private ProcessState state;
    private final ObjectMapper objectMapper;
    private MultiLangProtocol protocol;
    private final MultiLangDaemonConfiguration configuration;

    public void initialize(InitializationInput initializationInput) {
        try {
            this.shardId = initializationInput.shardId();
            try {
                this.process = this.startProcess();
            }
            catch (IOException e) {
                throw new IOException("Failed to start client executable", e);
            }
            this.messageWriter.initialize(this.process.getOutputStream(), this.shardId, this.objectMapper, this.executorService);
            this.messageReader.initialize(this.process.getInputStream(), this.shardId, this.objectMapper, this.executorService);
            this.readSTDERRTask.initialize(this.process.getErrorStream(), this.shardId, "Reading STDERR for " + this.shardId);
            this.stderrReadTask = this.executorService.submit(this.readSTDERRTask);
            this.protocol = new MultiLangProtocol(this.messageReader, this.messageWriter, initializationInput, this.configuration);
            if (!this.protocol.initialize()) {
                throw new RuntimeException("Failed to initialize child process");
            }
            this.initialized = true;
        }
        catch (Throwable t) {
            this.stopProcessing("Encountered an error while trying to initialize record processor", t);
        }
    }

    public void processRecords(ProcessRecordsInput processRecordsInput) {
        try {
            if (!this.protocol.processRecords(processRecordsInput)) {
                throw new RuntimeException("Child process failed to process records");
            }
        }
        catch (Throwable t) {
            this.stopProcessing("Encountered an error while trying to process records", t);
        }
    }

    public void leaseLost(LeaseLostInput leaseLostInput) {
        this.shutdown(p -> p.leaseLost(leaseLostInput));
    }

    public void shardEnded(ShardEndedInput shardEndedInput) {
        this.shutdown(p -> p.shardEnded(shardEndedInput));
    }

    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        log.info("Shutdown is requested.");
        if (!this.initialized) {
            log.info("Record processor was not initialized so no need to initiate a final checkpoint.");
            return;
        }
        log.info("Requesting a checkpoint on shutdown notification.");
        if (!this.protocol.shutdownRequested(shutdownRequestedInput.checkpointer())) {
            log.error("Child process failed to complete shutdown notification.");
        }
    }

    void shutdown(Function<MultiLangProtocol, Boolean> protocolInvocation) {
        if (!this.initialized) {
            log.info("Record processor was not initialized and will not have a child process, so not invoking child process shutdown.");
            this.state = ProcessState.SHUTDOWN;
            return;
        }
        try {
            if (ProcessState.ACTIVE.equals((Object)this.state)) {
                if (!protocolInvocation.apply(this.protocol).booleanValue()) {
                    throw new RuntimeException("Child process failed to shutdown");
                }
                this.childProcessShutdownSequence();
            } else {
                log.warn("Shutdown was called but this processor is already shutdown. Not doing anything.");
            }
        }
        catch (Throwable t) {
            if (ProcessState.ACTIVE.equals((Object)this.state)) {
                this.stopProcessing("Encountered an error while trying to shutdown child process", t);
            }
            this.stopProcessing("Encountered an error during shutdown, but it appears the processor has already been shutdown", t);
        }
    }

    MultiLangShardRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService, ObjectMapper objectMapper, MultiLangDaemonConfiguration configuration) {
        this(processBuilder, executorService, objectMapper, new MessageWriter(), new MessageReader(), new DrainChildSTDERRTask(), configuration);
    }

    MultiLangShardRecordProcessor(ProcessBuilder processBuilder, ExecutorService executorService, ObjectMapper objectMapper, MessageWriter messageWriter, MessageReader messageReader, DrainChildSTDERRTask readSTDERRTask, MultiLangDaemonConfiguration configuration) {
        this.executorService = executorService;
        this.processBuilder = processBuilder;
        this.objectMapper = objectMapper;
        this.messageWriter = messageWriter;
        this.messageReader = messageReader;
        this.readSTDERRTask = readSTDERRTask;
        this.configuration = configuration;
        this.state = ProcessState.ACTIVE;
    }

    private void childProcessShutdownSequence() {
        try {
            if (this.messageWriter.isOpen()) {
                this.messageWriter.close();
            }
        }
        catch (IOException e) {
            log.error("Encountered exception while trying to close output stream.", (Throwable)e);
        }
        this.safelyWaitOnFuture(this.messageReader.drainSTDOUT(), "draining STDOUT");
        this.safelyWaitOnFuture(this.stderrReadTask, "draining STDERR");
        this.safelyCloseInputStream(this.process.getErrorStream(), "STDERR");
        this.safelyCloseInputStream(this.process.getInputStream(), "STDOUT");
        try {
            log.info("Child process exited with value: {}", (Object)this.process.waitFor());
        }
        catch (InterruptedException e) {
            log.error("Interrupted before process finished exiting. Attempting to kill process.");
            this.process.destroy();
        }
        this.state = ProcessState.SHUTDOWN;
    }

    private void safelyCloseInputStream(InputStream inputStream, String name) {
        try {
            inputStream.close();
        }
        catch (IOException e) {
            log.error("Encountered exception while trying to close {} stream.", (Object)name, (Object)e);
        }
    }

    private void safelyWaitOnFuture(Future<?> future, String whatThisFutureIsDoing) {
        try {
            future.get();
        }
        catch (InterruptedException | ExecutionException e) {
            log.error("Encountered error while {} for shard {}", new Object[]{whatThisFutureIsDoing, this.shardId, e});
        }
    }

    private void stopProcessing(String message, Throwable reason) {
        try {
            log.error(message, reason);
            if (!this.state.equals((Object)ProcessState.SHUTDOWN)) {
                this.childProcessShutdownSequence();
            }
        }
        catch (Throwable t) {
            log.error("Encountered error while trying to shutdown", t);
        }
        this.exit();
    }

    void exit() {
        System.exit(1);
    }

    Process startProcess() throws IOException {
        return this.processBuilder.start();
    }

    private static enum ProcessState {
        ACTIVE,
        SHUTDOWN;

    }
}

