/*
 * Decompiled with CFR 0.152.
 */
package io.joynr.dispatching;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import io.joynr.common.ExpiryDate;
import io.joynr.dispatching.ContentWithExpiryDate;
import io.joynr.dispatching.DirectoryListener;
import io.joynr.dispatching.MutableMessageFactory;
import io.joynr.dispatching.OneWayCallable;
import io.joynr.dispatching.ProviderDirectory;
import io.joynr.dispatching.RequestCaller;
import io.joynr.dispatching.RequestReplyManager;
import io.joynr.dispatching.rpc.ReplyCaller;
import io.joynr.dispatching.rpc.ReplyCallerDirectory;
import io.joynr.dispatching.rpc.RequestInterpreter;
import io.joynr.dispatching.rpc.SynchronizedReplyCaller;
import io.joynr.exceptions.JoynrCommunicationException;
import io.joynr.exceptions.JoynrIllegalStateException;
import io.joynr.exceptions.JoynrRequestInterruptedException;
import io.joynr.exceptions.JoynrShutdownException;
import io.joynr.messaging.MessagingQos;
import io.joynr.messaging.sender.MessageSender;
import io.joynr.provider.ProviderCallback;
import io.joynr.provider.ProviderContainer;
import io.joynr.runtime.ShutdownListener;
import io.joynr.runtime.ShutdownNotifier;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import joynr.MutableMessage;
import joynr.OneWayRequest;
import joynr.Reply;
import joynr.Request;
import joynr.types.DiscoveryEntryWithMetaInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class RequestReplyManagerImpl
implements RequestReplyManager,
DirectoryListener<ProviderContainer>,
ShutdownListener {
    private static final Logger logger = LoggerFactory.getLogger(RequestReplyManagerImpl.class);
    private boolean running = true;
    private List<Thread> outstandingRequestThreads = Collections.synchronizedList(new ArrayList());
    private ConcurrentHashMap<String, ConcurrentLinkedQueue<ContentWithExpiryDate<Request>>> requestQueue = new ConcurrentHashMap();
    private ConcurrentHashMap<String, ConcurrentLinkedQueue<OneWayCallable>> oneWayRequestQueue = new ConcurrentHashMap();
    private ConcurrentHashMap<Request, ProviderCallback<Reply>> replyCallbacks = new ConcurrentHashMap();
    private ReplyCallerDirectory replyCallerDirectory;
    private ProviderDirectory providerDirectory;
    private RequestInterpreter requestInterpreter;
    private MessageSender messageSender;
    private MutableMessageFactory messageFactory;
    private ConcurrentMap<String, List<ScheduledFuture<?>>> cleanupSchedulerFuturesMap;
    private ScheduledExecutorService cleanupScheduler;

    @Inject
    public RequestReplyManagerImpl(MutableMessageFactory messageFactory, ReplyCallerDirectory replyCallerDirectory, ProviderDirectory providerDirectory, MessageSender messageSender, RequestInterpreter requestInterpreter, @Named(value="joynr.scheduler.cleanup") ScheduledExecutorService cleanupScheduler, ShutdownNotifier shutdownNotifier) {
        this.messageFactory = messageFactory;
        this.replyCallerDirectory = replyCallerDirectory;
        this.providerDirectory = providerDirectory;
        this.messageSender = messageSender;
        this.requestInterpreter = requestInterpreter;
        this.cleanupScheduler = cleanupScheduler;
        this.cleanupSchedulerFuturesMap = new ConcurrentHashMap();
        providerDirectory.addListener(this);
        shutdownNotifier.registerForShutdown((ShutdownListener)this);
    }

    @Override
    public void sendRequest(String fromParticipantId, DiscoveryEntryWithMetaInfo toDiscoveryEntry, Request request, MessagingQos messagingQos) {
        logger.trace("SEND USING RequestReplySenderImpl with Id: " + System.identityHashCode(this));
        MutableMessage message = this.messageFactory.createRequest(fromParticipantId, toDiscoveryEntry.getParticipantId(), request, messagingQos);
        message.setLocalMessage(toDiscoveryEntry.getIsLocal().booleanValue());
        logger.debug("REQUEST call proxy: method: {}, requestReplyId: {}, messageId: {}, proxy participantId: {}, provider participantId: {}, params: {}", new Object[]{request.getMethodName(), request.getRequestReplyId(), message.getId(), fromParticipantId, toDiscoveryEntry.getParticipantId(), request.getParams()});
        this.messageSender.sendMessage(message);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Object sendSyncRequest(String fromParticipantId, DiscoveryEntryWithMetaInfo toDiscoveryEntry, Request request, SynchronizedReplyCaller synchronizedReplyCaller, MessagingQos messagingQos) {
        if (!this.running) {
            throw new IllegalStateException("Request: " + request.getRequestReplyId() + " failed. SenderImpl ID: " + System.identityHashCode(this) + ": joynr is shutting down");
        }
        ArrayList<Object> responsePayloadContainer = new ArrayList<Object>(1);
        synchronizedReplyCaller.setResponseContainer(responsePayloadContainer);
        this.sendRequest(fromParticipantId, toDiscoveryEntry, request, messagingQos);
        long entryTime = System.currentTimeMillis();
        this.outstandingRequestThreads.add(Thread.currentThread());
        ArrayList<Object> arrayList = responsePayloadContainer;
        synchronized (arrayList) {
            while (this.running && responsePayloadContainer.isEmpty() && entryTime + messagingQos.getRoundTripTtl_ms() > System.currentTimeMillis()) {
                try {
                    responsePayloadContainer.wait();
                }
                catch (InterruptedException e) {
                    if (this.running) {
                        throw new JoynrRequestInterruptedException("Request: " + request.getRequestReplyId() + " interrupted.");
                    }
                    throw new JoynrShutdownException("Request: " + request.getRequestReplyId() + " interrupted by shutdown");
                }
            }
        }
        this.outstandingRequestThreads.remove(Thread.currentThread());
        if (responsePayloadContainer.isEmpty()) {
            throw new JoynrIllegalStateException("Request: " + request.getRequestReplyId() + " failed unexpectedly without response.");
        }
        Object response = responsePayloadContainer.get(0);
        if (response instanceof Throwable) {
            Throwable error = (Throwable)response;
            throw new JoynrCommunicationException("Request: " + request.getRequestReplyId() + " failed: " + error.getMessage(), error);
        }
        return response;
    }

    @Override
    public void sendOneWayRequest(String fromParticipantId, Set<DiscoveryEntryWithMetaInfo> toDiscoveryEntries, OneWayRequest oneWayRequest, MessagingQos messagingQos) {
        for (DiscoveryEntryWithMetaInfo toDiscoveryEntry : toDiscoveryEntries) {
            MutableMessage message = this.messageFactory.createOneWayRequest(fromParticipantId, toDiscoveryEntry.getParticipantId(), oneWayRequest, messagingQos);
            logger.debug("Send OneWayRequest: method: {}, messageId: {}, proxy participantId: {}, provider participantId: {}", new Object[]{oneWayRequest.getMethodName(), message.getId(), fromParticipantId, toDiscoveryEntry.getParticipantId()});
            this.messageSender.sendMessage(message);
        }
    }

    @Override
    public void entryAdded(String participantId, ProviderContainer providerContainer) {
        List futuresList;
        ConcurrentLinkedQueue<OneWayCallable> oneWayCallables;
        ConcurrentLinkedQueue<ContentWithExpiryDate<Request>> requestList = this.requestQueue.remove(participantId);
        if (requestList != null) {
            for (ContentWithExpiryDate<Request> requestItem : requestList) {
                if (requestItem.isExpired()) continue;
                Request request = (Request)requestItem.getContent();
                this.handleRequest(this.replyCallbacks.remove(request), providerContainer.getRequestCaller(), request);
            }
        }
        if ((oneWayCallables = this.oneWayRequestQueue.remove(participantId)) != null) {
            for (OneWayCallable oneWayCallable : oneWayCallables) {
                oneWayCallable.call();
            }
        }
        if ((futuresList = (List)this.cleanupSchedulerFuturesMap.remove(participantId)) != null) {
            for (ScheduledFuture future : futuresList) {
                future.cancel(false);
            }
        }
    }

    @Override
    public void entryRemoved(String participantId) {
    }

    @Override
    public void handleOneWayRequest(final String providerParticipantId, final OneWayRequest request, long expiryDate) {
        Callable<Void> requestHandler = new Callable<Void>(){

            @Override
            public Void call() {
                RequestReplyManagerImpl.this.requestInterpreter.invokeMethod(((ProviderContainer)RequestReplyManagerImpl.this.providerDirectory.get(providerParticipantId)).getRequestCaller(), request);
                return null;
            }
        };
        OneWayCallable oneWayCallable = new OneWayCallable(requestHandler, ExpiryDate.fromAbsolute((long)expiryDate), String.valueOf(request));
        if (this.providerDirectory.contains(providerParticipantId)) {
            oneWayCallable.call();
        } else {
            if (!this.oneWayRequestQueue.containsKey(providerParticipantId)) {
                this.oneWayRequestQueue.putIfAbsent(providerParticipantId, new ConcurrentLinkedQueue());
            }
            this.oneWayRequestQueue.get(providerParticipantId).add(oneWayCallable);
        }
    }

    @Override
    public void handleRequest(ProviderCallback<Reply> replyCallback, String providerParticipant, Request request, long expiryDate) {
        if (this.providerDirectory.contains(providerParticipant)) {
            this.handleRequest(replyCallback, ((ProviderContainer)this.providerDirectory.get(providerParticipant)).getRequestCaller(), request);
        } else {
            this.queueRequest(replyCallback, providerParticipant, request, ExpiryDate.fromAbsolute((long)expiryDate));
            logger.info("No requestCaller found for participantId: {} queuing request message.", (Object)providerParticipant);
        }
    }

    private void handleRequest(ProviderCallback<Reply> replyCallback, RequestCaller requestCaller, Request request) {
        logger.trace("executing request {}", (Object)request.getRequestReplyId());
        this.requestInterpreter.execute(replyCallback, requestCaller, request);
    }

    @Override
    public void handleReply(Reply reply) {
        ReplyCaller callBack = this.replyCallerDirectory.remove(reply.getRequestReplyId());
        if (callBack == null) {
            logger.warn("No reply caller found for id: " + reply.getRequestReplyId());
            return;
        }
        callBack.messageCallBack(reply);
    }

    @Override
    public void handleError(Request request, Throwable error) {
        ReplyCaller replyCaller;
        String requestReplyId = request.getRequestReplyId();
        if (requestReplyId != null && (replyCaller = this.replyCallerDirectory.remove(requestReplyId)) != null) {
            replyCaller.error(error);
        }
    }

    private void queueRequest(ProviderCallback<Reply> replyCallback, final String providerParticipantId, Request request, ExpiryDate expiryDate) {
        if (!this.requestQueue.containsKey(providerParticipantId)) {
            ConcurrentLinkedQueue newRequestList = new ConcurrentLinkedQueue();
            this.requestQueue.putIfAbsent(providerParticipantId, newRequestList);
        }
        final ContentWithExpiryDate<Request> requestItem = new ContentWithExpiryDate<Request>(request, expiryDate);
        this.requestQueue.get(providerParticipantId).add(requestItem);
        this.replyCallbacks.put(request, replyCallback);
        ScheduledFuture<?> cleanupSchedulerFuture = this.cleanupScheduler.schedule(new Runnable(){

            @Override
            public void run() {
                ((ConcurrentLinkedQueue)RequestReplyManagerImpl.this.requestQueue.get(providerParticipantId)).remove(requestItem);
                RequestReplyManagerImpl.this.replyCallbacks.remove(requestItem.getContent());
                Request request = (Request)requestItem.getContent();
                logger.warn("TTL DISCARD. providerParticipantId: {} request method: {} because it has expired.", (Object)providerParticipantId, (Object)request.getMethodName());
            }
        }, expiryDate.getRelativeTtl(), TimeUnit.MILLISECONDS);
        if (!this.cleanupSchedulerFuturesMap.containsKey(providerParticipantId)) {
            LinkedList cleanupSchedulerFuturesList = new LinkedList();
            this.cleanupSchedulerFuturesMap.put(providerParticipantId, cleanupSchedulerFuturesList);
        }
        ((List)this.cleanupSchedulerFuturesMap.get(providerParticipantId)).add(cleanupSchedulerFuture);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void shutdown() {
        for (List futuresList : this.cleanupSchedulerFuturesMap.values()) {
            if (futuresList == null) continue;
            for (ScheduledFuture future : futuresList) {
                future.cancel(false);
            }
        }
        this.running = false;
        List<Thread> list = this.outstandingRequestThreads;
        synchronized (list) {
            for (Thread thread : this.outstandingRequestThreads) {
                logger.debug("shutting down. Interrupting thread: " + thread);
                thread.interrupt();
            }
        }
        this.providerDirectory.removeListener(this);
    }
}

