/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processor.util.listen.handler.socket;

import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelAttachment;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
import org.apache.nifi.processor.util.listen.handler.socket.SocketChannelHandler;
import org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder;

public class StandardSocketChannelHandler<E extends Event<SocketChannel>>
extends SocketChannelHandler<E> {
    private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096);

    public StandardSocketChannelHandler(SelectionKey key, AsyncChannelDispatcher dispatcher, Charset charset, EventFactory<E> eventFactory, BlockingQueue<E> events, ComponentLog logger) {
        super(key, dispatcher, charset, eventFactory, events, logger);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        boolean eof = false;
        SocketChannel socketChannel = null;
        try {
            int bytesRead;
            socketChannel = (SocketChannel)this.key.channel();
            SocketChannelAttachment attachment = (SocketChannelAttachment)this.key.attachment();
            ByteBuffer socketBuffer = attachment.getByteBuffer();
            while ((bytesRead = socketChannel.read(socketBuffer)) > 0) {
                socketBuffer.flip();
                socketBuffer.mark();
                this.processBuffer(socketChannel, socketBuffer);
                socketBuffer.reset();
                socketBuffer.compact();
                this.logger.debug("bytes read {}", new Object[]{bytesRead});
            }
            if (bytesRead < 0) {
                eof = true;
                this.logger.debug("Reached EOF, closing connection");
            } else {
                this.logger.debug("No more data available, returning for selection");
            }
        }
        catch (InterruptedException | ClosedByInterruptException e) {
            this.logger.debug("read loop interrupted, closing connection");
            eof = true;
        }
        catch (ClosedChannelException e) {
            this.logger.error("Error reading from channel due to channel being closed", (Throwable)e);
            eof = true;
        }
        catch (IOException e) {
            this.logger.error("Error reading from channel due to {}", new Object[]{e.getMessage()}, (Throwable)e);
            eof = true;
        }
        finally {
            if (eof) {
                IOUtils.closeQuietly((Closeable)socketChannel);
                ((AsyncChannelDispatcher)this.dispatcher).completeConnection(this.key);
            } else {
                ((AsyncChannelDispatcher)this.dispatcher).addBackForSelection(this.key);
            }
        }
    }

    protected void processBuffer(SocketChannel socketChannel, ByteBuffer socketBuffer) throws InterruptedException, IOException {
        int total = socketBuffer.remaining();
        InetAddress sender = socketChannel.socket().getInetAddress();
        this.currBytes.reset();
        for (int i = 0; i < total; ++i) {
            byte currByte = socketBuffer.get();
            if (currByte == this.getDelimiter()) {
                if (this.currBytes.size() <= 0) continue;
                SocketChannelResponder response = new SocketChannelResponder(socketChannel);
                Map<String, String> metadata = EventFactoryUtil.createMapWithSender(sender.toString());
                Object event = this.eventFactory.create(this.currBytes.toByteArray(), metadata, response);
                this.events.offer(event);
                this.currBytes.reset();
                socketBuffer.mark();
                continue;
            }
            this.currBytes.write(currByte);
        }
    }

    @Override
    public byte getDelimiter() {
        return 10;
    }
}

