/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.slack.consume;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.slack.api.methods.SlackApiException;
import com.slack.api.methods.request.conversations.ConversationsHistoryRequest;
import com.slack.api.methods.request.conversations.ConversationsRepliesRequest;
import com.slack.api.methods.response.conversations.ConversationsHistoryResponse;
import com.slack.api.methods.response.conversations.ConversationsRepliesResponse;
import com.slack.api.model.Message;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.slack.consume.ConsumeSlackClient;
import org.apache.nifi.processors.slack.consume.PartialThreadException;
import org.apache.nifi.processors.slack.consume.SlackTimestamp;
import org.apache.nifi.processors.slack.consume.UsernameLookup;
import org.apache.nifi.processors.slack.util.SlackResponseUtil;

public class ConsumeChannel {
    private static final String CONVERSATION_HISTORY_URL = "https://slack.com/api/conversations.history";
    private static final String CHECK_FOR_REPLIES = "check for replies";
    private static final String BACKWARD = "backward";
    private static final String FORWARD = "forward";
    private static final Pattern MENTION_PATTERN = Pattern.compile("<@(U.*?)>");
    private static final long YIELD_MILLIS = 3000L;
    private final ConsumeSlackClient client;
    private final String channelId;
    private final String channelName;
    private final int batchSize;
    private final long replyMonitorFrequencyMillis;
    private final long replyMonitorWindowMillis;
    private final boolean resolveUsernames;
    private final boolean includeMessageBlocks;
    private final UsernameLookup usernameLookup;
    private final Relationship successRelationship;
    private final ComponentLog logger;
    private final ObjectMapper objectMapper;
    private final StateKeys stateKeys;
    private volatile long yieldExpiration;
    private volatile long lastReplyMonitorPollEnd = System.currentTimeMillis();
    private final AtomicLong nextRequestTime = new AtomicLong(0L);

    private ConsumeChannel(Builder builder) {
        this.client = builder.client;
        this.channelId = builder.channelId;
        this.channelName = builder.channelName;
        this.batchSize = builder.batchSize;
        this.replyMonitorFrequencyMillis = builder.replyMonitorFrequencyMillis;
        this.replyMonitorWindowMillis = builder.replyMonitorWindowMillis;
        this.logger = builder.logger;
        this.resolveUsernames = builder.resolveUsernames;
        this.includeMessageBlocks = builder.includeMessageBlocks;
        this.successRelationship = builder.successRelationship;
        this.usernameLookup = builder.usernameLookup;
        this.objectMapper = builder.objectMapper;
        this.stateKeys = new StateKeys(this.channelId);
    }

    public String getChannelId() {
        return this.channelId;
    }

    public ConfigVerificationResult verify() {
        ConversationsHistoryResponse response;
        ConversationsHistoryRequest request = ConversationsHistoryRequest.builder().channel(this.channelId).limit(Integer.valueOf(1)).build();
        try {
            response = this.client.fetchConversationsHistory(request);
        }
        catch (Exception e) {
            return new ConfigVerificationResult.Builder().verificationStepName("Check authorization for Channel " + this.channelId).outcome(ConfigVerificationResult.Outcome.FAILED).explanation("Failed to obtain a message due to: " + String.valueOf(e)).build();
        }
        if (response.isOk()) {
            List messages = response.getMessages();
            Message firstMessage = (Message)messages.get(0);
            this.enrichMessage(firstMessage);
            String username = firstMessage.getUsername();
            if (this.resolveUsernames && username == null) {
                return new ConfigVerificationResult.Builder().verificationStepName("Check authorization for Channel " + this.channelId).outcome(ConfigVerificationResult.Outcome.FAILED).explanation("Successfully retrieved a message but failed to resolve the username").build();
            }
            String user = username == null ? firstMessage.getUser() : username;
            Object explanation = response.getMessages().isEmpty() ? "Successfully requested messages for channel but got no messages" : "Successfully retrieved a message from " + user;
            return new ConfigVerificationResult.Builder().verificationStepName("Check authorization for Channel " + this.channelId).outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation((String)explanation).build();
        }
        String errorMessage = SlackResponseUtil.getErrorMessage(response.getError(), response.getNeeded(), response.getProvided(), response.getWarning());
        return new ConfigVerificationResult.Builder().verificationStepName("Check authorization for Channel " + this.channelId).outcome(ConfigVerificationResult.Outcome.FAILED).explanation("Failed to obtain a message due to: " + errorMessage).build();
    }

