/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.storage;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.Serializable;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Watch;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class GcsMatchIT {
    @Test
    public void testGcsMatchContinuously() {
        TestPipelineOptions options = (TestPipelineOptions)TestPipeline.testingPipelineOptions().as(TestPipelineOptions.class);
        Assert.assertNotNull((Object)options.getTempRoot());
        options.setTempLocation(options.getTempRoot() + "/GcsMatchIT");
        GcsOptions gcsOptions = (GcsOptions)options.as(GcsOptions.class);
        String dstFolderName = gcsOptions.getGcpTempLocation() + String.format("/testGcsMatchContinuously.%tF-%<tH-%<tM-%<tS-%<tL/", new Date());
        GcsPath watchPath = GcsPath.fromUri((String)dstFolderName);
        long waitSec = 7L;
        if (options.getRunner() == DirectRunner.class) {
            waitSec = 1L;
        }
        Pipeline p = Pipeline.create((PipelineOptions)options);
        PCollection path = (PCollection)p.apply((PTransform)Create.of(Collections.singletonList(watchPath)));
        PCollection filenames = (PCollection)((PCollection)((PCollection)path.apply((PTransform)MapElements.into((TypeDescriptor)TypeDescriptors.strings()).via((SerializableFunction & Serializable)gcsPath -> gcsPath.resolve("*").toString()))).apply("matchAll updated", (PTransform)FileIO.matchAll().continuously(Duration.millis((long)250L), (Watch.Growth.TerminationCondition)Watch.Growth.afterTimeSinceNewOutput((ReadableDuration)Duration.standardSeconds((long)(waitSec + 2L))), true))).apply("pick up file names", (PTransform)MapElements.into((TypeDescriptor)TypeDescriptors.strings()).via((SerializableFunction & Serializable)metadata -> metadata.resourceId().getFilename()));
        path.apply((PTransform)ParDo.of((DoFn)new WriteToGcsFn(waitSec)));
        List<String> expectedMatchAllUpdated = Arrays.asList("first", "first", "first", "second", "second", "third");
        PAssert.that((PCollection)filenames).containsInAnyOrder(expectedMatchAllUpdated);
        PipelineResult result = p.run();
        PipelineResult.State state = result.waitUntilFinish();
        Assert.assertEquals((Object)PipelineResult.State.DONE, (Object)state);
    }

    private static void writeBytesToFile(String gcsPath, int length) {
        ResourceId newFileResourceId = FileSystems.matchNewResource((String)gcsPath, (boolean)false);
        try (ByteArrayInputStream in = new ByteArrayInputStream(new byte[length]);
             ReadableByteChannel readerChannel = Channels.newChannel(in);
             WritableByteChannel writerChannel = FileSystems.create((ResourceId)newFileResourceId, (String)"text/plain");){
            ByteStreams.copy((ReadableByteChannel)readerChannel, (WritableByteChannel)writerChannel);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static class WriteToGcsFn
    extends DoFn<GcsPath, Void> {
        private final long waitSec;

        public WriteToGcsFn(long waitSec) {
            this.waitSec = waitSec;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext context) {
            GcsPath writePath = (GcsPath)context.element();
            assert (writePath != null);
            Thread writer = new Thread(() -> {
                try {
                    Thread.sleep(this.waitSec * 1000L);
                    GcsMatchIT.writeBytesToFile(writePath.resolve("first").toString(), 42);
                    Thread.sleep(1000L);
                    GcsMatchIT.writeBytesToFile(writePath.resolve("first").toString(), 99);
                    GcsMatchIT.writeBytesToFile(writePath.resolve("second").toString(), 42);
                    Thread.sleep(1000L);
                    GcsMatchIT.writeBytesToFile(writePath.resolve("first").toString(), 37);
                    GcsMatchIT.writeBytesToFile(writePath.resolve("second").toString(), 42);
                    GcsMatchIT.writeBytesToFile(writePath.resolve("third").toString(), 99);
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
            writer.start();
        }
    }
}

