Netty源码看这篇就够了

Netty源码看这篇就够了

前言

后面打算开始撸其他框架源码,而Netty对Java NIO的一层封装,提供了一套简单易用的API,经常被其他框架拿来用,我先花了点时间研究了下。这里整理下对源码的解读,以及对几个关键对象的介绍。分析了之前两篇流水账式的源码分析的不足,这次尝试聚焦几个不同重点进行分析。

个人netty注释版本:https://gitee.com/Nortyr/netty

原netty地址:https://github.com/netty/netty

看完能收获什么

Java网络编程介绍

一个简单的EchoServerDemo

Bootstrap

Channel

ChannelPipeline & ChannelHandler

EventLoopGroup & EventLoop

ChannelFuture

Java网络编程介绍

BIO模型。庞大的线程消耗,消费消息如果很漫长,这个服务就是个灾难。

public void server(int port) throws IOException{

final ServerSocket socket=new ServerSocket(port);

for (;;){

//接受连接

final Socket clientSock = socket.accept();

System.out.println("Accept connection from"+ clientSock);

//创建一个线程来处理连接

new Thread(new Runnable() {

@Override

public void run() {

OutputStream out;

try {

out=clientSock.getOutputStream();

... doSomeThing...

//将消息写给已连接的客户端

out.write("Hello world".getBytes(Charset.forName("UTF-8")));

out.flush();

clientSock.close();

}

...略...

}

}).start();

}

}

故此Java设计出了NIO,这里找到个Doug Lea大神的一篇 NIOppt http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf 感兴趣的可以看一下,十分精简。下面这部分会结合这个ppt进行讲解,看完这个ppt的可以直接略过到下一个部分。

public static void main(String[] args) throws IOException {

//创建ServerSocketChannel,处理接入连接

ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();

//创建Selector

Selector selector=Selector.open();

//设置是否为非阻塞

serverSocketChannel.configureBlocking(false);

//创建注册channel进selector的创立连接时间

serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

//绑定端口号

serverSocketChannel.socket().bind(new InetSocketAddress(8080));

while (true){

if(serverSocketChannel.isOpen()){

// 通过 Selector 选择 Channel

int selectNums = selector.select(1000L);

if (selectNums == 0) {

continue;

}

// 遍历可选择的 Channel 的 SelectionKey 集合

for (SelectionKey selectKey:selector.selectedKeys()) {

// 忽略无效的 SelectionKey

if (!selectKey.isValid()) {

continue;

}

//新建立的连接

if(selectKey.isAcceptable()){

//获取新连接创建的channel

SocketChannel socketChannel= ((ServerSocketChannel) selectKey.channel()).accept();

if(socketChannel!=null){

//设置为非阻塞

socketChannel.configureBlocking(false);

//注册进selector

socketChannel.register(selector,SelectionKey.OP_READ);

}

}

//处理读时间

if(selectKey.isReadable()){

SocketChannel socketChannel= (SocketChannel) selectKey.channel();

if(socketChannel!=null){

//读取数据

ByteBuffer buffer = ByteBuffer.allocate(1024);

int bytesRead = socketChannel.read(buffer);

if(bytesRead==-1){

socketChannel.register(selector,0);

socketChannel.close();

}else{

buffer.flip();

byte[] bytes = new byte[buffer.remaining()];

System.arraycopy(buffer.array(), buffer.position(), bytes, 0, buffer.remaining());

System.out.println(new String(bytes, "UTF-8"));

}

}

}

}

}

}

}

Java NIO的几个核心api

Channels

与支持非阻塞读取的文件,socket等建立连接。

Buffers

本质是一块内存,用于和NIO通道进行交互。

Selectors

把Channel和需要的事件注册到Selector上面,告诉一组channel中的哪一个有IO事件。

SelectionKeys

维护IO事件状态和绑定

几个核心api的关系

Channel和Buffer

2个交互关系如图所示

Selector/Channel/SelectionKey

一个Selector可以监听多个Channel

一个Selector和Channel的绑定关系为SelectionKey

