package org.joyqueue.broker.kafka.session;

import io.netty.channel.Channel;
import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import org.joyqueue.network.transport.ChannelTransport;
import org.joyqueue.network.transport.TransportAttribute;
import org.joyqueue.network.transport.TransportState;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.network.transport.command.CommandCallback;
import org.joyqueue.network.transport.exception.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/broker/kafka/session/KafkaChannelTransport.class */
public class KafkaChannelTransport implements ChannelTransport {
    protected static final Logger logger = LoggerFactory.getLogger(KafkaChannelTransport.class);
    private ChannelTransport delegate;
    private ConcurrentLinkedQueue<Command> requestQueue = new ConcurrentLinkedQueue<>();
    private ConcurrentMap<Command, Command> responseMap = new ConcurrentHashMap();

    public KafkaChannelTransport(ChannelTransport channelTransport) {
        this.delegate = channelTransport;
    }

    public Channel getChannel() {
        return this.delegate.getChannel();
    }

    public Command sync(Command command) throws TransportException {
        return this.delegate.sync(command);
    }

    public Command sync(Command command, long j) throws TransportException {
        return this.delegate.sync(command, j);
    }

    public void async(Command command, CommandCallback commandCallback) throws TransportException {
        this.delegate.async(command, commandCallback);
    }

    public void async(Command command, long j, CommandCallback commandCallback) throws TransportException {
        this.delegate.async(command, j, commandCallback);
    }

    public CompletableFuture<?> async(Command command) throws TransportException {
        return this.delegate.async(command);
    }

    public CompletableFuture<?> async(Command command, long j) throws TransportException {
        return this.delegate.async(command, j);
    }

    public void oneway(Command command) throws TransportException {
        this.delegate.oneway(command);
    }

    public void oneway(Command command, long j) throws TransportException {
        this.delegate.oneway(command, j);
    }

    public synchronized void acknowledge(Command command, Command command2) throws TransportException {
        Command command3;
        this.responseMap.put(command, command2);
        while (true) {
            Command peek = this.requestQueue.peek();
            if (peek == null || (command3 = this.responseMap.get(peek)) == null) {
                return;
            }
            this.delegate.getChannel().eventLoop().execute(() -> {
                this.delegate.acknowledge(peek, command3);
            });
            this.responseMap.remove(peek);
            this.requestQueue.remove(peek);
        }
    }

    public void acknowledge(Command command, Command command2, CommandCallback commandCallback) throws TransportException {
        throw new UnsupportedOperationException();
    }

    public SocketAddress remoteAddress() {
        return this.delegate.remoteAddress();
    }

    public TransportAttribute attr() {
        return this.delegate.attr();
    }

    public void attr(TransportAttribute transportAttribute) {
        this.delegate.attr(transportAttribute);
    }

    public TransportState state() {
        return this.delegate.state();
    }

    public void stop() {
        this.delegate.stop();
    }

    public void acquire(Command command) {
        this.requestQueue.offer(command);
    }

    public String toString() {
        return this.delegate.toString();
    }
}
