package org.apache.storm.bolt;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.storm.shade.com.google.common.base.Joiner;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.windowing.TimestampExtractor;
import org.apache.storm.windowing.TupleWindow;

/* loaded from: input_file:org/apache/storm/bolt/JoinBolt.class */
public class JoinBolt extends BaseWindowedBolt {
    private OutputCollector collector;
    HashMap<String, HashMap<Object, ArrayList<Tuple>>> hashedInputs;
    protected LinkedHashMap<String, JoinInfo> joinCriteria;
    protected FieldSelector[] outputFields;
    protected String outputStreamName;
    protected final Selector selectorType;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/storm/bolt/JoinBolt$FieldSelector.class */
    public static class FieldSelector implements Serializable {
        String streamName;
        String[] field;
        String outputName;

        public FieldSelector(String str) {
            int indexOf = str.indexOf(58);
            if (indexOf > 0) {
                this.streamName = str.substring(0, indexOf).trim();
                this.outputName = str.trim();
                this.field = str.substring(indexOf + 1, str.length()).split("\\.");
            } else {
                this.streamName = null;
                if (indexOf == 0) {
                    this.outputName = str.substring(1, str.length()).trim();
                } else if (indexOf < 0) {
                    this.outputName = str.trim();
                }
                this.field = this.outputName.split("\\.");
            }
        }

        public FieldSelector(String str, String str2) {
            this(str2);
            if (str2.indexOf(":") >= 0) {
                throw new IllegalArgumentException("Not expecting stream qualifier ':' in '" + str2 + "'. Stream name '" + str + "' is implicit in this context");
            }
            this.streamName = str;
        }

        public FieldSelector(String str, String[] strArr) {
            this(str, Joiner.on(".").join(strArr));
        }

        public String getStreamName() {
            return this.streamName;
        }

        public String[] getField() {
            return this.field;
        }

        public String getOutputName() {
            return toString();
        }

