javaeventloop的简单介绍

http://www.itjxue.com  2023-01-09 07:06  来源:未知  点击次数: 

EventLoopGroup

1,?EventLoopGroup,EventLoop,EventExecutorGroup,EventExecutor都实现了ScheduledExecutorService,粗略的说他们都是可调度线程池

2,顾名思义,EventLoopGroup包含一组EventLoop,EventExecutorGroup包含一组EventExecutor。

3,EventExecutorGroup主要覆盖了父接口里的几个方法,返回特定的ScheduledFuture类型。最关键的方法是next,返回一个可用的EventExecutor,EventLoopGroup提供了上个register方法用于注册channel到EventLoopGroup。

4,EventExecutor中最关键的是inEventLoop方法,用于判断一个线程是否是EventExecutor内部那个线程,EventExecutor和EventLoop都是单线程实现。inEventLoop的主要使用场景是,当IO变化时,通过channel关联的pipeline会触发对应的事件,这些事件对应的执行pipeline中的处理链中handler的回调方法,每个handler添加到pipeline都可以指定自己的EventLoop,如果没指定,默认使用要添加的pipeline关联的channel注册到的EventLoopGroup中的某个EventLoop。所以channel通过pipeline调用handler时,如果handler没有单独指定EventLoop,那inEventLoop就会返回true,他俩由同一个线程处理,直接调用handler。如果handler单独指定了EventLoop,inEventLoop就会返回false,channel调用handler时就把要调用的方法封装到Runnable里,然后添加到handler指定的EventLoop的任务队列里,稍后会由对应的EventLoop中的线程执行。

5,EventExecutorGroup有两个主要的实现类:

MultithreadEventExecutorGroup:看名字就知道有多个线程,哈哈,主要用于注册支持NIO的channel。在构造时根据指定的线程数量调用抽象方法newChild初始化EventExecutor数组(典型的模版方法设计模式),每个EventExecutorGroup类型都有对应的EventExecutor类型,每个EventLoopGroup都有对应的EventLoop类型。例如:最重要的NioEventLoopGroup的newChild实现方式如下

第3点里说EventExecutorGroup添加了一个非常重要的方法next,用于返回可用的EventExecutor,对应的选择策略接口是EventExecutorChooser(策略设计模式的典型应用场景)

如果executors数组的长度是2的指数就使用上面的,否则使用下面的,主要就是均匀的分布压力到多个线程。

ThreadPerChannelEventLoopGroup:看名字就知道每次有client连接时就启用一个新线程处理。构造时设置最大线程数,内部用Set存储当前活跃的线程,用ConcurrentLinkedQueue存储当前idle状态的线程,不支持next方法,有一个类似的nextChild方法,实现方法简单明了。

6, 注册channel到EventLoopGroup,使用EventLoopGroup中某个EventLoop处理channel的IO

一般regist都发生在主线程,所以eventLoop.inEventLoop()会返回false,eventLoop从ScheduledExecutorService继承了execute方法,第一次执行execute时,EventLoop内部的线程开始启动,启动之后循环调用抽象方法run执行任务队列中的任务。

更具体的register实现都在doRegister抽象方法中(模版方法设计模式真是很实用哦),重点是此时会pipeline.fireChannelRegistered()触发channelRegistered事件,如果是第一次注册,还会触发channelActive事件,事件实现细节稍后会有专门的文章分析。

NIO的doRegister是把channel注册到NioEventLoop内部的selector上,线程启动之后就不停的select注册的IO可用的channel。

handler在添加到channel关联的pipeline时,如果不指定EventLoop,默认使用channel注册到的EventLoop。所以EventLoop有两个职责:监控IO并触发相应的事件和执行handler的事件方法。ioRatio控制这两个职责消耗的时间比例。

OIO的doRegister是空的,啥也没干。

oio由于是一个线程处理一个channel,直接从任务队列里取任务执行就行了。

7,注册成功之后会触发channelActive事件,channlActive默认会触发channel和关联的pipeline的read事件,由于read事件属于outbound handler,所以从tail开始传播(关于pipeline稍后分析)。最后会传播到pipeline的head,head的read方法会调用unsafe.beginRead。

在NIO中的效果是添加对channel read事件的监听,OIO中的效果是开始读channel里的数据

之前说过EventLoop的职责不只是处理IO,但是OIO的IO操作都是阻塞的,一旦阻塞就无法处理handler回调了,所以需要设置超时来模拟非阻塞。

