package it;

import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.compute.v1.Instance;
import com.google.cloud.compute.v1.InstanceTemplatesClient;
import com.google.cloud.compute.v1.InstancesClient;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectId;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.ReservationName;
import com.google.cloud.pubsublite.ReservationPath;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
import com.google.cloud.pubsublite.proto.Topic;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.flogger.GoogleLogger;
import com.google.common.truth.Truth;
import com.google.protobuf.util.Durations;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import junit.framework.TestCase;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:it/StandaloneIT.class */
public class StandaloneIT extends Base {
    private static final String kafkaCpsSourceTestTopic = "cps-source-test-kafka-topic";
    private static final String kafkaCpsSinkTestTopic = "cps-sink-test-kafka-topic";
    private static final ReservationPath pslReservationPath;
    private static final String kafkaPslSinkTestTopic = "psl-sink-test-topic";
    private static final String pslSinkTopicId;
    private static final TopicPath pslSinkTopicPath;
    private static final String pslSinkSubscriptionId;
    private static final SubscriptionPath pslSinkSubscriptionPath;
    private static final String kafkaPslSourceTestTopic = "psl-source-test-topic";
    private static final String pslSourceTopicId;
    private static final TopicPath pslSourceTopicPath;
    private static final String pslSourceSubscriptionId;
    private static final SubscriptionPath pslSourceSubscriptionPath;
    private static final String instanceName;
    private static final String instanceTemplateName;
    private static AtomicBoolean initialized;
    private static Boolean cpsMessageReceived;
    private static Boolean pslMessageReceived;
    private static Instance gceKafkaInstance;
    private static String kafkaInstanceIpAddress;
    private static final Long KAFKA_PORT = 9092L;
    private static final GoogleLogger log = GoogleLogger.forEnclosingClass();
    private static final String projectId = System.getenv("GOOGLE_CLOUD_PROJECT");
    private static final String projectNumber = System.getenv("GOOGLE_CLOUD_PROJECT_NUMBER");
    private static final String pslReservationId = System.getenv("PSL_RESERVATION_ID");
    private static final String cpsSourceTopicId = "cps-source-topic-" + runId;
    private static final String cpsSourceSubscriptionId = "cps-source-subscription-" + runId;
    private static final TopicName cpsSourceTopicName = TopicName.of(projectId, cpsSourceTopicId);
    private static final SubscriptionName cpsSourceSubscriptionName = SubscriptionName.of(projectId, cpsSourceSubscriptionId);
    private static final String cpsSinkSubscriptionId = "cps-sink-subscription-" + runId;
    private static final String cpsSinkTopicId = "cps-sink-topic-" + runId;
    private static final SubscriptionName cpsSinkSubscriptionName = SubscriptionName.of(projectId, cpsSinkSubscriptionId);
    private static final TopicName cpsSinkTopicName = TopicName.of(projectId, cpsSinkTopicId);

    private static void requireEnvVar(String str) {
        TestCase.assertNotNull("Environment variable " + str + " is required to perform these tests.", System.getenv(str));
    }

    @BeforeClass
    public static void checkRequirements() {
        requireEnvVar("GOOGLE_CLOUD_PROJECT");
        requireEnvVar("GOOGLE_CLOUD_PROJECT_NUMBER");
        requireEnvVar("BUCKET_NAME");
    }

    @BeforeClass
    public static void setUp() throws Exception {
        findMavenHome();
        setupVersions();
        mavenPackage(workingDir);
        log.atInfo().log("Packaged connector jar.");
        uploadGCSResources();
        setupCpsResources();
        setupPslResources();
        setupGceInstance();
    }