    public void consume(ProcessContext context, ProcessSession session) throws IOException, SlackApiException {
        StateMap stateMap;
        long minTimestamp = this.nextRequestTime.get();
        if (minTimestamp > 0L && System.currentTimeMillis() < minTimestamp) {
            context.yield();
            return;
        }
        try {
            stateMap = session.getState(Scope.CLUSTER);
        }
        catch (IOException ioe) {
            this.logger.error("Failed to determine current offset for channel {}; will not retrieve any messages until this is resolved", new Object[]{this.channelId, ioe});
            context.yield();
            return;
        }
        boolean checkForReplies = this.isCheckForReplies(stateMap);
        if (checkForReplies) {
            this.consumeReplies(context, session, stateMap);
        } else {
            this.consumeLatestMessages(context, session, stateMap);
        }
    }

    private boolean isCheckForReplies(StateMap stateMap) {
        String currentAction = stateMap.get(this.stateKeys.ACTION);
        if (CHECK_FOR_REPLIES.equals(currentAction)) {
            return true;
        }
        long nextCheckRepliesTime = this.lastReplyMonitorPollEnd + this.replyMonitorFrequencyMillis;
        return System.currentTimeMillis() > nextCheckRepliesTime;
    }

    private void consumeReplies(ProcessContext context, ProcessSession session, StateMap stateMap) throws IOException, SlackApiException {
        ConsumptionResults results;
        String minTsValue;
        String direction = stateMap.get(this.stateKeys.DIRECTION);
        if (!FORWARD.equals(direction)) {
            this.onCompletedRepliesScan(session, new HashMap<String, String>(stateMap.toMap()), null);
            return;
        }
        String latestTs = stateMap.get(this.stateKeys.LATEST_TS);
        if (latestTs == null) {
            this.onCompletedRepliesScan(session, new HashMap<String, String>(stateMap.toMap()), null);
            return;
        }
        HashMap<String, String> updatedStateMap = new HashMap<String, String>(stateMap.toMap());
        String currentAction = stateMap.get(this.stateKeys.ACTION);
        if (!CHECK_FOR_REPLIES.equals(currentAction)) {
            updatedStateMap.put(this.stateKeys.ACTION, CHECK_FOR_REPLIES);
            session.setState(updatedStateMap, Scope.CLUSTER);
        }
        if ((minTsValue = stateMap.get(this.stateKeys.REPLY_MIN_TS)) == null) {
            minTsValue = latestTs;
        }
        String maxTsValue = stateMap.get(this.stateKeys.REPLY_MAX_TS);
        SlackTimestamp minTs = new SlackTimestamp(minTsValue);
        SlackTimestamp maxTs = maxTsValue == null ? new SlackTimestamp() : new SlackTimestamp(maxTsValue);
        SlackTimestamp maxParentTs = new SlackTimestamp(latestTs);
        String oldestThreadTs = new SlackTimestamp(System.currentTimeMillis() - this.replyMonitorWindowMillis).getRawValue();
        String earliestThreadTs = stateMap.get(this.stateKeys.HISTORICAL_REPLIES_EARLIEST_THREAD_TS);
        if (earliestThreadTs == null) {
            earliestThreadTs = new SlackTimestamp(System.currentTimeMillis()).getRawValue();
        }
        String repliesCursor = stateMap.get(this.stateKeys.HISTORICAL_MESSAGES_REPLIES_CURSOR);
        do {
            Predicate<Message> replyFilter;
            Predicate<Message> messageFilter;
            ConversationsHistoryRequest request;
            if (!(results = this.consumeMessages(context, session, request = ConversationsHistoryRequest.builder().channel(this.channelId).limit(Integer.valueOf(500)).latest(earliestThreadTs).oldest(oldestThreadTs).inclusive(true).build(), messageFilter = message -> false, repliesCursor, minTs, replyFilter = reply -> {
                SlackTimestamp replyTs = new SlackTimestamp(reply.getTs());
                if (replyTs.afterOrEqualTo(maxTs)) {
                    return false;
                }
                if (replyTs.beforeOrEqualTo(minTs)) {
                    return false;
                }
                SlackTimestamp replyThreadTs = new SlackTimestamp(reply.getThreadTs());
                return !replyThreadTs.after(maxParentTs);
            })).isMore() && !results.isFailure()) {
                this.onCompletedRepliesScan(session, updatedStateMap, maxTs);
                return;
            }
            SlackTimestamp earliest = results.getEarliestTimestamp();
            earliestThreadTs = earliest == null ? null : earliest.getRawValue();
            repliesCursor = results.getRepliesCursor();
            if (earliestThreadTs == null) break;
            updatedStateMap.put(this.stateKeys.HISTORICAL_REPLIES_EARLIEST_THREAD_TS, earliestThreadTs);
            if (repliesCursor != null) {
                updatedStateMap.put(this.stateKeys.HISTORICAL_MESSAGES_REPLIES_CURSOR, repliesCursor);
            }
            session.setState(updatedStateMap, Scope.CLUSTER);
            session.commitAsync();
        } while (results.isContinuePolling());
    }

