可以说,Netty把NIO和异步编程的哲学发挥到了淋漓尽致。在Netty中,几乎所有涉及网络操作的地方均采用异步回调的方式。图2-12展示了Netty的工作原理。首先,Netty用reactor线程监听ServerSocketChannel,每个ServerSocketChannel对应一个实际的端口。如果需要监听多个端口,则需要为reactor线程池配置多个线程。
当reactor线程监听的ServerSocketChannel监测到连接请求事件(OP_ACCEPT)时,就为接收到的连接套接字建立一个SocketChannel,并将该SocketChannel委托给工作线程池中的某个工作线程做后续处理。之后,当工作线程监测到SocketChannel上有数据可读(OP_READ)时,就调用相关的回调句柄(handler)对数据进行读取和处理,并返回最终的处理结果。另外,在Netty相关代码中,通常将reactor线程池称为boss group,而将工作线程池称为work group,大家在阅读Netty相关代码时知晓这两个概念即可。
图2-12 Netty的工作原理

使用Netty实现数据采集API
下面我们使用Netty来实现数据采集服务器。
public static void main(String[] args) {
final int port = 8081;
final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
final EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
final ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ServerInitializer())
.option(ChannelOption.SO_BACKLOG, 1024);
final ChannelFuture f = bootstrap.bind(port).sync();
logger.info(String.format("NettyDataCollector: running on port[%d]",
port));
f.channel().closeFuture().sync();
} catch (final InterruptedException e) {
logger.error("NettyDataCollector: an error occurred while running", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
private static final int MAX_CONTENT_LENGTH = 1024 1024;
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("http-codec", new HttpServerCodec());
ch.pipeline().addLast(new HttpObjectAggregator(MAX_CONTENT_LENGTH));
ch.pipeline().addLast("handler", new ServerHandler());
}
}
public class ServerHandler extends
SimpleChannelInboundHandler<HttpRequest> {
private static final Logger logger = LoggerFactory.getLogger(NettyData
Collector.class);
private final String kafkaBroker = "127.0.0.1:9092";
private final String topic = "collector_event";
private final KafkaSender kafkaSender = new KafkaSender(kafkaBroker);
private JSONObject doExtractCleanTransform(JSONObject event) {
// TODO: 实现抽取、清洗、转化具体逻辑
return event;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest req)
throws Exception {
byte[] body = readRequestBodyAsString((HttpContent) req); // step1: 对消息进行解码
JSONObject bodyJson = JSONObject.parseObject(new String(body,
Charsets.UTF_8));
// step2: 对消息进行抽取、清洗、转化
JSONObject normEvent = doExtractCleanTransform(bodyJson);
// step3: 将格式规整化的消息发送到消息中间件Kafka
kafkaSender.send(topic,
normEvent.toJSONString().getBytes(Charsets.UTF_8));
// 通知客户端数据采集成功
sendResponse(ctx, OK, RestHelper.genResponse(200, "ok").toJSONString());
}
}
在上面的代码中,我们分别在boss group和work group中设置了一个和两倍CPU 核数的线程数。在Netty服务器只监听一个端口时,启用一个ServerBootstrap实例即可,这时boss group也只需配置一个线程,更多的线程并不会提升性能。而如果Netty服务器启动多个端口,则需要为每一个端口启动一个ServerBootstrap实例,并最好给bossgroup配置与端口数相同的线程数,更多的线程不会提升性能,更少的线程则会降低性能。
在工作线程的回调处理过程中,我们使用HttpServerCodec将接收的数据按照HTTP格式解码,解码后的数据再交由ServerHandler处理,之后的处理逻辑就与用Spring Boot实现数据采集服务器的处理逻辑相同了。
异步编程
Netty实现的数据采集服务器在处理网络I/O时,充分发挥出了异步的潜力,但是不是这样就让CPU和I/O的能力彻底释放出来了呢?这可不一定。仔细查看前面的实现过程会发现,虽然采用NIO使请求的接收和请求的处理隔离开了,但是在处理请求的时候依然使用的是同步方式。
也就是说,对消息读取、解码、ECT、发送至消息中间件及最终将结果返回给客户端这全部的步骤都是在工作线程中依次执行完成的,并且我们只给work group分配了两倍于CPU核心数的线程数。很明显,这种处理逻辑还是将网络I/O数据读取的过程与具体请求处理的过程耦合起来了。我们可以通过增加work group线程数的方式来提升服务器的处理能力,但显然这不是正确的方法。毕竟Netty花了九牛二虎之力为我们构建了异步处理网络I/O事件的完整框架,但到最后我们依旧用耗时的同步处理逻辑阻塞了本应该用于快速处理网络读写事件的工作线程,严重影响工作线程处理网络读写事件的效率,实在是暴殄天物了!
所以,我们应该将请求处理的逻辑从工作线程的职责中剥离出来,让工作线程专心于处理网络读写事件,而用其他线程来执行请求的处理逻辑。图2-13就说明了这种异步方案。
下面我们就来看看如何将请求处理改造成异步执行的方式。
private static class RefController {
private final ChannelHandlerContext ctx;
private final HttpRequest req;
public RefController(ChannelHandlerContext ctx, HttpRequest req) {
this.ctx = ctx;
this.req = req;
}
public void retain() {
ReferenceCountUtil.retain(ctx);
ReferenceCountUtil.retain(req);
}
public void release() {
ReferenceCountUtil.release(req);
ReferenceCountUtil.release(ctx); }
}
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest req)
throws Exception {
logger.info(String.format("current thread[%s]",
Thread.currentThread().toString()));
final RefController refController = new RefController(ctx, req);
refController.retain();
CompletableFuture
.supplyAsync(() -> this.decode(ctx, req), this.decoderExecutor)
.thenApplyAsync(e -> this.doExtractCleanTransform(ctx, req, e),
this.ectExecutor)
.thenApplyAsync(e -> this.send(ctx, req, e), this.senderExecutor)
.thenAccept(v -> refController.release())
.exceptionally(e -> {
try {
logger.error("exception caught", e);
sendResponse(ctx, INTERNAL_SERVER_ERROR,
RestHelper.genResponseString(500, "服务器内部错误"));
return null;
} finally {
refController.release();
}
});
}
在上面的代码中,channelRead0函数的输入参数是一个ChannelHandlerContext对象和一个HttpRequest对象,它们针对每次的请求处理而创建,所以并非全局的,可以在不同线程间自由地传递和使用它们,并且不用担心并发安全的问题。
虽然channelRead0函数依旧在工作线程中被执行,但是这个函数是将具体的处理逻辑委托给其他线程后就立刻返回了。channelRead0函数执行的耗时极短,不会影响工作线程继续快速处理其他的网络读写事件。
我们将请求的处理逻辑细分为解码、ECT和发送到消息队列3个步骤,然后使用CompletableFuture的各种异步执行API将这3个步骤构造成异步执行链。具体来说,首先使用supplyAsync方法将解码过程委托给专门的解码执行器decoderExecutor。然后连续使用两次thenApplyAsync指定当解码和ECT结束之后,分别委托给ectExecutor和senderExecutor执行器进行ETL和发送。最后由thenAccept指定当ETL完成时,由发送执行器将数据发送给消息中间件。
以上整个过程都只是在制订异步执行的计划,不涉及真实的执行过程,所以channelRead0耗时极少,可以立刻返回。
图2-13 将回调句柄由同步改成异步执行
由于Netty会对其使用的部分对象进行分配和回收管理,在channelRead0方法返回时,Netty框架会立刻释放HttpRequest对象。而channelRead0方法将请求提交异步处理后立刻返回,此时请求处理可能尚未结束。因此,在将请求提交异步处理之前,必须先调用refController.retain()来保持对象,而在请求处理完后,再调用refController.release()来释放HttpRequest对象。
流量控制和反向压力
上面的改造已经将请求处理的过程彻底异步化,至此CPU和I/O才可以毫无阻碍地尽情“干活”,它们的生产力得到充分解放。但是,有关异步的问题还没有彻底解决。
由于请求处理使用了异步执行方案,请求的具体逻辑实际上交由各个步骤的执行器(executor)进行处理。这个过程中没有任何阻塞的地方,只不过各个步骤待处理的任务都被隐式地存放在了各个执行器的任务队列中。如果各执行器处理得足够快,那么它们的任务队列都能被及时消费,这样不会存在问题。但是,一旦某个步骤的处理速度比不上请求接收线程接收新请求的速度,那么必定有部分执行器任务队列中的任务会不停增长。由于执行器任务队列默认是非阻塞且不限容量的,这样当任务队列积压的任务越来越多时,终有一刻,JVM的内存会被耗尽,抛出OOM系统错误后程序异常退出。图2-14说明了这种问题。
图2-14 任务在各个执行器任务队列中积压
实际上,这是所有异步系统都普遍存在且必须引起我们重视的问题。在纤程中,可以通过指定最大纤程数量来限制内存的使用量,非常自然地控制了内存和流量。但是,在一般的异步系统中,如果不对执行的各个环节做流量控制,就很容易出现OOM问题。因为当每个环节都不管其下游环节处理速度是否跟得上,不停将其输出塞给下游的任务队列时,只要上游输出速度超过下游处理速度的状况持续一段时间,必然会导致内存不断被占用,直至最终耗尽,抛出OOM灾难性系统错误。
为了避免OOM问题,我们必须对上游输出给下游的速度做流量控制。一种方式是严格控制上游的发送速度,如每秒控制其只能发1000条消息,但是这种粗糙的处理方案非常低效。例如,实际下游每秒处理2000条消息,那么上游每秒1000条消息的速度就使得下游一半的性能没发挥出来。又如,下游因为某种原因性能降级为每秒只能处理500条,那么在一段时间后同样会发生OOM问题。
更优雅的一种解决方法是反向压力方案,即上游能够根据下游的处理能力动态调整输出速度。当下游处理不过来时,上游就减慢发送速度;当下游处理能力提高时,上游就加快发送速度。反向压力方案的思想实际上正逐渐成为流计算领域的共识,如与反向压力相关的标准Reactive Streams正在形成过程中。图2-15演示了Reactive Streams的工作原理,下游的消息订阅者从上游的消息发布者接收消息前,会先通知消息发布者自己能够接收多少消息,消息发布者之后就按照这个数量向下游的消息订阅者发送消息。这样,整个消息传递的过程都是量力而行的,不存在上下游处理能力不匹配造成的OOM问题了。
图2-15 Reactive Streams的工作原理
实现反向压力
回到Netty数据采集服务器的实现问题,那该怎样加上反向压力功能呢?
由于请求接收线程接收的新请求及其触发的各项任务被隐式地存放在各步骤的执行器任务队列中,并且执行器默认使用的任务队列是非阻塞和不限容量的,因此要加上反向压力功能,只需要从以下两个方面来控制。
·执行器任务队列容量必须有限。
·当执行器任务队列中的任务已满时,就阻塞上游继续向其提交新的任务,直到任务队列重新有空间可用为止。
图2-16 使用容量有限的阻塞队列实现反向压力
按照上面这种思路,我们可以很容易地实现反向压力。图2-16展示了使用容量有限的阻塞队列实现反向压力的过程,当“处理”这个步骤比“解码”步骤慢时,位于“处理”前的容量有限的阻塞队列会被塞满。当“解码”操作继续要往其写入消息时,就会被阻塞,直到“处理”操作从队列中取走消息为止。下面是一个具备反向压力能力的ExecutorService的具体实现细节。
private final List<ExecutorService> executors;
private final Partitioner partitioner;
private Long rejectSleepMills = 1L;
public BackPressureExecutor (String name, int executorNumber, int coreSize, int
maxSize, int capacity, long rejectSleepMills) {
this.rejectSleepMills = rejectSleepMills;
this.executors = new ArrayList<>(executorNumber);
for (int i = 0; i < executorNumber; i++) {
ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(capacity);
this.executors.add(new ThreadPoolExecutor(
coreSize, maxSize, 0L, TimeUnit.MILLISECONDS,
queue,
new ThreadFactoryBuilder().setNameFormat(name + "-" + i + "-
%d").build(),
new ThreadPoolExecutor.AbortPolicy()));
}
this.partitioner = new RoundRobinPartitionSelector(executorNumber);
}
@Override
public void execute(Runnable command) {
boolean rejected;
do {
try {
rejected = false;
executors.get(partitioner.getPartition()).execute(command);
} catch (RejectedExecutionException e) {
rejected = true;
try {
TimeUnit.MILLISECONDS.sleep(rejectSleepMills);
} catch (InterruptedException e1) {
logger.warn("Reject sleep has been interrupted.", e1);
}
}
} while (rejected);
}
@Override
public Future<?> submit(Runnable task) {
boolean rejected;
Future<?> future = null;
do {
try {
rejected = false;
future = executors.get(partitioner.getPartition()).submit(task);
} catch (RejectedExecutionException e) {
rejected = true;
try {
TimeUnit.MILLISECONDS.sleep(rejectSleepMills);
} catch (InterruptedException e1) {
logger.warn("Reject sleep has been interrupted.", e1);
}
}
} while (rejected);
return future;
}
在上面的代码中,BackPressureExecutor类在初始化时新建ThreadPoolExecutor对象作为实际执行任务的执行器。创建ThreadPoolExecutor对象时采用ArrayBlockingQueue,这是实现反向压力的关键之一。将ThreadPoolExecutor拒绝任务时采取的策略设置为AbortPolicy,这样在任务队列已满再执行execute或submit方法时会抛出RejectedExecutionException异常。在execute和submit方法中,通过一个do...while循环,循环体内捕获表示任务队列已满的RejectedExecutionException异常,直到新任务提交成功才退出,这是实现反向压力的关键之二。
接下来就可以在数据采集服务器中使用这个带有反向压力功能的MultiQueueExecutor-Service了。
final private Executor decoderExecutor = new
BackPressureExecutor("decoderExecutor",
1, 2, 1024, 1024, 1);
final private Executor ectExecutor = new BackPressureExecutor("ectExecutor",
1, 8, 1024, 1024, 1);
final private Executor senderExecutor = new BackPressureExecutor("senderExecutor",
1, 2, 1024, 1024, 1);
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest req)
throws Exception {
logger.info(String.format("current thread[%s]",
Thread.currentThread().toString()));
final RefController refController = new RefController(ctx, req);
refController.retain();
CompletableFuture
.supplyAsync(() -> this.decode(ctx, req), this.decoderExecutor)
.thenApplyAsync(e -> this.doExtractCleanTransform(ctx, req, e),
this.ectExecutor)
.thenApplyAsync(e -> this.send(ctx, req, e), this.senderExecutor)
.thenAccept(v -> refController.release())
.exceptionally(e -> {
try {
logger.error("exception caught", e);
if (RequestException.class.isInstance(e.getCause())) {
RequestException re = (RequestException) e.getCause();
sendResponse(ctx, HttpResponseStatus.valueOf(re.getCode()),
re.getResponse());
} else {
sendResponse(ctx, INTERNAL_SERVER_ERROR,
RestHelper.genResponseString(500, "服务器内部错误"));
}
return null;
} finally {
refController.release();
}
});
}
从上面的代码可以看出,我们只需把decode、doExtractCleanTransform和send等各个步骤用到的执行器替换成BackPressureExecutor,即可实现反向压力功能,其他部分的代码不需要做任何改变。
通过以上改造,当上游步骤往下游步骤提交新任务时,如果下游处理较慢,则上游会停下来等待,直到下游将执行器队列中的任务取走,上游才能继续提交新任务。如此一来,上游自动匹配下游的处理速度,最终实现了反向压力功能。
在BackPressureExecutor的实现中,之所以采用封装多个执行器的方式,是考虑到要使用M×N个线程,有下面3种不同的使用场景。
·每个执行器使用1个线程,使用M×N个执行器;
·每个执行器使用M×N个线程,使用1个执行器;
·每个执行器使用M个线程,使用N个执行器。
在不同场景下,3种使用方式的性能表现也会稍有不同。读者如果
需要使用这个类,请根据实际场景做出合理设置和必要测试。
异步的不足之处
在前面有关异步的讨论中,我们总在“鼓吹”异步比同步更好,能够更有效地使用CPU和I/O资源,提高程序性能。那是不是同步就一无是处,而异步毫无缺点呢?
其实不然。从一开始我们就说过,理论上讲纤程是最完美的线程。纤程虽然在内部使用异步机制实现,但基于纤程开发程序只需采用同步的方式,完全不需要考虑异步问题。这说明我们在程序开发的时候,并不是为了异步而异步,而是为了提高资源使用效率、提升程序性能才使用异步的。如果有纤程这种提供同步编程方式,而且保留非阻塞I/O优势的方案,那么我们大可不必选择异步编程方式。毕竟通常情况下,异步编程相比同步编程复杂太多,稍有不慎就会出现各种问题,如资源泄漏和反向压力等。
除了编程更复杂外,异步方式相比同步方式,对同一个请求的处理也会有更多的额外开销。这些开销包括任务在不同步骤(也就是不同线程)之间的辗转、在任务队列中的排队和等待等。所以,对于一次请求的完整处理过程,异步方式相比同步方式通常会花费更多的时间。
还有些时候,系统会更加强调请求处理的时延。这时一定要注意,应该先保证处理的时延能够达到性能指标要求,在满足时延要求的情况下,再尽可能提升每秒处理请求数。这是因为,在CPU和I/O等资源有限的情况下,为了提升每秒处理请求数,CPU和I/O都应尽可能处于忙碌状态,这就需要用到类似于异步编程中用任务队列缓存未完成任务的方法。这样做的结果是,系统每秒处理的请求数可能通过拼命“压榨”CPU和I/O得到了提升,但同时各个环节任务队列中的任务过多,增加了请求处理的时延。因此,如果系统强调的是请求处理的时延,那么异步方式几乎不会对降低请求处理时延带来任何好处。
这时只能先通过优化算法和I/O操作来降低请求处理时延,然后通过提高并行度以提升系统每秒处理的请求数。提高并行度既可以在JVM内实现,也可以在JVM外实现。在JVM内增加线程数,直到再增加线程时处理时延满足不了时延指标要求为止;在JVM外,在多个主机上部署多个JVM进程,直到整个系统的每秒请求处理数满足TPS指标要求为止。需要注意的是,在提高并行度的整个过程中,任何时候都必须保证请求处理的时延是满足时延指标要求的。
本篇文章给大家讲解的内容是使用Netty实现数据采集服务器下篇文章给大家讲解的内容是实现单节点流计算应用:自己动手写实时流计算框架