/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.spi.impl.operationservice.impl;

import com.hazelcast.core.Member;
import com.hazelcast.core.MemberLeftException;
import com.hazelcast.instance.HazelcastThreadGroup;
import com.hazelcast.instance.MemberImpl;
import com.hazelcast.instance.OutOfMemoryErrorDispatcher;
import com.hazelcast.internal.cluster.ClusterService;
import com.hazelcast.internal.metrics.MetricsProvider;
import com.hazelcast.internal.metrics.MetricsRegistry;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.metrics.ProbeLevel;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.internal.util.counters.SwCounter;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.Packet;
import com.hazelcast.spi.LiveOperations;
import com.hazelcast.spi.LiveOperationsTracker;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.PacketHandler;
import com.hazelcast.spi.impl.operationexecutor.OperationHostileThread;
import com.hazelcast.spi.impl.operationservice.impl.Invocation;
import com.hazelcast.spi.impl.operationservice.impl.InvocationRegistry;
import com.hazelcast.spi.impl.servicemanager.ServiceManager;
import com.hazelcast.spi.properties.GroupProperty;
import com.hazelcast.spi.properties.HazelcastProperties;
import com.hazelcast.util.Clock;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;

class InvocationMonitor
implements PacketHandler,
MetricsProvider {
    private static final long ON_MEMBER_LEFT_DELAY_MILLIS = 1111L;
    private static final int HEARTBEAT_CALL_TIMEOUT_RATIO = 4;
    private static final long MAX_DELAY_MILLIS = TimeUnit.SECONDS.toMillis(10L);
    private final NodeEngineImpl nodeEngine;
    private final InternalSerializationService serializationService;
    private final ServiceManager serviceManager;
    private final InvocationRegistry invocationRegistry;
    private final ILogger logger;
    private final ScheduledExecutorService scheduler;
    private final Address thisAddress;
    private final ConcurrentMap<Address, AtomicLong> lastHeartbeatPerMember = new ConcurrentHashMap<Address, AtomicLong>();
    @Probe(name="backupTimeouts", level=ProbeLevel.MANDATORY)
    private final SwCounter backupTimeoutsCount = SwCounter.newSwCounter();
    @Probe(name="normalTimeouts", level=ProbeLevel.MANDATORY)
    private final SwCounter normalTimeoutsCount = SwCounter.newSwCounter();
    @Probe
    private final SwCounter heartbeatPacketsReceived = SwCounter.newSwCounter();
    @Probe
    private final SwCounter heartbeatPacketsSend = SwCounter.newSwCounter();
    @Probe
    private final SwCounter delayedExecutionCount = SwCounter.newSwCounter();
    @Probe
    private final long backupTimeoutMillis;
    @Probe
    private final long invocationTimeoutMillis;
    @Probe
    private final long heartbeatBroadcastPeriodMillis;
    @Probe
    private final long invocationScanPeriodMillis = TimeUnit.SECONDS.toMillis(1L);

    InvocationMonitor(NodeEngineImpl nodeEngine, Address thisAddress, HazelcastThreadGroup threadGroup, HazelcastProperties properties, InvocationRegistry invocationRegistry, ILogger logger, InternalSerializationService serializationService, ServiceManager serviceManager) {
        this.nodeEngine = nodeEngine;
        this.thisAddress = thisAddress;
        this.serializationService = serializationService;
        this.serviceManager = serviceManager;
        this.invocationRegistry = invocationRegistry;
        this.logger = logger;
        this.backupTimeoutMillis = this.backupTimeoutMillis(properties);
        this.invocationTimeoutMillis = this.invocationTimeoutMillis(properties);
        this.heartbeatBroadcastPeriodMillis = this.heartbeatBroadcastPeriodMillis(properties);
        this.scheduler = this.newScheduler(threadGroup);
    }

    @Override
    public void provideMetrics(MetricsRegistry metricsRegistry) {
        metricsRegistry.scanAndRegister(this, "operation.invocations");
    }

    private ScheduledExecutorService newScheduler(final HazelcastThreadGroup threadGroup) {
        return new ScheduledThreadPoolExecutor(1, new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                return new InvocationMonitorThread(r, threadGroup);
            }
        });
    }

    private long invocationTimeoutMillis(HazelcastProperties properties) {
        long heartbeatTimeoutMillis = properties.getMillis(GroupProperty.OPERATION_CALL_TIMEOUT_MILLIS);
        if (this.logger.isFinestEnabled()) {
            this.logger.finest("Operation invocation timeout is " + heartbeatTimeoutMillis + " ms");
        }
        return heartbeatTimeoutMillis;
    }

    private long backupTimeoutMillis(HazelcastProperties properties) {
        long backupTimeoutMillis = properties.getMillis(GroupProperty.OPERATION_BACKUP_TIMEOUT_MILLIS);
        if (this.logger.isFinestEnabled()) {
            this.logger.finest("Operation backup timeout is " + backupTimeoutMillis + " ms");
        }
        return backupTimeoutMillis;
    }

    private long heartbeatBroadcastPeriodMillis(HazelcastProperties properties) {
        int callTimeoutMs = properties.getInteger(GroupProperty.OPERATION_CALL_TIMEOUT_MILLIS);
        long periodMs = Math.max(TimeUnit.SECONDS.toMillis(1L), (long)(callTimeoutMs / 4));
        if (this.logger.isFinestEnabled()) {
            this.logger.finest("Operation heartbeat period is " + periodMs + " ms");
        }
        return periodMs;
    }

    void onMemberLeft(MemberImpl member) {
        this.scheduler.schedule(new OnMemberLeftTask(member), 1111L, TimeUnit.MILLISECONDS);
    }

    @Override
    public void handle(Packet packet) {
        this.scheduler.execute(new ProcessOperationHeartbeatsTask(packet));
    }

    public void start() {
        MonitorInvocationsTask monitorInvocationsTask = new MonitorInvocationsTask(this.invocationScanPeriodMillis);
        this.scheduler.scheduleAtFixedRate(monitorInvocationsTask, 0L, monitorInvocationsTask.periodMillis, TimeUnit.MILLISECONDS);
        BroadcastOperationHeartbeatsTask broadcastOperationHeartbeatsTask = new BroadcastOperationHeartbeatsTask(this.heartbeatBroadcastPeriodMillis);
        this.scheduler.scheduleAtFixedRate(broadcastOperationHeartbeatsTask, 0L, broadcastOperationHeartbeatsTask.periodMillis, TimeUnit.MILLISECONDS);
    }

    public void shutdown() {
        this.scheduler.shutdown();
    }

    public void awaitTermination(long timeoutMillis) throws InterruptedException {
        this.scheduler.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS);
    }

    long getLastMemberHeartbeatMillis(Address memberAddress) {
        if (memberAddress == null) {
            return 0L;
        }
        AtomicLong heartbeat = (AtomicLong)this.lastHeartbeatPerMember.get(memberAddress);
        return heartbeat == null ? 0L : heartbeat.get();
    }

    private static final class InvocationMonitorThread
    extends Thread
    implements OperationHostileThread {
        private InvocationMonitorThread(Runnable task, HazelcastThreadGroup hzThreadGroup) {
            super(hzThreadGroup.getInternalThreadGroup(), task, hzThreadGroup.getThreadNamePrefix("InvocationMonitorThread"));
        }
    }

    private final class BroadcastOperationHeartbeatsTask
    extends FixedRateMonitorTask {
        private final LiveOperations liveOperations;

        private BroadcastOperationHeartbeatsTask(long periodMillis) {
            super(periodMillis);
            this.liveOperations = new LiveOperations(InvocationMonitor.this.thisAddress);
        }

        @Override
        public void run0() {
            LiveOperations result = this.populate();
            Set<Address> addresses = result.addresses();
            if (InvocationMonitor.this.logger.isFinestEnabled()) {
                InvocationMonitor.this.logger.finest("Broadcasting operation heartbeats to: " + addresses.size() + " members");
            }
            for (Address address : addresses) {
                this.sendHeartbeats(address, result.callIds(address));
            }
        }

        private LiveOperations populate() {
            this.liveOperations.clear();
            ClusterService clusterService = InvocationMonitor.this.nodeEngine.getClusterService();
            this.liveOperations.initMember(InvocationMonitor.this.thisAddress);
            for (Member member : clusterService.getMembers()) {
                this.liveOperations.initMember(member.getAddress());
            }
            for (LiveOperationsTracker tracker : InvocationMonitor.this.serviceManager.getServices(LiveOperationsTracker.class)) {
                tracker.populate(this.liveOperations);
            }
            return this.liveOperations;
        }

        private void sendHeartbeats(Address address, long[] callIds) {
            InvocationMonitor.this.heartbeatPacketsSend.inc();
            if (address.equals(InvocationMonitor.this.thisAddress)) {
                InvocationMonitor.this.scheduler.execute(new ProcessOperationHeartbeatsTask(callIds));
            } else {
                Packet packet = new Packet(InvocationMonitor.this.serializationService.toBytes(callIds)).setAllFlags(81);
                InvocationMonitor.this.nodeEngine.getNode().getConnectionManager().transmit(packet, address);
            }
        }
    }

    private final class ProcessOperationHeartbeatsTask
    extends MonitorTask {
        private Object callIds;

        private ProcessOperationHeartbeatsTask(Object callIds) {
            this.callIds = callIds;
        }

        @Override
        public void run0() {
            InvocationMonitor.this.heartbeatPacketsReceived.inc();
            long timeMillis = Clock.currentTimeMillis();
            this.updateMemberHeartbeat(timeMillis);
            for (long callId : (long[])InvocationMonitor.this.serializationService.toObject(this.callIds)) {
                this.updateHeartbeat(callId, timeMillis);
            }
        }

        private void updateMemberHeartbeat(long timeMillis) {
            Address address = this.callIds instanceof Packet ? ((Packet)this.callIds).getConn().getEndPoint() : InvocationMonitor.this.thisAddress;
            AtomicLong lastMemberHeartbeat = (AtomicLong)InvocationMonitor.this.lastHeartbeatPerMember.get(address);
            if (lastMemberHeartbeat == null) {
                lastMemberHeartbeat = new AtomicLong();
                InvocationMonitor.this.lastHeartbeatPerMember.put(address, lastMemberHeartbeat);
            }
            lastMemberHeartbeat.set(timeMillis);
        }

        private void updateHeartbeat(long callId, long timeMillis) {
            Invocation invocation = InvocationMonitor.this.invocationRegistry.get(callId);
            if (invocation == null) {
                return;
            }
            invocation.lastHeartbeatMillis = timeMillis;
        }
    }

    private final class OnMemberLeftTask
    extends MonitorTask {
        private final MemberImpl leftMember;

        private OnMemberLeftTask(MemberImpl leftMember) {
            this.leftMember = leftMember;
        }

        @Override
        public void run0() {
            InvocationMonitor.this.lastHeartbeatPerMember.remove(this.leftMember.getAddress());
            for (Invocation invocation : InvocationMonitor.this.invocationRegistry) {
                if (!this.hasMemberLeft(invocation)) continue;
                invocation.notifyError(new MemberLeftException(this.leftMember));
            }
        }

        private boolean hasMemberLeft(Invocation invocation) {
            MemberImpl targetMember = invocation.targetMember;
            if (targetMember == null) {
                Address invTarget = invocation.invTarget;
                return this.leftMember.getAddress().equals(invTarget);
            }
            return this.leftMember.getUuid().equals(targetMember.getUuid());
        }
    }

    private final class MonitorInvocationsTask
    extends FixedRateMonitorTask {
        private MonitorInvocationsTask(long periodMillis) {
            super(periodMillis);
        }

        @Override
        public void run0() {
            if (InvocationMonitor.this.logger.isFinestEnabled()) {
                InvocationMonitor.this.logger.finest("Scanning all invocations");
            }
            if (InvocationMonitor.this.invocationRegistry.size() == 0) {
                return;
            }
            int backupTimeouts = 0;
            int normalTimeouts = 0;
            int invocationCount = 0;
            Set<Map.Entry<Long, Invocation>> invocations = InvocationMonitor.this.invocationRegistry.entrySet();
            Iterator<Map.Entry<Long, Invocation>> iterator = invocations.iterator();
            while (iterator.hasNext()) {
                ++invocationCount;
                Map.Entry<Long, Invocation> entry = iterator.next();
                Long callId = entry.getKey();
                Invocation inv = entry.getValue();
                if (this.duplicate(inv, callId, iterator)) continue;
                try {
                    if (inv.detectAndHandleTimeout(InvocationMonitor.this.invocationTimeoutMillis)) {
                        ++normalTimeouts;
                        continue;
                    }
                    if (!inv.detectAndHandleBackupTimeout(InvocationMonitor.this.backupTimeoutMillis)) continue;
                    ++backupTimeouts;
                }
                catch (Throwable t) {
                    OutOfMemoryErrorDispatcher.inspectOutOfMemoryError(t);
                    InvocationMonitor.this.logger.severe("Failed to check invocation:" + inv, t);
                }
            }
            InvocationMonitor.this.backupTimeoutsCount.inc(backupTimeouts);
            InvocationMonitor.this.normalTimeoutsCount.inc(normalTimeouts);
            this.log(invocationCount, backupTimeouts, normalTimeouts);
        }

        private boolean duplicate(Invocation inv, long callId, Iterator iterator) {
            if (callId != inv.op.getCallId() && inv.future.isDone()) {
                iterator.remove();
                return true;
            }
            return false;
        }

        private void log(int invocationCount, int backupTimeouts, int invocationTimeouts) {
            Level logLevel = null;
            if (backupTimeouts > 0 || invocationTimeouts > 0) {
                logLevel = Level.INFO;
            } else if (InvocationMonitor.this.logger.isFineEnabled()) {
                logLevel = Level.FINE;
            }
            if (logLevel != null) {
                InvocationMonitor.this.logger.log(logLevel, "Invocations:" + invocationCount + " timeouts:" + invocationTimeouts + " backup-timeouts:" + backupTimeouts);
            }
        }
    }

    abstract class FixedRateMonitorTask
    implements Runnable {
        final long periodMillis;
        private long expectedNextMillis = System.currentTimeMillis();

        FixedRateMonitorTask(long periodMillis) {
            this.periodMillis = periodMillis;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            long currentTimeMillis = System.currentTimeMillis();
            try {
                if (this.expectedNextMillis + MAX_DELAY_MILLIS < currentTimeMillis) {
                    InvocationMonitor.this.logger.warning(this.getClass().getSimpleName() + " delayed " + (currentTimeMillis - this.expectedNextMillis) + " ms");
                    InvocationMonitor.this.delayedExecutionCount.inc();
                }
                this.run0();
            }
            catch (Throwable t) {
                OutOfMemoryErrorDispatcher.inspectOutOfMemoryError(t);
                InvocationMonitor.this.logger.severe(t);
            }
            finally {
                this.expectedNextMillis = currentTimeMillis + this.periodMillis;
            }
        }

        protected abstract void run0();
    }

    private abstract class MonitorTask
    implements Runnable {
        private MonitorTask() {
        }

        @Override
        public void run() {
            try {
                this.run0();
            }
            catch (Throwable t) {
                OutOfMemoryErrorDispatcher.inspectOutOfMemoryError(t);
                InvocationMonitor.this.logger.severe(t);
            }
        }

        protected abstract void run0();
    }
}