    private void onCompletedRepliesScan(ProcessSession session, Map<String, String> updatedStateMap, SlackTimestamp replyTsCutoff) throws IOException {
        updatedStateMap.remove(this.stateKeys.ACTION);
        updatedStateMap.remove(this.stateKeys.HISTORICAL_REPLIES_EARLIEST_THREAD_TS);
        updatedStateMap.remove(this.stateKeys.HISTORICAL_MESSAGES_REPLIES_CURSOR);
        updatedStateMap.remove(this.stateKeys.REPLY_MAX_TS);
        if (replyTsCutoff != null) {
            updatedStateMap.put(this.stateKeys.REPLY_MIN_TS, replyTsCutoff.getRawValue());
        }
        session.setState(updatedStateMap, Scope.CLUSTER);
        this.lastReplyMonitorPollEnd = System.currentTimeMillis();
    }

    private void consumeLatestMessages(ProcessContext context, ProcessSession session, StateMap stateMap) throws IOException, SlackApiException {
        ConsumptionResults results;
        String startingRepliesCursor = stateMap.get(this.stateKeys.LATEST_REPLIES_CURSOR);
        String direction = stateMap.get(this.stateKeys.DIRECTION);
        if (direction == null) {
            direction = BACKWARD;
        }
        String startingTimestampKey = BACKWARD.equals(direction) ? this.stateKeys.EARLIEST_TS : this.stateKeys.LATEST_TS;
        String ts = stateMap.get(startingTimestampKey);
        boolean includeLastMessage = startingRepliesCursor != null;
        String repliesCursor = startingRepliesCursor;
        HashMap<String, String> updatedStateMap = new HashMap<String, String>(stateMap.toMap());
        do {
            String timestampKeyName;
            SlackTimestamp resultTimestamp;
            ConversationsHistoryRequest request = ConversationsHistoryRequest.builder().channel(this.channelId).limit(Integer.valueOf(this.batchSize)).inclusive(includeLastMessage).build();
            if (direction.equals(FORWARD)) {
                request.setOldest(ts);
                request.setLatest(null);
            } else {
                request.setOldest(null);
                request.setLatest(ts);
            }
            String firstMessageTs = ts;
            Predicate<Message> messageFilter = message -> !Objects.equals(message.getTs(), firstMessageTs);
            Predicate<Message> replyFilter = message -> true;
            results = this.consumeMessages(context, session, request, messageFilter, repliesCursor, null, replyFilter);
            if (direction.equals(FORWARD)) {
                resultTimestamp = results.getLatestTimestamp();
                timestampKeyName = this.stateKeys.LATEST_TS;
            } else {
                resultTimestamp = results.getEarliestTimestamp();
                timestampKeyName = this.stateKeys.EARLIEST_TS;
            }
            if (resultTimestamp == null) break;
            ts = resultTimestamp.getRawValue();
            repliesCursor = results.getRepliesCursor();
            includeLastMessage = repliesCursor != null;
            updatedStateMap.put(timestampKeyName, ts);
            if (updatedStateMap.get(this.stateKeys.LATEST_TS) == null) {
                SlackTimestamp latestTimestamp = results.getLatestTimestamp();
                updatedStateMap.put(this.stateKeys.LATEST_TS, latestTimestamp == null ? null : latestTimestamp.getRawValue());
            }
            if (repliesCursor != null) {
                updatedStateMap.put(this.stateKeys.LATEST_REPLIES_CURSOR, repliesCursor);
            }
            if (!results.isMore() && !results.isFailure()) {
                updatedStateMap.put(this.stateKeys.DIRECTION, FORWARD);
                updatedStateMap.remove(this.stateKeys.EARLIEST_TS);
                this.logger.info("Successfully completed initial load of messages for channel {}", new Object[]{this.channelId});
            }
            session.setState(updatedStateMap, Scope.CLUSTER);
            session.commitAsync();
        } while (results.isContinuePolling());
    }

