package org.apache.apex.malhar.lib.fs.s3;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.testbench.CollectorTestSink;
import java.io.File;
import java.io.IOException;
import org.apache.apex.malhar.lib.fs.FSRecordCompactionOperator;
import org.apache.commons.io.FileUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

/* loaded from: input_file:org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.class */
public class S3ReconcilerTest {

    @Rule
    public TestMeta testMeta = new TestMeta();

    /* loaded from: input_file:org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest$TestMeta.class */
    private class TestMeta extends TestWatcher {
        S3Reconciler underTest;
        Context.OperatorContext context;
        CollectorTestSink<Object> sink;

        @Mock
        AmazonS3 s3clientMock;
        String outputPath;

        private TestMeta() {
        }

        protected void starting(Description description) {
            super.starting(description);
            this.outputPath = new File("target/" + description.getClassName() + "/" + description.getMethodName()).getPath();
            Attribute.AttributeMap.DefaultAttributeMap defaultAttributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
            defaultAttributeMap.put(Context.DAGContext.APPLICATION_ID, description.getClassName());
            defaultAttributeMap.put(Context.DAGContext.APPLICATION_PATH, this.outputPath);
            this.context = OperatorContextTestHelper.mockOperatorContext(1, defaultAttributeMap);
            this.underTest = new S3Reconciler();
            this.underTest.setAccessKey("");
            this.underTest.setSecretKey("");
            this.underTest.setup(this.context);
            MockitoAnnotations.initMocks(this);
            PutObjectResult putObjectResult = new PutObjectResult();
            putObjectResult.setETag(this.outputPath);
            Mockito.when(this.s3clientMock.putObject((PutObjectRequest) Matchers.any())).thenReturn(putObjectResult);
            this.underTest.setS3client(this.s3clientMock);
        }

        protected void finished(Description description) {
            this.underTest.teardown();
            try {
                FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Test
    public void verifyS3ReconclierOutputTuple() throws Exception {
        String str = this.testMeta.outputPath + "/s3-compaction_1.0";
        File file = new File(str);
        File file2 = new File(str + "." + System.currentTimeMillis() + ".tmp");
        StringBuffer stringBuffer = new StringBuffer();
        for (int i = 0; i < 10; i++) {
            stringBuffer.append("Record" + i + "\n");
            if (i == 5) {
                FileUtils.write(file2, stringBuffer.toString());
            }
        }
        FileUtils.write(file, stringBuffer.toString());
        this.testMeta.sink = new CollectorTestSink<>();
        this.testMeta.underTest.outputPort.setSink(this.testMeta.sink);
        FSRecordCompactionOperator.OutputMetaData outputMetaData = new FSRecordCompactionOperator.OutputMetaData(str, "s3-compaction_1.0", 80L);
        this.testMeta.underTest.beginWindow(0L);
        this.testMeta.underTest.input.process(outputMetaData);
        this.testMeta.underTest.endWindow();
        for (int i2 = 1; i2 < 60; i2++) {
            this.testMeta.underTest.beginWindow(i2);
            this.testMeta.underTest.endWindow();
        }
        this.testMeta.underTest.committed(59L);
        this.testMeta.sink.waitForResultCount(1, 12000L);
        for (int i3 = 60; i3 < 70; i3++) {
            this.testMeta.underTest.beginWindow(i3);
            Thread.sleep(10L);
            this.testMeta.underTest.endWindow();
        }
        Assert.assertEquals(1L, this.testMeta.sink.getCount(false));
    }

    @Test
    public void testFileClearing() throws Exception {
        String str = this.testMeta.outputPath + "/s3-compaction_1.0";
        File file = new File(str);
        File file2 = new File(str + "." + System.currentTimeMillis() + ".tmp");
        StringBuffer stringBuffer = new StringBuffer();
        for (int i = 0; i < 10; i++) {
            stringBuffer.append("Record" + i + "\n");
            if (i == 5) {
                FileUtils.write(file2, stringBuffer.toString());
            }
        }
        FileUtils.write(file, stringBuffer.toString());
        FSRecordCompactionOperator.OutputMetaData outputMetaData = new FSRecordCompactionOperator.OutputMetaData(str, "s3-compaction_1.0", 80L);
        this.testMeta.underTest.beginWindow(0L);
        this.testMeta.underTest.input.process(outputMetaData);
        this.testMeta.underTest.endWindow();
        for (int i2 = 1; i2 < 60; i2++) {
            this.testMeta.underTest.beginWindow(i2);
            this.testMeta.underTest.endWindow();
        }
        this.testMeta.underTest.committed(59L);
        for (int i3 = 60; i3 < 70; i3++) {
            this.testMeta.underTest.beginWindow(i3);
            Thread.sleep(10L);
            this.testMeta.underTest.endWindow();
        }
        Assert.assertEquals(0L, FileUtils.listFiles(new File(this.testMeta.outputPath), (String[]) null, true).size());
    }
}
