package org.apache.flink.runtime.operators.lifecycle.validation;

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.operators.lifecycle.TestJobWithDescription;
import org.apache.flink.runtime.operators.lifecycle.event.OperatorFinishedEvent;
import org.apache.flink.runtime.operators.lifecycle.event.TestEvent;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/operators/lifecycle/validation/TestJobDataFlowValidator.class */
public class TestJobDataFlowValidator {
    private static final Logger LOG = LoggerFactory.getLogger(TestJobDataFlowValidator.class);

    public static void checkDataFlow(TestJobWithDescription testJobWithDescription, boolean z) {
        HashMap hashMap = new HashMap();
        for (TestEvent testEvent : testJobWithDescription.eventQueue.getAll()) {
            if (testEvent instanceof OperatorFinishedEvent) {
                ((Map) hashMap.computeIfAbsent(testEvent.operatorId, str -> {
                    return new HashMap();
                })).put(Integer.valueOf(testEvent.subtaskIndex), (OperatorFinishedEvent) testEvent);
            }
        }
        for (JobVertex jobVertex : testJobWithDescription.jobGraph.getVertices()) {
            Iterator it = jobVertex.getProducedDataSets().iterator();
            while (it.hasNext()) {
                JobEdge consumer = ((IntermediateDataSet) it.next()).getConsumer();
                Optional<String> trackedOperatorID = getTrackedOperatorID(jobVertex, true, testJobWithDescription);
                Optional<String> trackedOperatorID2 = getTrackedOperatorID(consumer.getTarget(), false, testJobWithDescription);
                if (trackedOperatorID.isPresent() && trackedOperatorID2.isPresent()) {
                    String str2 = trackedOperatorID.get();
                    String str3 = trackedOperatorID2.get();
                    if (testJobWithDescription.sources.contains(str2)) {
                        LOG.debug("Legacy sources do not have the finish() method and thus do not emit FinishEvent");
                    } else {
                        checkDataFlow(str2, str3, consumer, hashMap, z);
                    }
                } else {
                    LOG.debug("Ignoring edge (untracked operator): {}", consumer);
                }
            }
        }
    }

    private static void checkDataFlow(String str, String str2, JobEdge jobEdge, Map<String, Map<Integer, OperatorFinishedEvent>> map, boolean z) {
        LOG.debug("Checking {} edge\n  from {} ({})\n  to {} ({})", new Object[]{jobEdge.getDistributionPattern(), jobEdge.getSource().getProducer().getName(), str, jobEdge.getTarget().getName(), str2});
        Map<Integer, OperatorFinishedEvent> forOperator = getForOperator(str2, map, z);
        Map<Integer, OperatorFinishedEvent> forOperator2 = getForOperator(str, map, z);
        if (z) {
            forOperator2.forEach((num, operatorFinishedEvent) -> {
                Assert.assertTrue(String.format("No downstream received %s from %s[%d]; received: %s", Long.valueOf(operatorFinishedEvent.lastSent), str, num, forOperator), anySubtaskReceived(str, num.intValue(), operatorFinishedEvent.lastSent, forOperator.values()));
            });
        }
    }

    private static boolean anySubtaskReceived(String str, int i, long j, Collection<OperatorFinishedEvent> collection) {
        return collection.stream().anyMatch(operatorFinishedEvent -> {
            return operatorFinishedEvent.getLastReceived(str, i) == j;
        });
    }

    private static Map<Integer, OperatorFinishedEvent> getForOperator(String str, Map<String, Map<Integer, OperatorFinishedEvent>> map, boolean z) {
        Map<Integer, OperatorFinishedEvent> map2 = map.get(str);
        if (z) {
            Assert.assertNotNull(String.format("Operator finish info wasn't collected with draining: %s (collected: %s)", str, map), map2);
        } else {
            Assert.assertNull(String.format("Operator finish info was collected without draining: %s (collected: %s)", str, map), map2);
        }
        return map2;
    }

    /* JADX WARN: Removed duplicated region for block: B:11:0x0038  */
    /* JADX WARN: Removed duplicated region for block: B:19:0x0044  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private static java.util.Optional<java.lang.String> getTrackedOperatorID(org.apache.flink.runtime.jobgraph.JobVertex r3, boolean r4, org.apache.flink.runtime.operators.lifecycle.TestJobWithDescription r5) {
        /*
            r0 = r3
            java.util.List r0 = r0.getOperatorIDs()
            r1 = r4
            if (r1 == 0) goto Lc
            r1 = 0
            goto L15
        Lc:
            r1 = r3
            java.util.List r1 = r1.getOperatorIDs()
            int r1 = r1.size()
        L15:
            java.util.ListIterator r0 = r0.listIterator(r1)
            r6 = r0
        L1b:
            r0 = r4
            if (r0 == 0) goto L2b
            r0 = r6
            boolean r0 = r0.hasNext()
            if (r0 == 0) goto L7b
            goto L34
        L2b:
            r0 = r6
            boolean r0 = r0.hasPrevious()
            if (r0 == 0) goto L7b
        L34:
            r0 = r4
            if (r0 == 0) goto L44
            r0 = r6
            java.lang.Object r0 = r0.next()
            org.apache.flink.runtime.OperatorIDPair r0 = (org.apache.flink.runtime.OperatorIDPair) r0
            goto L4d
        L44:
            r0 = r6
            java.lang.Object r0 = r0.previous()
            org.apache.flink.runtime.OperatorIDPair r0 = (org.apache.flink.runtime.OperatorIDPair) r0
        L4d:
            r7 = r0
            r0 = r7
            java.util.Optional r0 = r0.getUserDefinedOperatorID()
            r1 = r7
            org.apache.flink.runtime.jobgraph.OperatorID r1 = r1.getGeneratedOperatorID()
            java.lang.Object r0 = r0.orElse(r1)
            org.apache.flink.runtime.jobgraph.OperatorID r0 = (org.apache.flink.runtime.jobgraph.OperatorID) r0
            java.lang.String r0 = r0.toString()
            r8 = r0
            r0 = r5
            java.util.Set<java.lang.String> r0 = r0.operatorsWithDataFlowTracking
            r1 = r8
            boolean r0 = r0.contains(r1)
            if (r0 == 0) goto L78
            r0 = r8
            java.util.Optional r0 = java.util.Optional.of(r0)
            return r0
        L78:
            goto L1b
        L7b:
            java.util.Optional r0 = java.util.Optional.empty()
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.runtime.operators.lifecycle.validation.TestJobDataFlowValidator.getTrackedOperatorID(org.apache.flink.runtime.jobgraph.JobVertex, boolean, org.apache.flink.runtime.operators.lifecycle.TestJobWithDescription):java.util.Optional");
    }
}
