package org.apache.beam.it.gcp.storage;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.IOException;
import java.text.ParseException;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Base64;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.ResourceManager;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.IOLoadTestBase;
import org.apache.beam.it.gcp.LoadTestBase;
import org.apache.beam.it.truthmatchers.PipelineAsserts;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.synthetic.SyntheticBoundedSource;
import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/it/gcp/storage/FileBasedIOLT.class */
public class FileBasedIOLT extends IOLoadTestBase {
    private static final String READ_ELEMENT_METRIC_NAME = "read_count";
    private static GcsResourceManager resourceManager;
    private String filePrefix;
    private Configuration configuration;

    @Rule
    public TestPipeline writePipeline = TestPipeline.create();

    @Rule
    public TestPipeline readPipeline = TestPipeline.create();
    private static final Map<String, Configuration> TEST_CONFIGS_PRESET;

    /* loaded from: input_file:org/apache/beam/it/gcp/storage/FileBasedIOLT$Configuration.class */
    static class Configuration extends SyntheticSourceOptions {

        @JsonProperty
        public int numShards = 0;

        @JsonProperty
        public String compressionType = "UNCOMPRESSED";

        @JsonProperty
        public String runner = "DirectRunner";

        @JsonProperty
        public int pipelineTimeout = 20;

        Configuration() {
        }
    }

    /* loaded from: input_file:org/apache/beam/it/gcp/storage/FileBasedIOLT$MapKVToString.class */
    private static class MapKVToString extends DoFn<KV<byte[], byte[]>, String> {
        private MapKVToString() {
        }

        @DoFn.ProcessElement
        public void process(DoFn<KV<byte[], byte[]>, String>.ProcessContext processContext) {
            processContext.output(Base64.getEncoder().encodeToString((byte[]) ((KV) Objects.requireNonNull((KV) processContext.element())).getValue()));
        }
    }

    @BeforeClass
    public static void beforeClass() {
        resourceManager = GcsResourceManager.builder(TestProperties.artifactBucket(), "textiolt", CREDENTIALS).build();
    }

    @AfterClass
    public static void tearDownClass() {
        ResourceManagerUtils.cleanResources(new ResourceManager[]{resourceManager});
    }

    @Before
    public void setup() {
        String property = TestProperties.getProperty("configuration", "local", TestProperties.Type.PROPERTY);
        this.configuration = TEST_CONFIGS_PRESET.get(property);
        if (this.configuration == null) {
            try {
                this.configuration = Configuration.fromJsonString(property, Configuration.class);
            } catch (IOException e) {
                throw new IllegalArgumentException(String.format("Unknown test configuration: [%s]. Pass to a valid configuration json, or use config presets: %s", property, TEST_CONFIGS_PRESET.keySet()));
            }
        }
        String str = "textiolt-" + DateTimeFormatter.ofPattern("MMddHHmmssSSS").withZone(ZoneId.of("UTC")).format(Instant.now()) + UUID.randomUUID().toString().substring(0, 10);
        resourceManager.registerTempDir(str);
        this.filePrefix = String.format("gs://%s/%s/test", TestProperties.artifactBucket(), str);
    }

    @Test
    public void testTextIOWriteThenRead() throws IOException {
        TextIO.TypedWrite withCompression = TextIO.write().to(this.filePrefix).withOutputFilenames().withCompression(Compression.valueOf(this.configuration.compressionType));
        if (this.configuration.numShards > 0) {
            withCompression = withCompression.withNumShards(this.configuration.numShards);
        }
        this.writePipeline.apply("Read from source", Read.from(new SyntheticBoundedSource(this.configuration))).apply("Map records", ParDo.of(new MapKVToString())).apply("Write content to files", withCompression);
        this.readPipeline.apply(TextIO.read().from(this.filePrefix + "*")).apply("Counting element", ParDo.of(new IOLoadTestBase.CountingFn(READ_ELEMENT_METRIC_NAME)));
        PipelineLauncher.LaunchInfo launch = this.pipelineLauncher.launch(project, region, PipelineLauncher.LaunchConfig.builder("write-textio").setSdk(PipelineLauncher.Sdk.JAVA).setPipeline(this.writePipeline).addParameter("runner", this.configuration.runner).build());
        PipelineAsserts.assertThatResult(this.pipelineOperator.waitUntilDone(createConfig(launch, Duration.ofMinutes(this.configuration.pipelineTimeout)))).isLaunchFinished();
        PipelineLauncher.LaunchInfo launch2 = this.pipelineLauncher.launch(project, region, PipelineLauncher.LaunchConfig.builder("read-textio").setSdk(PipelineLauncher.Sdk.JAVA).setPipeline(this.readPipeline).addParameter("runner", this.configuration.runner).build());
        PipelineAsserts.assertThatResult(this.pipelineOperator.waitUntilDone(createConfig(launch2, Duration.ofMinutes(this.configuration.pipelineTimeout)))).isLaunchFinished();
        Assert.assertEquals(this.configuration.numRecords, this.pipelineLauncher.getMetric(project, region, launch2.jobId(), getBeamMetricsName(IOLoadTestBase.PipelineMetricsType.COUNTER, READ_ELEMENT_METRIC_NAME)).doubleValue(), 0.5d);
        LoadTestBase.MetricsConfiguration build = LoadTestBase.MetricsConfiguration.builder().setInputPCollection("Map records.out0").setInputPCollectionV2("Map records/ParMultiDo(MapKVToString).out0").setOutputPCollection("Counting element.out0").setOutputPCollectionV2("Counting element/ParMultiDo(Counting).out0").build();
        try {
            exportMetricsToBigQuery(launch, getMetrics(launch, build));
            exportMetricsToBigQuery(launch2, getMetrics(launch2, build));
        } catch (InterruptedException | ParseException e) {
            throw new RuntimeException(e);
        }
    }

    static {
        try {
            TEST_CONFIGS_PRESET = ImmutableMap.of("local", Configuration.fromJsonString("{\"numRecords\":1000,\"valueSizeBytes\":750,\"pipelineTimeout\":2,\"runner\":\"DirectRunner\"}", Configuration.class), "medium", Configuration.fromJsonString("{\"numRecords\":10000000,\"valueSizeBytes\":750,\"pipelineTimeout\":20,\"runner\":\"DataflowRunner\"}", Configuration.class), "large", Configuration.fromJsonString("{\"numRecords\":100000000,\"valueSizeBytes\":750,\"pipelineTimeout\":80,\"runner\":\"DataflowRunner\"}", Configuration.class));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
