/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.fs4.mplex;

import com.yahoo.concurrent.SystemTimer;
import com.yahoo.fs4.BasicPacket;
import com.yahoo.fs4.ChannelTimeoutException;
import com.yahoo.fs4.EolPacket;
import com.yahoo.fs4.ErrorPacket;
import com.yahoo.fs4.Packet;
import com.yahoo.fs4.PongPacket;
import com.yahoo.fs4.mplex.Backend;
import com.yahoo.fs4.mplex.InvalidChannelException;
import com.yahoo.search.Query;
import com.yahoo.search.dispatch.ResponseMonitor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

public class FS4Channel {
    private static Logger log = Logger.getLogger(FS4Channel.class.getName());
    private Integer channelId;
    private Backend backend;
    private volatile BlockingQueue<BasicPacket> responseQueue;
    private Query query;
    private boolean isPingChannel = false;
    private ResponseMonitor<FS4Channel> monitor;

    protected FS4Channel() {
    }

    protected FS4Channel(Backend backend, Integer channelId) {
        this.channelId = channelId;
        this.backend = backend;
        this.responseQueue = new LinkedBlockingQueue<BasicPacket>();
    }

    public static FS4Channel createPingChannel(Backend backend) {
        FS4Channel pingChannel = new FS4Channel(backend, 0);
        pingChannel.isPingChannel = true;
        return pingChannel;
    }

    public void setQuery(Query query) {
        this.query = query;
    }

    public Query getQuery() {
        return this.query;
    }

    public Integer getChannelId() {
        return this.channelId;
    }

    public void close() {
        BlockingQueue<BasicPacket> q = this.responseQueue;
        this.responseQueue = null;
        this.query = null;
        if (this.isPingChannel) {
            this.backend.removePingChannel();
        } else {
            this.backend.removeChannel(this.channelId);
        }
        if (q != null) {
            q.clear();
        }
    }

    public boolean sendPacket(BasicPacket packet) throws InvalidChannelException, IOException {
        this.ensureValid();
        return this.backend.sendPacket(packet, this.channelId);
    }

    public BasicPacket[] receivePackets(long timeout, int packetCount) throws InvalidChannelException, ChannelTimeoutException {
        this.ensureValid();
        ArrayList<BasicPacket> packets = new ArrayList<BasicPacket>(12);
        long startTime = SystemTimer.INSTANCE.milliTime();
        long timeLeft = timeout;
        try {
            while (timeLeft >= 0L) {
                BasicPacket p = this.nextPacket(timeLeft);
                if (p == null) {
                    throw new ChannelTimeoutException("Timed out");
                }
                if (!this.isPingChannel && ((Packet)p).getChannel() != this.getChannelId().intValue()) {
                    log.warning("Ignoring received " + p + ", when excepting channel " + this.getChannelId());
                    continue;
                }
                packets.add(p);
                if (FS4Channel.isLastPacket(p) || FS4Channel.hasEnoughPackets(packetCount, packets)) {
                    BasicPacket[] packetArray = new BasicPacket[packets.size()];
                    packets.toArray(packetArray);
                    return packetArray;
                }
                timeLeft = timeout - (SystemTimer.INSTANCE.milliTime() - startTime);
            }
        }
        catch (InvalidChannelException e) {
            log.info("FS4Channel was invalid. timeLeft=" + timeLeft + ", timeout=" + timeout);
        }
        catch (InterruptedException e) {
            log.info("FS4Channel was interrupted. timeLeft=" + timeLeft + ", timeout=" + timeout);
            Thread.currentThread().interrupt();
        }
        throw new ChannelTimeoutException();
    }

    private static boolean hasEnoughPackets(int packetCount, List<BasicPacket> packets) {
        if (packetCount < 0) {
            return false;
        }
        return packets.size() >= packetCount;
    }

    private static boolean isLastPacket(BasicPacket packet) {
        if (packet instanceof ErrorPacket) {
            return true;
        }
        if (packet instanceof EolPacket) {
            return true;
        }
        return packet instanceof PongPacket;
    }

    public BasicPacket nextPacket(long timeout) throws InterruptedException, InvalidChannelException {
        return this.ensureValidQ().poll(timeout, TimeUnit.MILLISECONDS);
    }

    protected void addPacket(BasicPacket packet) throws InterruptedException, InvalidChannelException {
        this.ensureValidQ().put(packet);
        if (this.monitor != null) {
            this.monitor.responseAvailable(this);
        }
    }

    public boolean isValid() {
        return this.responseQueue != null;
    }

    private void ensureValid() throws InvalidChannelException {
        if (this.isValid()) {
            return;
        }
        throw new InvalidChannelException("Channel is no longer valid");
    }

    private BlockingQueue<BasicPacket> ensureValidQ() throws InvalidChannelException {
        BlockingQueue<BasicPacket> q = this.responseQueue;
        if (q != null) {
            return q;
        }
        throw new InvalidChannelException("Channel is no longer valid");
    }

    public String toString() {
        return "fs4 channel " + this.channelId + (this.isValid() ? " [valid]" : " [invalid]");
    }

    public void setResponseMonitor(ResponseMonitor<FS4Channel> monitor) {
        this.monitor = monitor;
    }
}