Channel

这里我们以NioServerSocketChannel为例,看一下Channel

public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable {

ChannelId id();

EventLoop eventLoop();

Channel parent();

ChannelConfig config();

boolean isOpen();

boolean isRegistered();

boolean isActive();

ChannelMetadata metadata();

SocketAddress localAddress();

SocketAddress remoteAddress();

ChannelFuture closeFuture();

boolean isWritable();

long bytesBeforeUnwritable();

long bytesBeforeWritable();

Unsafe unsafe();

ChannelPipeline pipeline();

ByteBufAllocator alloc();

@Override

Channel read();

@Override

Channel flush();

/**

* 调用Javanio方法封装

*/

interface Unsafe {

RecvByteBufAllocator.Handle recvBufAllocHandle();

/**

* 返回地址

*/

SocketAddress localAddress();

/**

* 返回远程地址

*/

SocketAddress remoteAddress();

/**

* 注册Channel,注册完成后通知ChannelPromise

*/

void register(EventLoop eventLoop, ChannelPromise promise);

/**

* 将ip地址绑定到Channel,完成后通知ChannelPromise

*/

void bind(SocketAddress localAddress, ChannelPromise promise);

/**

* 连接远程ip地址

*/

void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);

/**

* 断开连接,完成后通知ChannelPromise

*/

void disconnect(ChannelPromise promise);

/**

* 关闭channel,通知ChannelPromise

*/

void close(ChannelPromise promise);

/**

* 关闭,不处罚任何事件

*/

void closeForcibly();

/**

* 注销channel,通知ChannelPromise

*/

void deregister(ChannelPromise promise);

/**

* 调用读取操作

*/

void beginRead();

/**

* 调用写操作

*/

void write(Object msg, ChannelPromise promise);

/**

* 清空所有通过ChannelPromise预定的写操作

*/

void flush();

ChannelPromise voidPromise();

/**

* 返回存储待处理写入请求的Channel的ChannelOutboundBuffer。

*/

ChannelOutboundBuffer outboundBuffer();

}

}

public abstract class AbstractNioChannel extends AbstractChannel {

private final SelectableChannel ch;

protected final int readInterestOp;

volatile SelectionKey selectionKey;

boolean readPending;

private final Runnable clearReadPendingRunnable = new Runnable() {

@Override

public void run() {

clearReadPending0();

}

};

private ChannelPromise connectPromise;

private Future connectTimeoutFuture;

private SocketAddress requestedRemoteAddress;

}

以上是Channel接口和AbstractNioChannel的抽象类,这里给大家精简了下,从Channel定义的各个方法可以看出,netty的Channel是对原始Channel的一层封装。其中所有的nio的操作封装在了Unsafe中,并进行了一定的增强,例如回调之类的。从AbstractNioChannel可以更加直观的看出,netty对Channel SelectionKey的封装,并添加了自己的回调ChannelPromise从而使方法更加易于使用。

ChannelPipeline & ChannelHandler

基础讲解

ChannelPipeline的初始化

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

...其余略...

private final DefaultChannelPipeline pipeline;

protected AbstractChannel(Channel parent) {

this.parent = parent;

id = newId();

unsafe = newUnsafe();

pipeline = newChannelPipeline();

}

protected DefaultChannelPipeline newChannelPipeline() {

return new DefaultChannelPipeline(this);

}

}

ChannelPipeline内部结构概述

public class DefaultChannelPipeline implements ChannelPipeline {

final AbstractChannelHandlerContext head;

final AbstractChannelHandlerContext tail;

private final Channel channel;

private final ChannelFuture succeededFuture;

protected DefaultChannelPipeline(Channel channel) {

this.channel = ObjectUtil.checkNotNull(channel, "channel");

succeededFuture = new SucceededChannelFuture(channel, null);

voidPromise = new VoidChannelPromise(channel, true);

tail = new TailContext(this);

head = new HeadContext(this);

head.next = tail;

tail.prev = head;

}

...略

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

TailContext(DefaultChannelPipeline pipeline) {

super(pipeline, null, TAIL_NAME, TailContext.class);

setAddComplete();

}

... 略

}