    private ConsumptionResults consumeMessages(ProcessContext context, ProcessSession session, ConversationsHistoryRequest request, Predicate<Message> messageFilter, String startingRepliesCursor, SlackTimestamp oldestReplyTs, Predicate<Message> replyFilter) throws IOException, SlackApiException {
        ConversationsHistoryResponse response = this.client.fetchConversationsHistory(request);
        if (!response.isOk()) {
            String error = SlackResponseUtil.getErrorMessage(response.getError(), response.getNeeded(), response.getProvided(), response.getWarning());
            this.logger.error("Received unexpected response from Slack when attempting to retrieve messages for channel {}: {}", new Object[]{this.channelId, error});
            context.yield();
            return new StandardConsumptionResults(null, null, null, true, false, false);
        }
        List messages = response.getMessages();
        if (messages.isEmpty()) {
            this.logger.debug("Received no new messages from Slack for channel {}", new Object[]{this.channelId});
            this.yield();
            return new StandardConsumptionResults(null, null, null, false, false, false);
        }
        FlowFile flowFile = session.create();
        int messageCount = 0;
        PartialThreadException partialThreadException = null;
        SlackTimestamp earliestTimestamp = null;
        SlackTimestamp latestTimestamp = null;
        try (OutputStream out = session.write(flowFile);
             JsonGenerator generator = this.objectMapper.createGenerator(out);){
            generator.writeStartArray();
            Iterator messageItr = messages.iterator();
            while (messageItr.hasNext()) {
                List<Message> replies;
                Message message = (Message)messageItr.next();
                boolean enrichFailed = false;
                boolean includeMessage = messageFilter.test(message);
                if (includeMessage) {
                    boolean success = this.enrichMessage(message);
                    enrichFailed = !success;
                    generator.writeObject((Object)message);
                    ++messageCount;
                    SlackTimestamp msgTimestamp = new SlackTimestamp(message.getTs());
                    if (earliestTimestamp == null || msgTimestamp.before(earliestTimestamp)) {
                        earliestTimestamp = msgTimestamp;
                    }
                    if (latestTimestamp == null || msgTimestamp.after(latestTimestamp)) {
                        latestTimestamp = msgTimestamp;
                    }
                } else {
                    messageItr.remove();
                }
                if (!SlackResponseUtil.hasReplies(message)) continue;
                try {
                    replies = this.fetchReplies(message, startingRepliesCursor, oldestReplyTs);
                }
                catch (PartialThreadException e) {
                    this.yieldOnException(e, this.channelId, message, context);
                    partialThreadException = e;
                    replies = e.getRetrieved();
                }
                for (Message reply : replies) {
                    if (reply.getTs().equals(message.getTs()) || !replyFilter.test(reply)) continue;
                    boolean success = this.enrichMessage(reply);
                    enrichFailed = enrichFailed || !success;
                    generator.writeObject((Object)reply);
                }
                messageCount += replies.size();
                if (partialThreadException == null && !enrichFailed) continue;
                while (messageItr.hasNext()) {
                    messageItr.next();
                    messageItr.remove();
                }
                break block12;
            }
            generator.writeEndArray();
        }
        if (!response.isHasMore()) {
            this.yield();
        }
        if (messageCount == 0) {
            session.remove(flowFile);
            boolean moreMessages = partialThreadException != null || response.isHasMore();
            return new StandardConsumptionResults(null, null, null, partialThreadException != null, false, moreMessages);
        }
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("slack.channel.id", this.channelId);
        attributes.put("slack.channel.name", this.channelName);
        attributes.put("slack.message.count", Integer.toString(messageCount));
        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
        flowFile = session.putAllAttributes(flowFile, attributes);
        session.getProvenanceReporter().receive(flowFile, CONVERSATION_HISTORY_URL);
        session.transfer(flowFile, this.successRelationship);
        String repliesCursor = null;
        if (partialThreadException != null) {
            repliesCursor = partialThreadException.getNextCursor();
        }
        boolean hasMoreReplies = repliesCursor != null;
        boolean moreMessages = response.isHasMore() || hasMoreReplies;
        boolean continuePolling = partialThreadException == null && moreMessages;
        return new StandardConsumptionResults(earliestTimestamp, latestTimestamp, repliesCursor, partialThreadException != null, continuePolling, moreMessages);
    }

