/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.lyo.trs.client.handlers;

import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.jena.rdf.model.Model;
import org.apache.jena.vocabulary.RDF;
import org.eclipse.lyo.core.trs.Base;
import org.eclipse.lyo.core.trs.ChangeEvent;
import org.eclipse.lyo.core.trs.ChangeLog;
import org.eclipse.lyo.core.trs.TrackedResourceSet;
import org.eclipse.lyo.trs.client.exceptions.RepresentationRetrievalException;
import org.eclipse.lyo.trs.client.exceptions.ServerRollBackException;
import org.eclipse.lyo.trs.client.handlers.IProviderEventHandler;
import org.eclipse.lyo.trs.client.handlers.IProviderHandler;
import org.eclipse.lyo.trs.client.model.BaseMember;
import org.eclipse.lyo.trs.client.model.ChangeEventMessageTR;
import org.eclipse.lyo.trs.client.util.ITrackedResourceClient;
import org.eclipse.lyo.trs.client.util.ProviderUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConcurrentTrsProviderHandler
implements IProviderHandler {
    private static final Logger log = LoggerFactory.getLogger(ConcurrentTrsProviderHandler.class);
    private final URI trsUriBase;
    private final ITrackedResourceClient trsClient;
    private final IProviderEventHandler handler;
    private URI lastProcessedChangeEventUri;

    public ConcurrentTrsProviderHandler(URI trsUriBase, ITrackedResourceClient trsClient, IProviderEventHandler handler) {
        this.trsUriBase = trsUriBase;
        this.trsClient = trsClient;
        this.handler = handler;
    }

    @Override
    public void update() {
        try {
            this.pollAndProcessChanges();
        }
        catch (Exception e) {
            log.warn("Force rebase");
            this.lastProcessedChangeEventUri = null;
            this.handler.rebase();
        }
    }

    public boolean fetchRemoteChangeLogs(ChangeLog currentChangeLog, List<ChangeLog> changeLogs) {
        boolean foundChangeEvent = false;
        while (currentChangeLog != null) {
            changeLogs.add(currentChangeLog);
            if (ProviderUtil.changeLogContainsEvent(this.lastProcessedChangeEventUri, currentChangeLog)) {
                foundChangeEvent = true;
                break;
            }
            URI previousChangeLog = currentChangeLog.getPrevious();
            if (previousChangeLog == null || URI.create(RDF.nil.getURI()).equals(previousChangeLog)) {
                if (URI.create(RDF.nil.getURI()).equals(this.lastProcessedChangeEventUri)) {
                    log.debug("First ChangeLog page reached");
                    foundChangeEvent = true;
                    break;
                }
                log.error("Changelog read to the end without finding the cutoff event URI");
                break;
            }
            currentChangeLog = this.trsClient.fetchRemoteChangeLog(previousChangeLog);
            if (!RDF.nil.getURI().equals(previousChangeLog.toString())) continue;
        }
        return foundChangeEvent;
    }

    private void pollAndProcessChanges() {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        Date processingDateStart = new Date();
        log.info("started dealing with TRS Provider: " + String.valueOf(this.trsUriBase));
        TrackedResourceSet updatedTrs = this.trsClient.extractRemoteTrs(this.trsUriBase);
        boolean indexingStage = false;
        List<URI> baseMembers = new ArrayList<URI>();
        if (this.lastProcessedChangeEventUri == null) {
            log.debug("Indexing Stage.");
            log.debug("Requesting Base members from remote server");
            List<Base> bases = this.trsClient.updateBases(updatedTrs);
            log.debug("Base members retrieved !");
            for (Base base : bases) {
                baseMembers.addAll(base.getMembers());
            }
            this.lastProcessedChangeEventUri = bases.get(0).getCutoffEvent();
            indexingStage = true;
        }
        log.debug("Requesting changeLogs from Remote Server");
        List<ChangeLog> changeLogs = this.fetchUpdatedChangeLogs(updatedTrs);
        log.debug("change Logs Retrieved ! ");
        log.debug("Compressing the list of changes ! ");
        List<ChangeEvent> compressedChanges = ProviderUtil.optimizedChangesList(changeLogs, this.lastProcessedChangeEventUri);
        log.debug("Change list compressed ! ");
        log.debug("starting the processing of change events and base members creations");
        log.trace("Creating necessary sparql update queries");
        ExecutorService handlerExecutor = Executors.newCachedThreadPool();
        if (indexingStage) {
            log.debug("optimizing the list of base members against the change events to be processed.");
            baseMembers = ProviderUtil.baseChangeEventsOptimizationSafe(compressedChanges, baseMembers);
            log.debug("finished optimizing the list of base members against the change events to be processed !");
            log.debug("Indexing stage. Base members creations will be be added to the list of events to be processed.");
            for (URI baseMemberUri : baseMembers) {
                handlerExecutor.execute(() -> {
                    try {
                        Model graphToUpload = this.trsClient.fetchTRSRemoteResource(baseMemberUri);
                        BaseMember baseMember = new BaseMember(baseMemberUri, graphToUpload);
                        this.handler.handleBaseMember(baseMember);
                    }
                    catch (RepresentationRetrievalException e) {
                        log.warn("Failed to retrieve {}", (Object)baseMemberUri);
                    }
                });
            }
        }
        for (ChangeEvent compressedChangeEvent : compressedChanges) {
            handlerExecutor.execute(() -> {
                ChangeEventMessageTR eventMessageTR = new ChangeEventMessageTR(compressedChangeEvent, null);
                this.handler.handleChangeEvent(eventMessageTR);
            });
            this.lastProcessedChangeEventUri = compressedChangeEvent.getAbout();
        }
        handlerExecutor.shutdown();
        if (!handlerExecutor.isTerminated()) {
            try {
                handlerExecutor.awaitTermination(3000L, TimeUnit.MILLISECONDS);
                handlerExecutor.shutdownNow();
            }
            catch (InterruptedException e) {
                log.debug("Handler thread interrupted while awaiting executor termination", (Throwable)e);
            }
        }
        this.handler.finishCycle();
        Date finishProcessingData = new Date();
        log.info("finished dealing with TRS Provider: " + String.valueOf(this.trsUriBase));
        log.debug("start dealing at: " + sdf.format(processingDateStart) + " . Finished dealing with provider at: " + sdf.format(finishProcessingData));
    }

    private List<ChangeLog> fetchUpdatedChangeLogs(TrackedResourceSet updatedTrs) {
        ArrayList<ChangeLog> changeLogs;
        ChangeLog firstChangeLog = updatedTrs.getChangeLog();
        boolean foundSyncEvent = this.fetchRemoteChangeLogs(firstChangeLog, changeLogs = new ArrayList<ChangeLog>());
        if (!foundSyncEvent) {
            this.lastProcessedChangeEventUri = null;
            throw new ServerRollBackException("The sync event can not be found. The sever provinding the trs at: " + String.valueOf(this.trsUriBase) + " seems to have been rollecd back to a previous state");
        }
        return changeLogs;
    }
}

