/*
 * Decompiled with CFR 0.152.
 */
package org.joyqueue.broker.kafka.session;

import io.netty.channel.Channel;
import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import org.joyqueue.network.transport.ChannelTransport;
import org.joyqueue.network.transport.TransportAttribute;
import org.joyqueue.network.transport.TransportState;
import org.joyqueue.network.transport.command.Command;
import org.joyqueue.network.transport.command.CommandCallback;
import org.joyqueue.network.transport.exception.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaChannelTransport
implements ChannelTransport {
    protected static final Logger logger = LoggerFactory.getLogger(KafkaChannelTransport.class);
    private ChannelTransport delegate;
    private ConcurrentLinkedQueue<Command> requestQueue = new ConcurrentLinkedQueue();
    private ConcurrentMap<Command, Command> responseMap = new ConcurrentHashMap<Command, Command>();

    public KafkaChannelTransport(ChannelTransport delegate) {
        this.delegate = delegate;
    }

    public Channel getChannel() {
        return this.delegate.getChannel();
    }

    public Command sync(Command command) throws TransportException {
        return this.delegate.sync(command);
    }

    public Command sync(Command command, long timeout) throws TransportException {
        return this.delegate.sync(command, timeout);
    }

    public void async(Command command, CommandCallback callback) throws TransportException {
        this.delegate.async(command, callback);
    }

    public void async(Command command, long timeout, CommandCallback callback) throws TransportException {
        this.delegate.async(command, timeout, callback);
    }

    public CompletableFuture<?> async(Command command) throws TransportException {
        return this.delegate.async(command);
    }

    public CompletableFuture<?> async(Command command, long timeout) throws TransportException {
        return this.delegate.async(command, timeout);
    }

    public void oneway(Command command) throws TransportException {
        this.delegate.oneway(command);
    }

    public void oneway(Command command, long timeout) throws TransportException {
        this.delegate.oneway(command, timeout);
    }

    public synchronized void acknowledge(Command request, Command response) throws TransportException {
        Command queueRequest;
        Command queueResponse;
        this.responseMap.put(request, response);
        while ((request = this.requestQueue.peek()) != null && (queueResponse = (Command)this.responseMap.get(queueRequest = request)) != null) {
            this.delegate.getChannel().eventLoop().execute(() -> this.delegate.acknowledge(queueRequest, queueResponse));
            this.responseMap.remove(queueRequest);
            this.requestQueue.remove(queueRequest);
        }
    }

    public void acknowledge(Command request, Command response, CommandCallback callback) throws TransportException {
        throw new UnsupportedOperationException();
    }

    public SocketAddress remoteAddress() {
        return this.delegate.remoteAddress();
    }

    public TransportAttribute attr() {
        return this.delegate.attr();
    }

    public void attr(TransportAttribute attribute) {
        this.delegate.attr(attribute);
    }

    public TransportState state() {
        return this.delegate.state();
    }

    public void stop() {
        this.delegate.stop();
    }

    public void acquire(Command request) {
        this.requestQueue.offer(request);
    }

    public String toString() {
        return this.delegate.toString();
    }
}

