/*
 * Decompiled with CFR 0.152.
 */
package com.orientechnologies.orient.client.remote;

import com.orientechnologies.common.exception.OException;
import com.orientechnologies.common.io.OIOException;
import com.orientechnologies.common.log.OLogManager;
import com.orientechnologies.orient.client.binary.OChannelBinaryAsynchClient;
import com.orientechnologies.orient.client.remote.OBinaryRequest;
import com.orientechnologies.orient.client.remote.OBinaryResponse;
import com.orientechnologies.orient.client.remote.ORemotePushHandler;
import com.orientechnologies.orient.client.remote.OStorageRemoteSession;
import com.orientechnologies.orient.client.remote.message.OBinaryPushRequest;
import com.orientechnologies.orient.client.remote.message.OSubscribeRequest;
import com.orientechnologies.orient.client.remote.message.OSubscribeResponse;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelBinary;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelDataInput;
import com.orientechnologies.orient.enterprise.channel.binary.OChannelDataOutput;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class OStorageRemotePushThread
extends Thread {
    private final ORemotePushHandler pushHandler;
    private final String host;
    private final int retryDelay;
    private final long requestTimeout;
    private OChannelBinary network;
    private final BlockingQueue<Object> blockingQueue = new SynchronousQueue<Object>();
    private volatile OBinaryRequest currentRequest;
    private volatile boolean shutDown;

    public OStorageRemotePushThread(ORemotePushHandler storage, String host, int retryDelay, long requestTimeout) {
        this.setDaemon(true);
        this.pushHandler = storage;
        this.host = host;
        this.network = storage.getNetwork(this.host);
        this.retryDelay = retryDelay;
        this.requestTimeout = requestTimeout;
    }

    public void handleException(Throwable throwable) {
        try {
            this.blockingQueue.put(throwable);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        block15: while (!Thread.interrupted() && !this.shutDown) {
            try {
                byte[] token;
                int currentSessionId;
                this.network.setWaitResponseTimeout();
                byte res = this.network.readByte();
                if (res == 0) {
                    currentSessionId = this.network.readInt();
                    token = this.network.readBytes();
                    byte messageId = this.network.readByte();
                    Object response = this.currentRequest.createResponse();
                    response.read((OChannelDataInput)this.network, null);
                    this.blockingQueue.put(response);
                    continue;
                }
                if (res == 1) {
                    currentSessionId = this.network.readInt();
                    token = this.network.readBytes();
                    byte messageId = this.network.readByte();
                    ((OChannelBinaryAsynchClient)this.network).handleStatus(res, currentSessionId, this::handleException);
                    continue;
                }
                byte push = this.network.readByte();
                OBinaryPushRequest request = this.pushHandler.createPush(push);
                request.read((OChannelDataInput)this.network);
                try {
                    Object response = request.execute(this.pushHandler);
                    if (response == null) continue;
                    OStorageRemotePushThread oStorageRemotePushThread = this;
                    synchronized (oStorageRemotePushThread) {
                        this.network.writeByte((byte)90);
                        this.network.writeInt(-1);
                        response.write((OChannelDataOutput)this.network);
                    }
                }
                catch (Exception e) {
                    OLogManager.instance().error((Object)this, "Error executing push request", (Throwable)e, new Object[0]);
                }
            }
            catch (OException | IOException e) {
                this.pushHandler.onPushDisconnect(this.network, (Exception)e);
                while (!OStorageRemotePushThread.currentThread().isInterrupted()) {
                    try {
                        Thread.sleep(this.retryDelay);
                    }
                    catch (InterruptedException x) {
                        OStorageRemotePushThread.currentThread().interrupt();
                    }
                    if (OStorageRemotePushThread.currentThread().isInterrupted()) continue;
                    try {
                        OStorageRemotePushThread oStorageRemotePushThread = this;
                        synchronized (oStorageRemotePushThread) {
                            this.network = this.pushHandler.getNetwork(this.host);
                        }
                        this.pushHandler.onPushReconnect(this.host);
                        continue block15;
                    }
                    catch (OIOException oIOException) {
                    }
                }
            }
            catch (InterruptedException e) {
                this.pushHandler.onPushDisconnect(this.network, e);
                OStorageRemotePushThread.currentThread().interrupt();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <T extends OBinaryResponse> T subscribe(OBinaryRequest<T> request, OStorageRemoteSession session) {
        try {
            OStorageRemotePushThread oStorageRemotePushThread = this;
            synchronized (oStorageRemotePushThread) {
                this.currentRequest = new OSubscribeRequest(request);
                ((OChannelBinaryAsynchClient)this.network).beginRequest((byte)100, session);
                this.currentRequest.write((OChannelDataOutput)this.network, null);
                this.network.flush();
            }
            Object poll = this.blockingQueue.poll(this.requestTimeout, TimeUnit.MILLISECONDS);
            if (poll == null) {
                return null;
            }
            if (poll instanceof OSubscribeResponse) {
                return (T)((OSubscribeResponse)poll).getResponse();
            }
            if (poll instanceof RuntimeException) {
                throw (RuntimeException)poll;
            }
        }
        catch (IOException e) {
            OLogManager.instance().warn((Object)this, "Exception on subscribe", (Throwable)e, new Object[0]);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return null;
    }

    public void shutdown() {
        this.shutDown = true;
        this.interrupt();
        this.pushHandler.returnSocket(this.network);
    }
}

