/*
 * Decompiled with CFR 0.152.
 */
package com.daml.ledger.rxjava.components;

import com.daml.ledger.javaapi.data.CompletionStreamResponse;
import com.daml.ledger.javaapi.data.GetActiveContractsResponse;
import com.daml.ledger.javaapi.data.LedgerOffset;
import com.daml.ledger.javaapi.data.SubmitCommandsRequest;
import com.daml.ledger.javaapi.data.Transaction;
import com.daml.ledger.javaapi.data.TransactionFilter;
import com.daml.ledger.javaapi.data.WorkflowEvent;
import com.daml.ledger.rxjava.CommandSubmissionClient;
import com.daml.ledger.rxjava.LedgerClient;
import com.daml.ledger.rxjava.TransactionsClient;
import com.daml.ledger.rxjava.components.LedgerViewFlowable;
import com.daml.ledger.rxjava.components.helpers.CommandsAndPendingSet;
import com.daml.ledger.rxjava.components.helpers.CreatedContract;
import com.daml.ledger.rxjava.components.helpers.Pair;
import com.daml.ledger.rxjava.util.FlowableLogger;
import com.google.rpc.Code;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.MaybeSource;
import io.reactivex.Observer;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.ReplaySubject;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Bot {
    private static final Logger logger = LoggerFactory.getLogger(Bot.class);

    static Flowable<LedgerViewFlowable.CompletionFailure> failuresCommandIds(Set<String> parties, Flowable<CompletionStreamResponse> completionStreamResponseFlowable) {
        return completionStreamResponseFlowable.concatMapIterable(CompletionStreamResponse::getCompletions).filter(s -> s.getStatus().getCode() != Code.OK.getNumber()).map(c -> new LedgerViewFlowable.CompletionFailure(c.getCommandId(), c.getStatus()));
    }

    public static <R> Disposable wire(String applicationId, LedgerClient ledgerClient, TransactionFilter transactionFilter, java.util.function.Function<LedgerViewFlowable.LedgerView<R>, Flowable<CommandsAndPendingSet>> bot, java.util.function.Function<CreatedContract, R> transform) {
        return Bot.wire(applicationId, ledgerClient, transactionFilter, bot, transform, Schedulers.io());
    }

    public static <R> Disposable wire(String applicationId, LedgerClient ledgerClient, TransactionFilter transactionFilter, java.util.function.Function<LedgerViewFlowable.LedgerView<R>, Flowable<CommandsAndPendingSet>> bot, java.util.function.Function<CreatedContract, R> transform, Scheduler scheduler) {
        logger.info("Bot wiring started for parties {}", (Object)transactionFilter.getParties());
        TransactionsClient transactionsClient = ledgerClient.getTransactionsClient();
        Flowable acs = Flowable.empty();
        Single acsLedgerViewSingle = LedgerViewFlowable.ledgerViewAndOffsetFromACS((Flowable<GetActiveContractsResponse>)acs, transform).observeOn(scheduler);
        Single ledgerViewAndOffsetSingle = acsLedgerViewSingle.flatMap(acsLedgerViewAndOffset -> {
            LedgerViewFlowable.LedgerView acsLedgerView = (LedgerViewFlowable.LedgerView)acsLedgerViewAndOffset.getFirst();
            LedgerOffset acsOffset = (LedgerOffset)acsLedgerViewAndOffset.getSecond();
            logger.debug("LedgerView accumulated from acs completed. Offset: {} LedgerView: {}", (Object)acsOffset, (Object)acsOffset);
            return ledgerClient.getTransactionsClient().getLedgerEnd().flatMap(ledgerEnd -> {
                logger.debug("LedgerEnd: {}", ledgerEnd);
                Flowable<Transaction> transactions = FlowableLogger.log(transactionsClient.getTransactions(acsOffset, (LedgerOffset)ledgerEnd, transactionFilter, true), "initTransactions");
                Single ledgerViewSingle = LedgerViewFlowable.ledgerViewFromFlowable(acsLedgerView, (Flowable<WorkflowEvent>)transactions.map(t -> t), transform);
                return ledgerViewSingle.map(ledgerView -> new Pair<LedgerViewFlowable.LedgerView, LedgerOffset>((LedgerViewFlowable.LedgerView)ledgerView, (LedgerOffset)ledgerEnd));
            });
        });
        Single mainFlow = ledgerViewAndOffsetSingle.doOnSuccess(ledgerViewAndOffset -> {
            @NonNull LedgerViewFlowable.LedgerView initialLedgerView = (LedgerViewFlowable.LedgerView)ledgerViewAndOffset.getFirst();
            @NonNull LedgerOffset ledgerOffset = (LedgerOffset)ledgerViewAndOffset.getSecond();
            logger.debug("LedgerView accumulated from acs and transactions completed. Offset: {} LedgerView: {}", (Object)ledgerOffset, (Object)initialLedgerView);
            Flowable transactions = FlowableLogger.log(transactionsClient.getTransactions(ledgerOffset, transactionFilter, true), "transactions").observeOn(scheduler);
            Flowable completionFailures = FlowableLogger.log(Bot.failuresCommandIds(transactionFilter.getParties(), ledgerClient.getCommandCompletionClient().completionStream(applicationId, (LedgerOffset)LedgerOffset.LedgerEnd.getInstance(), transactionFilter.getParties())), "completionFailures").observeOn(scheduler);
            ReplaySubject submissionFailuresSubject = ReplaySubject.create();
            ReplaySubject commandsAndPendingSetSubject = ReplaySubject.create();
            Flowable submissionFailures = FlowableLogger.log(submissionFailuresSubject.toFlowable(BackpressureStrategy.BUFFER), "submissionsFailures").observeOn(scheduler);
            Flowable commandsAndPendingsSet = FlowableLogger.log(commandsAndPendingSetSubject.toFlowable(BackpressureStrategy.BUFFER), "commandsAndPendingSet").observeOn(scheduler);
            Flowable ledgerViews = LedgerViewFlowable.of(initialLedgerView, (Flowable<LedgerViewFlowable.SubmissionFailure>)submissionFailures, (Flowable<LedgerViewFlowable.CompletionFailure>)completionFailures, (Flowable<WorkflowEvent>)transactions.map(t -> t), (Flowable<CommandsAndPendingSet>)commandsAndPendingsSet, transform);
            Flowable botResult = ledgerViews.concatMap(ledgerView -> {
                Flowable result;
                try {
                    Flowable commandsToSend = (Flowable)bot.apply((LedgerViewFlowable.LedgerView)ledgerView);
                    result = Flowable.concat((Publisher)commandsToSend, (Publisher)Flowable.just((Object)CommandsAndPendingSet.empty));
                }
                catch (Throwable t) {
                    logger.error("Error during execution of bot.", t);
                    result = Flowable.error((Throwable)t);
                }
                return FlowableLogger.log(result, "bot.execution");
            }).share();
            Flowable commands = botResult.filter(command -> !command.getSubmitCommandsRequest().getCommandId().isEmpty()).map(CommandsAndPendingSet::getSubmitCommandsRequest);
            CommandSubmissionClient commandSubmissionClient = ledgerClient.getCommandSubmissionClient();
            commands.concatMapMaybe(Bot.commandsFailuresFromSubmissions(commandSubmissionClient)).toObservable().subscribe((Observer)submissionFailuresSubject);
            botResult.toObservable().subscribe((Observer)commandsAndPendingSetSubject);
            logger.info("Bot wiring complete for parties {}", (Object)transactionFilter.getParties());
        });
        return mainFlow.toFlowable().observeOn(scheduler).publish().connect();
    }

    public static Disposable wireSimple(String appId, LedgerClient ledgerClient, TransactionFilter transactionFilter, java.util.function.Function<LedgerViewFlowable.LedgerView<CreatedContract>, Flowable<CommandsAndPendingSet>> bot) {
        return Bot.wire(appId, ledgerClient, transactionFilter, bot, r -> r);
    }

    static Flowable<WorkflowEvent> activeContractSetAndNewTransactions(LedgerClient ledgerClient, TransactionFilter filter) {
        CompletableFuture offsetFuture = new CompletableFuture();
        BiConsumer<LedgerOffset, String> setOffset = (ledgerOffset, name) -> {
            if (!offsetFuture.isDone()) {
                logger.debug(name + ".completeOffsetFuture: " + ledgerOffset);
                offsetFuture.complete(ledgerOffset);
            }
        };
        Flowable activeContracts = FlowableLogger.log(ledgerClient.getActiveContractSetClient().getActiveContracts(filter, true), "acs").doOnNext(r -> r.getOffset().ifPresent(off -> setOffset.accept((LedgerOffset)new LedgerOffset.Absolute(off), "acs.next"))).doOnComplete(() -> setOffset.accept((LedgerOffset)LedgerOffset.LedgerBegin.getInstance(), "acs.complete"));
        Flowable transactions = Single.fromFuture(offsetFuture).doOnSuccess(o -> logger.debug("offset.success: " + o)).doOnError(t -> logger.error("offset.error: " + t)).flatMapPublisher(offset -> FlowableLogger.log(ledgerClient.getTransactionsClient().getTransactions((LedgerOffset)offset, filter, true), "transactions"));
        return Flowable.concat((Publisher)activeContracts, (Publisher)transactions);
    }

    private static Function<@NonNull SubmitCommandsRequest, MaybeSource<? extends LedgerViewFlowable.SubmissionFailure>> commandsFailuresFromSubmissions(CommandSubmissionClient commandSubmissionClient) {
        return cs -> {
            logger.debug("Submitting: {}", cs);
            return FlowableLogger.log(commandSubmissionClient.submit(cs.getWorkflowId(), cs.getApplicationId(), cs.getCommandId(), cs.getParty(), cs.getMinLedgerTimeAbsolute(), cs.getMinLedgerTimeRelative(), cs.getDeduplicationTime(), cs.getCommands()).flatMapMaybe(s -> Maybe.empty()).doOnError(t -> logger.error("Error submitting commands {} for party {}: {}", new Object[]{cs.getCommandId(), cs.getParty(), t.getMessage()})).onErrorReturn(t -> new LedgerViewFlowable.SubmissionFailure(cs.getCommandId(), (Throwable)t)), "commandSubmissions");
        };
    }
}

