package org.apache.beam.it.gcp.datastream;

import com.google.api.gax.core.CredentialsProvider;
import com.google.cloud.datastream.v1.ConnectionProfile;
import com.google.cloud.datastream.v1.CreateConnectionProfileRequest;
import com.google.cloud.datastream.v1.CreateStreamRequest;
import com.google.cloud.datastream.v1.DatastreamClient;
import com.google.cloud.datastream.v1.DeleteConnectionProfileRequest;
import com.google.cloud.datastream.v1.DeleteStreamRequest;
import com.google.cloud.datastream.v1.DestinationConfig;
import com.google.cloud.datastream.v1.SourceConfig;
import com.google.cloud.datastream.v1.Stream;
import com.google.cloud.datastream.v1.UpdateStreamRequest;
import com.google.common.truth.Truth;
import java.util.concurrent.ExecutionException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Answers;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/it/gcp/datastream/DatastreamResourceManagerTest.class */
public class DatastreamResourceManagerTest {

    @Rule
    public final MockitoRule mockito = MockitoJUnit.rule();
    private static final String CONNECTION_PROFILE_ID = "test-connection-profile-id";
    private static final String STREAM_ID = "test-stream-id";
    private static final String PROJECT_ID = "test-project";
    private static final String LOCATION = "test-location";
    private static final String BUCKET = "test-bucket";
    private static final String ROOT_PATH = "/test-root-path";
    private static final int RESOURCE_COUNT = 5;

    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
    private DatastreamClient datastreamClient;
    private DatastreamResourceManager testManager;

    @Before
    public void setup() {
        this.testManager = new DatastreamResourceManager(this.datastreamClient, DatastreamResourceManager.builder(PROJECT_ID, LOCATION, (CredentialsProvider) null));
    }