final class HeadContext extends AbstractChannelHandlerContext

implements ChannelOutboundHandler, ChannelInboundHandler {

private final Unsafe unsafe;

HeadContext(DefaultChannelPipeline pipeline) {

super(pipeline, null, HEAD_NAME, HeadContext.class);

unsafe = pipeline.channel().unsafe();

setAddComplete();

}

...略

}

}

上面列举了ChannelPipeline的创建,以及ChannelPipeline的内部结构。可以看出它维护了一个双向链表。我们在添加handler的时候就是往这个链表中添加的。

ChannelHandler 添加进ChannelPipeline后会被封装成ChannelHandlerContext,会判断是ChannelInboundHandler还是ChannelOutboundHandler的子类,对inbound和outbound这两个属性进行赋值,ChannelInboundHandler的子类inbound为true,outbound为false,ChannelOutboundHandler反之。ChannelPipeline内部调用方法时,会使用fireXXXXX()的方法,会利用责任链模式进行调用,这时候会用到这个属性进行判断,是否有对应方法,从而进行调用(后面会详细讲解下)。

final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

DefaultChannelHandlerContext(

DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {

super(pipeline, executor, name, isInbound(handler), isOutbound(handler));

if (handler == null) {

throw new NullPointerException("handler");

}

this.handler = handler;

}

private static boolean isInbound(ChannelHandler handler) {

return handler instanceof ChannelInboundHandler;

}

private static boolean isOutbound(ChannelHandler handler) {

return handler instanceof ChannelOutboundHandler;

}

private AbstractChannelHandlerContext findContextInbound() {

AbstractChannelHandlerContext ctx = this;

do {

ctx = ctx.next;

} while (!ctx.inbound);

return ctx;

}

private AbstractChannelHandlerContext findContextOutbound() {

AbstractChannelHandlerContext ctx = this;

do {

ctx = ctx.prev;

} while (!ctx.outbound);

return ctx;

}

}

方法调用

这里就用了责任链的方式调用方法,确定下一个调用哪一个节点,就是通过inbound outbound

这两个字段决定的。

public final void read() {

...省略掉部分无用代码

final ChannelConfig config = config();

final ChannelPipeline pipeline = pipeline();

ByteBuf byteBuf = null;

boolean close = false;

try {

do {

byteBuf = allocHandle.allocate(allocator);

pipeline.fireChannelRead(byteBuf);

byteBuf = null;

} while (allocHandle.continueReading());

pipeline.fireChannelReadComplete();

if (close) {

closeOnRead(pipeline);

}

}

}

@Override

public ChannelHandlerContext fireChannelRead(final Object msg) {

invokeChannelRead(findContextInbound(), msg);

return this;

}

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {

//如果msg实现了ReferenceCounted,进行特殊操作

final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);

EventExecutor executor = next.executor();

if (executor.inEventLoop()) {

next.invokeChannelRead(m);

} else {

executor.execute(new Runnable() {

@Override

public void run() {

next.invokeChannelRead(m);

}

});

}

}

private void invokeChannelRead(Object msg) {

if (invokeHandler()) {

try {

//调用下一个节点的channelRead方法

((ChannelInboundHandler) handler()).channelRead(this, msg);

} catch (Throwable t) {

invokeExceptionCaught(t);

}

} else {

fireChannelRead(msg);

}

}

下面就是调用你自定义的ChannelInboundHandler子类的覆盖方法了,这里就不过多赘述。

EventLoopGroup & EventLoop

初始化

EventLoopGroup初始化创建的时候,会创建对应数量的EventLoop,如果没有指定,默认创建cpu核心数量*2个EventLoop

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {

//默认线程数是cpu核心的2倍

static {

DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(

"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

if (logger.isDebugEnabled()) {

logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);

}

}

}

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,

