/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.wan.impl;

import com.hazelcast.cluster.impl.operations.AuthorizationOperation;
import com.hazelcast.instance.Node;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.Connection;
import com.hazelcast.nio.ConnectionManager;
import com.hazelcast.nio.Packet;
import com.hazelcast.spi.InternalCompletableFuture;
import com.hazelcast.spi.InvocationBuilder;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.impl.operationservice.InternalOperationService;
import com.hazelcast.util.AddressUtil;
import com.hazelcast.wan.ReplicationEventObject;
import com.hazelcast.wan.WanReplicationEndpoint;
import com.hazelcast.wan.WanReplicationEvent;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class WanNoDelayReplication
implements Runnable,
WanReplicationEndpoint {
    private static final int RETRY_CONNECTION_MAX = 10;
    private static final int RETRY_CONNECTION_SLEEP_MILLIS = 1000;
    private Node node;
    private ILogger logger;
    private String groupName;
    private String password;
    private final LinkedBlockingQueue<String> addressQueue = new LinkedBlockingQueue();
    private final LinkedList<WanReplicationEvent> failureQ = new LinkedList();
    private final BlockingQueue<WanReplicationEvent> eventQueue = new ArrayBlockingQueue<WanReplicationEvent>(100000);
    private volatile boolean running = true;

    List<WanReplicationEvent> getFailureQ() {
        return this.failureQ;
    }

    BlockingQueue<WanReplicationEvent> getEventQueue() {
        return this.eventQueue;
    }

    @Override
    public void init(Node node, String groupName, String password, String ... targets) {
        this.node = node;
        this.logger = node.getLogger(WanNoDelayReplication.class.getName());
        this.groupName = groupName;
        this.password = password;
        this.addressQueue.addAll(Arrays.asList(targets));
        node.nodeEngine.getExecutionService().execute("hz:wan", this);
    }

    @Override
    public void publishReplicationEvent(String serviceName, ReplicationEventObject eventObject) {
        WanReplicationEvent replicationEvent = new WanReplicationEvent(serviceName, eventObject);
        if (this.eventQueue.offer(replicationEvent)) {
            return;
        }
        this.eventQueue.poll();
        if (!this.eventQueue.offer(replicationEvent)) {
            this.logger.warning("Could not publish replication event: " + replicationEvent);
        }
    }

    @Override
    public void publishReplicationEventBackup(String serviceName, ReplicationEventObject eventObject) {
    }

    @Override
    public void publishReplicationEvent(WanReplicationEvent wanReplicationEvent) {
    }

    @Override
    public void shutdown() {
        this.running = false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    @Override
    public void run() {
        Connection conn = null;
        while (this.running) {
            WanReplicationEvent event = null;
            try {
                WanReplicationEvent wanReplicationEvent = event = this.failureQ.size() > 0 ? this.failureQ.removeFirst() : this.eventQueue.take();
                if (conn == null && (conn = this.getConnection()) != null) {
                    conn = this.authorizeConnection(conn);
                }
                if (conn != null && conn.isAlive()) {
                    byte[] bytes = this.node.nodeEngine.getSerializationService().toBytes(event);
                    Packet packet = new Packet(bytes);
                    packet.setHeader(3);
                    conn.write(packet);
                    event = null;
                } else {
                    conn = null;
                }
                if (event == null) continue;
                this.failureQ.addFirst(event);
            }
            catch (InterruptedException e) {
                this.running = false;
                if (event == null) continue;
                this.failureQ.addFirst(event);
            }
            catch (Throwable e2) {
                if (this.logger != null) {
                    this.logger.warning(e2);
                }
                conn = null;
                if (event == null) continue;
                this.failureQ.addFirst(event);
                continue;
                {
                    catch (Throwable throwable) {
                        if (event != null) {
                            this.failureQ.addFirst(event);
                        }
                        throw throwable;
                    }
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    Connection getConnection() throws InterruptedException {
        int defaultPort = this.node.getConfig().getNetworkConfig().getPort();
        while (this.running) {
            String targetStr = this.addressQueue.take();
            try {
                AddressUtil.AddressHolder addressHolder = AddressUtil.getAddressHolder(targetStr, defaultPort);
                Address target = new Address(addressHolder.getAddress(), addressHolder.getPort());
                ConnectionManager connectionManager = this.node.getConnectionManager();
                Connection conn = connectionManager.getOrConnect(target);
                for (int i = 0; i < 10; ++i) {
                    if (conn != null) {
                        Connection connection = conn;
                        return connection;
                    }
                    Thread.sleep(1000L);
                    conn = connectionManager.getConnection(target);
                }
            }
            catch (Throwable e) {
                Thread.sleep(1000L);
            }
            finally {
                this.addressQueue.offer(targetStr);
            }
        }
        return null;
    }

    public boolean checkAuthorization(String groupName, String groupPassword, Address target) {
        AuthorizationOperation authorizationCall = new AuthorizationOperation(groupName, groupPassword);
        InternalOperationService operationService = this.node.nodeEngine.getOperationService();
        String serviceName = "hz:core:wanReplicationService";
        InvocationBuilder invocationBuilder = operationService.createInvocationBuilder(serviceName, (Operation)authorizationCall, target);
        InternalCompletableFuture future = invocationBuilder.setTryCount(1).invoke();
        try {
            return (Boolean)future.get();
        }
        catch (Exception ignored) {
            this.logger.finest(ignored);
            return false;
        }
    }

    private Connection authorizeConnection(Connection conn) {
        boolean authorized = this.checkAuthorization(this.groupName, this.password, conn.getEndPoint());
        if (!authorized) {
            conn.close();
            if (this.logger != null) {
                this.logger.severe("Invalid groupName or groupPassword! ");
            }
            return null;
        }
        return conn;
    }
}

