package game.network; import java.net.InetAddress; import java.net.SocketAddress; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Pattern; import com.google.common.util.concurrent.ThreadFactoryBuilder; import game.log.Log; import game.network.NetHandler.ThreadQuickExitException; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.local.LocalChannel; import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.local.LocalServerChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.handler.timeout.TimeoutException; import io.netty.util.AttributeKey; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; public class NetConnection extends SimpleChannelInboundHandler { private static final Pattern IP_REPLACER = Pattern.compile("([0-9]*)\\.([0-9]*)\\.[0-9]*\\.[0-9]*"); public static final AttributeKey ATTR_STATE = AttributeKey.valueOf("protocol"); public static final LazyLoadBase CLIENT_NIO_EVENTLOOP = new LazyLoadBase() { protected NioEventLoopGroup load() { return new NioEventLoopGroup(0, (new ThreadFactoryBuilder()).setNameFormat("Netty Client IO #%d").setDaemon(true).build()); } }; // public static final LazyLoadBase CLIENT_EPOLL_EVENTLOOP = new LazyLoadBase() // { // protected EpollEventLoopGroup load() // { // return new EpollEventLoopGroup(0, (new ThreadFactoryBuilder()).setNameFormat("Netty Epoll Client IO #%d").setDaemon(true).build()); // } // }; public static final LazyLoadBase CLIENT_LOCAL_EVENTLOOP = new LazyLoadBase() { protected LocalEventLoopGroup load() { return new LocalEventLoopGroup(0, (new ThreadFactoryBuilder()).setNameFormat("Netty Local Client IO #%d").setDaemon(true).build()); } }; // private final PacketDirection direction; private final Queue outboundPacketsQueue = new ConcurrentLinkedQueue(); private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private Channel channel; private SocketAddress socketAddress; private NetHandler packetListener; private String terminationReason; private boolean disconnected; private boolean local; // public NetConnection(PacketDirection packetDirection) // { // this.direction = packetDirection; // } public void channelActive(ChannelHandlerContext p_channelActive_1_) throws Exception { super.channelActive(p_channelActive_1_); this.channel = p_channelActive_1_.channel(); this.local = this.channel instanceof LocalChannel || this.channel instanceof LocalServerChannel; this.socketAddress = this.channel.remoteAddress(); try { this.setConnectionState(PacketRegistry.HANDSHAKE); } catch (Throwable throwable) { Log.JNI.error(throwable, "Fehler beim Aufbauen der Verbindung für Handshake"); } } /** * Sets the new connection state and registers which packets this channel may send and receive */ public void setConnectionState(PacketRegistry newState) { this.channel.attr(ATTR_STATE).set(newState); this.channel.config().setAutoRead(true); // Log.debug("Automatisches Lesen eingeschaltet"); } public void channelInactive(ChannelHandlerContext p_channelInactive_1_) throws Exception { this.closeChannel("Ende der Datenübertragung"); } public void exceptionCaught(ChannelHandlerContext p_exceptionCaught_1_, Throwable p_exceptionCaught_2_) throws Exception { String comp; if (p_exceptionCaught_2_ instanceof TimeoutException) { comp = "Zeitüberschreitung"; } else { comp = "Interner Fehler: " + p_exceptionCaught_2_; } this.closeChannel(comp); } protected void channelRead0(ChannelHandlerContext p_channelRead0_1_, Packet p_channelRead0_2_) throws Exception { if (this.channel.isOpen()) { try { p_channelRead0_2_.processPacket(this.packetListener); } catch (ThreadQuickExitException e) { ; } } } /** * Sets the NetHandler for this NetworkManager, no checks are made if this handler is suitable for the particular * connection state (protocol) */ public void setNetHandler(NetHandler handler) { if (handler == null) { throw new NullPointerException("Handler ist Null"); } // Log.debug("Setze Handler von " + this + " auf " + handler); this.packetListener = handler; } public void sendPacket(Packet packetIn) { if (this.isChannelOpen()) { this.flushOutboundQueue(); this.dispatchPacket(packetIn, null); } else { this.readWriteLock.writeLock().lock(); try { this.outboundPacketsQueue.add(new NetConnection.InboundHandlerTuplePacketListener(packetIn, null)); } finally { this.readWriteLock.writeLock().unlock(); } } } public void sendPacket(Packet packetIn, GenericFutureListener > listener) { if (this.isChannelOpen()) { this.flushOutboundQueue(); this.dispatchPacket(packetIn, new GenericFutureListener[] {listener}); } else { this.readWriteLock.writeLock().lock(); try { this.outboundPacketsQueue.add(new NetConnection.InboundHandlerTuplePacketListener(packetIn, listener)); } finally { this.readWriteLock.writeLock().unlock(); } } } /** * Will commit the packet to the channel. If the current thread 'owns' the channel it will write and flush the * packet, otherwise it will add a task for the channel eventloop thread to do that. */ private void dispatchPacket(final Packet inPacket, final GenericFutureListener > [] futureListeners) { final PacketRegistry enumconnectionstate = PacketRegistry.getType(inPacket); final PacketRegistry enumconnectionstate1 = (PacketRegistry)this.channel.attr(ATTR_STATE).get(); if (enumconnectionstate1 != enumconnectionstate) { // Log.debug("Automatisches Lesen ausgeschaltet"); this.channel.config().setAutoRead(false); } if (this.channel.eventLoop().inEventLoop()) { if (enumconnectionstate != enumconnectionstate1) { this.setConnectionState(enumconnectionstate); } ChannelFuture channelfuture = this.channel.writeAndFlush(inPacket); if (futureListeners != null) { channelfuture.addListeners(futureListeners); } channelfuture.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); } else { this.channel.eventLoop().execute(new Runnable() { public void run() { if (enumconnectionstate != enumconnectionstate1) { NetConnection.this.setConnectionState(enumconnectionstate); } ChannelFuture channelfuture1 = NetConnection.this.channel.writeAndFlush(inPacket); if (futureListeners != null) { channelfuture1.addListeners(futureListeners); } channelfuture1.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); } }); } } /** * Will iterate through the outboundPacketQueue and dispatch all Packets */ private void flushOutboundQueue() { if (this.channel != null && this.channel.isOpen()) { this.readWriteLock.readLock().lock(); try { while (!this.outboundPacketsQueue.isEmpty()) { NetConnection.InboundHandlerTuplePacketListener networkmanager$inboundhandlertuplepacketlistener = (NetConnection.InboundHandlerTuplePacketListener)this.outboundPacketsQueue.poll(); if(networkmanager$inboundhandlertuplepacketlistener != null) { // NPE Fix this.dispatchPacket(networkmanager$inboundhandlertuplepacketlistener.packet, networkmanager$inboundhandlertuplepacketlistener.futureListeners); } } } finally { this.readWriteLock.readLock().unlock(); } } } /** * Checks timeouts and processes all packets received */ public void processReceivedPackets() { this.flushOutboundQueue(); this.packetListener.update(); // if (this.packetListener instanceof ITickable) // { // ((ITickable)this.packetListener).update(); // } this.channel.flush(); } // /** // * Returns the socket address of the remote side. Server-only. // */ // public SocketAddress getRemoteAddress() // { // return this.socketAddress; // } public String getCutAddress() { return this.socketAddress == null ? "?.?.*.*" : IP_REPLACER.matcher(this.socketAddress.toString()).replaceAll("$1.$2.*.*"); } /** * Closes the channel, the parameter can be used for an exit message (not certain how it gets sent) */ public void closeChannel(String message) { if (this.channel.isOpen()) { this.channel.close().awaitUninterruptibly(); this.terminationReason = message; } } /** * True if this NetworkManager uses a memory connection (single player game). False may imply both an active TCP * connection or simply no active connection at all */ public boolean isLocalChannel() { return this.local; } /** * Create a new NetworkManager from the server host and connect it to the server * * @param address The address of the server * @param serverPort The server port */ public static NetConnection createNetworkManagerAndConnect(InetAddress address, int serverPort) { final NetConnection networkmanager = new NetConnection(); Class oclass; LazyLoadBase lazyloadbase; // if (Epoll.isAvailable()) // { // oclass = EpollSocketChannel.class; // lazyloadbase = CLIENT_EPOLL_EVENTLOOP; // } // else // { oclass = NioSocketChannel.class; lazyloadbase = CLIENT_NIO_EVENTLOOP; // } ((Bootstrap)((Bootstrap)((Bootstrap)(new Bootstrap()).group((EventLoopGroup)lazyloadbase.getValue())).handler(new ChannelInitializer() { protected void initChannel(Channel p_initChannel_1_) throws Exception { try { p_initChannel_1_.config().setOption(ChannelOption.TCP_NODELAY, Boolean.valueOf(true)); } catch (ChannelException var3) { ; } p_initChannel_1_.pipeline().addLast((String)"timeout", (ChannelHandler)(new ReadTimeoutHandler(30))).addLast((String)"splitter", (ChannelHandler)(new PacketSplitter())).addLast((String)"decoder", (ChannelHandler)(new PacketDecoder(false))).addLast((String)"prepender", (ChannelHandler)(new PacketPrepender())).addLast((String)"encoder", (ChannelHandler)(new PacketEncoder(true))).addLast((String)"packet_handler", (ChannelHandler)networkmanager); } })).channel(oclass)).connect(address, serverPort).syncUninterruptibly(); return networkmanager; } /** * Prepares a clientside NetworkManager: establishes a connection to the socket supplied and configures the channel * pipeline. Returns the newly created instance. */ public static NetConnection provideLocalClient(SocketAddress address) { final NetConnection networkmanager = new NetConnection(); ((Bootstrap)((Bootstrap)((Bootstrap)(new Bootstrap()).group((EventLoopGroup)CLIENT_LOCAL_EVENTLOOP.getValue())).handler(new ChannelInitializer() { protected void initChannel(Channel p_initChannel_1_) throws Exception { p_initChannel_1_.pipeline().addLast((String)"packet_handler", (ChannelHandler)networkmanager); } })).channel(LocalChannel.class)).connect(address).syncUninterruptibly(); return networkmanager; } /** * Returns true if this NetworkManager has an active channel, false otherwise */ public boolean isChannelOpen() { return this.channel != null && this.channel.isOpen(); } public boolean hasNoChannel() { return this.channel == null; } /** * Gets the current handler for processing packets */ public NetHandler getNetHandler() { return this.packetListener; } /** * Switches the channel to manual reading modus */ public void disableAutoRead() { this.channel.config().setAutoRead(false); } public void setCompressionTreshold(int treshold) { if (treshold >= 0) { if (this.channel.pipeline().get("decompress") instanceof NettyCompressionDecoder) { ((NettyCompressionDecoder)this.channel.pipeline().get("decompress")).setCompressionTreshold(treshold); } else { this.channel.pipeline().addBefore("decoder", "decompress", new NettyCompressionDecoder(treshold)); } if (this.channel.pipeline().get("compress") instanceof NettyCompressionEncoder) { ((NettyCompressionEncoder)this.channel.pipeline().get("compress")).setCompressionTreshold(treshold); } else { this.channel.pipeline().addBefore("encoder", "compress", new NettyCompressionEncoder(treshold)); } } else { if (this.channel.pipeline().get("decompress") instanceof NettyCompressionDecoder) { this.channel.pipeline().remove("decompress"); } if (this.channel.pipeline().get("compress") instanceof NettyCompressionEncoder) { this.channel.pipeline().remove("compress"); } } } public void checkDisconnected() { if (this.channel != null && !this.channel.isOpen()) { if (!this.disconnected) { this.disconnected = true; if (this.terminationReason != null) { this.getNetHandler().onDisconnect(this.terminationReason); } else if (this.getNetHandler() != null) { this.getNetHandler().onDisconnect("Verbindung getrennt"); } } else { Log.JNI.warn("handleDisconnection() zweifach aufgerufen"); } } } static class InboundHandlerTuplePacketListener { private final Packet packet; private final GenericFutureListener > [] futureListeners; public InboundHandlerTuplePacketListener(Packet inPacket, GenericFutureListener > inFutureListener) { this.packet = inPacket; this.futureListeners = inFutureListener == null ? null : new GenericFutureListener[] {inFutureListener}; } } }