每次读完一个请求之后,都会关闭输入(OIO是关闭InputStream,NIO是取消监控channel的read事件),之后如果设置了autoread(默认为true,如果设置了false,必须自己在handler中调用channel.read,否则不会收到请求),会再次打开(打开之后,如果有请求到来(同一个连接如果还没收到响应,在超时之前是不能再次发送请求的吧???),但是现在的请求还没处理完,就会在EventLoop的任务队列里排队)。然后开始处理这个请求,一般的pipeline流程是:各种ByteToMessageDecode,接着各种MessageToMessageDecode,转为java对象之后就是正常的业务处理了,每次有要响应的数据时都可以调用write*中间经过各种MessageToMessageEncode和MessageToByteEncode把响应写入ChannelOutboundBuffer,等全部处理完之后,再调用flush,把整个buffer写入channel。

NIO flush的过程是先直接写入channel,写入失败之后开始监听write事件,等write事件触发了,写完所有请求之后,取消监控write事件

OIO的doWrite就比较简单了,就是死循环拼命写。

一次完整的读写过程就结束啦。

netty如何判断当前eventloop正在监听哪个端口

使用netty框架,怎么在一个进程内监听多个端口

当监听一个端口后,监听会进入阻塞,那另一个端口就没法监听了。

示例程序如下

EventLoopGroup bossGroup = new NioEventLoopGroup();

EventLoopGroup workerGroup = new NioEventLoopGroup();

try {

ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workerGroup)

.channel(NioServerSocketChannel.class)

.option(ChannelOption.SO_BACKLOG, 100)

.handler(new LoggingHandler(LogLevel.INFO))

.childHandler(new ChannelInitializerSocketChannel() {

@Override

public void initChannel(SocketChannel ch) throws Exception {

ch.pipeline().addLast(

//new LoggingHandler(LogLevel.INFO),

new EchoServerHandler());

}

});

// Start the server.

ChannelFuture f = b.bind(port).sync();

ChannelFuture fq = b.bind(9802).sync();

// Wait until the server socket is closed.

f.channel().closeFuture().sync(); -这里会进入阻塞下边的wait没法进行

fq .channel().closeFuture().sync();

Bpel(java)无法找到函数源码

在runtime的地方,比如service,点击后,有一个source的tab页,里面可以添加源码。在debug和run中,也有相关的设置,你把源码包设置上,应该就能看到了。

eventloopgroup 在哪个jar

在Netty框架初探中,当我们启动客户端或者服务端时,都要声明一个Group对象

[java] view plain copy

EventLoopGroup group = new NioEventLoopGroup();

这里我们就以NioEventLoopGroup来说明。先看一下它的继承关系

[java] view plain copy

NioEventLoopGroup extends MultithreadEventLoopGroup extends MultithreadEventExecutorGroup

看看NioEventLoopGroup的构造函数

[java] view plain copy

public NioEventLoopGroup() {

this(0);

}

//他会连续调用内部的构造函数,直到用下面的构造去执行父类的构造

//nThreads此时为0,马上就会提到这个参数的用处

public NioEventLoopGroup(

int nThreads, Executor executor, final SelectorProvider selectorProvider) {

super(nThreads, executor, selectorProvider);

}

基类MultithreadEventLoopGroup的构造

[java] view plain copy

protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {

super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);

}

//nThreads:内部线程数,如果为0,就取默认值,通常我们会设置为处理器个数*2

DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(

"io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));

继续调用再上一级的MultithreadEventExecutorGroup的构造

[java] view plain copy

//这里会根据nThreads创建执行者数组

private final EventExecutor[] children;

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {

if (nThreads = 0) {

throw new IllegalArgumentException(String.format("nThreads: %d (expected: 0)", nThreads));

}

if (executor == null) {

executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());

}

//这里创建EventExecutor数组对象

children = new EventExecutor[nThreads];

if (isPowerOfTwo(children.length)) {

chooser = new PowerOfTwoEventExecutorChooser();

} else {

chooser = new GenericEventExecutorChooser();

}

//此处循环children数组,来创建内部的NioEventLoop对象

for (int i = 0; i nThreads; i ++) {

boolean success = false;

try {

//newChild是abstract方法,运行期会执行具体的实例对象的重载

children[i] = newChild(executor, args);

success = true;

} catch (Exception e) {

// TODO: Think about if this is a good exception type

throw new IllegalStateException("failed to create a child event loop", e);

} finally {

//如果没有成功,做关闭处理

if (!success) {

for (int j = 0; j i; j ++) {

children[j].shutdownGracefully();

}

for (int j = 0; j i; j ++) {

EventExecutor e = children[j];

try {

while (!e.isTerminated()) {

e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);

}

} catch (InterruptedException interrupted) {

// Let the caller handle the interruption.

Thread.currentThread().interrupt();

break;

}

}

}

}

}

