Chin的博客

程序的世界多奇妙

Netty初步分析

网络通讯是分布式架构的基础,而netty作为网络通讯的基础库,被广泛应用在各种中间件中。深入分析netty很有必要。

从一个demo来了解netty,先看server端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72

 public class TcpServer {

    private int port;

    public TcpServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8), new StringDecoder(CharsetUtil.UTF_8)).
                                    addLast(new ServerHandler(SessionManager.getManager()));
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .option(ChannelOption.TCP_NODELAY, true);

            //启动推送消息线程
            Thread t = new Thread(new PushRun(new PushMessage()));
            t.start();
            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();

        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    class PushRun implements Runnable {

        private PushMessage pushMessage;

        public PushRun(PushMessage pushMessage) {
            this.pushMessage = pushMessage;
        }

        public void run() {
            int i = 0;
            while (true) {
                i++;
                pushMessage.sendMsg("send msg:" + i, 34445L);
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 4199;
        }
        new TcpServer(port).run();
    }
}

我们看到先是创建了两个EventLoopGroup,eventLoopGroup是什么,先姑且理解为线程组的概念,后面的文章再详细分析netty的线程模型。然后创建ServerBootStrap,设置上面的两个eventGroup,再就是设置handler,channelInit的时候加入StringEncoder,StringDecoder,业务逻辑处理ServerHandler,再就是一些参数的设置,是否启用NODELAY等。

服务端执行b.bind()之后就开始监听客户端的连接,具体的业务逻辑的处理在ServerHandler里做。我们看下ServerHandler的代码,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class ServerHandler extends ChannelInboundHandlerAdapter {

    private SessionManager sessionManager;

    public ServerHandler(SessionManager sessionManager) {
        super();
        this.sessionManager = sessionManager;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("receive from client:" + msg);
        TcpProtocol tcpProtocol = JSON.parseObject((String) msg, TcpProtocol.class);
        sessionManager.addSession(String.valueOf(tcpProtocol.getUid()), ctx);
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        super.channelRegistered(ctx);
        System.out.println("channel registered....");
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        System.out.println("channel active.....");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        cause.printStackTrace();
        ctx.close();
    }
}

ServerHandler继承自ChannelInboundHandlerAdapter,覆盖了一些方法,channelRegistered、channelActive、channelRead、channelInactive,这些方法分别在channel生命周期的不同阶段调用,由此看来,netty是事件编程模型的。

接着看客户端的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
ublic class TcpClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8),
                            new StringDecoder(CharsetUtil.UTF_8)).addLast(new ClientHandler());
                }
            });

            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)
            TcpProtocol tcpProtocol = new TcpProtocol();
            tcpProtocol.setUid(34445L);
            f.channel().writeAndFlush(JSON.toJSONString(tcpProtocol));
            synchronized (TcpClient.class) {
                while (true) {
                    TcpClient.class.wait();
                }
            }
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

客户端因为不需要处理连接的监听,所以只创建了一个EventLoopGroup。同样具体的业务处理的逻辑放在Handler里处理,ClientHandler代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package netty; /**
 * Created by Administrator on 2016/7/13.
 */

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println(msg);
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
    }
}

覆盖channelRead方法只是输出接受到的消息。

我们看到,不管是Server还是Client,netty都是基于事件来做业务的处理,那事件是传播开来的,事件的触发源在哪里呢?下一篇文章详细分析netty的事件传播。