/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.kinesis.multilang;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.multilang.messages.CheckpointMessage;
import software.amazon.kinesis.multilang.messages.InitializeMessage;
import software.amazon.kinesis.multilang.messages.LeaseLostMessage;
import software.amazon.kinesis.multilang.messages.Message;
import software.amazon.kinesis.multilang.messages.ProcessRecordsMessage;
import software.amazon.kinesis.multilang.messages.ShardEndedMessage;
import software.amazon.kinesis.multilang.messages.ShutdownRequestedMessage;

class MessageWriter {
    private static final Logger log = LoggerFactory.getLogger(MessageWriter.class);
    private BufferedWriter writer;
    private volatile boolean open = true;
    private String shardId;
    private ObjectMapper objectMapper;
    private ExecutorService executorService;

    MessageWriter() {
    }

    private Future<Boolean> writeMessageToOutput(final String message) throws IOException {
        Callable<Boolean> writeMessageToOutputTask = new Callable<Boolean>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public Boolean call() throws Exception {
                try {
                    BufferedWriter bufferedWriter = MessageWriter.this.writer;
                    synchronized (bufferedWriter) {
                        MessageWriter.this.writer.write(message, 0, message.length());
                        MessageWriter.this.writer.write(System.lineSeparator(), 0, System.lineSeparator().length());
                        MessageWriter.this.writer.flush();
                    }
                    log.info("Message size == {} bytes for shard {}", (Object)message.getBytes().length, (Object)MessageWriter.this.shardId);
                }
                catch (IOException e) {
                    MessageWriter.this.open = false;
                }
                return MessageWriter.this.open;
            }
        };
        if (this.open) {
            return this.executorService.submit(writeMessageToOutputTask);
        }
        String errorMessage = "Cannot write message " + message + " because writer is closed for shard " + this.shardId;
        log.info(errorMessage);
        throw new IllegalStateException(errorMessage);
    }

    private Future<Boolean> writeMessage(Message message) {
        log.info("Writing {} to child process for shard {}", (Object)message.getClass().getSimpleName(), (Object)this.shardId);
        try {
            String jsonText = this.objectMapper.writeValueAsString((Object)message);
            return this.writeMessageToOutput(jsonText);
        }
        catch (IOException e) {
            String errorMessage = String.format("Encountered I/O error while writing %s action to subprocess", message.getClass().getSimpleName());
            log.error(errorMessage, (Throwable)e);
            throw new RuntimeException(errorMessage, e);
        }
    }

    Future<Boolean> writeInitializeMessage(InitializationInput initializationInput) {
        return this.writeMessage(new InitializeMessage(initializationInput));
    }

    Future<Boolean> writeProcessRecordsMessage(ProcessRecordsInput processRecordsInput) {
        return this.writeMessage(new ProcessRecordsMessage(processRecordsInput));
    }

    Future<Boolean> writeLeaseLossMessage(LeaseLostInput leaseLostInput) {
        return this.writeMessage(new LeaseLostMessage());
    }

    Future<Boolean> writeShardEndedMessage(ShardEndedInput shardEndedInput) {
        return this.writeMessage(new ShardEndedMessage());
    }

    Future<Boolean> writeShutdownRequestedMessage() {
        return this.writeMessage(new ShutdownRequestedMessage());
    }

    Future<Boolean> writeCheckpointMessageWithError(String sequenceNumber, Long subSequenceNumber, Throwable throwable) {
        return this.writeMessage(new CheckpointMessage(sequenceNumber, subSequenceNumber, throwable));
    }

    void close() throws IOException {
        this.open = false;
        this.writer.close();
    }

    boolean isOpen() {
        return this.open;
    }

    MessageWriter initialize(OutputStream stream, String shardId, ObjectMapper objectMapper, ExecutorService executorService) {
        return this.initialize(new BufferedWriter(new OutputStreamWriter(stream)), shardId, objectMapper, executorService);
    }

    MessageWriter initialize(BufferedWriter writer, String shardId, ObjectMapper objectMapper, ExecutorService executorService) {
        this.writer = writer;
        this.shardId = shardId;
        this.objectMapper = objectMapper;
        this.executorService = executorService;
        return this;
    }
}

