/*
 * Decompiled with CFR 0.152.
 */
package org.apache.james.rspamd.task;

import com.github.fge.lambdas.Throwing;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import java.nio.ByteBuffer;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.core.Username;
import org.apache.james.mailbox.MailboxManager;
import org.apache.james.mailbox.MessageIdManager;
import org.apache.james.mailbox.model.MessageResult;
import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
import org.apache.james.rspamd.client.RspamdClientConfiguration;
import org.apache.james.rspamd.client.RspamdHttpClient;
import org.apache.james.rspamd.task.GetMailboxMessagesService;
import org.apache.james.rspamd.task.RunningOptions;
import org.apache.james.task.Task;
import org.apache.james.task.TaskExecutionDetails;
import org.apache.james.task.TaskType;
import org.apache.james.user.api.UsersRepository;
import org.apache.james.util.ReactorUtils;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

public class FeedHamToRspamdTask
implements Task {
    public static final TaskType TASK_TYPE = TaskType.of((String)"FeedHamToRspamdTask");
    private final GetMailboxMessagesService messagesService;
    private final RspamdHttpClient rspamdHttpClient;
    private final RspamdClientConfiguration configuration;
    private final RunningOptions runningOptions;
    private final Context context;
    private final Clock clock;

    public FeedHamToRspamdTask(MailboxManager mailboxManager, UsersRepository usersRepository, MessageIdManager messageIdManager, MailboxSessionMapperFactory mapperFactory, RspamdHttpClient rspamdHttpClient, RunningOptions runningOptions, Clock clock, RspamdClientConfiguration configuration) {
        this.runningOptions = runningOptions;
        this.messagesService = new GetMailboxMessagesService(mailboxManager, usersRepository, mapperFactory, messageIdManager);
        this.rspamdHttpClient = rspamdHttpClient;
        this.context = new Context();
        this.clock = clock;
        this.configuration = configuration;
    }

    public Task.Result run() {
        Optional<Date> afterDate = this.runningOptions.getPeriodInSecond().map(periodInSecond -> Date.from(this.clock.instant().minusSeconds((long)periodInSecond)));
        return (Task.Result)this.messagesService.getHamMessagesOfAllUser(afterDate, this.runningOptions, this.context).transform(ReactorUtils.throttle().elements(this.runningOptions.getMessagesPerSecond()).per(Duration.ofSeconds(1L)).forOperation(userAndMessageResult -> this.reportHam((Pair<Username, MessageResult>)userAndMessageResult).timeout(this.runningOptions.getRspamdTimeout()).then(Mono.fromCallable(() -> {
            this.context.incrementReportedHamMessageCount(1);
            return Task.Result.COMPLETED;
        })).onErrorResume(error -> {
            LOGGER.error("Error when report ham message to Rspamd", error);
            this.context.incrementErrorCount();
            return Mono.just((Object)Task.Result.PARTIAL);
        }))).reduce(Task::combine).switchIfEmpty(Mono.just((Object)Task.Result.COMPLETED)).block();
    }

    public TaskType type() {
        return TASK_TYPE;
    }

    public Optional<TaskExecutionDetails.AdditionalInformation> details() {
        return Optional.of(AdditionalInformation.from(this.context, this.runningOptions));
    }

    @VisibleForTesting
    public Context.Snapshot snapshot() {
        return this.context.snapshot();
    }

    public RunningOptions getRunningOptions() {
        return this.runningOptions;
    }

    private Mono<Void> reportHam(Pair<Username, MessageResult> userAndMessageResult) {
        if (this.configuration.usePerUserBayes()) {
            return this.rspamdHttpClient.reportAsHam((Publisher<ByteBuffer>)((Publisher)Throwing.supplier(() -> ((MessageResult)userAndMessageResult.getRight()).getFullContent().reactiveBytes()).get()), RspamdHttpClient.Options.forUser((Username)userAndMessageResult.getLeft()));
        }
        return this.rspamdHttpClient.reportAsHam((Publisher<ByteBuffer>)((Publisher)Throwing.supplier(() -> ((MessageResult)userAndMessageResult.getRight()).getFullContent().reactiveBytes()).get()));
    }

    public static class Context {
        private final AtomicLong hamMessageCount = new AtomicLong();
        private final AtomicLong reportedHamMessageCount = new AtomicLong();
        private final AtomicLong errorCount = new AtomicLong();

        public void incrementHamMessageCount() {
            this.hamMessageCount.incrementAndGet();
        }

        public void incrementReportedHamMessageCount(int count) {
            this.reportedHamMessageCount.addAndGet(count);
        }

        public void incrementErrorCount() {
            this.errorCount.incrementAndGet();
        }

        public Snapshot snapshot() {
            return Snapshot.builder().hamMessageCount(this.hamMessageCount.get()).reportedHamMessageCount(this.reportedHamMessageCount.get()).errorCount(this.errorCount.get()).build();
        }

        public static class Snapshot {
            private final long hamMessageCount;
            private final long reportedHamMessageCount;
            private final long errorCount;

            public static Builder builder() {
                return new Builder();
            }

            public Snapshot(long hamMessageCount, long reportedHamMessageCount, long errorCount) {
                this.hamMessageCount = hamMessageCount;
                this.reportedHamMessageCount = reportedHamMessageCount;
                this.errorCount = errorCount;
            }

            public long getHamMessageCount() {
                return this.hamMessageCount;
            }

            public long getReportedHamMessageCount() {
                return this.reportedHamMessageCount;
            }

            public long getErrorCount() {
                return this.errorCount;
            }

            public final boolean equals(Object o) {
                if (o instanceof Snapshot) {
                    Snapshot snapshot = (Snapshot)o;
                    return Objects.equals(this.hamMessageCount, snapshot.hamMessageCount) && Objects.equals(this.reportedHamMessageCount, snapshot.reportedHamMessageCount) && Objects.equals(this.errorCount, snapshot.errorCount);
                }
                return false;
            }

            public final int hashCode() {
                return Objects.hash(this.hamMessageCount, this.reportedHamMessageCount, this.errorCount);
            }

            public String toString() {
                return MoreObjects.toStringHelper((Object)this).add("hamMessageCount", this.hamMessageCount).add("reportedHamMessageCount", this.reportedHamMessageCount).add("errorCount", this.errorCount).toString();
            }

            static class Builder {
                private Optional<Long> hamMessageCount = Optional.empty();
                private Optional<Long> reportedHamMessageCount = Optional.empty();
                private Optional<Long> errorCount = Optional.empty();

                Builder() {
                }

                public Snapshot build() {
                    return new Snapshot(this.hamMessageCount.orElse(0L), this.reportedHamMessageCount.orElse(0L), this.errorCount.orElse(0L));
                }

                public Builder hamMessageCount(long hamMessageCount) {
                    this.hamMessageCount = Optional.of(hamMessageCount);
                    return this;
                }

                public Builder reportedHamMessageCount(long reportedHamMessageCount) {
                    this.reportedHamMessageCount = Optional.of(reportedHamMessageCount);
                    return this;
                }

                public Builder errorCount(long errorCount) {
                    this.errorCount = Optional.of(errorCount);
                    return this;
                }
            }
        }
    }

    public static class AdditionalInformation
    implements TaskExecutionDetails.AdditionalInformation {
        private final Instant timestamp;
        private final long hamMessageCount;
        private final long reportedHamMessageCount;
        private final long errorCount;
        private final RunningOptions runningOptions;

        private static AdditionalInformation from(Context context, RunningOptions runningOptions) {
            Context.Snapshot snapshot = context.snapshot();
            return new AdditionalInformation(Clock.systemUTC().instant(), snapshot.getHamMessageCount(), snapshot.getReportedHamMessageCount(), snapshot.getErrorCount(), runningOptions);
        }

        public AdditionalInformation(Instant timestamp, long hamMessageCount, long reportedHamMessageCount, long errorCount, RunningOptions runningOptions) {
            this.timestamp = timestamp;
            this.hamMessageCount = hamMessageCount;
            this.reportedHamMessageCount = reportedHamMessageCount;
            this.errorCount = errorCount;
            this.runningOptions = runningOptions;
        }

        public long getHamMessageCount() {
            return this.hamMessageCount;
        }

        public long getReportedHamMessageCount() {
            return this.reportedHamMessageCount;
        }

        public long getErrorCount() {
            return this.errorCount;
        }

        public RunningOptions getRunningOptions() {
            return this.runningOptions;
        }

        public Instant timestamp() {
            return this.timestamp;
        }

        public final boolean equals(Object o) {
            if (o instanceof AdditionalInformation) {
                AdditionalInformation that = (AdditionalInformation)o;
                return Objects.equals(this.hamMessageCount, that.hamMessageCount) && Objects.equals(this.reportedHamMessageCount, that.reportedHamMessageCount) && Objects.equals(this.errorCount, that.errorCount) && Objects.equals(this.timestamp, that.timestamp) && Objects.equals(this.runningOptions, that.runningOptions);
            }
            return false;
        }

        public final int hashCode() {
            return Objects.hash(this.timestamp, this.hamMessageCount, this.reportedHamMessageCount, this.errorCount, this.runningOptions);
        }
    }
}