    private boolean enrichMessage(Message message) {
        message.setChannel(this.channelId);
        if (!this.includeMessageBlocks) {
            message.setBlocks(null);
        }
        AtomicBoolean lookupFailed = new AtomicBoolean(false);
        if (this.resolveUsernames) {
            String text;
            if (message.getUsername() == null && message.getUser() != null) {
                String username = this.usernameLookup.getUsername(message.getUser());
                if (username == null) {
                    lookupFailed.set(true);
                }
                message.setUsername(username);
            }
            if ((text = message.getText()) != null) {
                Matcher matcher = MENTION_PATTERN.matcher(text);
                String updatedText = matcher.replaceAll(matchResult -> {
                    String id = matchResult.group(1);
                    String username = this.usernameLookup.getUsername(id);
                    if (username == null) {
                        lookupFailed.set(true);
                        matchResult.group(0);
                    }
                    return "<@" + username + ">";
                });
                message.setText(updatedText);
            }
        }
        return !lookupFailed.get();
    }

    private void yieldOnException(PartialThreadException e, String channelId, Message message, ProcessContext context) {
        int retryAfterSeconds;
        if (SlackResponseUtil.isRateLimited(e.getCause())) {
            retryAfterSeconds = SlackResponseUtil.getRetryAfterSeconds(e);
            this.logger.warn("Slack indicated that the Rate Limit has been exceeded when attempting to retrieve messages for channel {}; will continue in {} seconds", new Object[]{channelId, retryAfterSeconds});
        } else {
            this.logger.error("Encountered unexpected response from Slack when retrieving replies to message with thread timestamp {} due to: {}", new Object[]{message.getThreadTs(), e.getMessage(), e});
        }
        retryAfterSeconds = SlackResponseUtil.getRetryAfterSeconds(e);
        long timeOfNextRequest = System.currentTimeMillis() + (long)retryAfterSeconds * 1000L;
        this.nextRequestTime.getAndUpdate(currentTime -> Math.max(currentTime, timeOfNextRequest));
        context.yield();
    }

    private List<Message> fetchReplies(Message message, String startCursor, SlackTimestamp oldestTs) throws SlackApiException, IOException, PartialThreadException {
        String latestReply;
        ArrayList<Message> replies = new ArrayList<Message>();
        if (oldestTs != null && (latestReply = message.getLatestReply()) != null && new SlackTimestamp(latestReply).before(oldestTs)) {
            return Collections.emptyList();
        }
        String cursor = startCursor;
        while (true) {
            ConversationsRepliesResponse response;
            ConversationsRepliesRequest request = ConversationsRepliesRequest.builder().channel(this.channelId).ts(message.getThreadTs()).includeAllMetadata(true).limit(Integer.valueOf(1000)).oldest(oldestTs == null ? null : oldestTs.getRawValue()).cursor(cursor).build();
            try {
                response = this.client.fetchConversationsReplies(request);
            }
            catch (Exception e) {
                if (replies.isEmpty()) {
                    throw e;
                }
                throw new PartialThreadException(replies, cursor, e);
            }
            if (!response.isOk()) {
                String errorMessage = SlackResponseUtil.getErrorMessage(response.getError(), response.getNeeded(), response.getProvided(), response.getWarning());
                throw new PartialThreadException(replies, cursor, errorMessage);
            }
            replies.addAll(response.getMessages());
            if (!response.isHasMore()) break;
            cursor = response.getResponseMetadata().getNextCursor();
        }
        return replies;
    }

    public void yield() {
        this.yieldExpiration = System.currentTimeMillis() + 3000L;
    }

    public boolean isYielded() {
        long expiration = this.yieldExpiration;
        if (expiration == 0L) {
            return false;
        }
        if (System.currentTimeMillis() < this.yieldExpiration) {
            return true;
        }
        this.yieldExpiration = 0L;
        return false;
    }

    public static class Builder {
        private ConsumeSlackClient client;
        private String channelId;
        private String channelName;
        private boolean includeMessageBlocks;
        private boolean resolveUsernames;
        private int batchSize = 50;
        private ComponentLog logger;
        private Relationship successRelationship;
        private UsernameLookup usernameLookup;
        private long replyMonitorFrequencyMillis = TimeUnit.SECONDS.toMillis(60L);
        private long replyMonitorWindowMillis = TimeUnit.DAYS.toMillis(7L);
        private ObjectMapper objectMapper;

