package org.apache.kafka.connect.util.clusters;

import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import javax.ws.rs.core.Response;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.class */
public class EmbeddedConnectClusterAssertions {
    private static final Logger log = LoggerFactory.getLogger(EmbeddedConnectClusterAssertions.class);
    public static final long WORKER_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
    private static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
    private final EmbeddedConnectCluster connect;

    /* JADX INFO: Access modifiers changed from: package-private */
    public EmbeddedConnectClusterAssertions(EmbeddedConnectCluster embeddedConnectCluster) {
        this.connect = embeddedConnectCluster;
    }

    public void assertAtLeastNumWorkersAreUp(int i, String str) throws InterruptedException {
        try {
            TestUtils.waitForCondition(() -> {
                return checkWorkersUp(i, (num, num2) -> {
                    return Boolean.valueOf(num.intValue() >= num2.intValue());
                }).orElse(false).booleanValue();
            }, WORKER_SETUP_DURATION_MS, "Didn't meet the minimum requested number of online workers: " + i);
        } catch (AssertionError e) {
            throw new AssertionError(str, e);
        }
    }

    public void assertExactlyNumWorkersAreUp(int i, String str) throws InterruptedException {
        try {
            TestUtils.waitForCondition(() -> {
                return checkWorkersUp(i, (num, num2) -> {
                    return Boolean.valueOf(num == num2);
                }).orElse(false).booleanValue();
            }, WORKER_SETUP_DURATION_MS, "Didn't meet the exact requested number of online workers: " + i);
        } catch (AssertionError e) {
            throw new AssertionError(str, e);
        }
    }

    protected Optional<Boolean> checkWorkersUp(int i, BiFunction<Integer, Integer, Boolean> biFunction) {
        try {
            return Optional.of(biFunction.apply(Integer.valueOf(this.connect.activeWorkers().size()), Integer.valueOf(i)));
        } catch (Exception e) {
            log.error("Could not check active workers.", e);
            return Optional.empty();
        }
    }

    public void assertConnectorAndAtLeastNumTasksAreRunning(String str, int i, String str2) throws InterruptedException {
        try {
            TestUtils.waitForCondition(() -> {
                return checkConnectorState(str, AbstractStatus.State.RUNNING, i, AbstractStatus.State.RUNNING, (num, num2) -> {
                    return Boolean.valueOf(num.intValue() >= num2.intValue());
                }).orElse(false).booleanValue();
            }, CONNECTOR_SETUP_DURATION_MS, "The connector or at least " + i + " of tasks are not running.");
        } catch (AssertionError e) {
            throw new AssertionError(str2, e);
        }
    }

    public void assertConnectorAndExactlyNumTasksAreRunning(String str, int i, String str2) throws InterruptedException {
        try {
            TestUtils.waitForCondition(() -> {
                return checkConnectorState(str, AbstractStatus.State.RUNNING, i, AbstractStatus.State.RUNNING, (num, num2) -> {
                    return Boolean.valueOf(num == num2);
                }).orElse(false).booleanValue();
            }, CONNECTOR_SETUP_DURATION_MS, "The connector or exactly " + i + " tasks are not running.");
        } catch (AssertionError e) {
            throw new AssertionError(str2, e);
        }
    }

    public void assertConnectorIsRunningAndTasksHaveFailed(String str, int i, String str2) throws InterruptedException {
        try {
            TestUtils.waitForCondition(() -> {
                return checkConnectorState(str, AbstractStatus.State.RUNNING, i, AbstractStatus.State.FAILED, (num, num2) -> {
                    return Boolean.valueOf(num.intValue() >= num2.intValue());
                }).orElse(false).booleanValue();
            }, CONNECTOR_SETUP_DURATION_MS, "Either the connector is not running or not all the " + i + " tasks have failed.");
        } catch (AssertionError e) {
            throw new AssertionError(str2, e);
        }
    }

    public void assertConnectorAndTasksAreStopped(String str, String str2) throws InterruptedException {
        try {
            TestUtils.waitForCondition(() -> {
                return checkConnectorAndTasksAreStopped(str);
            }, CONNECTOR_SETUP_DURATION_MS, "At least the connector or one of its tasks is still");
        } catch (AssertionError e) {
            throw new AssertionError(str2, e);
        }
    }

    protected boolean checkConnectorAndTasksAreStopped(String str) {
        try {
            ConnectorStateInfo connectorStatus = this.connect.connectorStatus(str);
            if (connectorStatus == null) {
                return true;
            }
            return !connectorStatus.connector().state().equals(AbstractStatus.State.RUNNING.toString()) && connectorStatus.tasks().stream().noneMatch(taskState -> {
                return taskState.state().equals(AbstractStatus.State.RUNNING.toString());
            });
        } catch (Exception e) {
            log.error("Could not check connector state info.", e);
            return false;
        } catch (ConnectRestException e2) {
            return e2.statusCode() == Response.Status.NOT_FOUND.getStatusCode();
        }
    }

    protected Optional<Boolean> checkConnectorState(String str, AbstractStatus.State state, int i, AbstractStatus.State state2, BiFunction<Integer, Integer, Boolean> biFunction) {
        try {
            ConnectorStateInfo connectorStatus = this.connect.connectorStatus(str);
            return Optional.of(Boolean.valueOf(connectorStatus != null && biFunction.apply(Integer.valueOf(connectorStatus.tasks().size()), Integer.valueOf(i)).booleanValue() && connectorStatus.connector().state().equals(state.toString()) && connectorStatus.tasks().stream().allMatch(taskState -> {
                return taskState.state().equals(state2.toString());
            })));
        } catch (Exception e) {
            log.error("Could not check connector state info.", e);
            return Optional.empty();
        }
    }
}
