/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.coordination;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.storm.Constants;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.task.IOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.FailedException;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.TimeCacheMap;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CoordinatedBolt
implements IRichBolt {
    public static final Logger LOG = LoggerFactory.getLogger(CoordinatedBolt.class);
    private Map<String, SourceArgs> sourceArgs;
    private IdStreamSpec idStreamSpec;
    private IRichBolt delegate;
    private Integer numSourceReports;
    private List<Integer> countOutTasks = new ArrayList<Integer>();
    private OutputCollector collector;
    private TimeCacheMap<Object, TrackingInfo> tracked;

    public CoordinatedBolt(IRichBolt delegate) {
        this(delegate, null, null);
    }

    public CoordinatedBolt(IRichBolt delegate, String sourceComponent, SourceArgs sourceArgs, IdStreamSpec idStreamSpec) {
        this(delegate, CoordinatedBolt.singleSourceArgs(sourceComponent, sourceArgs), idStreamSpec);
    }

    public CoordinatedBolt(IRichBolt delegate, Map<String, SourceArgs> sourceArgs, IdStreamSpec idStreamSpec) {
        this.sourceArgs = sourceArgs;
        if (this.sourceArgs == null) {
            this.sourceArgs = new HashMap<String, SourceArgs>();
        }
        this.delegate = delegate;
        this.idStreamSpec = idStreamSpec;
    }

    private static Map<String, SourceArgs> singleSourceArgs(String sourceComponent, SourceArgs sourceArgs) {
        HashMap<String, SourceArgs> ret = new HashMap<String, SourceArgs>();
        ret.put(sourceComponent, sourceArgs);
        return ret;
    }

    @Override
    public void prepare(Map<String, Object> config, TopologyContext context, OutputCollector collector) {
        TimeoutItems callback = null;
        if (this.delegate instanceof TimeoutCallback) {
            callback = new TimeoutItems();
        }
        this.tracked = new TimeCacheMap<Object, TrackingInfo>(context.maxTopologyMessageTimeout(), callback);
        this.collector = collector;
        this.delegate.prepare(config, context, new OutputCollector(new CoordinatedOutputCollector(collector)));
        for (String string : ((Map)Utils.get(context.getThisTargets(), Constants.COORDINATED_STREAM_ID, new HashMap())).keySet()) {
            for (Integer task : context.getComponentTasks(string)) {
                this.countOutTasks.add(task);
            }
        }
        if (!this.sourceArgs.isEmpty()) {
            this.numSourceReports = 0;
            for (Map.Entry entry : this.sourceArgs.entrySet()) {
                if (((SourceArgs)entry.getValue()).singleCount) {
                    this.numSourceReports = this.numSourceReports + 1;
                    continue;
                }
                this.numSourceReports = this.numSourceReports + context.getComponentTasks((String)entry.getKey()).size();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean checkFinishId(Tuple tup, TupleType type) {
        Object id = tup.getValue(0);
        boolean failed = false;
        TimeCacheMap<Object, TrackingInfo> timeCacheMap = this.tracked;
        synchronized (timeCacheMap) {
            TrackingInfo track = this.tracked.get(id);
            try {
                if (track != null) {
                    boolean delayed = false;
                    if (this.idStreamSpec == null && type == TupleType.COORD || this.idStreamSpec != null && type == TupleType.ID) {
                        track.ackTuples.add(tup);
                        delayed = true;
                    }
                    if (track.failed) {
                        failed = true;
                        for (Tuple t : track.ackTuples) {
                            this.collector.fail(t);
                        }
                        this.tracked.remove(id);
                    } else if (track.receivedId && (this.sourceArgs.isEmpty() || track.reportCount == this.numSourceReports && track.expectedTupleCount == track.receivedTuples)) {
                        if (this.delegate instanceof FinishedCallback) {
                            ((FinishedCallback)((Object)this.delegate)).finishedId(id);
                        }
                        if (!this.sourceArgs.isEmpty() && type == TupleType.REGULAR) {
                            throw new IllegalStateException("Coordination condition met on a non-coordinating tuple. Should be impossible");
                        }
                        for (int task : this.countOutTasks) {
                            int numTuples = Utils.get(track.taskEmittedTuples, task, 0);
                            this.collector.emitDirect(task, Constants.COORDINATED_STREAM_ID, tup, (List<Object>)new Values(id, numTuples));
                        }
                        for (Tuple t : track.ackTuples) {
                            this.collector.ack(t);
                        }
                        track.finished = true;
                        this.tracked.remove(id);
                    }
                    if (!delayed && type != TupleType.REGULAR) {
                        if (track.failed) {
                            this.collector.fail(tup);
                        } else {
                            this.collector.ack(tup);
                        }
                    }
                } else if (type != TupleType.REGULAR) {
                    this.collector.fail(tup);
                }
            }
            catch (FailedException e) {
                LOG.error("Failed to finish batch", (Throwable)e);
                for (Tuple t : track.ackTuples) {
                    this.collector.fail(t);
                }
                this.tracked.remove(id);
                failed = true;
            }
        }
        return failed;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void execute(Tuple tuple) {
        TrackingInfo track;
        Object id = tuple.getValue(0);
        TupleType type = this.getTupleType(tuple);
        TimeCacheMap<Object, TrackingInfo> timeCacheMap = this.tracked;
        synchronized (timeCacheMap) {
            track = this.tracked.get(id);
            if (track == null) {
                track = new TrackingInfo();
                if (this.idStreamSpec == null) {
                    track.receivedId = true;
                }
                this.tracked.put(id, track);
            }
        }
        if (type == TupleType.ID) {
            timeCacheMap = this.tracked;
            synchronized (timeCacheMap) {
                track.receivedId = true;
            }
            this.checkFinishId(tuple, type);
        } else if (type == TupleType.COORD) {
            int count = (Integer)tuple.getValue(1);
            TimeCacheMap<Object, TrackingInfo> timeCacheMap2 = this.tracked;
            synchronized (timeCacheMap2) {
                ++track.reportCount;
                track.expectedTupleCount += count;
            }
            this.checkFinishId(tuple, type);
        } else {
            timeCacheMap = this.tracked;
            synchronized (timeCacheMap) {
                this.delegate.execute(tuple);
            }
        }
    }

    @Override
    public void cleanup() {
        this.delegate.cleanup();
        this.tracked.cleanup();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        this.delegate.declareOutputFields(declarer);
        declarer.declareStream(Constants.COORDINATED_STREAM_ID, true, new Fields("id", "count"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return this.delegate.getComponentConfiguration();
    }

    private TupleType getTupleType(Tuple tuple) {
        if (this.idStreamSpec != null && tuple.getSourceGlobalStreamId().equals(this.idStreamSpec.id)) {
            return TupleType.ID;
        }
        if (!this.sourceArgs.isEmpty() && tuple.getSourceStreamId().equals(Constants.COORDINATED_STREAM_ID)) {
            return TupleType.COORD;
        }
        return TupleType.REGULAR;
    }

    private class TimeoutItems
    implements TimeCacheMap.ExpiredCallback<Object, TrackingInfo> {
        private TimeoutItems() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void expire(Object id, TrackingInfo val) {
            TimeCacheMap<Object, TrackingInfo> timeCacheMap = CoordinatedBolt.this.tracked;
            synchronized (timeCacheMap) {
                val.failed = true;
                if (!val.finished) {
                    ((TimeoutCallback)((Object)CoordinatedBolt.this.delegate)).timeoutId(id);
                }
            }
        }
    }

    public class CoordinatedOutputCollector
    implements IOutputCollector {
        IOutputCollector delegate;

        public CoordinatedOutputCollector(IOutputCollector delegate) {
            this.delegate = delegate;
        }

        @Override
        public List<Integer> emit(String stream, Collection<Tuple> anchors, List<Object> tuple) {
            List<Integer> tasks = this.delegate.emit(stream, anchors, tuple);
            this.updateTaskCounts(tuple.get(0), tasks);
            return tasks;
        }

        @Override
        public void emitDirect(int task, String stream, Collection<Tuple> anchors, List<Object> tuple) {
            this.updateTaskCounts(tuple.get(0), Arrays.asList(task));
            this.delegate.emitDirect(task, stream, anchors, tuple);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void ack(Tuple tuple) {
            Object id = tuple.getValue(0);
            TimeCacheMap<Object, TrackingInfo> timeCacheMap = CoordinatedBolt.this.tracked;
            synchronized (timeCacheMap) {
                TrackingInfo track = CoordinatedBolt.this.tracked.get(id);
                if (track != null) {
                    ++track.receivedTuples;
                }
            }
            boolean failed = CoordinatedBolt.this.checkFinishId(tuple, TupleType.REGULAR);
            if (failed) {
                this.delegate.fail(tuple);
            } else {
                this.delegate.ack(tuple);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void fail(Tuple tuple) {
            Object id = tuple.getValue(0);
            TimeCacheMap<Object, TrackingInfo> timeCacheMap = CoordinatedBolt.this.tracked;
            synchronized (timeCacheMap) {
                TrackingInfo track = CoordinatedBolt.this.tracked.get(id);
                if (track != null) {
                    track.failed = true;
                }
            }
            CoordinatedBolt.this.checkFinishId(tuple, TupleType.REGULAR);
            this.delegate.fail(tuple);
        }

        @Override
        public void flush() {
            this.delegate.flush();
        }

        @Override
        public void resetTimeout(Tuple tuple) {
            this.delegate.resetTimeout(tuple);
        }

        @Override
        public void reportError(Throwable error) {
            this.delegate.reportError(error);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void updateTaskCounts(Object id, List<Integer> tasks) {
            TimeCacheMap<Object, TrackingInfo> timeCacheMap = CoordinatedBolt.this.tracked;
            synchronized (timeCacheMap) {
                TrackingInfo track = CoordinatedBolt.this.tracked.get(id);
                if (track != null) {
                    Map<Integer, Integer> taskEmittedTuples = track.taskEmittedTuples;
                    for (Integer task : tasks) {
                        int newCount = Utils.get(taskEmittedTuples, task, 0) + 1;
                        taskEmittedTuples.put(task, newCount);
                    }
                }
            }
        }
    }

    public static class IdStreamSpec
    implements Serializable {
        GlobalStreamId id;

        protected IdStreamSpec(String component, String stream) {
            this.id = new GlobalStreamId(component, stream);
        }

        public static IdStreamSpec makeDetectSpec(String component, String stream) {
            return new IdStreamSpec(component, stream);
        }

        public GlobalStreamId getGlobalStreamId() {
            return this.id;
        }
    }

    public static class TrackingInfo {
        int reportCount = 0;
        int expectedTupleCount = 0;
        int receivedTuples = 0;
        boolean failed = false;
        Map<Integer, Integer> taskEmittedTuples = new HashMap<Integer, Integer>();
        boolean receivedId = false;
        boolean finished = false;
        List<Tuple> ackTuples = new ArrayList<Tuple>();

        public String toString() {
            return "reportCount: " + this.reportCount + "\nexpectedTupleCount: " + this.expectedTupleCount + "\nreceivedTuples: " + this.receivedTuples + "\nfailed: " + this.failed + "\n" + this.taskEmittedTuples.toString();
        }
    }

    public static class SourceArgs
    implements Serializable {
        public boolean singleCount;

        protected SourceArgs(boolean singleCount) {
            this.singleCount = singleCount;
        }

        public static SourceArgs single() {
            return new SourceArgs(true);
        }

        public static SourceArgs all() {
            return new SourceArgs(false);
        }

        public String toString() {
            return "<Single: " + this.singleCount + ">";
        }
    }

    public static interface TimeoutCallback {
        public void timeoutId(Object var1);
    }

    public static interface FinishedCallback {
        public void finishedId(Object var1);
    }

    static enum TupleType {
        REGULAR,
        ID,
        COORD;

    }
}

