package com.taobao.txc.rpc.impl;

import com.taobao.txc.common.LoggerInit;
import com.taobao.txc.common.LoggerWrap;
import com.taobao.txc.common.NetUtil;
import com.taobao.txc.common.TxcConstants;
import com.taobao.txc.common.TxcXID;
import com.taobao.txc.common.config.DiamondUtil;
import com.taobao.txc.common.config.TxcConfigHolder;
import com.taobao.txc.common.exception.TxcErrCode;
import com.taobao.txc.common.exception.TxcException;
import com.taobao.txc.rpc.api.ConnectionEvent;
import com.taobao.txc.rpc.api.ConnectionEventListener;
import com.taobao.txc.rpc.api.ServerMessageListener;
import com.taobao.txc.rpc.api.TxcServerMessageSender;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang.StringUtils;

@ChannelHandler.Sharable
/* loaded from: input_file:com/taobao/txc/rpc/impl/RpcServer.class */
public abstract class RpcServer extends RpcEndpoint implements TxcServerMessageSender {
    private static final LoggerWrap logger = LoggerInit.logger;
    protected int port;
    protected ServerMessageListener serverMessageListener;
    protected ConnectionEventListener connectionEventListener;
    protected ConcurrentHashMap<String, String> rmIpAndPortToClientAppName;
    public static int mid;
    private NioEventLoopGroup bossGroup;
    private NioEventLoopGroup workerGroup;
    protected ConcurrentHashMap<String, String> ipAndPortToClientAppName;
    protected ConcurrentHashMap<String, Channel> ipAndClientAppNameToChannelMap;
    protected ConcurrentHashMap<String, String> ipAndPortToDbKey;
    protected ConcurrentHashMap<String, Map<String, Map<Integer, Channel>>> dbKeyToChannelMap;
    protected ConcurrentHashMap<String, Map<String, Map<Integer, Channel>>> dbKeyToRtChannelMap;
    protected ConcurrentHashMap<String, String> ipAndPortToVgroupName;
    protected ConcurrentHashMap<String, Long> ipInactiveRecords;
    protected volatile long lastInactiveMills;
    private int backlog;
    public static final int LOOP_CAPACITY = 3;
    public static final int BKUP_MID_DIFF = 100;
    private Object channelLock1;
    private Object channelLock2;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/taobao/txc/rpc/impl/RpcServer$TargetHolder.class */
    public static class TargetHolder {
        String targetIp;
        Integer targetKey;

        private TargetHolder() {
            this.targetIp = null;
            this.targetKey = null;
        }
    }

    public RpcServer(ThreadPoolExecutor threadPoolExecutor) {
        super(threadPoolExecutor);
        this.rmIpAndPortToClientAppName = new ConcurrentHashMap<>();
        this.bossGroup = new NioEventLoopGroup();
        this.workerGroup = new NioEventLoopGroup();
        this.ipAndPortToClientAppName = new ConcurrentHashMap<>();
        this.ipAndClientAppNameToChannelMap = new ConcurrentHashMap<>();
        this.ipAndPortToDbKey = new ConcurrentHashMap<>();
        this.dbKeyToChannelMap = new ConcurrentHashMap<>();
        this.dbKeyToRtChannelMap = new ConcurrentHashMap<>();
        this.ipAndPortToVgroupName = new ConcurrentHashMap<>();
        this.ipInactiveRecords = new ConcurrentHashMap<>();
        this.lastInactiveMills = 0L;
        this.backlog = 0;
        this.channelLock1 = new Object();
        this.channelLock2 = new Object();
    }

    public ServerMessageListener getServerMessageListener() {
        return this.serverMessageListener;
    }

    public void setServerMessageListener(ServerMessageListener serverMessageListener) {
        this.serverMessageListener = serverMessageListener;
    }

    public ConnectionEventListener getConnectionEventListener() {
        return this.connectionEventListener;
    }

    public void setConnectionEventListener(ConnectionEventListener connectionEventListener) {
        this.connectionEventListener = connectionEventListener;
    }

    public void setBacklog(int i) {
        if (i > 0) {
            this.backlog = i;
        }
    }

    public static int getPeerBkupMid() {
        int i = mid - 1;
        if (i == 0) {
            i = 3;
        }
        return i + 100;
    }

