package com.twitter.heron.dsl.impl.operators;

import com.twitter.heron.api.bolt.OutputCollector;
import com.twitter.heron.api.topology.TopologyContext;
import com.twitter.heron.api.tuple.Tuple;
import com.twitter.heron.api.tuple.Values;
import com.twitter.heron.api.windowing.TupleWindow;
import com.twitter.heron.dsl.KeyValue;
import com.twitter.heron.dsl.KeyedWindow;
import com.twitter.heron.dsl.SerializableBiFunction;
import com.twitter.heron.dsl.Window;
import java.util.HashMap;
import java.util.Map;

/* loaded from: input_file:com/twitter/heron/dsl/impl/operators/JoinOperator.class */
public class JoinOperator<K, V1, V2, VR> extends DslWindowOperator {
    private static final long serialVersionUID = 4875450390444745407L;
    public static final String LEFT_COMPONENT_NAME = "_dsl_joinbolt_left_component_name_";
    public static final String RIGHT_COMPONENT_NAME = "_dsl_joinbolt_right_component_name_";
    private JoinType joinType;
    private String leftComponent;
    private String rightComponent;
    private SerializableBiFunction<? super V1, ? super V2, ? extends VR> joinFn;
    private OutputCollector collector;

    /* loaded from: input_file:com/twitter/heron/dsl/impl/operators/JoinOperator$JoinType.class */
    public enum JoinType {
        INNER,
        LEFT,
        OUTER
    }

    public JoinOperator(JoinType joinType, String str, String str2, SerializableBiFunction<? super V1, ? super V2, ? extends VR> serializableBiFunction) {
        this.joinType = joinType;
        this.leftComponent = str;
        this.rightComponent = str2;
        this.joinFn = serializableBiFunction;
    }

    @Override // com.twitter.heron.api.bolt.BaseWindowedBolt, com.twitter.heron.api.bolt.IWindowedBolt
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }

    @Override // com.twitter.heron.api.bolt.BaseWindowedBolt, com.twitter.heron.api.topology.IComponent
    public Map<String, Object> getComponentConfiguration() {
        Map<String, Object> componentConfiguration = super.getComponentConfiguration();
        componentConfiguration.put(LEFT_COMPONENT_NAME, this.leftComponent);
        componentConfiguration.put(RIGHT_COMPONENT_NAME, this.rightComponent);
        return componentConfiguration;
    }

    @Override // com.twitter.heron.api.bolt.IWindowedBolt
    public void execute(TupleWindow tupleWindow) {
        HashMap hashMap = new HashMap();
        for (Tuple tuple : tupleWindow.get()) {
            if (tuple.getSourceComponent().equals(this.leftComponent)) {
                KeyValue<K, V1> keyValue = (KeyValue) tuple.getValue(0);
                if (keyValue.getKey() != null) {
                    addMapLeft(hashMap, keyValue);
                }
            } else {
                KeyValue<K, V2> keyValue2 = (KeyValue) tuple.getValue(0);
                if (keyValue2.getKey() != null) {
                    addMapRight(hashMap, keyValue2);
                }
            }
        }
        evaluateJoinMap(hashMap, tupleWindow);
    }

    private void evaluateJoinMap(Map<K, KeyValue<V1, V2>> map, TupleWindow tupleWindow) {
        for (K k : map.keySet()) {
            KeyValue<V1, V2> keyValue = map.get(k);
            switch (this.joinType) {
                case INNER:
                    if (keyValue.getKey() != null && keyValue.getValue() != null) {
                        joinAndEmit(k, tupleWindow, keyValue);
                        break;
                    }
                    break;
                case LEFT:
                    if (keyValue.getKey() != null) {
                        joinAndEmit(k, tupleWindow, keyValue);
                        break;
                    } else {
                        break;
                    }
                case OUTER:
                    joinAndEmit(k, tupleWindow, keyValue);
                    break;
                default:
                    throw new RuntimeException("Unknown join type " + this.joinType.name());
            }
        }
    }

    private void addMapLeft(Map<K, KeyValue<V1, V2>> map, KeyValue<K, V1> keyValue) {
        if (map.containsKey(keyValue.getKey())) {
            map.get(keyValue.getKey()).setKey(keyValue.getValue());
        } else {
            map.put(keyValue.getKey(), new KeyValue<>(keyValue.getValue(), null));
        }
    }

    private void addMapRight(Map<K, KeyValue<V1, V2>> map, KeyValue<K, V2> keyValue) {
        if (map.containsKey(keyValue.getKey())) {
            map.get(keyValue.getKey()).setValue(keyValue.getValue());
        } else {
            map.put(keyValue.getKey(), new KeyValue<>(null, keyValue.getValue()));
        }
    }

    private void joinAndEmit(K k, TupleWindow tupleWindow, KeyValue<V1, V2> keyValue) {
        this.collector.emit(new Values(new KeyValue(new KeyedWindow(k, new Window(tupleWindow.getStartTimestamp() == null ? 0L : tupleWindow.getStartTimestamp().longValue(), tupleWindow.getEndTimestamp() == null ? 0L : tupleWindow.getEndTimestamp().longValue(), tupleWindow.get().size())), this.joinFn.apply(keyValue.getKey(), keyValue.getValue()))));
    }
}