......

}

因为我们最初创建的是NioEventLoopGroup对象,所以newChild会执行NioEventLoopGroup的newChild方法,创建NioEventLoop对象。

[java] view plain copy

@Override

protected EventLoop newChild(Executor executor, Object... args) throws Exception {

return new NioEventLoop(this, executor, (SelectorProvider) args[0]);

}

看看NioEventLoop的继承关系

[java] view plain copy

NioEventLoop extends SingleThreadEventLoop extends SingleThreadEventExecutor

NioEventLoop通过自己的构造行数,一直调用到SingleThreadEventExecutor的构造

[java] view plain copy

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {

super(parent);

if (executor == null) {

throw new NullPointerException("executor");

}

this.addTaskWakesUp = addTaskWakesUp;

this.executor = executor;

taskQueue = newTaskQueue();

}

至此,Group和内部的Loop对象以及Executor就创建完毕,那么他们是什么时候被调用的呢?就是在服务端bind和客户端connect时。

服务端的bind方法执行的是AbstractBootstrap的bind方法,bind方法中先会调用validate()方法检查是否有group

[java] view plain copy

@SuppressWarnings("unchecked")

public B validate() {

if (group == null) {

throw new IllegalStateException("group not set");

}

if (channelFactory == null) {

throw new IllegalStateException("channel or channelFactory not set");

}

return (B) this;

}

所以,假如初始时,没有设置Bootstrap的group的话,就会报错。

最终bind调用doBind,然后调用doBind0,启动一个Runnable线程

[java] view plain copy

private static void doBind0(

final ChannelFuture regFuture, final Channel channel,

final SocketAddress localAddress, final ChannelPromise promise) {

// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up

// the pipeline in its channelRegistered() implementation.

channel.eventLoop().execute(new Runnable() {

@Override

public void run() {

if (regFuture.isSuccess()) {

channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

} else {

promise.setFailure(regFuture.cause());

}

}

});

}

我们这里,channel.eventLoop()得到的是NioEventLoop对象,所以执行NioEventLoop的run方法,开始线程运行。

我们在创建Bootstrap初期,会调用它的group方法,绑定一个group,这样,一个循环就开始运行了。

inEventLoop()方法有什么

线程模型啊

NioEventLoop相对NioEventLoopGroup来说就复杂很多了,需要一定的耐心来看这篇文章。

首先从NioEventLoop的启动讲起,对于线程池来说,启动一般都是从第一个任务的添加开始的。经过跟踪,找到execute()方法在SingleThreadEventExecutor类中:

[java] view plain copy

public void execute(Runnable task) {

if (task == null) {

throw new NullPointerException("task");

}

// inEventLoop表示启动线程与当前线程相同,相同表示已经启动,不同则有两种可能:未启动或者线程不同

boolean inEventLoop = inEventLoop();

if (inEventLoop) {

// 运行中则直接添加任务到队列中

addTask(task);

} else {

// 尝试启动任务

startExecution();

// 将任务加到任务队列taskQueue中

addTask(task);

// 发现已经关闭则移除任务并拒绝

if (isShutdown() removeTask(task)) {

reject();

}

}

if (!addTaskWakesUp wakesUpForTask(task)) {

// 唤醒执行线程

wakeup(inEventLoop);

}

}

private void startExecution() {

// 未启动的状态下才进行启动

if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {

if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {

// 增加一个定时任务,该任务将定时任务队列中的已取消任务从队列中移除,该任务每间隔1秒执行1次

schedule(new ScheduledFutureTaskVoid(

this, Executors.Voidcallable(new PurgeTask(), null),

ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));

// 开始执行

scheduleExecution();

}

}

}

// 如果已经关闭了,则不能再加任务,否则加入到任务队列中

protected void addTask(Runnable task) {

if (task == null) {

throw new NullPointerException("task");

}

if (isShutdown()) {

reject();

}

taskQueue.add(task);

}

(责任编辑:IT教学网)

更多

推荐综合特效文章