package org.apache.beam.sdk.io.kinesis;

import com.amazonaws.regions.Regions;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.kinesis.KinesisIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.commons.lang.RandomStringUtils;
import org.assertj.core.api.Assertions;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/sdk/io/kinesis/KinesisReaderIT.class */
public class KinesisReaderIT {
    private static final long PIPELINE_STARTUP_TIME = TimeUnit.SECONDS.toMillis(10);
    private ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();

    @Rule
    public final transient TestPipeline p = TestPipeline.create();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/kinesis/KinesisReaderIT$RecordDataToString.class */
    public static class RecordDataToString extends DoFn<KinesisRecord, String> {
        private RecordDataToString() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KinesisRecord, String>.ProcessContext processContext) throws Exception {
            Preconditions.checkNotNull(processContext.element(), "Null record given");
            processContext.output(new String(((KinesisRecord) processContext.element()).getData().array(), StandardCharsets.UTF_8));
        }
    }

    @Test
    @Ignore
    public void readsDataFromRealKinesisStream() throws IOException, InterruptedException, ExecutionException {
        KinesisTestOptions readKinesisOptions = readKinesisOptions();
        List<String> prepareTestData = prepareTestData(1000);
        Future<?> startTestPipeline = startTestPipeline(prepareTestData, readKinesisOptions);
        KinesisUploader.uploadAll(prepareTestData, readKinesisOptions);
        startTestPipeline.get();
    }

    private List<String> prepareTestData(int i) {
        ArrayList newArrayList = Lists.newArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            newArrayList.add(RandomStringUtils.randomAlphabetic(32));
        }
        return newArrayList;
    }

    private Future<?> startTestPipeline(List<String> list, KinesisTestOptions kinesisTestOptions) throws InterruptedException {
        PAssert.that(this.p.apply(KinesisIO.Read.from(kinesisTestOptions.getAwsKinesisStream(), Instant.now()).using(kinesisTestOptions.getAwsAccessKey(), kinesisTestOptions.getAwsSecretKey(), Regions.fromName(kinesisTestOptions.getAwsKinesisRegion())).withMaxReadTime(Duration.standardMinutes(3L))).apply(ParDo.of(new RecordDataToString()))).containsInAnyOrder(list);
        Future<?> submit = this.singleThreadExecutor.submit(new Callable<Void>() { // from class: org.apache.beam.sdk.io.kinesis.KinesisReaderIT.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                PipelineResult.State state;
                PipelineResult run = KinesisReaderIT.this.p.run();
                PipelineResult.State state2 = run.getState();
                while (true) {
                    state = state2;
                    if (state == PipelineResult.State.DONE || state == PipelineResult.State.FAILED) {
                        break;
                    }
                    Thread.sleep(1000L);
                    state2 = run.getState();
                }
                Assertions.assertThat(state).isEqualTo(PipelineResult.State.DONE);
                return null;
            }
        });
        Thread.sleep(PIPELINE_STARTUP_TIME);
        return submit;
    }

    private KinesisTestOptions readKinesisOptions() {
        PipelineOptionsFactory.register(KinesisTestOptions.class);
        return TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
    }
}
