package org.apache.omid.tso;

import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ThreadFactory;
import org.apache.omid.proto.TSOProto;
import org.apache.phoenix.shaded.io.netty.bootstrap.ServerBootstrap;
import org.apache.phoenix.shaded.io.netty.channel.Channel;
import org.apache.phoenix.shaded.io.netty.channel.ChannelHandler;
import org.apache.phoenix.shaded.io.netty.channel.ChannelHandlerContext;
import org.apache.phoenix.shaded.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.phoenix.shaded.io.netty.channel.ChannelInitializer;
import org.apache.phoenix.shaded.io.netty.channel.ChannelPipeline;
import org.apache.phoenix.shaded.io.netty.channel.group.ChannelGroup;
import org.apache.phoenix.shaded.io.netty.channel.group.DefaultChannelGroup;
import org.apache.phoenix.shaded.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.phoenix.shaded.io.netty.channel.socket.SocketChannel;
import org.apache.phoenix.shaded.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.phoenix.shaded.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.phoenix.shaded.io.netty.handler.codec.LengthFieldPrepender;
import org.apache.phoenix.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
import org.apache.phoenix.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder;
import org.apache.phoenix.shaded.io.netty.util.AttributeKey;
import org.apache.phoenix.shaded.io.netty.util.concurrent.GlobalEventExecutor;
import org.apache.phoenix.shaded.javax.inject.Inject;
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
/* loaded from: input_file:org/apache/omid/tso/ProgrammableTSOServer.class */
public class ProgrammableTSOServer extends ChannelInboundHandlerAdapter {
    private ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private Queue<Response> responseQueue = new LinkedList();
    private static final Logger LOG = LoggerFactory.getLogger(ProgrammableTSOServer.class);
    private static final AttributeKey<TSOChannelContext> TSO_CTX = AttributeKey.valueOf("TSO_CTX");

    /* loaded from: input_file:org/apache/omid/tso/ProgrammableTSOServer$AbortResponse.class */
    public static class AbortResponse extends Response {
        final long startTS;

        public AbortResponse(long j) {
            super(Response.ResponseType.ABORT);
            this.startTS = j;
        }
    }

    /* loaded from: input_file:org/apache/omid/tso/ProgrammableTSOServer$CommitResponse.class */
    public static class CommitResponse extends Response {
        final long startTS;
        final long commitTS;

        public CommitResponse(long j, long j2) {
            super(Response.ResponseType.COMMIT);
            this.startTS = j;
            this.commitTS = j2;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/omid/tso/ProgrammableTSOServer$Response.class */
    public static abstract class Response {
        final ResponseType type;

        /* loaded from: input_file:org/apache/omid/tso/ProgrammableTSOServer$Response$ResponseType.class */
        enum ResponseType {
            TIMESTAMP,
            COMMIT,
            ABORT
        }

        public Response(ResponseType responseType) {
            this.type = responseType;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/omid/tso/ProgrammableTSOServer$TSOChannelContext.class */
    public static class TSOChannelContext {
        boolean handshakeComplete = false;

        TSOChannelContext() {
        }

        boolean getHandshakeComplete() {
            return this.handshakeComplete;
        }

        void setHandshakeComplete() {
            this.handshakeComplete = true;
        }
    }

    /* loaded from: input_file:org/apache/omid/tso/ProgrammableTSOServer$TimestampResponse.class */
    public static class TimestampResponse extends Response {
        final long startTS;

        public TimestampResponse(long j) {
            super(Response.ResponseType.TIMESTAMP);
            this.startTS = j;
        }
    }

    /* JADX WARN: Type inference failed for: r0v25, types: [org.apache.phoenix.shaded.io.netty.channel.ChannelFuture] */
    @Inject
    public ProgrammableTSOServer(int i) {
        int availableProcessors = ((Runtime.getRuntime().availableProcessors() * 2) + 1) * 2;
        ThreadFactory build = new ThreadFactoryBuilder().setNameFormat("tsoserver-boss-%d").build();
        NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(availableProcessors, new ThreadFactoryBuilder().setNameFormat("tsoserver-worker-%d").build());
        NioEventLoopGroup nioEventLoopGroup2 = new NioEventLoopGroup(build);
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(nioEventLoopGroup2, nioEventLoopGroup);
        serverBootstrap.channel(NioServerSocketChannel.class);
        serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { // from class: org.apache.omid.tso.ProgrammableTSOServer.1
            @Override // org.apache.phoenix.shaded.io.netty.channel.ChannelInitializer
            public void initChannel(SocketChannel socketChannel) throws Exception {
                ChannelPipeline pipeline = socketChannel.pipeline();
                pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(10485760, 0, 4, 0, 4));
                pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
                pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Request.getDefaultInstance()));
                pipeline.addLast("protobufencoder", new ProtobufEncoder());
                pipeline.addLast("handler", ProgrammableTSOServer.this);
            }
        });
        this.allChannels.add(serverBootstrap.bind(new InetSocketAddress(i)).syncUninterruptibly2().channel());
        LOG.info("********** Dumb TSO Server running on port {} **********", Integer.valueOf(i));
    }

