package com.github.ambry.messageformat;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.github.ambry.store.Message;
import com.github.ambry.store.MessageInfo;
import com.github.ambry.store.TransformationOutput;
import com.github.ambry.store.Transformer;
import com.github.ambry.utils.SystemTime;
import com.github.ambry.utils.Utils;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/ambry/messageformat/MessageSievingInputStream.class */
public class MessageSievingInputStream extends InputStream {
    private int sievedStreamSize;
    private final InputStream sievedStream;
    private final List<Transformer> transformers;
    public final Histogram singleMessageSieveTime;
    public final Histogram batchMessageSieveTime;
    public final Counter messageSievingCorruptMessagesDiscardedCount;
    public final Counter messageSievingDeprecatedMessagesDiscardedCount;
    public final Counter messageSievingDeletedMessagesDiscardedCount;
    public final Counter messageSievingExpiredMessagesDiscardedCount;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private boolean hasInvalidMessages = false;
    private boolean hasDeprecatedMessages = false;
    private final List<MessageInfo> sievedMessageInfoList = new ArrayList();

    public MessageSievingInputStream(InputStream inputStream, List<MessageInfo> list, List<Transformer> list2, MetricRegistry metricRegistry) throws IOException {
        this.transformers = list2;
        this.singleMessageSieveTime = metricRegistry.histogram(MetricRegistry.name(MessageSievingInputStream.class, new String[]{"SingleMessageSieveTime"}));
        this.batchMessageSieveTime = metricRegistry.histogram(MetricRegistry.name(MessageSievingInputStream.class, new String[]{"BatchMessageSieveTime"}));
        this.messageSievingCorruptMessagesDiscardedCount = metricRegistry.counter(MetricRegistry.name(MessageSievingInputStream.class, new String[]{"MessageSievingCorruptMessagesDiscardedCount"}));
        this.messageSievingDeprecatedMessagesDiscardedCount = metricRegistry.counter(MetricRegistry.name(MessageSievingInputStream.class, new String[]{"MessageSievingDeprecatedMessagesDiscardedCount"}));
        this.messageSievingDeletedMessagesDiscardedCount = metricRegistry.counter(MetricRegistry.name(MessageSievingInputStream.class, new String[]{"MessageSievingDeletedMessagesDiscardedCount"}));
        this.messageSievingExpiredMessagesDiscardedCount = metricRegistry.counter(MetricRegistry.name(MessageSievingInputStream.class, new String[]{"MessageSievingExpiredMessagesDiscardedCount"}));
        this.sievedStreamSize = 0;
        if (list.size() == 0) {
            this.sievedStream = new ByteArrayInputStream(new byte[0]);
            return;
        }
        int i = 0;
        Iterator<MessageInfo> it = list.iterator();
        while (it.hasNext()) {
            i = (int) (i + it.next().getSize());
        }
        int i2 = 0;
        ArrayList arrayList = new ArrayList();
        long milliseconds = SystemTime.getInstance().milliseconds();
        this.logger.trace("Starting to validate message stream ");
        for (MessageInfo messageInfo : list) {
            int size = (int) messageInfo.getSize();
            if (messageInfo.isDeleted()) {
                this.messageSievingDeletedMessagesDiscardedCount.inc();
                this.logger.trace("Skipping message with key {}, because it is deleted.", messageInfo.getStoreKey());
                Utils.readBytesFromStream(inputStream, size);
            } else if (messageInfo.isExpired()) {
                this.messageSievingExpiredMessagesDiscardedCount.inc();
                this.logger.trace("Skipping message with key {}, because it has expired.", messageInfo.getStoreKey());
                Utils.readBytesFromStream(inputStream, size);
            } else {
                Message message = new Message(messageInfo, new ByteArrayInputStream(Utils.readBytesFromStream(inputStream, size)));
                this.logger.trace("Read stream for message info " + messageInfo + "  into memory");
                validateAndTransform(message, arrayList, i2);
            }
            i2 += size;
        }
        if (i2 != i) {
            this.logger.error("Failed to read intended size from stream. Expected " + i + ", actual " + i2);
        }
        if (this.hasInvalidMessages) {
            this.logger.error("There are invalidated messages in this stream");
        }
        if (this.hasDeprecatedMessages) {
            this.logger.trace("There are deprecated messages in this stream");
        }
        this.batchMessageSieveTime.update(SystemTime.getInstance().milliseconds() - milliseconds);
        final ListIterator<InputStream> listIterator = arrayList.listIterator();
        this.sievedStream = new SequenceInputStream(new Enumeration<InputStream>() { // from class: com.github.ambry.messageformat.MessageSievingInputStream.1
            @Override // java.util.Enumeration
            public boolean hasMoreElements() {
                return listIterator.hasNext();
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Enumeration
            public InputStream nextElement() {
                return (InputStream) listIterator.next();
            }
        });
        this.sievedStreamSize = (int) this.sievedMessageInfoList.stream().mapToLong((v0) -> {
            return v0.getSize();
        }).sum();
        this.logger.trace("Completed validation of message stream ");
    }

    public int getSize() {
        return this.sievedStreamSize;
    }

    @Override // java.io.InputStream
    public int read() throws IOException {
        return this.sievedStream.read();
    }

    @Override // java.io.InputStream
    public int read(byte[] bArr, int i, int i2) throws IOException {
        return this.sievedStream.read(bArr, i, i2);
    }

    public boolean hasInvalidMessages() {
        return this.hasInvalidMessages;
    }

    boolean hasDeprecatedMessages() {
        return this.hasDeprecatedMessages;
    }

    public List<MessageInfo> getValidMessageInfoList() {
        return this.sievedMessageInfoList;
    }

    private void validateAndTransform(Message message, List<InputStream> list, int i) throws IOException {
        if (this.transformers == null || this.transformers.isEmpty()) {
            this.sievedMessageInfoList.add(message.getMessageInfo());
            list.add(message.getStream());
            return;
        }
        long milliseconds = SystemTime.getInstance().milliseconds();
        Message message2 = message;
        TransformationOutput transformationOutput = null;
        Iterator<Transformer> it = this.transformers.iterator();
        while (it.hasNext()) {
            transformationOutput = it.next().transform(message2);
            if (transformationOutput.getException() != null || transformationOutput.getMsg() == null) {
                break;
            } else {
                message2 = transformationOutput.getMsg();
            }
        }
        if (transformationOutput.getException() != null) {
            if (!(transformationOutput.getException() instanceof MessageFormatException)) {
                throw new IOException("Encountered exception during transformation", transformationOutput.getException());
            }
            this.logger.error("Error validating/transforming the message at {} with messageInfo {} and hence skipping the message", new Object[]{Integer.valueOf(i), message.getMessageInfo(), transformationOutput.getException()});
            this.hasInvalidMessages = true;
            this.messageSievingCorruptMessagesDiscardedCount.inc();
        } else if (transformationOutput.getMsg() == null) {
            this.logger.trace("Transformation is on, and the message with id {} does not have a replacement and was discarded.", message.getMessageInfo().getStoreKey());
            this.hasDeprecatedMessages = true;
            this.messageSievingDeprecatedMessagesDiscardedCount.inc();
        } else {
            MessageInfo messageInfo = transformationOutput.getMsg().getMessageInfo();
            this.sievedMessageInfoList.add(messageInfo);
            list.add(transformationOutput.getMsg().getStream());
            this.logger.trace("Original message length {}, transformed bytes read {}", Long.valueOf(message.getMessageInfo().getSize()), Long.valueOf(messageInfo.getSize()));
        }
        this.singleMessageSieveTime.update(SystemTime.getInstance().milliseconds() - milliseconds);
    }
}
