package org.apache.beam.sdk.io.tfrecord;

import com.google.cloud.Timestamp;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.TFRecordIO;
import org.apache.beam.sdk.io.common.FileBasedIOITHelper;
import org.apache.beam.sdk.io.common.FileBasedIOTestPipelineOptions;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.ByteMonitor;
import org.apache.beam.sdk.testutils.metrics.CountMonitor;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/tfrecord/TFRecordIOIT.class */
public class TFRecordIOIT {
    private static final String TFRECORD_NAMESPACE = TFRecordIOIT.class.getName();
    private static String filenamePrefix;
    private static Integer numberOfTextLines;
    private static Compression compressionType;
    private static String bigQueryDataset;
    private static String bigQueryTable;

    @Rule
    public TestPipeline writePipeline = TestPipeline.create();

    @Rule
    public TestPipeline readPipeline = TestPipeline.create();

    /* loaded from: input_file:org/apache/beam/sdk/io/tfrecord/TFRecordIOIT$ByteArrayToString.class */
    static class ByteArrayToString extends SimpleFunction<byte[], String> {
        ByteArrayToString() {
        }

        public String apply(byte[] bArr) {
            return new String(bArr, StandardCharsets.UTF_8);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/tfrecord/TFRecordIOIT$StringToByteArray.class */
    static class StringToByteArray extends SimpleFunction<String, byte[]> {
        StringToByteArray() {
        }

        public byte[] apply(String str) {
            return str.getBytes(StandardCharsets.UTF_8);
        }
    }

    @BeforeClass
    public static void setup() {
        FileBasedIOTestPipelineOptions readFileBasedIOITPipelineOptions = FileBasedIOITHelper.readFileBasedIOITPipelineOptions();
        numberOfTextLines = readFileBasedIOITPipelineOptions.getNumberOfRecords();
        filenamePrefix = FileBasedIOITHelper.appendTimestampSuffix(readFileBasedIOITPipelineOptions.getFilenamePrefix());
        compressionType = Compression.valueOf(readFileBasedIOITPipelineOptions.getCompressionType());
        bigQueryDataset = readFileBasedIOITPipelineOptions.getBigQueryDataset();
        bigQueryTable = readFileBasedIOITPipelineOptions.getBigQueryTable();
    }

    private static String createFilenamePattern() {
        return filenamePrefix + "*";
    }

    @Test
    public void writeThenReadAll() {
        this.writePipeline.apply("Generate sequence", GenerateSequence.from(0L).to(numberOfTextLines.intValue())).apply("Produce text lines", ParDo.of(new FileBasedIOITHelper.DeterministicallyConstructTestTextLineFn())).apply("Transform strings to bytes", MapElements.via(new StringToByteArray())).apply("Record time before writing", ParDo.of(new TimeMonitor(TFRECORD_NAMESPACE, "writeTime"))).apply("Collect byte count", ParDo.of(new ByteMonitor(TFRECORD_NAMESPACE, "byteCount"))).apply("Collect element count", ParDo.of(new CountMonitor(TFRECORD_NAMESPACE, "itemCount"))).apply("Write content to files", TFRecordIO.write().to(filenamePrefix).withCompression(compressionType).withSuffix(".tfrecord"));
        PipelineResult run = this.writePipeline.run();
        run.waitUntilFinish();
        String createFilenamePattern = createFilenamePattern();
        PCollection apply = this.readPipeline.apply(TFRecordIO.read().from(createFilenamePattern).withCompression(Compression.AUTO)).apply("Record time after reading", ParDo.of(new TimeMonitor(TFRECORD_NAMESPACE, "readTime"))).apply("Transform bytes to strings", MapElements.via(new ByteArrayToString())).apply("Calculate hashcode", Combine.globally(new HashingFn())).apply(Reshuffle.viaRandomKey());
        PAssert.thatSingleton(apply).isEqualTo(FileBasedIOITHelper.getExpectedHashForLineCount(numberOfTextLines.intValue()));
        this.readPipeline.apply(Create.of(createFilenamePattern, new String[0])).apply("Delete test files", ParDo.of(new FileBasedIOITHelper.DeleteFileFn()).withSideInputs(new PCollectionView[]{(PCollectionView) apply.apply(View.asSingleton())}));
        PipelineResult run2 = this.readPipeline.run();
        run2.waitUntilFinish();
        collectAndPublishMetrics(run2, run);
    }

    private void collectAndPublishMetrics(PipelineResult pipelineResult, PipelineResult pipelineResult2) {
        String uuid = UUID.randomUUID().toString();
        String timestamp = Timestamp.now().toString();
        new IOITMetrics(getReadMetricSuppliers(uuid, timestamp), pipelineResult, TFRECORD_NAMESPACE, uuid, timestamp).publish(bigQueryDataset, bigQueryTable);
        new IOITMetrics(getWriteMetricSuppliers(uuid, timestamp), pipelineResult2, TFRECORD_NAMESPACE, uuid, timestamp).publish(bigQueryDataset, bigQueryTable);
    }

    private Set<Function<MetricsReader, NamedTestResult>> getWriteMetricSuppliers(String str, String str2) {
        HashSet hashSet = new HashSet();
        hashSet.add(metricsReader -> {
            return NamedTestResult.create(str, str2, "read_time", (metricsReader.getEndTimeMetric("readTime") - metricsReader.getStartTimeMetric("readTime")) / 1000.0d);
        });
        hashSet.add(metricsReader2 -> {
            return NamedTestResult.create(str, str2, "byte_count", metricsReader2.getCounterMetric("byteCount"));
        });
        hashSet.add(metricsReader3 -> {
            return NamedTestResult.create(str, str2, "item_count", metricsReader3.getCounterMetric("itemCount"));
        });
        return hashSet;
    }

    private Set<Function<MetricsReader, NamedTestResult>> getReadMetricSuppliers(String str, String str2) {
        HashSet hashSet = new HashSet();
        hashSet.add(metricsReader -> {
            return NamedTestResult.create(str, str2, "write_time", (metricsReader.getEndTimeMetric("writeTime") - metricsReader.getStartTimeMetric("writeTime")) / 1000.0d);
        });
        return hashSet;
    }
}
