package org.apache.omid.tso;

import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.metrics.NullMetricsProvider;
import org.apache.omid.tso.ReplyProcessorImpl;
import org.apache.phoenix.shaded.com.lmax.disruptor.BlockingWaitStrategy;
import org.apache.phoenix.shaded.io.netty.channel.Channel;
import org.apache.phoenix.shaded.org.apache.commons.pool2.ObjectPool;
import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.SettableFuture;
import org.mockito.InOrder;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/omid/tso/TestReplyProcessor.class */
public class TestReplyProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(TestReplyProcessor.class);
    private static final long ANY_DISRUPTOR_SEQUENCE = 1234;
    public static final int BATCH_POOL_SIZE = 3;
    private static final long FIRST_ST = 0;
    private static final long FIRST_CT = 1;
    private static final long SECOND_ST = 2;
    private static final long SECOND_CT = 3;
    private static final long THIRD_ST = 4;
    private static final long THIRD_CT = 5;
    private static final long FOURTH_ST = 6;
    private static final long FOURTH_CT = 7;
    private static final long FIFTH_ST = 8;
    private static final long FIFTH_CT = 9;
    private static final long SIXTH_ST = 10;
    private static final long SIXTH_CT = 11;

    @Mock
    private Panicker panicker;

    @Mock
    private MonitoringContextImpl monCtx;
    private MetricsRegistry metrics;
    private ObjectPool<Batch> batchPool;
    private ReplyProcessorImpl replyProcessor;
    private LowWatermarkWriter lowWatermarkWriter;

    @BeforeMethod(alwaysRun = true, timeOut = 30000)
    public void initMocksAndComponents() throws Exception {
        MockitoAnnotations.initMocks(this);
        TSOServerConfig tSOServerConfig = new TSOServerConfig();
        tSOServerConfig.setNumConcurrentCTWriters(3);
        this.metrics = new NullMetricsProvider();
        this.batchPool = (ObjectPool) Mockito.spy(new BatchPoolModule(tSOServerConfig).getBatchPool());
        this.lowWatermarkWriter = (LowWatermarkWriter) Mockito.mock(LowWatermarkWriter.class);
        SettableFuture create = SettableFuture.create();
        create.set(null);
        ((LowWatermarkWriter) Mockito.doReturn(create).when(this.lowWatermarkWriter)).persistLowWatermark(((Long) Matchers.any(Long.class)).longValue());
        this.replyProcessor = (ReplyProcessorImpl) Mockito.spy(new ReplyProcessorImpl(new BlockingWaitStrategy(), this.metrics, this.panicker, this.batchPool, this.lowWatermarkWriter));
    }

    @AfterMethod
    void afterMethod() {
    }

    @Test(timeOut = 10000)
    public void testBadFormedPackageThrowsException() throws Exception {
        this.replyProcessor = (ReplyProcessorImpl) Mockito.spy(new ReplyProcessorImpl(new BlockingWaitStrategy(), this.metrics, new RuntimeExceptionPanicker(), this.batchPool, this.lowWatermarkWriter));
        Batch borrowObject = this.batchPool.borrowObject();
        borrowObject.addCommitRetry(0L, null, this.monCtx);
        ReplyProcessorImpl.ReplyBatchEvent newInstance = ReplyProcessorImpl.ReplyBatchEvent.EVENT_FACTORY.newInstance();
        ReplyProcessorImpl.ReplyBatchEvent.makeReplyBatch(newInstance, borrowObject, 0L);
        Assert.assertEquals(this.replyProcessor.nextIDToHandle.get(), 0L);
        Assert.assertEquals(this.replyProcessor.futureEvents.size(), 0);
        Assert.assertEquals(this.batchPool.getNumActive(), 1);
        Assert.assertEquals(this.batchPool.getNumIdle(), 2);
        try {
            this.replyProcessor.onEvent(newInstance, ANY_DISRUPTOR_SEQUENCE, false);
            Assert.fail();
        } catch (RuntimeException e) {
        }
        Assert.assertEquals(this.replyProcessor.nextIDToHandle.get(), 0L);
        Assert.assertEquals(this.replyProcessor.futureEvents.size(), 0);
        Assert.assertEquals(this.batchPool.getNumActive(), 1);
        Assert.assertEquals(this.batchPool.getNumIdle(), 2);
    }

    @Test(timeOut = 10000)
    public void testUnorderedBatchSequenceGetsSaved() throws Exception {
        Batch borrowObject = this.batchPool.borrowObject();
        ReplyProcessorImpl.ReplyBatchEvent newInstance = ReplyProcessorImpl.ReplyBatchEvent.EVENT_FACTORY.newInstance();
        ReplyProcessorImpl.ReplyBatchEvent.makeReplyBatch(newInstance, borrowObject, ANY_DISRUPTOR_SEQUENCE);
        Assert.assertEquals(this.replyProcessor.nextIDToHandle.get(), 0L);
        Assert.assertEquals(this.replyProcessor.futureEvents.size(), 0);
        Assert.assertEquals(this.batchPool.getNumActive(), 1);
        Assert.assertEquals(this.batchPool.getNumIdle(), 2);
        this.replyProcessor.onEvent(newInstance, ANY_DISRUPTOR_SEQUENCE, false);
        Assert.assertEquals(this.replyProcessor.nextIDToHandle.get(), 0L);
        Assert.assertEquals(this.replyProcessor.futureEvents.size(), 1);
        Assert.assertEquals(this.batchPool.getNumActive(), 1);
        Assert.assertEquals(this.batchPool.getNumIdle(), 2);
        Assert.assertTrue(borrowObject.isEmpty());
        ((ReplyProcessorImpl) Mockito.verify(this.replyProcessor, Mockito.times(0))).handleReplyBatchEvent((ReplyProcessorImpl.ReplyBatchEvent) Matchers.any(ReplyProcessorImpl.ReplyBatchEvent.class));
    }

    @Test(timeOut = 10000)
    public void testProcessingOfEmptyBatchReplyEvent() throws Exception {
        Batch borrowObject = this.batchPool.borrowObject();
        ReplyProcessorImpl.ReplyBatchEvent newInstance = ReplyProcessorImpl.ReplyBatchEvent.EVENT_FACTORY.newInstance();
        ReplyProcessorImpl.ReplyBatchEvent.makeReplyBatch(newInstance, borrowObject, 0L);
        Assert.assertEquals(this.replyProcessor.nextIDToHandle.get(), 0L);
        Assert.assertEquals(this.replyProcessor.futureEvents.size(), 0);
        Assert.assertEquals(this.batchPool.getNumActive(), 1);
        Assert.assertEquals(this.batchPool.getNumIdle(), 2);
        this.replyProcessor.onEvent(newInstance, ANY_DISRUPTOR_SEQUENCE, false);
        Assert.assertEquals(this.replyProcessor.nextIDToHandle.get(), 1L);
        Assert.assertEquals(this.replyProcessor.futureEvents.size(), 0);
        Assert.assertEquals(this.batchPool.getNumActive(), 0);
        Assert.assertEquals(this.batchPool.getNumIdle(), 3);
        Assert.assertTrue(borrowObject.isEmpty());
        ((ReplyProcessorImpl) Mockito.verify(this.replyProcessor, Mockito.times(1))).handleReplyBatchEvent((ReplyProcessorImpl.ReplyBatchEvent) Matchers.eq(newInstance));
    }

    @Test(timeOut = 10000)
    public void testUnorderedSequenceOfBatchReplyEventsThatMustBeOrderedBeforeSendingReplies() throws Exception {
        Batch borrowObject = this.batchPool.borrowObject();
        borrowObject.addTimestamp(0L, (Channel) Mockito.mock(Channel.class), this.monCtx);
        borrowObject.addCommit(2L, 3L, (Channel) Mockito.mock(Channel.class), this.monCtx, Optional.absent());
        ReplyProcessorImpl.ReplyBatchEvent newInstance = ReplyProcessorImpl.ReplyBatchEvent.EVENT_FACTORY.newInstance();
        ReplyProcessorImpl.ReplyBatchEvent.makeReplyBatch(newInstance, borrowObject, 2L);
        Assert.assertEquals(this.replyProcessor.nextIDToHandle.get(), 0L);
        Assert.assertEquals(this.replyProcessor.futureEvents.size(), 0);
        Assert.assertEquals(this.batchPool.getNumActive(), 1);
        Assert.assertEquals(this.batchPool.getNumIdle(), 2);
        this.replyProcessor.onEvent(newInstance, ANY_DISRUPTOR_SEQUENCE, false);
        Assert.assertEquals(this.replyProcessor.nextIDToHandle.get(), 0L);
        Assert.assertEquals(this.replyProcessor.futureEvents.size(), 1);
        Assert.assertEquals(this.batchPool.getNumActive(), 1);
        Assert.assertEquals(this.batchPool.getNumIdle(), 2);
        Assert.assertFalse(borrowObject.isEmpty());
        ((ReplyProcessorImpl) Mockito.verify(this.replyProcessor, Mockito.never())).handleReplyBatchEvent((ReplyProcessorImpl.ReplyBatchEvent) Matchers.eq(newInstance));
        Batch borrowObject2 = this.batchPool.borrowObject();
        borrowObject2.addTimestamp(4L, (Channel) Mockito.mock(Channel.class), this.monCtx);
        borrowObject2.addCommit(FOURTH_ST, 7L, (Channel) Mockito.mock(Channel.class), this.monCtx, Optional.absent());
        ReplyProcessorImpl.ReplyBatchEvent newInstance2 = ReplyProcessorImpl.ReplyBatchEvent.EVENT_FACTORY.newInstance();
        ReplyProcessorImpl.ReplyBatchEvent.makeReplyBatch(newInstance2, borrowObject2, 1L);
        this.replyProcessor.onEvent(newInstance2, ANY_DISRUPTOR_SEQUENCE, false);
        Assert.assertEquals(this.replyProcessor.nextIDToHandle.get(), 0L);
        Assert.assertEquals(this.replyProcessor.futureEvents.size(), 2);
        Assert.assertEquals(this.batchPool.getNumActive(), 2);
        Assert.assertEquals(this.batchPool.getNumIdle(), 1);
        Assert.assertFalse(borrowObject2.isEmpty());
        Assert.assertFalse(borrowObject.isEmpty());
        Batch borrowObject3 = this.batchPool.borrowObject();
        borrowObject3.addAbort(8L, (Channel) Mockito.mock(Channel.class), this.monCtx);
        ReplyProcessorImpl.ReplyBatchEvent newInstance3 = ReplyProcessorImpl.ReplyBatchEvent.EVENT_FACTORY.newInstance();
        ReplyProcessorImpl.ReplyBatchEvent.makeReplyBatch(newInstance3, borrowObject3, 0L);
        this.replyProcessor.onEvent(newInstance3, ANY_DISRUPTOR_SEQUENCE, false);
        Assert.assertEquals(this.replyProcessor.nextIDToHandle.get(), 3L);
        Assert.assertEquals(this.replyProcessor.futureEvents.size(), 0);
        Assert.assertEquals(this.batchPool.getNumActive(), 0);
        Assert.assertEquals(this.batchPool.getNumIdle(), 3);
        Assert.assertTrue(borrowObject3.isEmpty());
        Assert.assertTrue(borrowObject2.isEmpty());
        Assert.assertTrue(borrowObject.isEmpty());
        InOrder inOrder = Mockito.inOrder(new Object[]{this.replyProcessor, this.replyProcessor, this.replyProcessor});
        ((ReplyProcessorImpl) inOrder.verify(this.replyProcessor, Mockito.times(1))).handleReplyBatchEvent((ReplyProcessorImpl.ReplyBatchEvent) Matchers.eq(newInstance3));
        ((ReplyProcessorImpl) inOrder.verify(this.replyProcessor, Mockito.times(1))).handleReplyBatchEvent((ReplyProcessorImpl.ReplyBatchEvent) Matchers.eq(newInstance2));
        ((ReplyProcessorImpl) inOrder.verify(this.replyProcessor, Mockito.times(1))).handleReplyBatchEvent((ReplyProcessorImpl.ReplyBatchEvent) Matchers.eq(newInstance));
        InOrder inOrder2 = Mockito.inOrder(new Object[]{this.replyProcessor, this.replyProcessor, this.replyProcessor, this.replyProcessor, this.replyProcessor});
        ((ReplyProcessorImpl) inOrder2.verify(this.replyProcessor, Mockito.times(1))).sendAbortResponse(Matchers.eq(8L), (Channel) Matchers.any(Channel.class), (MonitoringContext) Matchers.eq(this.monCtx));
        ((ReplyProcessorImpl) inOrder2.verify(this.replyProcessor, Mockito.times(1))).sendTimestampResponse(Matchers.eq(4L), (Channel) Matchers.any(Channel.class), (MonitoringContext) Matchers.eq(this.monCtx));
        ((ReplyProcessorImpl) inOrder2.verify(this.replyProcessor, Mockito.times(1))).sendCommitResponse(Matchers.eq(FOURTH_ST), Matchers.eq(7L), (Channel) Matchers.any(Channel.class), (MonitoringContext) Matchers.eq(this.monCtx), (Optional) Matchers.any(Optional.class));
        ((ReplyProcessorImpl) inOrder2.verify(this.replyProcessor, Mockito.times(1))).sendTimestampResponse(Matchers.eq(0L), (Channel) Matchers.any(Channel.class), (MonitoringContext) Matchers.eq(this.monCtx));
        ((ReplyProcessorImpl) inOrder2.verify(this.replyProcessor, Mockito.times(1))).sendCommitResponse(Matchers.eq(2L), Matchers.eq(3L), (Channel) Matchers.any(Channel.class), (MonitoringContext) Matchers.eq(this.monCtx), (Optional) Matchers.any(Optional.class));
    }

    @Test
    public void testUpdateLowWaterMarkOnlyForMaxInBatch() throws Exception {
        Batch borrowObject = this.batchPool.borrowObject();
        borrowObject.addTimestamp(0L, (Channel) Mockito.mock(Channel.class), this.monCtx);
        borrowObject.addCommit(2L, 3L, (Channel) Mockito.mock(Channel.class), this.monCtx, Optional.of(100L));
        borrowObject.addCommit(4L, 5L, (Channel) Mockito.mock(Channel.class), this.monCtx, Optional.of(50L));
        borrowObject.addCommit(FOURTH_ST, 7L, (Channel) Mockito.mock(Channel.class), this.monCtx, Optional.absent());
        borrowObject.addCommit(8L, 9L, (Channel) Mockito.mock(Channel.class), this.monCtx, Optional.of(100L));
        borrowObject.addCommit(SIXTH_ST, SIXTH_CT, (Channel) Mockito.mock(Channel.class), this.monCtx, Optional.of(150L));
        ReplyProcessorImpl.ReplyBatchEvent newInstance = ReplyProcessorImpl.ReplyBatchEvent.EVENT_FACTORY.newInstance();
        ReplyProcessorImpl.ReplyBatchEvent.makeReplyBatch(newInstance, borrowObject, 0L);
        this.replyProcessor.onEvent(newInstance, ANY_DISRUPTOR_SEQUENCE, false);
        InOrder inOrder = Mockito.inOrder(new Object[]{this.lowWatermarkWriter, this.lowWatermarkWriter, this.lowWatermarkWriter});
        ((LowWatermarkWriter) inOrder.verify(this.lowWatermarkWriter, Mockito.times(1))).persistLowWatermark(Matchers.eq(100L));
        ((LowWatermarkWriter) inOrder.verify(this.lowWatermarkWriter, Mockito.times(1))).persistLowWatermark(Matchers.eq(150L));
        ((LowWatermarkWriter) Mockito.verify(this.lowWatermarkWriter, Mockito.timeout(100L).times(0))).persistLowWatermark(Matchers.eq(50L));
        InOrder inOrder2 = Mockito.inOrder(new Object[]{this.replyProcessor, this.replyProcessor, this.replyProcessor, this.replyProcessor, this.replyProcessor});
        ((ReplyProcessorImpl) inOrder2.verify(this.replyProcessor, Mockito.times(1))).updateLowWatermark(Optional.of(100L));
        ((ReplyProcessorImpl) inOrder2.verify(this.replyProcessor, Mockito.times(1))).updateLowWatermark(Optional.of(50L));
        ((ReplyProcessorImpl) inOrder2.verify(this.replyProcessor, Mockito.times(1))).updateLowWatermark(Optional.absent());
        ((ReplyProcessorImpl) inOrder2.verify(this.replyProcessor, Mockito.times(1))).updateLowWatermark(Optional.of(100L));
        ((ReplyProcessorImpl) inOrder2.verify(this.replyProcessor, Mockito.times(1))).updateLowWatermark(Optional.of(150L));
    }
}