EventExecutorChooserFactory chooserFactory, Object... args) {

children = new EventExecutor[nThreads];

for (int i = 0; i < nThreads; i ++) {

boolean success = false;

try {

//创建对应数量的EventLoop

children[i] = newChild(executor, args);

success = true;

}

}

chooser = chooserFactory.newChooser(children);

final FutureListener terminationListener = new FutureListener() {

@Override

public void operationComplete(Future future) throws Exception {

if (terminatedChildren.incrementAndGet() == children.length) {

terminationFuture.setSuccess(null);

}

}

};

for (EventExecutor e: children) {

e.terminationFuture().addListener(terminationListener);

}

Set childrenSet = new LinkedHashSet(children.length);

Collections.addAll(childrenSet, children);

readonlyChildren = Collections.unmodifiableSet(childrenSet);

}

}

将EventLoop封装进EventExecutorChooser

@Override

public EventExecutorChooser newChooser(EventExecutor[] executors) {

if (isPowerOfTwo(executors.length)) {

return new PowerOfTwoEventExecutorChooser(executors);

} else {

return new GenericEventExecutorChooser(executors);

}

}

方法调用

此处借助EchoServer启动分析EventLoop方法执行过程(不感兴趣的可以跳过)

如果你服务设置了主从线程,在启动的时候,就会使用主线程启动服务。

final ChannelFuture initAndRegister() {

...省略部分代码

ChannelFuture regFuture = config().group().register(channel);

}

@Override

public ChannelFuture register(Channel channel) {

//从EventExecutorChooser获取到EventLoop注册Channel

return next().register(channel);

}

protected abstract class AbstractUnsafe implements Unsafe {

public final void register(EventLoop eventLoop, final ChannelPromise promise) {

try {

eventLoop.execute(new Runnable() {

@Override

public void run() {

register0(promise);

}

});

}

}

}

//将任务先添加进队列,

private void execute(Runnable task, boolean immediate) {

boolean inEventLoop = inEventLoop();

addTask(task);

if (!inEventLoop) {

//主线程轮循,监听事件

startThread();

}

...省略无用代码

}

protected void addTask(Runnable task) {

ObjectUtil.checkNotNull(task, "task");

if (!offerTask(task)) {

reject(task);

}

}

final boolean offerTask(Runnable task) {

if (isShutdown()) {

reject();

}

return taskQueue.offer(task);

}

startThread(); 比较核心单独说一下,他会启动一个线程

private void doStartThread() {

assert thread == null;

executor.execute(new Runnable() {

@Override

public void run() {

...省略无用代码

try {

SingleThreadEventExecutor.this.run();

success = true;

}

}

});

}

//execute没啥好说的了,就是启动线程

public void execute(Runnable command) {

threadFactory.newThread(command).start();

}

protected void run() {

for (;;) {

else if (strategy > 0) {

final long ioStartTime = System.nanoTime();

try {

processSelectedKeys();

} finally {

// Ensure we always run tasks.

final long ioTime = System.nanoTime() - ioStartTime;

ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

}

} else {

//执行前天添加的任务

ranTasks = runAllTasks(0); // This will run the minimum number of tasks

}

}

}

processSelectedKeys();就是正常执行读取连接的操作入口,runAllTasks( ); 就是上面添加的匿名内部类的执行入口

new Runnable() {

@Override

public void run() {

register0(promise);

}

}

ChannelFuture

这个从图中可以看出它就是对java.util.concurrent.Future的拓展。

这里我们主要看一下它扩展的回调机制

public Promise await() throws InterruptedException {

//根据有无结果判断当前任务是否完成

if (isDone()) {

return this;

}

if (Thread.interrupted()) {

throw new InterruptedException(toString());

}

checkDeadLock();

synchronized (this) {

while (!isDone()) {

incWaiters();

try {

//线程进入等待状态

wait();

} finally {

decWaiters();

}

}

}

return this;

}

private static boolean isDone0(Object result) {

return result != null && result != UNCANCELLABLE;

}

与之对应的就是notify了

调用回调最终会调用