    @Test
    public void testBuilderWithInvalidProjectShouldFail() {
        Truth.assertThat((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            DatastreamResourceManager.builder("", LOCATION, (CredentialsProvider) null);
        })).hasMessageThat().contains("projectID can not be null or empty");
    }

    @Test
    public void testBuilderWithInvalidLocationShouldFail() {
        Truth.assertThat((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            DatastreamResourceManager.builder(PROJECT_ID, "", (CredentialsProvider) null);
        })).hasMessageThat().contains("location can not be null or empty");
    }

    @Test
    public void testCreateBQDestinationConnectionProfileExecutionExceptionShouldFail() throws ExecutionException, InterruptedException {
        Mockito.when((ConnectionProfile) this.datastreamClient.createConnectionProfileAsync((CreateConnectionProfileRequest) ArgumentMatchers.any(CreateConnectionProfileRequest.class)).get()).thenThrow(ExecutionException.class);
        Truth.assertThat(Assert.assertThrows(DatastreamResourceManagerException.class, () -> {
            this.testManager.createBQDestinationConnectionProfile(CONNECTION_PROFILE_ID);
        })).hasMessageThat().contains("Failed to create BQ destination connection profile.");
    }

    @Test
    public void testCreateBQDestinationConnectionInterruptedExceptionShouldFail() throws ExecutionException, InterruptedException {
        Mockito.when((ConnectionProfile) this.datastreamClient.createConnectionProfileAsync((CreateConnectionProfileRequest) ArgumentMatchers.any(CreateConnectionProfileRequest.class)).get()).thenThrow(InterruptedException.class);
        Truth.assertThat(Assert.assertThrows(DatastreamResourceManagerException.class, () -> {
            this.testManager.createBQDestinationConnectionProfile(CONNECTION_PROFILE_ID);
        })).hasMessageThat().contains("Failed to create BQ destination connection profile.");
    }

    @Test
    public void testCreateBQDestinationConnectionShouldCreateSuccessfully() throws ExecutionException, InterruptedException {
        ConnectionProfile defaultInstance = ConnectionProfile.getDefaultInstance();
        Mockito.when((ConnectionProfile) this.datastreamClient.createConnectionProfileAsync((CreateConnectionProfileRequest) ArgumentMatchers.any(CreateConnectionProfileRequest.class)).get()).thenReturn(defaultInstance);
        Truth.assertThat(this.testManager.createBQDestinationConnectionProfile(CONNECTION_PROFILE_ID)).isEqualTo(defaultInstance);
    }

    @Test
    public void testCreateGCSDestinationConnectionProfileWithInvalidGCSRootPathShouldFail() {
        Truth.assertThat((IllegalArgumentException) Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.testManager.createGCSDestinationConnectionProfile(CONNECTION_PROFILE_ID, BUCKET, "invalid");
        })).hasMessageThat().contains("gcsRootPath must either be an empty string or start with a '/'");
    }

    @Test
    public void testCreateGCSDestinationConnectionProfileExecutionExceptionShouldFail() throws ExecutionException, InterruptedException {
        Mockito.when((ConnectionProfile) this.datastreamClient.createConnectionProfileAsync((CreateConnectionProfileRequest) ArgumentMatchers.any(CreateConnectionProfileRequest.class)).get()).thenThrow(ExecutionException.class);
        Truth.assertThat(Assert.assertThrows(DatastreamResourceManagerException.class, () -> {
            this.testManager.createGCSDestinationConnectionProfile(CONNECTION_PROFILE_ID, BUCKET, ROOT_PATH);
        })).hasMessageThat().contains("Failed to create GCS source connection profile.");
    }

    @Test
    public void testCreateGCSDestinationConnectionProfileInterruptedExceptionShouldFail() throws ExecutionException, InterruptedException {
        Mockito.when((ConnectionProfile) this.datastreamClient.createConnectionProfileAsync((CreateConnectionProfileRequest) ArgumentMatchers.any(CreateConnectionProfileRequest.class)).get()).thenThrow(InterruptedException.class);
        Truth.assertThat(Assert.assertThrows(DatastreamResourceManagerException.class, () -> {
            this.testManager.createGCSDestinationConnectionProfile(CONNECTION_PROFILE_ID, BUCKET, ROOT_PATH);
        })).hasMessageThat().contains("Failed to create GCS source connection profile.");
    }

    @Test
    public void testCreateGCSDestinationConnectionShouldCreateSuccessfully() throws ExecutionException, InterruptedException {
        ConnectionProfile defaultInstance = ConnectionProfile.getDefaultInstance();
        Mockito.when((ConnectionProfile) this.datastreamClient.createConnectionProfileAsync((CreateConnectionProfileRequest) ArgumentMatchers.any(CreateConnectionProfileRequest.class)).get()).thenReturn(defaultInstance);
        Truth.assertThat(this.testManager.createGCSDestinationConnectionProfile(CONNECTION_PROFILE_ID, BUCKET, ROOT_PATH)).isEqualTo(defaultInstance);
    }

    @Test
    public void testCreateStreamExecutionExceptionShouldFail() throws ExecutionException, InterruptedException {
        Mockito.when((Stream) this.datastreamClient.createStreamAsync((CreateStreamRequest) ArgumentMatchers.any(CreateStreamRequest.class)).get()).thenThrow(ExecutionException.class);
        Truth.assertThat(Assert.assertThrows(DatastreamResourceManagerException.class, () -> {
            this.testManager.createStream(STREAM_ID, SourceConfig.getDefaultInstance(), DestinationConfig.getDefaultInstance());
        })).hasMessageThat().contains("Failed to create stream.");
    }

    @Test
    public void testCreateStreamInterruptedExceptionShouldFail() throws ExecutionException, InterruptedException {
        Mockito.when((Stream) this.datastreamClient.createStreamAsync((CreateStreamRequest) ArgumentMatchers.any(CreateStreamRequest.class)).get()).thenThrow(InterruptedException.class);
        Truth.assertThat(Assert.assertThrows(DatastreamResourceManagerException.class, () -> {
            this.testManager.createStream(STREAM_ID, SourceConfig.getDefaultInstance(), DestinationConfig.getDefaultInstance());
        })).hasMessageThat().contains("Failed to create stream.");
    }

    @Test
    public void testCreateStreamShouldCreateSuccessfully() throws ExecutionException, InterruptedException {
        Stream defaultInstance = Stream.getDefaultInstance();
        Mockito.when((Stream) this.datastreamClient.createStreamAsync((CreateStreamRequest) ArgumentMatchers.any(CreateStreamRequest.class)).get()).thenReturn(defaultInstance);
        Truth.assertThat(this.testManager.createStream(STREAM_ID, SourceConfig.getDefaultInstance(), DestinationConfig.getDefaultInstance())).isEqualTo(defaultInstance);
    }

    @Test
    public void testUpdateStreamStateInterruptedExceptionShouldFail() throws ExecutionException, InterruptedException {
        Mockito.when((Stream) this.datastreamClient.updateStreamAsync((UpdateStreamRequest) ArgumentMatchers.any(UpdateStreamRequest.class)).get()).thenThrow(InterruptedException.class);
        Truth.assertThat(Assert.assertThrows(DatastreamResourceManagerException.class, () -> {
            this.testManager.updateStreamState(STREAM_ID, Stream.State.RUNNING);
        })).hasMessageThat().contains("Failed to update stream.");
    }

    @Test
    public void testUpdateStreamStateExecutionExceptionShouldFail() throws ExecutionException, InterruptedException {
        Mockito.when((Stream) this.datastreamClient.updateStreamAsync((UpdateStreamRequest) ArgumentMatchers.any(UpdateStreamRequest.class)).get()).thenThrow(ExecutionException.class);
        Truth.assertThat(Assert.assertThrows(DatastreamResourceManagerException.class, () -> {
            this.testManager.updateStreamState(STREAM_ID, Stream.State.RUNNING);
        })).hasMessageThat().contains("Failed to update stream.");
    }

    @Test
    public void testUpdateStreamStateShouldCreateSuccessfully() throws ExecutionException, InterruptedException {
        Stream defaultInstance = Stream.getDefaultInstance();
        Mockito.when((Stream) this.datastreamClient.updateStreamAsync((UpdateStreamRequest) ArgumentMatchers.any(UpdateStreamRequest.class)).get()).thenReturn(defaultInstance);
        Truth.assertThat(this.testManager.updateStreamState(STREAM_ID, Stream.State.RUNNING)).isEqualTo(defaultInstance);
    }

    @Test
    public void testCleanupAllShouldDeleteSuccessfullyWhenNoErrorIsThrown() {
        ((DatastreamClient) Mockito.doNothing().when(this.datastreamClient)).close();
        for (int i = 0; i < RESOURCE_COUNT; i++) {
            this.testManager.createGCSDestinationConnectionProfile("gcs-test-connection-profile-id" + i, BUCKET, ROOT_PATH);
            this.testManager.createBQDestinationConnectionProfile("bq-test-connection-profile-id" + i);
            this.testManager.createStream(STREAM_ID + i, SourceConfig.getDefaultInstance(), DestinationConfig.getDefaultInstance());
        }
        this.testManager.cleanupAll();
        ((DatastreamClient) Mockito.verify(this.datastreamClient, Mockito.times(RESOURCE_COUNT))).deleteStreamAsync((DeleteStreamRequest) ArgumentMatchers.any(DeleteStreamRequest.class));
        ((DatastreamClient) Mockito.verify(this.datastreamClient, Mockito.times(10))).deleteConnectionProfileAsync((DeleteConnectionProfileRequest) ArgumentMatchers.any(DeleteConnectionProfileRequest.class));
        ((DatastreamClient) Mockito.verify(this.datastreamClient)).close();
    }
}