        public String toString() {
            return this.outputName;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/storm/bolt/JoinBolt$JoinAccumulator.class */
    public class JoinAccumulator {
        ArrayList<ResultRecord> records = new ArrayList<>();

        protected JoinAccumulator() {
        }

        public void insert(ResultRecord resultRecord) {
            this.records.add(resultRecord);
        }

        public Collection<ResultRecord> getRecords() {
            return this.records;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/storm/bolt/JoinBolt$JoinInfo.class */
    public static class JoinInfo implements Serializable {
        static final long serialVersionUID = 1;
        private JoinType joinType;
        private FieldSelector field;
        private FieldSelector other;

        public JoinInfo(FieldSelector fieldSelector) {
            this.joinType = null;
            this.field = fieldSelector;
            this.other = null;
        }

        public JoinInfo(FieldSelector fieldSelector, String str, JoinInfo joinInfo, JoinType joinType) {
            this.joinType = joinType;
            this.field = fieldSelector;
            this.other = new FieldSelector(str, joinInfo.field.getOutputName());
        }

        public FieldSelector getJoinField() {
            return this.field;
        }

        public String getOtherStream() {
            return this.other.getStreamName();
        }

        public String[] getOtherField() {
            return this.other.getField();
        }

        public JoinType getJoinType() {
            return this.joinType;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/storm/bolt/JoinBolt$JoinType.class */
    public enum JoinType {
        INNER,
        LEFT,
        RIGHT,
        OUTER
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/storm/bolt/JoinBolt$ResultRecord.class */
    public class ResultRecord {
        ArrayList<Tuple> tupleList = new ArrayList<>();
        ArrayList<Object> outFields;

        public ResultRecord(Tuple tuple, boolean z) {
            this.outFields = null;
            this.tupleList.add(tuple);
            if (z) {
                this.outFields = JoinBolt.this.doProjection(this.tupleList, JoinBolt.this.outputFields);
            }
        }

        public ResultRecord(ResultRecord resultRecord, Tuple tuple, boolean z) {
            this.outFields = null;
            if (resultRecord != null) {
                this.tupleList.addAll(resultRecord.tupleList);
            }
            if (tuple != null) {
                this.tupleList.add(tuple);
            }
            if (z) {
                this.outFields = JoinBolt.this.doProjection(this.tupleList, JoinBolt.this.outputFields);
            }
        }

        public ArrayList<Object> getOutputFields() {
            return this.outFields;
        }

        public Object getField(FieldSelector fieldSelector) {
            Iterator<Tuple> it = this.tupleList.iterator();
            while (it.hasNext()) {
                Object lookupField = JoinBolt.this.lookupField(fieldSelector, it.next());
                if (lookupField != null) {
                    return lookupField;
                }
            }
            return null;
        }
    }

    /* loaded from: input_file:org/apache/storm/bolt/JoinBolt$Selector.class */
    public enum Selector {
        STREAM,
        SOURCE
    }

    public JoinBolt(String str, String str2) {
        this(Selector.SOURCE, str, str2);
    }

    public JoinBolt(Selector selector, String str, String str2) {
        this.hashedInputs = new HashMap<>();
        this.joinCriteria = new LinkedHashMap<>();
        this.selectorType = selector;
        this.joinCriteria.put(str, new JoinInfo(new FieldSelector(str, str2)));
    }

    public JoinBolt withOutputStream(String str) {
        this.outputStreamName = str;
        return this;
    }

    public JoinBolt join(String str, String str2, String str3) {
        return joinCommon(str, str2, str3, JoinType.INNER);
    }

    public JoinBolt leftJoin(String str, String str2, String str3) {
        return joinCommon(str, str2, str3, JoinType.LEFT);
    }

    private JoinBolt joinCommon(String str, String str2, String str3, JoinType joinType) {
        if (this.hashedInputs.containsKey(str)) {
            throw new IllegalArgumentException("'" + str + "' is already part of join. Cannot join with it more than once.");
        }
        this.hashedInputs.put(str, new HashMap<>());
        JoinInfo joinInfo = this.joinCriteria.get(str3);
        if (joinInfo == null) {
            throw new IllegalArgumentException("Stream '" + str3 + "' was not previously declared");
        }
        this.joinCriteria.put(str, new JoinInfo(new FieldSelector(str, str2), str3, joinInfo, joinType));
        return this;
    }

    public JoinBolt select(String str) {
        String[] split = str.split(",");
        this.outputFields = new FieldSelector[split.length];
        for (int i = 0; i < split.length; i++) {
            this.outputFields[i] = new FieldSelector(split[i]);
        }
        return this;
    }

    @Override // org.apache.storm.topology.base.BaseWindowedBolt, org.apache.storm.topology.IComponent
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        String[] strArr = new String[this.outputFields.length];
        for (int i = 0; i < this.outputFields.length; i++) {
            strArr[i] = this.outputFields[i].getOutputName();
        }
        if (this.outputStreamName != null) {
            outputFieldsDeclarer.declareStream(this.outputStreamName, new Fields(strArr));
        } else {
            outputFieldsDeclarer.declare(new Fields(strArr));
        }
    }

    @Override // org.apache.storm.topology.base.BaseWindowedBolt, org.apache.storm.topology.IWindowedBolt
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        int i = 0;
        for (String str : this.joinCriteria.keySet()) {
            if (i > 0) {
                this.hashedInputs.put(str, new HashMap<>());
            }
            i++;
        }
        if (this.outputFields == null) {
            throw new IllegalArgumentException("Must specify output fields via .select() method.");
        }
    }

    @Override // org.apache.storm.topology.IWindowedBolt
    public void execute(TupleWindow tupleWindow) {
        for (ResultRecord resultRecord : hashJoin(tupleWindow.get()).getRecords()) {
            ArrayList<Object> outputFields = resultRecord.getOutputFields();
            if (this.outputStreamName == null) {
                this.collector.emit(resultRecord.tupleList, outputFields);
            } else {
                this.collector.emit(this.outputStreamName, resultRecord.tupleList, outputFields);
            }
        }
    }

    private void clearHashedInputs() {
        Iterator<HashMap<Object, ArrayList<Tuple>>> it = this.hashedInputs.values().iterator();
        while (it.hasNext()) {
            it.next().clear();
        }
    }

    protected JoinAccumulator hashJoin(List<Tuple> list) {
        clearHashedInputs();
        JoinAccumulator joinAccumulator = new JoinAccumulator();
        String next = this.joinCriteria.keySet().iterator().next();
        for (Tuple tuple : list) {
            String streamSelector = getStreamSelector(tuple);
            if (streamSelector.equals(next)) {
                joinAccumulator.insert(new ResultRecord(tuple, this.joinCriteria.size() == 1));
            } else {
                Object joinField = getJoinField(streamSelector, tuple);
                ArrayList<Tuple> arrayList = this.hashedInputs.get(streamSelector).get(joinField);
                if (arrayList == null) {
                    arrayList = new ArrayList<>();
                    this.hashedInputs.get(streamSelector).put(joinField, arrayList);
                }
                arrayList.add(tuple);
            }
        }
        int i = 0;
        for (String str : this.joinCriteria.keySet()) {
            boolean z = i == this.joinCriteria.size() - 1;
            if (i > 0) {
                joinAccumulator = doJoin(joinAccumulator, this.hashedInputs.get(str), this.joinCriteria.get(str), z);
            }
            i++;
        }
        return joinAccumulator;
    }

    protected JoinAccumulator doJoin(JoinAccumulator joinAccumulator, HashMap<Object, ArrayList<Tuple>> hashMap, JoinInfo joinInfo, boolean z) {
        JoinType joinType = joinInfo.getJoinType();
        switch (joinType) {
            case INNER:
                return doInnerJoin(joinAccumulator, hashMap, joinInfo, z);
            case LEFT:
                return doLeftJoin(joinAccumulator, hashMap, joinInfo, z);
            case RIGHT:
            case OUTER:
            default:
                throw new RuntimeException("Unsupported join type : " + joinType.name());
        }
    }

    protected JoinAccumulator doInnerJoin(JoinAccumulator joinAccumulator, Map<Object, ArrayList<Tuple>> map, JoinInfo joinInfo, boolean z) {
        ArrayList<Tuple> arrayList;
        String[] otherField = joinInfo.getOtherField();
        JoinAccumulator joinAccumulator2 = new JoinAccumulator();
        FieldSelector fieldSelector = new FieldSelector(joinInfo.other.getStreamName(), otherField);
        for (ResultRecord resultRecord : joinAccumulator.getRecords()) {
            Object field = resultRecord.getField(fieldSelector);
            if (field != null && (arrayList = map.get(field)) != null) {
                Iterator<Tuple> it = arrayList.iterator();
                while (it.hasNext()) {
                    joinAccumulator2.insert(new ResultRecord(resultRecord, it.next(), z));
                }
            }
        }
        return joinAccumulator2;
    }

    protected JoinAccumulator doLeftJoin(JoinAccumulator joinAccumulator, Map<Object, ArrayList<Tuple>> map, JoinInfo joinInfo, boolean z) {
        String[] otherField = joinInfo.getOtherField();
        JoinAccumulator joinAccumulator2 = new JoinAccumulator();
        FieldSelector fieldSelector = new FieldSelector(joinInfo.other.getStreamName(), otherField);
        for (ResultRecord resultRecord : joinAccumulator.getRecords()) {
            Object field = resultRecord.getField(fieldSelector);
            if (field != null) {
                ArrayList<Tuple> arrayList = map.get(field);
                if (arrayList == null || arrayList.isEmpty()) {
                    joinAccumulator2.insert(new ResultRecord(resultRecord, null, z));
                } else {
                    Iterator<Tuple> it = arrayList.iterator();
                    while (it.hasNext()) {
                        joinAccumulator2.insert(new ResultRecord(resultRecord, it.next(), z));
                    }
                }
            }
        }
        return joinAccumulator2;
    }

    private Object getJoinField(String str, Tuple tuple) {
        JoinInfo joinInfo = this.joinCriteria.get(str);
        if (joinInfo == null) {
            throw new RuntimeException("Join information for '" + str + "' not found. Check the join clauses.");
        }
        return lookupField(joinInfo.getJoinField(), tuple);
    }

    private String getStreamSelector(Tuple tuple) {
        switch (this.selectorType) {
            case STREAM:
                return tuple.getSourceStreamId();
            case SOURCE:
                return tuple.getSourceComponent();
            default:
                throw new RuntimeException(this.selectorType + " stream selector type not yet supported");
        }
    }

    protected ArrayList<Object> doProjection(ArrayList<Tuple> arrayList, FieldSelector[] fieldSelectorArr) {
        ArrayList<Object> arrayList2 = new ArrayList<>(fieldSelectorArr.length);
        for (FieldSelector fieldSelector : fieldSelectorArr) {
            boolean z = true;
            Iterator<Tuple> it = arrayList.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Object lookupField = lookupField(fieldSelector, it.next());
                if (lookupField != null) {
                    arrayList2.add(lookupField);
                    z = false;
                    break;
                }
            }
            if (z) {
                arrayList2.add(null);
            }
        }
        return arrayList2;
    }

    protected Object lookupField(FieldSelector fieldSelector, Tuple tuple) {
        if (fieldSelector.streamName != null && !fieldSelector.streamName.equalsIgnoreCase(getStreamSelector(tuple))) {
            return null;
        }
        Object obj = null;
        for (int i = 0; i < fieldSelector.field.length; i++) {
            if (i != 0) {
                obj = ((Map) obj).get(fieldSelector.field[i]);
                if (obj == null) {
                    return null;
                }
            } else {
                if (!tuple.contains(fieldSelector.field[i])) {
                    return null;
                }
                obj = tuple.getValueByField(fieldSelector.field[i]);
            }
        }
        return obj;
    }

    @Override // org.apache.storm.topology.base.BaseWindowedBolt
    public JoinBolt withWindow(BaseWindowedBolt.Count count, BaseWindowedBolt.Count count2) {
        return (JoinBolt) super.withWindow(count, count2);
    }

    @Override // org.apache.storm.topology.base.BaseWindowedBolt
    public JoinBolt withWindow(BaseWindowedBolt.Count count, BaseWindowedBolt.Duration duration) {
        return (JoinBolt) super.withWindow(count, duration);
    }

    @Override // org.apache.storm.topology.base.BaseWindowedBolt
    public JoinBolt withWindow(BaseWindowedBolt.Duration duration, BaseWindowedBolt.Count count) {
        return (JoinBolt) super.withWindow(duration, count);
    }

    @Override // org.apache.storm.topology.base.BaseWindowedBolt
    public JoinBolt withWindow(BaseWindowedBolt.Duration duration, BaseWindowedBolt.Duration duration2) {
        return (JoinBolt) super.withWindow(duration, duration2);
    }

    @Override // org.apache.storm.topology.base.BaseWindowedBolt
    public JoinBolt withWindow(BaseWindowedBolt.Count count) {
        return (JoinBolt) super.withWindow(count);
    }

    @Override // org.apache.storm.topology.base.BaseWindowedBolt
    public JoinBolt withWindow(BaseWindowedBolt.Duration duration) {
        return (JoinBolt) super.withWindow(duration);
    }

    @Override // org.apache.storm.topology.base.BaseWindowedBolt
    public JoinBolt withTumblingWindow(BaseWindowedBolt.Count count) {
        return (JoinBolt) super.withTumblingWindow(count);
    }

    @Override // org.apache.storm.topology.base.BaseWindowedBolt
    public JoinBolt withTumblingWindow(BaseWindowedBolt.Duration duration) {
        return (JoinBolt) super.withTumblingWindow(duration);
    }

    @Override // org.apache.storm.topology.base.BaseWindowedBolt
    public JoinBolt withTimestampField(String str) {
        return (JoinBolt) super.withTimestampField(str);
    }

    @Override // org.apache.storm.topology.base.BaseWindowedBolt
    public JoinBolt withTimestampExtractor(TimestampExtractor timestampExtractor) {
        return (JoinBolt) super.withTimestampExtractor(timestampExtractor);
    }

    @Override // org.apache.storm.topology.base.BaseWindowedBolt
    public JoinBolt withLateTupleStream(String str) {
        return (JoinBolt) super.withLateTupleStream(str);
    }

    @Override // org.apache.storm.topology.base.BaseWindowedBolt
    public BaseWindowedBolt withLag(BaseWindowedBolt.Duration duration) {
        return (JoinBolt) super.withLag(duration);
    }

    @Override // org.apache.storm.topology.base.BaseWindowedBolt
    public BaseWindowedBolt withWatermarkInterval(BaseWindowedBolt.Duration duration) {
        return (JoinBolt) super.withWatermarkInterval(duration);
    }
}
