/*
 * Decompiled with CFR 0.152.
 */
package org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast;

import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.security.MessageDigest;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.Broadcaster;
import org.apache.jackrabbit.oak.plugins.document.persistentCache.broadcast.DynamicBroadcastConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TCPBroadcaster
implements Broadcaster {
    static final Logger LOG = LoggerFactory.getLogger(TCPBroadcaster.class);
    private static final int TIMEOUT = 100;
    private static final int MAX_BUFFER_SIZE = 64;
    private static final AtomicInteger NEXT_ID = new AtomicInteger();
    private static final Charset UTF8 = Charset.forName("UTF-8");
    private final int id = NEXT_ID.incrementAndGet();
    private final CopyOnWriteArrayList<Broadcaster.Listener> listeners = new CopyOnWriteArrayList();
    private final ConcurrentHashMap<String, Client> clients = new ConcurrentHashMap();
    private final ArrayBlockingQueue<ByteBuffer> sendBuffer = new ArrayBlockingQueue(128);
    private volatile DynamicBroadcastConfig broadcastConfig;
    private ServerSocket serverSocket;
    private Thread acceptThread;
    private Thread discoverThread;
    private Thread sendThread;
    private String ownListener;
    private String ownKeyUUID = UUID.randomUUID().toString();
    private byte[] ownKey = this.ownKeyUUID.getBytes(UTF8);
    private final AtomicBoolean stop = new AtomicBoolean(false);

    public TCPBroadcaster(String config) {
        LOG.info("Init " + config);
        this.init(config);
    }

    public void init(String config) {
        try {
            String[] parts = config.split(";");
            int startPort = 9800;
            int endPort = 9810;
            String key = "";
            String[] sendTo = new String[]{"sendTo"};
            for (String p : parts) {
                if (p.startsWith("ports ")) {
                    String[] ports = p.split(" ");
                    startPort = Integer.parseInt(ports[1]);
                    endPort = Integer.parseInt(ports[2]);
                    continue;
                }
                if (p.startsWith("key ")) {
                    key = p.split(" ")[1];
                    continue;
                }
                if (!p.startsWith("sendTo ")) continue;
                sendTo = p.split(" ");
            }
            sendTo[0] = null;
            MessageDigest messageDigest = MessageDigest.getInstance("SHA-256");
            if (key.length() > 0) {
                this.ownKey = messageDigest.digest(key.getBytes(UTF8));
            }
            IOException lastException = null;
            ServerSocket server = null;
            for (int port = startPort; port <= endPort; ++port) {
                if (server == null) {
                    try {
                        server = new ServerSocket(port);
                    }
                    catch (IOException e) {
                        LOG.debug("Cannot open port " + port);
                        lastException = e;
                    }
                }
                for (String send : sendTo) {
                    if (send == null || send.isEmpty()) continue;
                    try {
                        Client c = new Client(send, port, this.ownKey);
                        this.clients.put(send + ":" + port, c);
                    }
                    catch (IOException e) {
                        LOG.debug("Cannot connect to " + send + " " + port);
                    }
                }
            }
            if (server == null && lastException != null) {
                throw lastException;
            }
            server.setSoTimeout(100);
            this.serverSocket = server;
            LOG.info("Listening on port " + server.getLocalPort());
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        this.acceptThread = new Thread(new Runnable(){

            @Override
            public void run() {
                TCPBroadcaster.this.accept();
            }
        }, "Oak TCPBroadcaster: accept #" + this.id);
        this.acceptThread.setDaemon(true);
        this.acceptThread.start();
        this.discoverThread = new Thread(new Runnable(){

            @Override
            public void run() {
                TCPBroadcaster.this.discover();
            }
        }, "Oak TCPBroadcaster: discover #" + this.id);
        this.discoverThread.setDaemon(true);
        this.discoverThread.start();
        this.sendThread = new Thread(new Runnable(){

            @Override
            public void run() {
                TCPBroadcaster.this.send();
            }
        }, "Oak TCPBroadcaster: send #" + this.id);
        this.sendThread.setDaemon(true);
        this.sendThread.start();
    }

    @Override
    public void setBroadcastConfig(DynamicBroadcastConfig broadcastConfig) {
        String address;
        this.broadcastConfig = broadcastConfig;
        HashMap<String, String> clientInfo = new HashMap<String, String>();
        clientInfo.put("broadcastId", this.ownKeyUUID);
        ServerSocket s = this.serverSocket;
        if (s != null && (address = TCPBroadcaster.getLocalAddress()) != null) {
            this.ownListener = address + ":" + s.getLocalPort();
            clientInfo.put("broadcastListener", this.ownListener);
        }
        broadcastConfig.connect(clientInfo);
    }

    static String getLocalAddress() {
        String bind = System.getProperty("oak.tcpBindAddress", null);
        try {
            InetAddress address = bind != null && !bind.isEmpty() ? InetAddress.getByName(bind) : InetAddress.getLocalHost();
            return address.getHostAddress();
        }
        catch (UnknownHostException e) {
            return "";
        }
    }

    void accept() {
        while (this.isRunning()) {
            try {
                final Socket socket = this.serverSocket.accept();
                Runnable r = new Runnable(){

                    @Override
                    public void run() {
                        try {
                            DataInputStream in = new DataInputStream(socket.getInputStream());
                            byte[] testKey = new byte[TCPBroadcaster.this.ownKey.length];
                            in.readFully(testKey);
                            if (ByteBuffer.wrap(testKey).compareTo(ByteBuffer.wrap(TCPBroadcaster.this.ownKey)) != 0) {
                                LOG.debug("Key mismatch");
                                socket.close();
                                return;
                            }
                            while (!socket.isClosed()) {
                                int len = in.readInt();
                                byte[] data = new byte[len];
                                in.readFully(data);
                                ByteBuffer buff = ByteBuffer.wrap(data);
                                int start = buff.position();
                                for (Broadcaster.Listener l : TCPBroadcaster.this.listeners) {
                                    ((Buffer)buff).position(start);
                                    l.receive(buff);
                                }
                            }
                        }
                        catch (IOException iOException) {
                            // empty catch block
                        }
                    }
                };
                Thread t = new Thread(r, "Oak TCPBroadcaster: listener");
                t.setDaemon(true);
                t.start();
            }
            catch (SocketTimeoutException socket) {
            }
            catch (IOException e) {
                if (!this.isRunning()) continue;
                LOG.warn("Receive failed", (Throwable)e);
            }
        }
        try {
            this.serverSocket.close();
        }
        catch (IOException e) {
            LOG.debug("Closed");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void discover() {
        while (this.isRunning()) {
            DynamicBroadcastConfig b = this.broadcastConfig;
            if (b != null) {
                this.readClients(b);
            }
            for (Client c : this.clients.values()) {
                c.tryConnect();
                if (this.isRunning()) continue;
                break;
            }
            AtomicBoolean atomicBoolean = this.stop;
            synchronized (atomicBoolean) {
                if (this.isRunning()) {
                    try {
                        this.stop.wait(2000L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                }
            }
        }
    }

    void readClients(DynamicBroadcastConfig b) {
        List<Map<String, String>> list = b.getClientInfo();
        for (Map<String, String> m : list) {
            int index;
            String clientKey;
            Client c;
            String listener = m.get("broadcastListener");
            String id = m.get("broadcastId");
            if (listener.equals(this.ownListener) || (c = this.clients.get(clientKey = listener + " " + id)) != null || (index = listener.lastIndexOf(58)) < 0) continue;
            String host = listener.substring(0, index);
            int port = Integer.parseInt(listener.substring(index + 1));
            try {
                byte[] key = id.getBytes(UTF8);
                c = new Client(host, port, key);
                this.clients.put(clientKey, c);
            }
            catch (UnknownHostException unknownHostException) {}
        }
    }

    void send() {
        while (this.isRunning()) {
            try {
                ByteBuffer buff = this.sendBuffer.poll(10L, TimeUnit.MILLISECONDS);
                if (buff == null || !this.isRunning()) continue;
                this.sendBuffer(buff);
            }
            catch (InterruptedException interruptedException) {}
        }
    }

    @Override
    public void send(ByteBuffer buff) {
        ByteBuffer b = ByteBuffer.allocate(buff.remaining());
        b.put(buff);
        ((Buffer)b).flip();
        while (this.sendBuffer.size() > 64) {
            this.sendBuffer.poll();
        }
        try {
            this.sendBuffer.add(b);
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    private void sendBuffer(ByteBuffer buff) {
        int len = buff.limit();
        byte[] data = new byte[len];
        buff.get(data);
        for (Client c : this.clients.values()) {
            c.send(data);
            if (this.isRunning()) continue;
            break;
        }
    }

    @Override
    public void addListener(Broadcaster.Listener listener) {
        this.listeners.add(listener);
    }

    @Override
    public void removeListener(Broadcaster.Listener listener) {
        this.listeners.remove(listener);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        if (this.isRunning()) {
            LOG.debug("Stopping");
            AtomicBoolean atomicBoolean = this.stop;
            synchronized (atomicBoolean) {
                this.stop.set(true);
                this.stop.notifyAll();
            }
            try {
                this.serverSocket.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
            try {
                this.acceptThread.join();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            try {
                this.sendThread.join();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            try {
                this.discoverThread.join();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    public final boolean isRunning() {
        return !this.stop.get();
    }

    static class Client {
        final String host;
        final int port;
        final byte[] key;
        DataOutputStream out;

        Client(String host, int port, byte[] key) throws UnknownHostException {
            this.host = host;
            this.port = port;
            this.key = key;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void send(byte[] data) {
            DataOutputStream o = this.out;
            if (o != null) {
                DataOutputStream dataOutputStream = o;
                synchronized (dataOutputStream) {
                    try {
                        o.writeInt(data.length);
                        o.write(data);
                        o.flush();
                    }
                    catch (IOException e) {
                        LOG.debug("Writing failed, port " + this.port, (Throwable)e);
                        try {
                            o.close();
                        }
                        catch (IOException iOException) {
                            // empty catch block
                        }
                        this.out = null;
                    }
                }
            }
        }

        void tryConnect() {
            InetAddress address;
            DataOutputStream o = this.out;
            if (o != null || this.host == null) {
                return;
            }
            try {
                address = InetAddress.getByName(this.host);
            }
            catch (UnknownHostException e1) {
                return;
            }
            Socket socket = new Socket();
            try {
                socket.connect(new InetSocketAddress(address, this.port), 100);
                o = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));
                o.write(this.key);
                o.flush();
                this.out = o;
                LOG.info("Connected to " + address + " port " + this.port + " k " + this.key[0]);
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }
}

