package org.apache.flink.streaming.api.functions.sink;

import java.io.IOException;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.util.ContentDump;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.class */
public class TwoPhaseCommitSinkFunctionTest {
    private ContentDumpSinkFunction sinkFunction;
    private OneInputStreamOperatorTestHarness<String, Object> harness;
    private AtomicBoolean throwException = new AtomicBoolean();
    private ContentDump targetDirectory;
    private ContentDump tmpDirectory;
    private SettableClock clock;
    private Logger logger;
    private AppenderSkeleton testAppender;
    private List<LoggingEvent> loggingEvents;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest$ContentDumpSinkFunction.class */
    public class ContentDumpSinkFunction extends TwoPhaseCommitSinkFunction<String, ContentTransaction, Void> {
        public ContentDumpSinkFunction() {
            super(new KryoSerializer(ContentTransaction.class, new ExecutionConfig()), VoidSerializer.INSTANCE, TwoPhaseCommitSinkFunctionTest.this.clock);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void invoke(ContentTransaction contentTransaction, String str, SinkFunction.Context context) throws Exception {
            contentTransaction.tmpContentWriter.write(str);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: beginTransaction, reason: merged with bridge method [inline-methods] */
        public ContentTransaction m5beginTransaction() throws Exception {
            return new ContentTransaction(TwoPhaseCommitSinkFunctionTest.this.tmpDirectory.createWriter(UUID.randomUUID().toString()));
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void preCommit(ContentTransaction contentTransaction) throws Exception {
            contentTransaction.tmpContentWriter.flush();
            contentTransaction.tmpContentWriter.close();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void commit(ContentTransaction contentTransaction) {
            if (TwoPhaseCommitSinkFunctionTest.this.throwException.get()) {
                throw new RuntimeException("Expected exception");
            }
            ContentDump.move(contentTransaction.tmpContentWriter.getName(), TwoPhaseCommitSinkFunctionTest.this.tmpDirectory, TwoPhaseCommitSinkFunctionTest.this.targetDirectory);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void abort(ContentTransaction contentTransaction) {
            contentTransaction.tmpContentWriter.close();
            TwoPhaseCommitSinkFunctionTest.this.tmpDirectory.delete(contentTransaction.tmpContentWriter.getName());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest$ContentTransaction.class */
    public static class ContentTransaction {
        private ContentDump.ContentWriter tmpContentWriter;

        public ContentTransaction(ContentDump.ContentWriter contentWriter) {
            this.tmpContentWriter = contentWriter;
        }

        public String toString() {
            return String.format("ContentTransaction[%s]", this.tmpContentWriter.getName());
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest$SettableClock.class */
    private static class SettableClock extends Clock {
        private final ZoneId zoneId;
        private long epochMilli;

        private SettableClock() {
            this.zoneId = ZoneOffset.UTC;
        }

        public SettableClock(ZoneId zoneId, long j) {
            this.zoneId = zoneId;
            this.epochMilli = j;
        }

        public void setEpochMilli(long j) {
            this.epochMilli = j;
        }

        @Override // java.time.Clock
        public ZoneId getZone() {
            return this.zoneId;
        }

        @Override // java.time.Clock, java.time.InstantSource
        public Clock withZone(ZoneId zoneId) {
            return zoneId.equals(this.zoneId) ? this : new SettableClock(zoneId, this.epochMilli);
        }

        @Override // java.time.Clock, java.time.InstantSource
        public Instant instant() {
            return Instant.ofEpochMilli(this.epochMilli);
        }
    }

    @Before
    public void setUp() throws Exception {
        this.loggingEvents = new ArrayList();
        setupLogger();
        this.targetDirectory = new ContentDump();
        this.tmpDirectory = new ContentDump();
        this.clock = new SettableClock();
        setUpTestHarness();
    }

    @After
    public void tearDown() throws Exception {
        closeTestHarness();
        if (this.logger != null) {
            this.logger.removeAppender(this.testAppender);
        }
        this.loggingEvents = null;
    }

    private void setupLogger() {
        Logger.getRootLogger().removeAllAppenders();
        this.logger = Logger.getLogger(TwoPhaseCommitSinkFunction.class);
        this.testAppender = new AppenderSkeleton() { // from class: org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunctionTest.1
            protected void append(LoggingEvent loggingEvent) {
                TwoPhaseCommitSinkFunctionTest.this.loggingEvents.add(loggingEvent);
            }

            public void close() {
            }

            public boolean requiresLayout() {
                return false;
            }
        };
        this.logger.addAppender(this.testAppender);
        this.logger.setLevel(Level.WARN);
    }

    private void setUpTestHarness() throws Exception {
        this.sinkFunction = new ContentDumpSinkFunction();
        this.harness = new OneInputStreamOperatorTestHarness<>((OneInputStreamOperator) new StreamSink(this.sinkFunction), (TypeSerializer) StringSerializer.INSTANCE);
        this.harness.setup();
    }

    private void closeTestHarness() throws Exception {
        this.harness.close();
    }

    @Test
    public void testNotifyOfCompletedCheckpoint() throws Exception {
        this.harness.open();
        this.harness.processElement("42", 0L);
        this.harness.snapshot(0L, 1L);
        this.harness.processElement("43", 2L);
        this.harness.snapshot(1L, 3L);
        this.harness.processElement("44", 4L);
        this.harness.snapshot(2L, 5L);
        this.harness.notifyOfCompletedCheckpoint(1L);
        assertExactlyOnce(Arrays.asList("42", "43"));
        Assert.assertEquals(2L, this.tmpDirectory.listFiles().size());
    }

    @Test
    public void testFailBeforeNotify() throws Exception {
        this.harness.open();
        this.harness.processElement("42", 0L);
        this.harness.snapshot(0L, 1L);
        this.harness.processElement("43", 2L);
        OperatorSubtaskState snapshot = this.harness.snapshot(1L, 3L);
        this.tmpDirectory.setWritable(false);
        try {
            this.harness.processElement("44", 4L);
            this.harness.snapshot(2L, 5L);
            Assert.fail("something should fail");
        } catch (Exception e) {
            if (!(e.getCause() instanceof ContentDump.NotWritableException)) {
                throw e;
            }
        }
        closeTestHarness();
        this.tmpDirectory.setWritable(true);
        setUpTestHarness();
        this.harness.initializeState(snapshot);
        assertExactlyOnce(Arrays.asList("42", "43"));
        closeTestHarness();
        Assert.assertEquals(0L, this.tmpDirectory.listFiles().size());
    }

    @Test
    public void testIgnoreCommitExceptionDuringRecovery() throws Exception {
        this.clock.setEpochMilli(0L);
        this.harness.open();
        this.harness.processElement("42", 0L);
        OperatorSubtaskState snapshot = this.harness.snapshot(0L, 1L);
        this.harness.notifyOfCompletedCheckpoint(1L);
        this.throwException.set(true);
        closeTestHarness();
        setUpTestHarness();
        this.sinkFunction.setTransactionTimeout(1000L);
        this.sinkFunction.ignoreFailuresAfterTransactionTimeout();
        try {
            this.harness.initializeState(snapshot);
            Assert.fail("Expected exception not thrown");
        } catch (RuntimeException e) {
            Assert.assertEquals("Expected exception", e.getMessage());
        }
        this.clock.setEpochMilli(1001L);
        this.harness.initializeState(snapshot);
        assertExactlyOnce(Collections.singletonList("42"));
    }

    @Test
    public void testLogTimeoutAlmostReachedWarningDuringCommit() throws Exception {
        this.clock.setEpochMilli(0L);
        this.sinkFunction.setTransactionTimeout(1000L);
        this.sinkFunction.enableTransactionTimeoutWarnings(0.5d);
        this.harness.open();
        this.harness.snapshot(0L, 1L);
        this.clock.setEpochMilli(502L);
        this.harness.notifyOfCompletedCheckpoint(1L);
        Assert.assertThat((List) this.loggingEvents.stream().map((v0) -> {
            return v0.getRenderedMessage();
        }).collect(Collectors.toList()), CoreMatchers.hasItem(CoreMatchers.containsString("has been open for 502 ms. This is close to or even exceeding the transaction timeout of 1000 ms.")));
    }

    @Test
    public void testLogTimeoutAlmostReachedWarningDuringRecovery() throws Exception {
        this.clock.setEpochMilli(0L);
        this.sinkFunction.setTransactionTimeout(1000L);
        this.sinkFunction.enableTransactionTimeoutWarnings(0.5d);
        this.harness.open();
        OperatorSubtaskState snapshot = this.harness.snapshot(0L, 1L);
        this.clock.setEpochMilli(502L);
        closeTestHarness();
        setUpTestHarness();
        this.sinkFunction.setTransactionTimeout(1000L);
        this.sinkFunction.enableTransactionTimeoutWarnings(0.5d);
        this.harness.initializeState(snapshot);
        this.harness.open();
        List list = (List) this.loggingEvents.stream().map((v0) -> {
            return v0.getRenderedMessage();
        }).collect(Collectors.toList());
        closeTestHarness();
        Assert.assertThat(list, CoreMatchers.hasItem(CoreMatchers.containsString("has been open for 502 ms. This is close to or even exceeding the transaction timeout of 1000 ms.")));
    }

    private void assertExactlyOnce(List<String> list) throws IOException {
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = this.targetDirectory.listFiles().iterator();
        while (it.hasNext()) {
            arrayList.addAll(this.targetDirectory.read(it.next()));
        }
        Collections.sort(arrayList);
        Collections.sort(list);
        Assert.assertEquals(list, arrayList);
    }
}