private void notifyListenersNow() {

Object listeners;

synchronized (this) {

// Only proceed if there are listeners to notify and we are not already notifying listeners.

if (notifyingListeners || this.listeners == null) {

return;

}

//用完就删

notifyingListeners = true;

listeners = this.listeners;

this.listeners = null;

}

for (;;) {

if (listeners instanceof DefaultFutureListeners) {

notifyListeners0((DefaultFutureListeners) listeners);

} else {

notifyListener0(this, (GenericFutureListener) listeners);

}

synchronized (this) {

if (this.listeners == null) {

// Nothing can throw from within this method, so setting notifyingListeners back to false does not

// need to be in a finally block.

notifyingListeners = false;

return;

}

listeners = this.listeners;

this.listeners = null;

}

}

}

private static void notifyListener0(Future future, GenericFutureListener l) {

try {

//调用自定义listener

l.operationComplete(future);

} catch (Throwable t) {

if (logger.isWarnEnabled()) {

logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);

}

}

}

Bootstrap

最后我们再来看下Bootstrap,核心部分上面已经讲完了,这里就不多赘述,这就简述下

public abstract class AbstractBootstrap{

private static final Map.Entry, Object>[] EMPTY_OPTION_ARRAY = new Map.Entry[0];

private static final Map.Entry, Object>[] EMPTY_ATTRIBUTE_ARRAY = new Map.Entry[0];

volatile EventLoopGroup group;

private volatile ChannelFactory channelFactory;

private volatile SocketAddress localAddress;

private final Map, Object> options = new LinkedHashMap, Object>();

private final Map, Object> attrs = new ConcurrentHashMap, Object>();

private volatile ChannelHandler handler;

}

public class ServerBootstrap extends AbstractBootstrap{

private final Map, Object> childOptions = new LinkedHashMap, Object>();

private final Map, Object> childAttrs = new ConcurrentHashMap, Object>();

private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);

private volatile EventLoopGroup childGroup;

private volatile ChannelHandler childHandler;

}

private ChannelFuture doBind(final SocketAddress localAddress) {

//初始化并注册一个 Channel 对象,pipeline中添加ServerBootstrapAcceptor,处理连理连接事件,

// ChannelFuture regFuture = config().group().register(channel);启动线程循环监听事件

final ChannelFuture regFuture = initAndRegister();

final Channel channel = regFuture.channel();

if (regFuture.cause() != null) {

return regFuture;

}

//因为是异步,不能保证是否完成

//绑定Channel端口,并注册channel到selectionKey中

if (regFuture.isDone()) {

// 注册完成

ChannelPromise promise = channel.newPromise();

doBind0(regFuture, channel, localAddress, promise);

return promise;

} else {

// 注册还未完成

final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);

regFuture.addListener(new ChannelFutureListener() {

@Override

public void operationComplete(ChannelFuture future) throws Exception {

Throwable cause = future.cause();

if (cause != null) {

// EventLoop 上的注册失败,因此一旦我们尝试访问 Channel 的 EventLoop,就直接使 ChannelPromise 失败,以免导致 IllegalStateException。

promise.setFailure(cause);

} else {

// 注册成功,所以设置正确的执行器来使用。

// See https://github.com/netty/netty/issues/2586

promise.registered();

//绑定端口

doBind0(regFuture, channel, localAddress, promise);

}

}

});

return promise;

}

}

至此,Netty源码分析就结束了,大部分都已经讲完,感兴趣的朋友可以跟着ServerBootstrap的源码跑一下,大部分都明白了,本来上一篇博客写了ServerBootstrap启动过程分析,但是觉得又臭又长,就给删了。就是跑一边代码,谁不会呢,这里就简述下关键的部分。还有其他部分,后面看心情决定要不要写博客了,反正也没人看~~~

相关文章

陈翔六点半腿腿之死,若在天堂,请不要哭泣
365bet在线登录

陈翔六点半腿腿之死,若在天堂,请不要哭泣

📅 02-17 👁️ 1991
如何区分进排气凸轮轴
365bet在线登录

如何区分进排气凸轮轴

📅 10-06 👁️ 9749