        public Builder channelId(String channelId) {
            this.channelId = channelId;
            return this;
        }

        public Builder channelName(String channelName) {
            this.channelName = channelName;
            return this;
        }

        public Builder client(ConsumeSlackClient client) {
            this.client = client;
            return this;
        }

        public Builder batchSize(int batchSize) {
            this.batchSize = batchSize;
            return this;
        }

        public Builder logger(ComponentLog logger) {
            this.logger = logger;
            return this;
        }

        public Builder replyMonitorFrequency(long value, TimeUnit timeUnit) {
            this.replyMonitorFrequencyMillis = timeUnit.toMillis(value);
            return this;
        }

        public Builder replyMonitorWindow(long value, TimeUnit timeUnit) {
            this.replyMonitorWindowMillis = timeUnit.toMillis(value);
            return this;
        }

        public Builder includeMessageBlocks(boolean includeMessageBlocks) {
            this.includeMessageBlocks = includeMessageBlocks;
            return this;
        }

        public Builder resolveUsernames(boolean resolveUsernames) {
            this.resolveUsernames = resolveUsernames;
            return this;
        }

        public Builder successRelationship(Relationship relationship) {
            this.successRelationship = relationship;
            return this;
        }

        public Builder usernameLookup(UsernameLookup lookup) {
            this.usernameLookup = lookup;
            return this;
        }

        public Builder objectMapper(ObjectMapper objectMapper) {
            this.objectMapper = objectMapper;
            return this;
        }

        public ConsumeChannel build() {
            return new ConsumeChannel(this);
        }
    }

    private static class StateKeys {
        public final String ACTION;
        public final String LATEST_TS;
        public final String EARLIEST_TS;
        public final String DIRECTION;
        public final String LATEST_REPLIES_CURSOR;
        public final String HISTORICAL_MESSAGES_REPLIES_CURSOR;
        public final String HISTORICAL_REPLIES_EARLIEST_THREAD_TS;
        public final String REPLY_MIN_TS;
        public final String REPLY_MAX_TS;

        public StateKeys(String channelId) {
            this.ACTION = channelId + ".action";
            this.LATEST_TS = channelId + ".latest";
            this.EARLIEST_TS = channelId + ".earliest";
            this.DIRECTION = channelId + ".direction";
            this.LATEST_REPLIES_CURSOR = channelId + ".latest.replies.cursor";
            this.HISTORICAL_MESSAGES_REPLIES_CURSOR = channelId + ".historical.replies.cursor";
            this.HISTORICAL_REPLIES_EARLIEST_THREAD_TS = channelId + ".historical.replies.ts";
            this.REPLY_MIN_TS = channelId + ".historical.reply.min.ts";
            this.REPLY_MAX_TS = channelId + ".historical.reply.max.ts";
        }
    }

    private static interface ConsumptionResults {
        public SlackTimestamp getEarliestTimestamp();

        public SlackTimestamp getLatestTimestamp();

        public String getRepliesCursor();

        public boolean isFailure();

        public boolean isContinuePolling();

        public boolean isMore();
    }

    private static class StandardConsumptionResults
    implements ConsumptionResults {
        private final SlackTimestamp earliestTimestamp;
        private final SlackTimestamp latestTimestamp;
        private final boolean failure;
        private final boolean continuePolling;
        private final String repliesCursor;
        private final boolean isMore;

        public StandardConsumptionResults(SlackTimestamp earliestTimestamp, SlackTimestamp latestTimestamp, String repliesCursor, boolean failure, boolean continuePolling, boolean moreMessages) {
            this.earliestTimestamp = earliestTimestamp;
            this.latestTimestamp = latestTimestamp;
            this.repliesCursor = repliesCursor;
            this.failure = failure;
            this.continuePolling = continuePolling;
            this.isMore = moreMessages;
        }

        @Override
        public SlackTimestamp getEarliestTimestamp() {
            return this.earliestTimestamp;
        }

        @Override
        public SlackTimestamp getLatestTimestamp() {
            return this.latestTimestamp;
        }

        @Override
        public String getRepliesCursor() {
            return this.repliesCursor;
        }

        @Override
        public boolean isFailure() {
            return this.failure;
        }

        @Override
        public boolean isContinuePolling() {
            return this.continuePolling;
        }

        @Override
        public boolean isMore() {
            return this.isMore;
        }
    }
}

