/*
 * Decompiled with CFR 0.152.
 */
package com.github.ambry.messageformat;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.github.ambry.messageformat.MessageFormatException;
import com.github.ambry.store.Message;
import com.github.ambry.store.MessageInfo;
import com.github.ambry.store.TransformationOutput;
import com.github.ambry.store.Transformer;
import com.github.ambry.utils.SystemTime;
import com.github.ambry.utils.Utils;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageSievingInputStream
extends InputStream {
    private int sievedStreamSize;
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private final InputStream sievedStream;
    private boolean hasInvalidMessages;
    private boolean hasDeprecatedMessages;
    private final List<MessageInfo> sievedMessageInfoList;
    private final List<Transformer> transformers;
    public final Histogram singleMessageSieveTime;
    public final Histogram batchMessageSieveTime;
    public final Counter messageSievingCorruptMessagesDiscardedCount;
    public final Counter messageSievingDeprecatedMessagesDiscardedCount;
    public final Counter messageSievingDeletedMessagesDiscardedCount;
    public final Counter messageSievingExpiredMessagesDiscardedCount;

    public MessageSievingInputStream(InputStream inStream, List<MessageInfo> messageInfoList, List<Transformer> transformers, MetricRegistry metricRegistry) throws IOException {
        this.transformers = transformers;
        this.singleMessageSieveTime = metricRegistry.histogram(MetricRegistry.name(MessageSievingInputStream.class, (String[])new String[]{"SingleMessageSieveTime"}));
        this.batchMessageSieveTime = metricRegistry.histogram(MetricRegistry.name(MessageSievingInputStream.class, (String[])new String[]{"BatchMessageSieveTime"}));
        this.messageSievingCorruptMessagesDiscardedCount = metricRegistry.counter(MetricRegistry.name(MessageSievingInputStream.class, (String[])new String[]{"MessageSievingCorruptMessagesDiscardedCount"}));
        this.messageSievingDeprecatedMessagesDiscardedCount = metricRegistry.counter(MetricRegistry.name(MessageSievingInputStream.class, (String[])new String[]{"MessageSievingDeprecatedMessagesDiscardedCount"}));
        this.messageSievingDeletedMessagesDiscardedCount = metricRegistry.counter(MetricRegistry.name(MessageSievingInputStream.class, (String[])new String[]{"MessageSievingDeletedMessagesDiscardedCount"}));
        this.messageSievingExpiredMessagesDiscardedCount = metricRegistry.counter(MetricRegistry.name(MessageSievingInputStream.class, (String[])new String[]{"MessageSievingExpiredMessagesDiscardedCount"}));
        this.sievedStreamSize = 0;
        this.hasInvalidMessages = false;
        this.hasDeprecatedMessages = false;
        this.sievedMessageInfoList = new ArrayList<MessageInfo>();
        if (messageInfoList.size() == 0) {
            this.sievedStream = new ByteArrayInputStream(new byte[0]);
            return;
        }
        int totalMessageListSize = 0;
        for (MessageInfo info : messageInfoList) {
            totalMessageListSize = (int)((long)totalMessageListSize + info.getSize());
        }
        int bytesRead = 0;
        ArrayList<InputStream> msgStreamList = new ArrayList<InputStream>();
        long batchStartTime = SystemTime.getInstance().milliseconds();
        this.logger.trace("Starting to validate message stream ");
        for (MessageInfo msgInfo : messageInfoList) {
            int msgSize = (int)msgInfo.getSize();
            if (msgInfo.isDeleted()) {
                this.messageSievingDeletedMessagesDiscardedCount.inc();
                this.logger.trace("Skipping message with key {}, because it is deleted.", (Object)msgInfo.getStoreKey());
                Utils.readBytesFromStream((InputStream)inStream, (int)msgSize);
            } else if (msgInfo.isExpired()) {
                this.messageSievingExpiredMessagesDiscardedCount.inc();
                this.logger.trace("Skipping message with key {}, because it has expired.", (Object)msgInfo.getStoreKey());
                Utils.readBytesFromStream((InputStream)inStream, (int)msgSize);
            } else {
                Message msg = new Message(msgInfo, (InputStream)new ByteArrayInputStream(Utils.readBytesFromStream((InputStream)inStream, (int)msgSize)));
                this.logger.trace("Read stream for message info " + msgInfo + "  into memory");
                this.validateAndTransform(msg, msgStreamList, bytesRead);
            }
            bytesRead += msgSize;
        }
        if (bytesRead != totalMessageListSize) {
            this.logger.error("Failed to read intended size from stream. Expected " + totalMessageListSize + ", actual " + bytesRead);
        }
        if (this.hasInvalidMessages) {
            this.logger.error("There are invalidated messages in this stream");
        }
        if (this.hasDeprecatedMessages) {
            this.logger.trace("There are deprecated messages in this stream");
        }
        this.batchMessageSieveTime.update(SystemTime.getInstance().milliseconds() - batchStartTime);
        final ListIterator msgStreamIterator = msgStreamList.listIterator();
        Enumeration<InputStream> inputStreamEnumeration = new Enumeration<InputStream>(){

            @Override
            public boolean hasMoreElements() {
                return msgStreamIterator.hasNext();
            }

            @Override
            public InputStream nextElement() {
                return (InputStream)msgStreamIterator.next();
            }
        };
        this.sievedStream = new SequenceInputStream((Enumeration<? extends InputStream>)inputStreamEnumeration);
        this.sievedStreamSize = (int)this.sievedMessageInfoList.stream().mapToLong(MessageInfo::getSize).sum();
        this.logger.trace("Completed validation of message stream ");
    }

    public int getSize() {
        return this.sievedStreamSize;
    }

    @Override
    public int read() throws IOException {
        return this.sievedStream.read();
    }

    @Override
    public int read(byte[] bytes, int offset, int length) throws IOException {
        return this.sievedStream.read(bytes, offset, length);
    }

    public boolean hasInvalidMessages() {
        return this.hasInvalidMessages;
    }

    boolean hasDeprecatedMessages() {
        return this.hasDeprecatedMessages;
    }

    public List<MessageInfo> getValidMessageInfoList() {
        return this.sievedMessageInfoList;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void validateAndTransform(Message inMsg, List<InputStream> msgStreamList, int msgOffset) throws IOException {
        if (this.transformers == null || this.transformers.isEmpty()) {
            this.sievedMessageInfoList.add(inMsg.getMessageInfo());
            msgStreamList.add(inMsg.getStream());
            return;
        } else {
            Transformer transformer;
            long sieveStartTime = SystemTime.getInstance().milliseconds();
            Message msg = inMsg;
            TransformationOutput output = null;
            Iterator<Transformer> iterator = this.transformers.iterator();
            while (iterator.hasNext() && (output = (transformer = iterator.next()).transform(msg)).getException() == null && output.getMsg() != null) {
                msg = output.getMsg();
            }
            if (output.getException() != null) {
                if (!(output.getException() instanceof MessageFormatException)) throw new IOException("Encountered exception during transformation", output.getException());
                this.logger.error("Error validating/transforming the message at {} with messageInfo {} and hence skipping the message", new Object[]{msgOffset, inMsg.getMessageInfo(), output.getException()});
                this.hasInvalidMessages = true;
                this.messageSievingCorruptMessagesDiscardedCount.inc();
            } else if (output.getMsg() == null) {
                this.logger.trace("Transformation is on, and the message with id {} does not have a replacement and was discarded.", (Object)inMsg.getMessageInfo().getStoreKey());
                this.hasDeprecatedMessages = true;
                this.messageSievingDeprecatedMessagesDiscardedCount.inc();
            } else {
                MessageInfo tfmMsgInfo = output.getMsg().getMessageInfo();
                this.sievedMessageInfoList.add(tfmMsgInfo);
                msgStreamList.add(output.getMsg().getStream());
                this.logger.trace("Original message length {}, transformed bytes read {}", (Object)inMsg.getMessageInfo().getSize(), (Object)tfmMsgInfo.getSize());
            }
            this.singleMessageSieveTime.update(SystemTime.getInstance().milliseconds() - sieveStartTime);
        }
    }
}