    public static int getPeerMid() {
        int i = mid - 1;
        if (i == 0) {
            i = 3;
        }
        return i;
    }

    public static int getBkupMid() {
        return mid + 100;
    }

    @Override // com.taobao.txc.rpc.impl.RpcEndpoint, com.taobao.txc.resourcemanager.IRmRpcClient
    public void init() {
        super.init();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(this.bossGroup, this.workerGroup).channel(NioServerSocketChannel.class).childOption(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_KEEPALIVE, true).childHandler(new ChannelInitializer<SocketChannel>() { // from class: com.taobao.txc.rpc.impl.RpcServer.1
            public void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(15, 0, 0)}).addLast(new ChannelHandler[]{new TxcMessageCodec(), RpcServer.this});
            }
        });
        if (this.backlog > 0) {
            logger.info("set server backlog: {}", Integer.valueOf(this.backlog));
            serverBootstrap.option(ChannelOption.SO_BACKLOG, Integer.valueOf(this.backlog));
        }
        TxcXID.setIpAddress(NetUtil.getLocalIp());
        TxcXID.setPort(this.port);
        String str = NetUtil.getLocalIp() + ":" + this.port;
        try {
            String config = TxcConfigHolder.getInstance().getConfig(String.format("com.taobao.txc.server.group.%s.listenall", getGroup()), "TXC_GROUP");
            if (!StringUtils.isEmpty(config) && (config.equals("1") || config.equalsIgnoreCase("true"))) {
                str = "0.0.0.0:" + this.port;
            }
        } catch (Exception e) {
            logger.warn(String.format("listen on internal network only!", new Object[0]));
        }
        try {
            serverBootstrap.bind(NetUtil.toInetSocketAddress(str)).sync();
            logger.info("txc server begin to listen address:" + str);
        } catch (InterruptedException e2) {
            throw new RuntimeException("txc server can not bind to local address:" + str);
        }
    }

    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if ((obj instanceof IdleStateEvent) && ((IdleStateEvent) obj).state() == IdleState.READER_IDLE) {
            logger.info("channel" + channelHandlerContext.channel() + " read idle.");
            handleDisconnect(channelHandlerContext, false);
            try {
                channelHandlerContext.disconnect();
                channelHandlerContext.close();
                TxcConstants.removeChannelVersion(channelHandlerContext.channel());
            } catch (Exception e) {
                logger.info(e.getMessage());
            }
        }
    }

    @Override // com.taobao.txc.rpc.impl.RpcEndpoint
    public void destroy() {
        this.addressManager.unpublish(getGroup(), NetUtil.getLocalIp() + ":" + this.port);
        super.destroy();
        logger.info("destoy rpcserver");
        this.bossGroup.shutdownGracefully();
        this.workerGroup.shutdownGracefully();
    }

    public Channel getChannel(Map<String, Map<Integer, Channel>> map, String str, String str2) {
        return getChannel(map, str, str2, null);
    }

    /* JADX WARN: Finally extract failed */
    public Channel getChannel(Map<String, Map<Integer, Channel>> map, String str, String str2, TargetHolder targetHolder) {
        Map<Integer, Channel> map2;
        Channel channel = null;
        Integer num = 0;
        String str3 = DiamondUtil.DEFAULT_TENANT_ID;
        if (map != null) {
            if (map.get(str) != null && map.get(str).size() > 0) {
                if (map.get(str).size() > 1 && str2 != null) {
                    Iterator<Integer> it = map.get(str).keySet().iterator();
                    while (true) {
                        if (!it.hasNext()) {
                            break;
                        }
                        Integer next = it.next();
                        if (this.rmIpAndPortToClientAppName.get(str + ":" + next).compareTo(str2) == 0) {
                            channel = map.get(str).get(next);
                            num = next;
                            str3 = str;
                            if (targetHolder != null) {
                                targetHolder.targetIp = str3;
                                targetHolder.targetKey = num;
                            }
                        }
                    }
                } else {
                    num = map.get(str).keySet().iterator().next();
                    channel = map.get(str).get(num);
                    str3 = str;
                    if (targetHolder != null) {
                        targetHolder.targetIp = str3;
                        targetHolder.targetKey = num;
                    }
                }
            }
            if (channel == null && map.size() > 0) {
                for (String str4 : map.keySet()) {
                    Map<Integer, Channel> map3 = map.get(str4);
                    if (map3.size() > 1 && str2 != null) {
                        for (Integer num2 : map3.keySet()) {
                            if (this.rmIpAndPortToClientAppName.get(str4 + ":" + num2).compareTo(str2) == 0 || channel == null) {
                                channel = map3.get(num2);
                                str3 = str4;
                                num = num2;
                                if (targetHolder != null) {
                                    targetHolder.targetIp = str3;
                                    targetHolder.targetKey = num;
                                }
                            }
                        }
                    } else if (map3.size() == 1) {
                        num = map3.keySet().iterator().next();
                        channel = map3.get(num);
                        str3 = str4;
                        if (targetHolder != null) {
                            targetHolder.targetIp = str3;
                            targetHolder.targetKey = num;
                        }
                    }
                    if (channel != null) {
                        break;
                    }
                }
            }
            if (channel != null) {
                if (channel.isActive()) {
                    if (str != null && !str.equals(str3)) {
                        logger.info("target ip is:" + str3);
                    }
                    return channel;
                }
                for (int i = 0; i < 1000; i++) {
                    try {
                        Thread.sleep(10L);
                    } catch (InterruptedException e) {
                    }
                    if (channel.isActive()) {
                        return channel;
                    }
                }
                synchronized (this.channelLock1) {
                    if (str3 != null && num != null) {
                        Map<Integer, Channel> map4 = map.get(str3);
                        if (map4 != null && map4.get(num) == null) {
                            return null;
                        }
                    }
                    try {
                        try {
                            logger.warn("channel " + channel + " is not active after long wait, close it.");
                            channel.disconnect();
                            channel.close();
                            TxcConstants.removeChannelVersion(channel);
                            channel = null;
                        } catch (Throwable th) {
                            throw th;
                        }
                    } catch (Exception e2) {
                        logger.error(DiamondUtil.DEFAULT_TENANT_ID, "failed to close channle", e2);
                        channel = null;
                    }
                    if (str3 != null && num != null && (map2 = map.get(str3)) != null) {
                        map2.remove(num);
                        if (map2.size() == 0) {
                            synchronized (map) {
                                Map<Integer, Channel> map5 = map.get(str3);
                                if (map5 != null && map5.size() == 0) {
                                    map.remove(str3);
                                }
                            }
                        }
                    }
                }
            }
        }
        return channel;
    }

    private Channel getRMResponseChannel(String str, String str2, String str3, TargetHolder targetHolder) {
        Map<String, Map<Integer, Channel>> map = this.dbKeyToChannelMap.get(str);
        if (map == null || map.isEmpty()) {
            logger.info("No channel for RM dbKey: " + str);
            return null;
        }
        Map<Integer, Channel> map2 = map.get(str2);
        if (map2 == null || map2.isEmpty()) {
            logger.info("No channel for RM client: " + str2 + ", dbKey: " + str);
            return null;
        }
        Channel channel = null;
        for (Map.Entry<Integer, Channel> entry : map2.entrySet()) {
            Integer key = entry.getKey();
            Channel value = entry.getValue();
            if (str3 == null) {
                channel = value;
            } else if (str3.equals(this.rmIpAndPortToClientAppName.get(str2 + ":" + key))) {
                channel = value;
            }
            if (channel != null) {
                if (channel.isActive()) {
                    break;
                }
                logger.info("skip inactive RM channel " + channel);
                channel = null;
            }
        }
        return channel;
    }

    @Override // com.taobao.txc.rpc.api.TxcServerMessageSender
    public void sendRtRequest(String str, String str2, String str3, Object obj) {
        Channel channel = null;
        if (str != null) {
            channel = getChannel(this.dbKeyToRtChannelMap.get(str), str2, str3);
        }
        if (channel == null) {
            throw new RuntimeException("client is not connected, client ip:" + str2 + ",client app name:" + str3);
        }
        super.sendRequest(channel, obj);
    }

    private Channel getChannel(String str, String str2) {
        Channel channel = this.ipAndClientAppNameToChannelMap.get(str + str2);
        if (channel == null) {
            return null;
        }
        if (channel.isActive()) {
            return channel;
        }
        for (int i = 0; i < 100; i++) {
            try {
                Thread.sleep(10L);
            } catch (InterruptedException e) {
            }
            if (channel.isActive()) {
                return channel;
            }
        }
        synchronized (this.channelLock2) {
            Channel remove = this.ipAndClientAppNameToChannelMap.remove(str + str2);
            if (remove == null) {
                return null;
            }
            if (remove.isActive()) {
                this.ipAndClientAppNameToChannelMap.putIfAbsent(str + str2, remove);
                return remove;
            }
            try {
                logger.warn("channel " + remove + " is not active after long wait, close it.");
                remove.disconnect();
                remove.close();
                TxcConstants.removeChannelVersion(remove);
            } catch (Exception e2) {
            }
            return null;
        }
    }

    @Override // com.taobao.txc.rpc.api.TxcServerMessageSender
    public void sendRequest(String str, String str2, String str3, Object obj) {
        Channel channel;
        Map<Integer, Channel> map;
        Map<String, Map<Integer, Channel>> map2 = null;
        TargetHolder targetHolder = new TargetHolder();
        if (str != null) {
            map2 = this.dbKeyToChannelMap.get(str);
            channel = getChannel(map2, str2, str3, targetHolder);
        } else {
            if (str2 == null) {
                throw new RuntimeException("client is not connected, client ip:" + str2 + ",client app name:" + str3);
            }
            channel = getChannel(str2, str3);
        }
        if (channel == null) {
            throw new TxcException("rm client is not connected. dbkey:" + str + ",clientIp:" + str2 + ",chMap:" + map2);
        }
        try {
            super.sendRequest(channel, obj);
        } catch (TxcException e) {
            if (e.getMessage() != null && e.getMessage().contains(TxcConstants.CHANNEL_NOT_WRITABLE) && str != null && targetHolder.targetIp != null && targetHolder.targetKey != null && (map = map2.get(targetHolder.targetIp)) != null) {
                map.remove(targetHolder.targetKey);
                if (map.size() == 0) {
                    synchronized (map2) {
                        Map<Integer, Channel> map3 = map2.get(targetHolder.targetIp);
                        if (map3 != null && map3.size() == 0) {
                            map2.remove(targetHolder.targetIp);
                        }
                    }
                }
            }
            throw e;
        }
    }

    @Override // com.taobao.txc.rpc.api.TxcServerMessageSender
    public void sendResponse(long j, String str, String str2, String str3, Object obj) {
        Channel remove;
        Map<String, Map<Integer, Channel>> map;
        Map<Integer, Channel> map2;
        if (str == null) {
            if (str3 == null || str2 == null) {
                throw new RuntimeException("dbKey is null, clientAppName is" + str3 + ",clientIp is" + str2);
            }
            Channel channel = getChannel(str2, str3);
            if (channel == null) {
                logger.info("Unable to send response back to TM since already disconnected [" + str3 + "] [" + str2 + "]");
                return;
            }
            try {
                super.sendResponse(j, channel, obj);
                return;
            } catch (TxcException e) {
                if (e.getMessage() != null && e.getMessage().contains(TxcConstants.CHANNEL_NOT_WRITABLE) && (remove = this.ipAndClientAppNameToChannelMap.remove(str2 + str3)) != null && channel.compareTo(remove) != 0) {
                    this.ipAndClientAppNameToChannelMap.putIfAbsent(str2 + str3, remove);
                }
                throw e;
            }
        }
        TargetHolder targetHolder = new TargetHolder();
        Channel rMResponseChannel = getRMResponseChannel(str, str2, str3, targetHolder);
        if (rMResponseChannel == null) {
            logger.info("Failed to send response through " + rMResponseChannel);
            logger.info("SNAPSHOT - msgId:" + j + ", dbKey:" + str + ", clientIp:" + str2 + ", clientAppName:" + str3 + ", msg:" + obj + ", rpcserver:" + this + ", dbKeyToChannelMap:" + this.dbKeyToChannelMap + ", rmIpAndPortToClientAppName:" + this.rmIpAndPortToClientAppName);
            throw new RuntimeException("RM is not connected.");
        }
        try {
            super.sendResponse(j, rMResponseChannel, obj);
        } catch (TxcException e2) {
            logger.info("Failed to send response through " + rMResponseChannel);
            logger.info("SNAPSHOT - msgId:" + j + ", dbKey:" + str + ", clientIp:" + str2 + ", clientAppName:" + str3 + ", msg:" + obj + ", rpcserver:" + this + ", dbKeyToChannelMap:" + this.dbKeyToChannelMap + ", rmIpAndPortToClientAppName:" + this.rmIpAndPortToClientAppName);
            if (e2.getMessage() != null && e2.getMessage().contains(TxcConstants.CHANNEL_NOT_WRITABLE) && (map = this.dbKeyToChannelMap.get(str)) != null && targetHolder.targetIp != null && targetHolder.targetKey != null && (map2 = map.get(targetHolder.targetIp)) != null) {
                map2.remove(targetHolder.targetKey);
                if (map2.size() == 0) {
                    synchronized (map) {
                        Map<Integer, Channel> map3 = map.get(targetHolder.targetIp);
                        if (map3 != null && map3.size() == 0) {
                            map.remove(targetHolder.targetIp);
                        }
                    }
                }
            }
            throw new RuntimeException("RM is not connected.", e2);
        }
    }

    @Override // com.taobao.txc.rpc.impl.RpcEndpoint, com.taobao.txc.rpc.api.TxcServerMessageSender
    public void sendResponse(long j, Channel channel, Object obj) {
        if (channel == null || !channel.isActive()) {
            throw new RuntimeException("channel is not active. channel:" + channel);
        }
        super.sendResponse(j, channel, obj);
    }

    @Override // com.taobao.txc.rpc.api.TxcServerMessageSender
    public Object invoke(String str, String str2, String str3, Object obj, long j) throws IOException, TimeoutException {
        Channel channel = null;
        Map<String, Map<Integer, Channel>> map = null;
        if (str != null) {
            map = this.dbKeyToChannelMap.get(str);
            channel = getChannel(map, str2, str3);
        } else if (str3 != null) {
            channel = getChannel(str2, str3);
        }
        if (channel != null) {
            return super.invoke((String) null, channel, obj, j);
        }
        throw new RuntimeException("client is not connected. dbkey:" + str + ",clientIp:" + str2 + ",chMap:" + map);
    }

    @Override // com.taobao.txc.rpc.api.TxcServerMessageSender
    public Object invoke(String str, String str2, String str3, Object obj) throws IOException, TimeoutException {
        return invoke(str, str2, str3, obj, 30000L);
    }

    @Override // com.taobao.txc.rpc.impl.RpcEndpoint
    public void dispatch(long j, ChannelHandlerContext channelHandlerContext, Object obj) {
        if (!TxcConstants.isHandledChannel(channelHandlerContext.channel())) {
            try {
                logger.warn("unknown connection. " + channelHandlerContext.channel());
                channelHandlerContext.disconnect();
                channelHandlerContext.close();
                TxcConstants.removeChannelVersion(channelHandlerContext.channel());
            } catch (Exception e) {
            }
            logger.warn(String.format("close a unhandled connection! [%s]", channelHandlerContext.channel().toString()));
            return;
        }
        String stringAddress = NetUtil.toStringAddress(channelHandlerContext.channel().remoteAddress());
        String str = this.ipAndPortToClientAppName.get(stringAddress);
        String str2 = this.ipAndPortToVgroupName.get(stringAddress);
        String str3 = this.ipAndPortToDbKey.get(stringAddress);
        if (str == null && str3 == null) {
            this.serverMessageListener.onMessage(j, channelHandlerContext.channel(), obj);
        } else {
            this.serverMessageListener.onMessage(j, str3, stringAddress.substring(0, stringAddress.indexOf(58)), str, str2, obj);
        }
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (this.messageExecutor.isShutdown()) {
            return;
        }
        String removeChannelVersion = TxcConstants.removeChannelVersion(channelHandlerContext.channel());
        if (removeChannelVersion != null) {
            logger.info("channel" + channelHandlerContext.channel() + " inactive.");
        }
        handleDisconnect(channelHandlerContext, removeChannelVersion == null);
        super.channelInactive(channelHandlerContext);
    }

    private void handleDisconnect(ChannelHandlerContext channelHandlerContext, boolean z) {
        final String stringAddress = NetUtil.toStringAddress(channelHandlerContext.channel().remoteAddress());
        final String remove = this.ipAndPortToClientAppName.remove(stringAddress);
        final String remove2 = this.ipAndPortToDbKey.remove(stringAddress);
        this.ipAndPortToVgroupName.remove(stringAddress);
        if ((remove == null || remove.isEmpty()) && (remove2 == null || remove2.isEmpty())) {
            String ipAddress = NetUtil.toIpAddress(channelHandlerContext.channel().remoteAddress());
            Long l = this.ipInactiveRecords.get(ipAddress);
            if (l == null || this.nowMills - l.longValue() > 60000) {
                this.ipInactiveRecords.put(ipAddress, Long.valueOf(this.nowMills));
                if (this.nowMills > this.lastInactiveMills) {
                    this.lastInactiveMills = this.nowMills;
                }
                if (!z) {
                    logger.info(stringAddress + " to server channel inactive.");
                }
            }
        } else {
            if (this.nowMills - this.lastInactiveMills > 120000 && !this.ipInactiveRecords.isEmpty()) {
                this.ipInactiveRecords.clear();
            }
            if (!z) {
                logger.info(stringAddress + " to server channel inactive.");
            }
        }
        if (remove != null) {
            final String substring = stringAddress.substring(0, stringAddress.indexOf(58));
            try {
                Channel channel = this.ipAndClientAppNameToChannelMap.get(substring + remove);
                if (channel != null && channel.remoteAddress().equals(channelHandlerContext.channel().remoteAddress())) {
                    Channel remove3 = this.ipAndClientAppNameToChannelMap.remove(substring + remove);
                    if (remove3 == null || remove3.remoteAddress().equals(channelHandlerContext.channel().remoteAddress())) {
                        logger.info("remove cient channel:" + remove3);
                    } else {
                        this.ipAndClientAppNameToChannelMap.putIfAbsent(substring + remove, remove3);
                    }
                }
            } catch (Exception e) {
                logger.error(DiamondUtil.DEFAULT_TENANT_ID, DiamondUtil.DEFAULT_TENANT_ID, e);
            }
            this.messageExecutor.execute(new Runnable() { // from class: com.taobao.txc.rpc.impl.RpcServer.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        ConnectionEvent connectionEvent = new ConnectionEvent();
                        connectionEvent.setClientAppName(remove);
                        connectionEvent.setClientIp(substring);
                        connectionEvent.setClientIpAndPort(stringAddress);
                        connectionEvent.setConnected(false);
                        RpcServer.this.connectionEventListener.onEvent(connectionEvent);
                    } catch (Throwable th) {
                        RpcServer.logger.error(TxcErrCode.NetOnMessage.errCode, th.getMessage(), th);
                    }
                }
            });
            return;
        }
        if (remove2 == null) {
            if (logger.isDebugEnabled()) {
                logger.debug("Unknown connection is disconnected [" + stringAddress + "]" + channelHandlerContext.channel());
                return;
            }
            return;
        }
        final String substring2 = stringAddress.substring(0, stringAddress.indexOf(58));
        int parseInt = Integer.parseInt(stringAddress.substring(stringAddress.indexOf(58) + 1));
        for (String str : remove2.split(",")) {
            Map<String, Map<Integer, Channel>> map = this.dbKeyToChannelMap.get(str);
            if (map != null && map.get(substring2) != null) {
                logger.info("remove rm channel:" + map.get(substring2).remove(Integer.valueOf(parseInt)) + " for dbkey:" + str);
            }
            Map<String, Map<Integer, Channel>> map2 = this.dbKeyToRtChannelMap.get(str);
            if (map2 != null && map2.get(substring2) != null) {
                map2.get(substring2).remove(Integer.valueOf(parseInt));
            }
        }
        this.messageExecutor.execute(new Runnable() { // from class: com.taobao.txc.rpc.impl.RpcServer.3
            @Override // java.lang.Runnable
            public void run() {
                try {
                    ConnectionEvent connectionEvent = new ConnectionEvent();
                    connectionEvent.setDbKeys(remove2);
                    connectionEvent.setClientIp(substring2);
                    connectionEvent.setClientIpAndPort(stringAddress);
                    connectionEvent.setConnected(false);
                    RpcServer.this.connectionEventListener.onEvent(connectionEvent);
                } catch (Throwable th) {
                    RpcServer.logger.error("0105", th.getMessage(), th);
                }
            }
        });
    }

    protected abstract String checkAuth(Map<String, String> map, String str);

    @Override // com.taobao.txc.rpc.impl.RpcEndpoint
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (obj instanceof RpcMessage) {
            RpcMessage rpcMessage = (RpcMessage) obj;
            if (rpcMessage.getBody() instanceof RegisterClientAppNameMessage) {
                String stringAddress = NetUtil.toStringAddress(channelHandlerContext.channel().remoteAddress());
                RegisterClientAppNameMessage registerClientAppNameMessage = (RegisterClientAppNameMessage) rpcMessage.getBody();
                TxcConstants.addChannelVersion(channelHandlerContext.channel(), registerClientAppNameMessage.getVersion());
                this.ipAndPortToClientAppName.put(stringAddress, registerClientAppNameMessage.getClientAppName());
                this.ipAndClientAppNameToChannelMap.put(stringAddress.substring(0, stringAddress.indexOf(58)) + registerClientAppNameMessage.getClientAppName(), channelHandlerContext.channel());
                Map<String, String> parseUdata = registerClientAppNameMessage.parseUdata();
                String checkAuth = checkAuth(parseUdata, stringAddress);
                if (checkAuth != null && !checkAuth.isEmpty()) {
                    this.ipAndPortToVgroupName.put(stringAddress, checkAuth);
                    logger.info(String.format("checkAuth for client:%s vgroup:%s ok", stringAddress, checkAuth));
                }
                sendResponse(rpcMessage.getId(), channelHandlerContext.channel(), new RegisterClientAppNameResultMessage(true));
                StringBuilder sb = new StringBuilder();
                sb.append("app ").append(registerClientAppNameMessage.getClientAppName()).append(" is connected from ").append(stringAddress).append(", version:").append(registerClientAppNameMessage.getVersion()).append(", vgroup:").append(parseUdata == null ? "null" : parseUdata.get(RegisterClientAppNameMessage.UDATA_VGROUP)).append(", ak:").append(parseUdata == null ? "null" : parseUdata.get(RegisterClientAppNameMessage.UDATA_AK)).append(", digest:").append(parseUdata == null ? "null" : parseUdata.get(RegisterClientAppNameMessage.UDATA_DIGEST)).append(", type:").append(parseUdata == null ? "null" : parseUdata.get(RegisterClientAppNameMessage.UDATA_TYPE)).append(", ip:").append(parseUdata == null ? "null" : parseUdata.get(RegisterClientAppNameMessage.UDATA_IP)).append(", insts:[").append(registerClientAppNameMessage.getTxcInsts()).append(']').append(", timestamp:").append(parseUdata == null ? "null" : parseUdata.get(RegisterClientAppNameMessage.UDATA_TIMESTAMP));
                logger.warn(sb.toString());
                return;
            }
            if (rpcMessage.getBody() == HeartbeatMessage.PING) {
                try {
                    sendResponse(rpcMessage.getId(), channelHandlerContext.channel(), HeartbeatMessage.PONG);
                } catch (Throwable th) {
                    logger.error(DiamondUtil.DEFAULT_TENANT_ID, "send response error", th);
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("received PING from " + channelHandlerContext.channel().remoteAddress());
                    return;
                }
                return;
            }
        }
        super.channelRead(channelHandlerContext, obj);
    }

    public int getPort() {
        return this.port;
    }

    public void setPort(int i) {
        this.port = i;
    }

    @Override // com.taobao.txc.rpc.impl.RpcEndpoint
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        String stringAddress = NetUtil.toStringAddress(channelHandlerContext.channel().remoteAddress());
        if (this.ipAndPortToClientAppName.get(stringAddress) != null) {
            this.ipAndPortToClientAppName.remove(stringAddress);
        } else if (this.ipAndPortToDbKey.get(stringAddress) != null) {
            this.ipAndPortToDbKey.remove(stringAddress);
        } else if (this.ipAndPortToVgroupName.get(stringAddress) == null) {
            return;
        } else {
            this.ipAndPortToVgroupName.remove(stringAddress);
        }
        super.exceptionCaught(channelHandlerContext, th);
    }
}
