/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.bolt;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.windowing.TimestampExtractor;
import org.apache.storm.windowing.TupleWindow;

public class JoinBolt
extends BaseWindowedBolt {
    protected final Selector selectorType;
    protected LinkedHashMap<String, JoinInfo> joinCriteria = new LinkedHashMap();
    protected FieldSelector[] outputFields;
    protected String outputStreamName;
    HashMap<String, HashMap<Object, ArrayList<Tuple>>> hashedInputs = new HashMap();
    private OutputCollector collector;

    public JoinBolt(String sourceId, String fieldName) {
        this(Selector.SOURCE, sourceId, fieldName);
    }

    public JoinBolt(Selector type, String srcOrStreamId, String fieldName) {
        this.selectorType = type;
        this.joinCriteria.put(srcOrStreamId, new JoinInfo(new FieldSelector(srcOrStreamId, fieldName)));
    }

    public JoinBolt withOutputStream(String streamName) {
        this.outputStreamName = streamName;
        return this;
    }

    public JoinBolt join(String newStream, String field, String priorStream) {
        return this.joinCommon(newStream, field, priorStream, JoinType.INNER);
    }

    public JoinBolt leftJoin(String newStream, String field, String priorStream) {
        return this.joinCommon(newStream, field, priorStream, JoinType.LEFT);
    }

    private JoinBolt joinCommon(String newStream, String fieldDescriptor, String priorStream, JoinType joinType) {
        if (this.hashedInputs.containsKey(newStream)) {
            throw new IllegalArgumentException("'" + newStream + "' is already part of join. Cannot join with it more than once.");
        }
        this.hashedInputs.put(newStream, new HashMap());
        JoinInfo joinInfo = this.joinCriteria.get(priorStream);
        if (joinInfo == null) {
            throw new IllegalArgumentException("Stream '" + priorStream + "' was not previously declared");
        }
        FieldSelector field = new FieldSelector(newStream, fieldDescriptor);
        this.joinCriteria.put(newStream, new JoinInfo(field, priorStream, joinInfo, joinType));
        return this;
    }

    public JoinBolt select(String commaSeparatedKeys) {
        String[] fieldNames = commaSeparatedKeys.split(",");
        this.outputFields = new FieldSelector[fieldNames.length];
        for (int i = 0; i < fieldNames.length; ++i) {
            this.outputFields[i] = new FieldSelector(fieldNames[i]);
        }
        return this;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        String[] outputFieldNames = new String[this.outputFields.length];
        for (int i = 0; i < this.outputFields.length; ++i) {
            outputFieldNames[i] = this.outputFields[i].getOutputName();
        }
        if (this.outputStreamName != null) {
            declarer.declareStream(this.outputStreamName, new Fields(outputFieldNames));
        } else {
            declarer.declare(new Fields(outputFieldNames));
        }
    }

    @Override
    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        int i = 0;
        for (String stream : this.joinCriteria.keySet()) {
            if (i > 0) {
                this.hashedInputs.put(stream, new HashMap());
            }
            ++i;
        }
        if (this.outputFields == null) {
            throw new IllegalArgumentException("Must specify output fields via .select() method.");
        }
    }

    @Override
    public void execute(TupleWindow inputWindow) {
        List<Tuple> currentWindow = inputWindow.get();
        JoinAccumulator joinResult = this.hashJoin(currentWindow);
        for (ResultRecord resultRecord : joinResult.getRecords()) {
            ArrayList<Object> outputTuple = resultRecord.getOutputFields();
            if (this.outputStreamName == null) {
                this.collector.emit(resultRecord.tupleList, outputTuple);
                continue;
            }
            this.collector.emit(this.outputStreamName, resultRecord.tupleList, outputTuple);
        }
    }

    private void clearHashedInputs() {
        for (HashMap<Object, ArrayList<Tuple>> mappings : this.hashedInputs.values()) {
            mappings.clear();
        }
    }

    protected JoinAccumulator hashJoin(List<Tuple> tuples) {
        this.clearHashedInputs();
        JoinAccumulator probe = new JoinAccumulator();
        String firstStream = this.joinCriteria.keySet().iterator().next();
        for (Tuple tuple : tuples) {
            String streamId = this.getStreamSelector(tuple);
            if (!streamId.equals(firstStream)) {
                Object field = this.getJoinField(streamId, tuple);
                ArrayList<Tuple> recs = this.hashedInputs.get(streamId).get(field);
                if (recs == null) {
                    recs = new ArrayList();
                    this.hashedInputs.get(streamId).put(field, recs);
                }
                recs.add(tuple);
                continue;
            }
            ResultRecord probeRecord = new ResultRecord(tuple, this.joinCriteria.size() == 1);
            probe.insert(probeRecord);
        }
        int i = 0;
        for (String streamName : this.joinCriteria.keySet()) {
            boolean finalJoin;
            boolean bl = finalJoin = i == this.joinCriteria.size() - 1;
            if (i > 0) {
                probe = this.doJoin(probe, this.hashedInputs.get(streamName), this.joinCriteria.get(streamName), finalJoin);
            }
            ++i;
        }
        return probe;
    }

    protected JoinAccumulator doJoin(JoinAccumulator probe, HashMap<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo, boolean finalJoin) {
        JoinType joinType = joinInfo.getJoinType();
        switch (joinType) {
            case INNER: {
                return this.doInnerJoin(probe, buildInput, joinInfo, finalJoin);
            }
            case LEFT: {
                return this.doLeftJoin(probe, buildInput, joinInfo, finalJoin);
            }
        }
        throw new RuntimeException("Unsupported join type : " + joinType.name());
    }

    protected JoinAccumulator doInnerJoin(JoinAccumulator probe, Map<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo, boolean finalJoin) {
        String[] probeKeyName = joinInfo.getOtherField();
        JoinAccumulator result2 = new JoinAccumulator();
        FieldSelector fieldSelector = new FieldSelector(joinInfo.other.getStreamName(), probeKeyName);
        for (ResultRecord rec : probe.getRecords()) {
            ArrayList<Tuple> matchingBuildRecs;
            Object probeKey = rec.getField(fieldSelector);
            if (probeKey == null || (matchingBuildRecs = buildInput.get(probeKey)) == null) continue;
            for (Tuple matchingRec : matchingBuildRecs) {
                ResultRecord mergedRecord = new ResultRecord(rec, matchingRec, finalJoin);
                result2.insert(mergedRecord);
            }
        }
        return result2;
    }

    protected JoinAccumulator doLeftJoin(JoinAccumulator probe, Map<Object, ArrayList<Tuple>> buildInput, JoinInfo joinInfo, boolean finalJoin) {
        String[] probeKeyName = joinInfo.getOtherField();
        JoinAccumulator result2 = new JoinAccumulator();
        FieldSelector fieldSelector = new FieldSelector(joinInfo.other.getStreamName(), probeKeyName);
        for (ResultRecord rec : probe.getRecords()) {
            Object probeKey = rec.getField(fieldSelector);
            if (probeKey == null) continue;
            ArrayList<Tuple> matchingBuildRecs = buildInput.get(probeKey);
            if (matchingBuildRecs != null && !matchingBuildRecs.isEmpty()) {
                for (Tuple matchingRec : matchingBuildRecs) {
                    ResultRecord mergedRecord = new ResultRecord(rec, matchingRec, finalJoin);
                    result2.insert(mergedRecord);
                }
                continue;
            }
            ResultRecord mergedRecord = new ResultRecord(rec, null, finalJoin);
            result2.insert(mergedRecord);
        }
        return result2;
    }

    private Object getJoinField(String streamId, Tuple tuple) {
        JoinInfo ji = this.joinCriteria.get(streamId);
        if (ji == null) {
            throw new RuntimeException("Join information for '" + streamId + "' not found. Check the join clauses.");
        }
        return this.lookupField(ji.getJoinField(), tuple);
    }

    private String getStreamSelector(Tuple ti) {
        switch (this.selectorType) {
            case STREAM: {
                return ti.getSourceStreamId();
            }
            case SOURCE: {
                return ti.getSourceComponent();
            }
        }
        throw new RuntimeException(this.selectorType + " stream selector type not yet supported");
    }

    protected ArrayList<Object> doProjection(ArrayList<Tuple> tuples, FieldSelector[] projectionFields) {
        ArrayList<Object> result2 = new ArrayList<Object>(projectionFields.length);
        for (int i = 0; i < projectionFields.length; ++i) {
            boolean missingField = true;
            for (Tuple tuple : tuples) {
                Object field = this.lookupField(projectionFields[i], tuple);
                if (field == null) continue;
                result2.add(field);
                missingField = false;
                break;
            }
            if (!missingField) continue;
            result2.add(null);
        }
        return result2;
    }

    protected Object lookupField(FieldSelector fieldSelector, Tuple tuple) {
        if (fieldSelector.streamName != null && !fieldSelector.streamName.equalsIgnoreCase(this.getStreamSelector(tuple))) {
            return null;
        }
        Object curr = null;
        for (int i = 0; i < fieldSelector.field.length; ++i) {
            if (i == 0) {
                if (tuple.contains(fieldSelector.field[i])) {
                    curr = tuple.getValueByField(fieldSelector.field[i]);
                    continue;
                }
                return null;
            }
            if ((curr = (Object)((Map)curr).get(fieldSelector.field[i])) != null) continue;
            return null;
        }
        return curr;
    }

    @Override
    public JoinBolt withWindow(BaseWindowedBolt.Count windowLength, BaseWindowedBolt.Count slidingInterval) {
        return (JoinBolt)super.withWindow(windowLength, slidingInterval);
    }

    @Override
    public JoinBolt withWindow(BaseWindowedBolt.Count windowLength, BaseWindowedBolt.Duration slidingInterval) {
        return (JoinBolt)super.withWindow(windowLength, slidingInterval);
    }

    @Override
    public JoinBolt withWindow(BaseWindowedBolt.Duration windowLength, BaseWindowedBolt.Count slidingInterval) {
        return (JoinBolt)super.withWindow(windowLength, slidingInterval);
    }

    @Override
    public JoinBolt withWindow(BaseWindowedBolt.Duration windowLength, BaseWindowedBolt.Duration slidingInterval) {
        return (JoinBolt)super.withWindow(windowLength, slidingInterval);
    }

    @Override
    public JoinBolt withWindow(BaseWindowedBolt.Count windowLength) {
        return (JoinBolt)super.withWindow(windowLength);
    }

    @Override
    public JoinBolt withWindow(BaseWindowedBolt.Duration windowLength) {
        return (JoinBolt)super.withWindow(windowLength);
    }

    @Override
    public JoinBolt withTumblingWindow(BaseWindowedBolt.Count count) {
        return (JoinBolt)super.withTumblingWindow(count);
    }

    @Override
    public JoinBolt withTumblingWindow(BaseWindowedBolt.Duration duration) {
        return (JoinBolt)super.withTumblingWindow(duration);
    }

    @Override
    public JoinBolt withTimestampField(String fieldName) {
        return (JoinBolt)super.withTimestampField(fieldName);
    }

    @Override
    public JoinBolt withTimestampExtractor(TimestampExtractor timestampExtractor) {
        return (JoinBolt)super.withTimestampExtractor(timestampExtractor);
    }

    @Override
    public JoinBolt withLateTupleStream(String streamId) {
        return (JoinBolt)super.withLateTupleStream(streamId);
    }

    @Override
    public BaseWindowedBolt withLag(BaseWindowedBolt.Duration duration) {
        return (JoinBolt)super.withLag(duration);
    }

    @Override
    public BaseWindowedBolt withWatermarkInterval(BaseWindowedBolt.Duration interval) {
        return (JoinBolt)super.withWatermarkInterval(interval);
    }

    protected class JoinAccumulator {
        ArrayList<ResultRecord> records = new ArrayList();

        protected JoinAccumulator() {
        }

        public void insert(ResultRecord tuple) {
            this.records.add(tuple);
        }

        public Collection<ResultRecord> getRecords() {
            return this.records;
        }
    }

    protected class ResultRecord {
        ArrayList<Tuple> tupleList = new ArrayList();
        ArrayList<Object> outFields = null;

        public ResultRecord(Tuple tuple, boolean generateOutputFields) {
            this.tupleList.add(tuple);
            if (generateOutputFields) {
                this.outFields = JoinBolt.this.doProjection(this.tupleList, JoinBolt.this.outputFields);
            }
        }

        public ResultRecord(ResultRecord lhs, Tuple rhs, boolean generateOutputFields) {
            if (lhs != null) {
                this.tupleList.addAll(lhs.tupleList);
            }
            if (rhs != null) {
                this.tupleList.add(rhs);
            }
            if (generateOutputFields) {
                this.outFields = JoinBolt.this.doProjection(this.tupleList, JoinBolt.this.outputFields);
            }
        }

        public ArrayList<Object> getOutputFields() {
            return this.outFields;
        }

        public Object getField(FieldSelector fieldSelector) {
            for (Tuple tuple : this.tupleList) {
                Object result2 = JoinBolt.this.lookupField(fieldSelector, tuple);
                if (result2 == null) continue;
                return result2;
            }
            return null;
        }
    }

    protected static class FieldSelector
    implements Serializable {
        String streamName;
        String[] field;
        String outputName;

        public FieldSelector(String fieldDescriptor) {
            int pos = fieldDescriptor.indexOf(58);
            if (pos > 0) {
                this.streamName = fieldDescriptor.substring(0, pos).trim();
                this.outputName = fieldDescriptor.trim();
                this.field = fieldDescriptor.substring(pos + 1, fieldDescriptor.length()).split("\\.");
                return;
            }
            this.streamName = null;
            if (pos == 0) {
                this.outputName = fieldDescriptor.substring(1, fieldDescriptor.length()).trim();
            } else if (pos < 0) {
                this.outputName = fieldDescriptor.trim();
            }
            this.field = this.outputName.split("\\.");
        }

        public FieldSelector(String stream, String fieldDescriptor) {
            this(fieldDescriptor);
            if (fieldDescriptor.indexOf(":") >= 0) {
                throw new IllegalArgumentException("Not expecting stream qualifier ':' in '" + fieldDescriptor + "'. Stream name '" + stream + "' is implicit in this context");
            }
            this.streamName = stream;
        }

        public FieldSelector(String stream, String[] field) {
            this(stream, String.join((CharSequence)".", field));
        }

        public String getStreamName() {
            return this.streamName;
        }

        public String[] getField() {
            return this.field;
        }

        public String getOutputName() {
            return this.toString();
        }

        public String toString() {
            return this.outputName;
        }
    }

    protected static class JoinInfo
    implements Serializable {
        static final long serialVersionUID = 1L;
        private JoinType joinType;
        private FieldSelector field;
        private FieldSelector other;

        public JoinInfo(FieldSelector field) {
            this.joinType = null;
            this.field = field;
            this.other = null;
        }

        public JoinInfo(FieldSelector field, String otherStream, JoinInfo otherStreamJoinInfo, JoinType joinType) {
            this.joinType = joinType;
            this.field = field;
            this.other = new FieldSelector(otherStream, otherStreamJoinInfo.field.getOutputName());
        }

        public FieldSelector getJoinField() {
            return this.field;
        }

        public String getOtherStream() {
            return this.other.getStreamName();
        }

        public String[] getOtherField() {
            return this.other.getField();
        }

        public JoinType getJoinType() {
            return this.joinType;
        }
    }

    protected static enum JoinType {
        INNER,
        LEFT,
        RIGHT,
        OUTER;

    }

    public static enum Selector {
        STREAM,
        SOURCE;

    }
}