    public void queueResponse(Response response) {
        this.responseQueue.add(response);
    }

    public void cleanResponses() {
        this.responseQueue.clear();
    }

    @Override // org.apache.phoenix.shaded.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.phoenix.shaded.io.netty.channel.ChannelInboundHandler
    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        this.allChannels.add(channelHandlerContext.channel());
    }

    @Override // org.apache.phoenix.shaded.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.phoenix.shaded.io.netty.channel.ChannelInboundHandler
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        if (!(obj instanceof TSOProto.Request)) {
            LOG.error("Unknown message type", obj);
            return;
        }
        TSOProto.Request request = (TSOProto.Request) obj;
        Channel channel = channelHandlerContext.channel();
        if (request.hasHandshakeRequest()) {
            checkHandshake(channelHandlerContext, request.getHandshakeRequest());
            return;
        }
        if (!handshakeCompleted(channelHandlerContext)) {
            LOG.info("handshake not completed");
            channel.close();
        }
        Response poll = this.responseQueue.poll();
        if (request.hasTimestampRequest()) {
            if (poll == null || poll.type != Response.ResponseType.TIMESTAMP) {
                throw new IllegalStateException("Expecting TS response to send but got " + poll);
            }
            sendTimestampResponse(((TimestampResponse) poll).startTS, channel);
            return;
        }
        if (!request.hasCommitRequest()) {
            LOG.error("Invalid request {}", request);
            channelHandlerContext.channel().close();
        } else {
            if (poll == null) {
                throw new IllegalStateException("Expecting COMMIT response to send but got null");
            }
            switch (poll.type) {
                case COMMIT:
                    CommitResponse commitResponse = (CommitResponse) poll;
                    sendCommitResponse(commitResponse.startTS, commitResponse.commitTS, channel);
                    return;
                case ABORT:
                    sendAbortResponse(((AbortResponse) poll).startTS, channel);
                    return;
                default:
                    throw new IllegalStateException("Expecting COMMIT response to send but got " + poll.type);
            }
        }
    }

    @Override // org.apache.phoenix.shaded.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.phoenix.shaded.io.netty.channel.ChannelHandlerAdapter, org.apache.phoenix.shaded.io.netty.channel.ChannelHandler, org.apache.phoenix.shaded.io.netty.channel.ChannelInboundHandler
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        if (th instanceof ClosedChannelException) {
            return;
        }
        LOG.warn("TSOHandler: Unexpected exception.", th);
        channelHandlerContext.channel().close();
    }

    private void checkHandshake(ChannelHandlerContext channelHandlerContext, TSOProto.HandshakeRequest handshakeRequest) {
        TSOProto.HandshakeResponse.Builder newBuilder = TSOProto.HandshakeResponse.newBuilder();
        if (handshakeRequest.hasClientCapabilities()) {
            newBuilder.setClientCompatible(true).setServerCapabilities(TSOProto.Capabilities.newBuilder().build());
            TSOChannelContext tSOChannelContext = new TSOChannelContext();
            tSOChannelContext.setHandshakeComplete();
            channelHandlerContext.channel().attr(TSO_CTX).set(tSOChannelContext);
        } else {
            newBuilder.setClientCompatible(false);
        }
        channelHandlerContext.channel().writeAndFlush(TSOProto.Response.newBuilder().setHandshakeResponse(newBuilder.build()).build());
    }

    private boolean handshakeCompleted(ChannelHandlerContext channelHandlerContext) {
        Object obj = channelHandlerContext.channel().attr(TSO_CTX).get();
        if (obj instanceof TSOChannelContext) {
            return ((TSOChannelContext) obj).getHandshakeComplete();
        }
        return false;
    }

    private void sendTimestampResponse(long j, Channel channel) {
        TSOProto.Response.Builder newBuilder = TSOProto.Response.newBuilder();
        TSOProto.TimestampResponse.Builder newBuilder2 = TSOProto.TimestampResponse.newBuilder();
        newBuilder2.setStartTimestamp(j);
        newBuilder.setTimestampResponse(newBuilder2.build());
        channel.writeAndFlush(newBuilder.build());
    }

    private void sendCommitResponse(long j, long j2, Channel channel) {
        TSOProto.Response.Builder newBuilder = TSOProto.Response.newBuilder();
        TSOProto.CommitResponse.Builder newBuilder2 = TSOProto.CommitResponse.newBuilder();
        newBuilder2.setAborted(false).setStartTimestamp(j).setCommitTimestamp(j2);
        newBuilder.setCommitResponse(newBuilder2.build());
        channel.writeAndFlush(newBuilder.build());
    }

    private void sendAbortResponse(long j, Channel channel) {
        TSOProto.Response.Builder newBuilder = TSOProto.Response.newBuilder();
        TSOProto.CommitResponse.Builder newBuilder2 = TSOProto.CommitResponse.newBuilder();
        newBuilder2.setAborted(true).setStartTimestamp(j);
        newBuilder.setCommitResponse(newBuilder2.build());
        channel.writeAndFlush(newBuilder.build());
    }
}