    protected static void uploadGCSResources() throws Exception {
        Storage service = StorageOptions.newBuilder().setProjectId(projectId).build().getService();
        uploadGCS(service, connectorJarNameInGCS, cpsConnectorJarLoc);
        log.atInfo().log("Uploaded CPS connector jar to GCS.");
        uploadGCS(service, cpsSinkConnectorPropertiesGCSName, testResourcesDirLoc + cpsSinkConnectorPropertiesName);
        log.atInfo().log("Uploaded CPS sink connector properties file to GCS.");
        uploadGCS(service, cpsSourceConnectorPropertiesGCSName, testResourcesDirLoc + cpsSourceConnectorPropertiesName);
        log.atInfo().log("Uploaded CPS source connector properties file to GCS.");
        uploadGCS(service, pslSinkConnectorPropertiesGCSName, testResourcesDirLoc + pslSinkConnectorPropertiesName);
        log.atInfo().log("Uploaded PSL sink connector properties file to GCS.");
        uploadGCS(service, pslSourceConnectorPropertiesGCSName, testResourcesDirLoc + pslSourceConnectorPropertiesName);
        log.atInfo().log("Uploaded PSL source connector properties file to GCS.");
    }

    protected static void setupCpsResources() throws IOException {
        TopicAdminClient create = TopicAdminClient.create();
        Throwable th = null;
        try {
            create.createTopic(cpsSourceTopicName);
            log.atInfo().log("Created CPS source topic: " + cpsSourceTopicName);
            create.createTopic(cpsSinkTopicName);
            log.atInfo().log("Created CPS sink topic: " + cpsSinkTopicName);
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    create.close();
                }
            }
            SubscriptionAdminClient create2 = SubscriptionAdminClient.create();
            Throwable th3 = null;
            try {
                create2.createSubscription(Subscription.newBuilder().setName(cpsSinkSubscriptionName.toString()).setTopic(cpsSinkTopicName.toString()).build());
                log.atInfo().log("Created CPS sink subscription: " + cpsSinkSubscriptionName);
                create2.createSubscription(Subscription.newBuilder().setName(cpsSourceSubscriptionName.toString()).setTopic(cpsSourceTopicName.toString()).build());
                log.atInfo().log("Created CPS source subscription: " + cpsSourceSubscriptionName);
                if (create2 != null) {
                    if (0 == 0) {
                        create2.close();
                        return;
                    }
                    try {
                        create2.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
            } catch (Throwable th5) {
                if (create2 != null) {
                    if (0 != 0) {
                        try {
                            create2.close();
                        } catch (Throwable th6) {
                            th3.addSuppressed(th6);
                        }
                    } else {
                        create2.close();
                    }
                }
                throw th5;
            }
        } catch (Throwable th7) {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    create.close();
                }
            }
            throw th7;
        }
    }

    protected static void setupPslResources() throws Exception {
        AdminClient create = AdminClient.create(AdminClientSettings.newBuilder().setRegion(CloudRegion.of("us-central1")).build());
        Throwable th = null;
        try {
            Topic.Builder retentionConfig = Topic.newBuilder().setName(pslSinkTopicPath.toString()).setPartitionConfig(Topic.PartitionConfig.newBuilder().setCount(2L).setCapacity(Topic.PartitionConfig.Capacity.newBuilder().setPublishMibPerSec(4).setSubscribeMibPerSec(4).build())).setRetentionConfig(Topic.RetentionConfig.newBuilder().setPerPartitionBytes(32212254720L).setPeriod(Durations.fromHours(1L)));
            if (pslReservationId != null) {
                retentionConfig.setReservationConfig(Topic.ReservationConfig.newBuilder().setThroughputReservation(pslReservationPath.toString()).build());
            }
            log.atInfo().log("Created PSL sink topic: " + pslSinkTopicPath);
            log.atInfo().log("Created PSL sink subscription: " + pslSinkSubscriptionPath.toString());
            Topic.Builder retentionConfig2 = Topic.newBuilder().setName(pslSourceTopicPath.toString()).setPartitionConfig(Topic.PartitionConfig.newBuilder().setCount(2L).setCapacity(Topic.PartitionConfig.Capacity.newBuilder().setPublishMibPerSec(4).setSubscribeMibPerSec(4).build())).setRetentionConfig(Topic.RetentionConfig.newBuilder().setPerPartitionBytes(32212254720L).setPeriod(Durations.fromHours(1L)));
            if (pslReservationId != null) {
                retentionConfig2.setReservationConfig(Topic.ReservationConfig.newBuilder().setThroughputReservation(pslReservationPath.toString()).build());
            }
            log.atInfo().log("Created PSL source topic: " + pslSourceTopicPath);
            log.atInfo().log("Created PSL source subscription:  " + pslSinkSubscriptionPath.toString());
            if (create != null) {
                if (0 == 0) {
                    create.close();
                    return;
                }
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    protected static void setupGceInstance() throws IOException, ExecutionException, InterruptedException, TimeoutException {
        createInstanceTemplate(projectId, projectNumber, instanceTemplateName);
        log.atInfo().log("Created Compute Engine instance template.");
        createInstanceFromTemplate(projectId, location, instanceName, instanceTemplateName);
        log.atInfo().log("Created Compute Engine instance from instance template");
        gceKafkaInstance = getInstance(projectId, location, instanceName);
        kafkaInstanceIpAddress = gceKafkaInstance.getNetworkInterfaces(0).getAccessConfigs(0).getNatIP();
        log.atInfo().log("Kafka GCE Instance: " + gceKafkaInstance.getSelfLink() + " " + gceKafkaInstance.getDescription());
    }

    @AfterClass
    public static void tearDown() throws Exception {
        log.atInfo().log("Attempting teardown!");
        Function<Runnable, Void> function = new Function<Runnable, Void>() { // from class: it.StandaloneIT.1
            @Override // java.util.function.Function
            public Void apply(Runnable runnable) {
                try {
                    runnable.run();
                    return null;
                } catch (NotFoundException e) {
                    StandaloneIT.log.atInfo().log(e.getMessage());
                    StandaloneIT.log.atInfo().log("Ignored. Resource not found!");
                    return null;
                }
            }
        };
        SubscriptionAdminClient create = SubscriptionAdminClient.create();
        Throwable th = null;
        try {
            try {
                function.apply(() -> {
                    create.deleteSubscription(cpsSinkSubscriptionName);
                });
                log.atInfo().log("Deleted CPS subscriptions.");
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                TopicAdminClient create2 = TopicAdminClient.create();
                Throwable th3 = null;
                try {
                    function.apply(() -> {
                        create2.deleteTopic(cpsSourceTopicName.toString());
                    });
                    function.apply(() -> {
                        create2.deleteTopic(cpsSinkTopicName.toString());
                    });
                    log.atInfo().log("Deleted CPS topics.");
                    if (create2 != null) {
                        if (0 != 0) {
                            try {
                                create2.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            create2.close();
                        }
                    }
                    AdminClient create3 = AdminClient.create(AdminClientSettings.newBuilder().setRegion(CloudRegion.of("us-central1")).build());
                    Throwable th5 = null;
                    try {
                        function.apply(() -> {
                            create3.deleteSubscription(pslSinkSubscriptionPath);
                        });
                        function.apply(() -> {
                            create3.deleteSubscription(pslSourceSubscriptionPath);
                        });
                        function.apply(() -> {
                            create3.deleteTopic(pslSinkTopicPath);
                        });
                        function.apply(() -> {
                            create3.deleteTopic(pslSourceTopicPath);
                        });
                        log.atInfo().log("Deleted PSL topics and subscriptions.");
                        if (create3 != null) {
                            if (0 != 0) {
                                try {
                                    create3.close();
                                } catch (Throwable th6) {
                                    th5.addSuppressed(th6);
                                }
                            } else {
                                create3.close();
                            }
                        }
                        InstancesClient create4 = InstancesClient.create();
                        Throwable th7 = null;
                        try {
                            create4.deleteAsync(projectId, location, instanceName).get(3L, TimeUnit.MINUTES);
                            if (create4 != null) {
                                if (0 != 0) {
                                    try {
                                        create4.close();
                                    } catch (Throwable th8) {
                                        th7.addSuppressed(th8);
                                    }
                                } else {
                                    create4.close();
                                }
                            }
                            log.atInfo().log("Deleted Compute Engine instance.");
                            InstanceTemplatesClient create5 = InstanceTemplatesClient.create();
                            Throwable th9 = null;
                            try {
                                create5.deleteAsync(projectId, instanceTemplateName).get(3L, TimeUnit.MINUTES);
                                if (create5 != null) {
                                    if (0 != 0) {
                                        try {
                                            create5.close();
                                        } catch (Throwable th10) {
                                            th9.addSuppressed(th10);
                                        }
                                    } else {
                                        create5.close();
                                    }
                                }
                                log.atInfo().log("Deleted Compute Engine instance template.");
                            } catch (Throwable th11) {
                                if (create5 != null) {
                                    if (0 != 0) {
                                        try {
                                            create5.close();
                                        } catch (Throwable th12) {
                                            th9.addSuppressed(th12);
                                        }
                                    } else {
                                        create5.close();
                                    }
                                }
                                throw th11;
                            }
                        } catch (Throwable th13) {
                            if (create4 != null) {
                                if (0 != 0) {
                                    try {
                                        create4.close();
                                    } catch (Throwable th14) {
                                        th7.addSuppressed(th14);
                                    }
                                } else {
                                    create4.close();
                                }
                            }
                            throw th13;
                        }
                    } catch (Throwable th15) {
                        if (create3 != null) {
                            if (0 != 0) {
                                try {
                                    create3.close();
                                } catch (Throwable th16) {
                                    th5.addSuppressed(th16);
                                }
                            } else {
                                create3.close();
                            }
                        }
                        throw th15;
                    }
                } catch (Throwable th17) {
                    if (create2 != null) {
                        if (0 != 0) {
                            try {
                                create2.close();
                            } catch (Throwable th18) {
                                th3.addSuppressed(th18);
                            }
                        } else {
                            create2.close();
                        }
                    }
                    throw th17;
                }
            } finally {
            }
        } catch (Throwable th19) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th20) {
                        th.addSuppressed(th20);
                    }
                } else {
                    create.close();
                }
            }
            throw th19;
        }
    }

    @Test
    public void testCpsSinkConnector() throws Exception {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaInstanceIpAddress + ":" + KAFKA_PORT);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Future send = kafkaProducer.send(new ProducerRecord(kafkaCpsSinkTestTopic, "key0", "value0"));
        kafkaProducer.flush();
        send.get();
        kafkaProducer.metrics().forEach((metricName, metric) -> {
            if (metricName.name() == "record-send-total") {
                log.atInfo().log("record-send-total: " + metric.metricValue().toString());
            }
        });
        kafkaProducer.close();
        Thread.sleep(60000L);
        Subscriber subscriber = null;
        try {
            subscriber = Subscriber.newBuilder(ProjectSubscriptionName.of(projectId, cpsSinkSubscriptionId), (pubsubMessage, ackReplyConsumer) -> {
                Truth.assertThat(pubsubMessage.getData().toStringUtf8()).isEqualTo("value0");
                Truth.assertThat((String) pubsubMessage.getAttributesMap().get("key")).isEqualTo("key0");
                cpsMessageReceived = true;
                ackReplyConsumer.ack();
            }).build();
            subscriber.startAsync().awaitRunning();
            subscriber.awaitTerminated(30L, TimeUnit.SECONDS);
        } catch (TimeoutException e) {
            subscriber.stopAsync();
        }
        Truth.assertThat(cpsMessageReceived).isTrue();
    }

    /* JADX WARN: Code restructure failed: missing block: B:9:0x0103, code lost:
    
        r0 = r0.records(it.StandaloneIT.kafkaCpsSourceTestTopic).iterator();
        com.google.common.truth.Truth.assertThat(java.lang.Boolean.valueOf(r0.hasNext())).isTrue();
        com.google.common.truth.Truth.assertThat((java.lang.String) ((org.apache.kafka.clients.consumer.ConsumerRecord) r0.next()).value()).isEqualTo("msg0");
        r16 = true;
     */
    @org.junit.Test(timeout = 300000)
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void testCpsSourceConnector() throws java.lang.Exception {
        /*
            Method dump skipped, instructions count: 359
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: it.StandaloneIT.testCpsSourceConnector():void");
    }

    @Test
    public void testPslSinkConnector() throws Exception {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaInstanceIpAddress + ":" + KAFKA_PORT);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Future send = kafkaProducer.send(new ProducerRecord(kafkaPslSinkTestTopic, "key0", "value0"));
        kafkaProducer.flush();
        send.get();
        kafkaProducer.metrics().forEach((metricName, metric) -> {
            if (metricName.name() == "record-send-total") {
                log.atInfo().log("record-send-total: " + metric.metricValue().toString());
            }
        });
        kafkaProducer.close();
        Thread.sleep(60000L);
        com.google.cloud.pubsublite.cloudpubsub.Subscriber create = com.google.cloud.pubsublite.cloudpubsub.Subscriber.create(SubscriberSettings.newBuilder().setSubscriptionPath(pslSinkSubscriptionPath).setReceiver((pubsubMessage, ackReplyConsumer) -> {
            log.atInfo().log("Received message: " + pubsubMessage);
            Truth.assertThat(pubsubMessage.getData().toStringUtf8()).isEqualTo("value0");
            Truth.assertThat(pubsubMessage.getOrderingKey()).isEqualTo("key0");
            pslMessageReceived = true;
            log.atInfo().log("this.pslMessageReceived: " + pslMessageReceived);
            ackReplyConsumer.ack();
        }).setPerPartitionFlowControlSettings(FlowControlSettings.builder().setBytesOutstanding(10485760L).setMessagesOutstanding(1000L).build()).build());
        try {
            create.startAsync().awaitRunning();
            create.awaitTerminated(3L, TimeUnit.MINUTES);
        } catch (TimeoutException e) {
            create.stopAsync();
        }
        Truth.assertThat(pslMessageReceived).isTrue();
    }

    /* JADX WARN: Code restructure failed: missing block: B:9:0x0124, code lost:
    
        r0 = r0.records(it.StandaloneIT.kafkaPslSourceTestTopic).iterator();
        com.google.common.truth.Truth.assertThat(java.lang.Boolean.valueOf(r0.hasNext())).isTrue();
        com.google.common.truth.Truth.assertThat((java.lang.String) ((org.apache.kafka.clients.consumer.ConsumerRecord) r0.next()).value()).isEqualTo("msg0");
        r16 = true;
     */
    @org.junit.Test(timeout = 300000)
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void testPslSourceConnector() throws java.lang.Exception {
        /*
            Method dump skipped, instructions count: 393
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: it.StandaloneIT.testPslSourceConnector():void");
    }

    static {
        pslReservationPath = pslReservationId == null ? null : ReservationPath.newBuilder().setProject(ProjectNumber.of(Long.valueOf(projectNumber).longValue())).setLocation(CloudRegion.of("us-central1")).setName(ReservationName.of(pslReservationId)).build();
        pslSinkTopicId = "psl-sink-topic-" + runId;
        pslSinkTopicPath = TopicPath.newBuilder().setProject(ProjectId.of(projectId)).setLocation(CloudZone.of(CloudRegion.of("us-central1"), zone.charValue())).setName(com.google.cloud.pubsublite.TopicName.of(pslSinkTopicId)).build();
        pslSinkSubscriptionId = "psl-sink-subscription-" + runId;
        pslSinkSubscriptionPath = SubscriptionPath.newBuilder().setName(com.google.cloud.pubsublite.SubscriptionName.of(pslSinkSubscriptionId)).setProject(ProjectId.of(projectId)).setLocation(CloudZone.of(CloudRegion.of("us-central1"), zone.charValue())).build();
        pslSourceTopicId = "psl-source-topic-" + runId;
        pslSourceTopicPath = TopicPath.newBuilder().setProject(ProjectId.of(projectId)).setLocation(CloudZone.of(CloudRegion.of("us-central1"), zone.charValue())).setName(com.google.cloud.pubsublite.TopicName.of(pslSourceTopicId)).build();
        pslSourceSubscriptionId = "psl-source-subscription-" + runId;
        pslSourceSubscriptionPath = SubscriptionPath.newBuilder().setName(com.google.cloud.pubsublite.SubscriptionName.of(pslSourceSubscriptionId)).setProject(ProjectId.of(projectId)).setLocation(CloudZone.of(CloudRegion.of("us-central1"), zone.charValue())).build();
        instanceName = "kafka-it-" + runId;
        instanceTemplateName = "kafka-it-template-" + runId;
        initialized = new AtomicBoolean(false);
        cpsMessageReceived = false;
        pslMessageReceived = false;
    }
}
