300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > Vert.x实战 Verticles:Vert.x的基本处理单元

Vert.x实战 Verticles:Vert.x的基本处理单元

时间:2019-01-28 17:55:12

相关推荐

Vert.x实战 Verticles:Vert.x的基本处理单元

本章包括:

verticles是什么如何写【write】、配置【configure】以及部署【deploy】一个verticlesVert.x的线程模型【threading model】如何混合【mix】Vert.x线程以及非Vert.x线程

简单来讲,verticle是Vert.x中的基本处理单元【fundamental processing unit】。verticle的作用是用于封装处理事件的技术功能单元【technical functional unit】,例如暴露一个HTTP API并响应请求、在数据库之上提供仓储接口【 repository interface】,或向第三方系统发出请求。与其他技术(如Enterprise Java Beans【EJB】)中的组件【components 】非常相似,verticles同样可以部署,并且具有生命周期。

异步编程是构建响应式应用程序的关键,因为它们必须可伸缩【scale】,而在Vert.x中,verticles是构造(异步)事件处理代码和业务逻辑的基础。

2.1 你的第一个verticle【Writing a verticle】

熟悉actor并发模型【actor concurrency model】的读者,将会发现Vert.x的verticles与actors[1]具有相似之处。简而言之,在actor模型中,其中自治实体【autonomous entities】(即actors)通过发送和响应消息与其他实体进行专有通信。Vert.x的verticles与actor之间的相似绝非偶然:verticles具有私有状态,并可以在接收事件时进行更新,它们可以部署其他verticles,并且它们可以通过消息传递进行通信(下一章将详细介绍)。Verticles不一定遵循actors正统的传统定义【orthodox definition】,但我们可以公平地认为,Vert.x至少是受到了actors的启发。

因为verticles是Ver.x中的一个重要概念,我们将研究它们是如何工作的。在此之前,让我们编写一个简单的verticle,它可以处理两种类型的事件 :

定期计时器事件【 periodic timers event】HTTP请求事件【HTTP requests event】

2.1.1 项目准备【Preparing the project】

我们将使用清单2.1中的Gradle项目描述符【Gradle project descriptor 】,为本章中的所有示例使用一个通用的工程:

plugins {java}repositories {mavenCentral()}dependencies {implementation("io.vertx:vertx-core:3.8.0")implementation("ch.qos.logback:logback-classic:1.2.3")}tasks.create<JavaExec>("run") {main = project.properties.getOrDefault("mainClass", "chapter2.hello.HelloVerticle") as Stringclasspath = sourceSets["main"].runtimeClasspathsystemProperties["vertx.logger-delegate-factory-class-name"] = "io.vertx.core.logging.SLF4JLogDelegateFactory"}java {sourceCompatibility = JavaVersion.VERSION_1_8}

Listing 2.1. Gradle build.gradle.kts for the samples in chapter 2

Gradle对于构建Java项目来说是非常简单的。因为我们将有几个例子要运行,我们不依赖Gradle的application插件,而是定义我们自己的自定义run任务,我们可以通过传递启动类的名称来执行。我们还将利用它来确保日志被正确配置并统一使用SLF4J。

<configuration><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>%level [%thread] %logger{0} - %msg%n</pattern></encoder></appender><logger name="ty" level="warn"/><root level="debug"><appender-ref ref="STDOUT"/></root></configuration>

Listing 2.2. Logback configuration to reduce Netty verbosity

TIP

Vert.x uses Netty and logging in Netty is quite verbose with the default Logback configuration. We can reduce the amount of log entries by creating a src/main/resources/logback.xml file and adding the configuration as in listing 2.2. To make the log samples shorter in this book we also removed event dates and shortened logger class names ($logger{0}). Please refer to the Logback documentation to understand how to configure it [LogbackDoc].Vert.x使用了Netty,而Netty中的日志的默认配置是相当冗长的。我们可以通过创建src/main/resources/logback.xml文件并添加如清单2.2所示的配置来减少日志条目的数量。具体请参阅Logback的官方文档来了解如何配置它。

2.1.2 verticle 类【The verticle class】

清单2-3中是使用Java类来构建的一个完整verticle和应用程序:

package chapter2.hello;import io.vertx.core.AbstractVerticle;import io.vertx.core.Vertx;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class HelloVerticle extends AbstractVerticle {private final Logger logger = LoggerFactory.getLogger(HelloVerticle.class);private long counter = 1;@Overridepublic void start() {vertx.setPeriodic(5000, id -> {logger.info("tick");});vertx.createHttpServer().requestHandler(req -> {logger.info("Request #{} from {}", counter++, req.remoteAddress().host());req.response().end("Hello!");}).listen(8080);logger.info("Open http://localhost:8080/");}public static void main(String[] args) {Vertx vertx = Vertx.vertx();vertx.deployVerticle(new HelloVerticle());}}

该verticle定义了两个事件处理程序【event handlers】:

一个是用于处理5秒一次的周期性任务另一个用于在HTTP服务器中处理HTTP请求

在main方法中,我们实例化了一个全局 Vert.x实例,并部署了一个verticle实例。

Defining a verticle code in Java is typically done by specializing the AbstractVerticle class. There exists a Verticle interface that one could in theory implement, but AbstractVerticle provides all the event processing, configuration and execution plumbing that Vert.x users need.

在Java中通常通过专门的AbstractVerticle类来完成对verticle的定义。虽然存在一个理论上可以实现的Verticle接口,但是AbstractVerticle提供了所有的事件处理【event processing】、配置【configuration】和执行管道【execution plumbing】,而这些是Vert.x使用者定然需要的功能。

NOTE

因为Vert.x是一个库,而不是一个框架,因此你既可以在main方法中创建一个Vert.x实例,亦可以在其他类中创建,然后再部署**verticles**。

verticle的生命周期由启动事件【start event】和停止事件【stop event】组成。AbstractVerticle类提供了可以重写的start方法和stop方法。默认情况下,这些方法不执行任何操作。

The start method typically contains setup and handlers initialization, like setting a periodic task handler and starting a HTTP server in listing 2.3.start方法通常包含设置初始化【setup initialization】和处理程序初始化【 handlers initialization】,比如我们在清单 2.3 中所示的设置周期性任务处理程序【periodic task handler】和启动一个HTTP服务器。The stop method is implemented when housekeeping tasks are required, such as closing open database connections.stop方法多在需要家务管理任务【housekeeping tasks】时实现,比如关闭打开的数据库连接。

2.1.3 Running and first observations

通过从集成开发环境或命令行运行main方法,即可以将应用程序作为常规Java应用程序来启动。要使用Gradle在命令行中运行它,可以使用以下命令:

$ ./gradlew run -PmainClass=chapter2.hello.HelloVerticle

我们再次假设您将在Unix shell中运行该程序,无论是在Linux、macOS还是适用于Linux的Windows子系统。如果您在传统的Windows终端上运行该命令,那么Gradle有一个.bat文件,所以您需要将命令行中的./gradlew替换为gradlew.bat。

应用程序运行后,我们可以使用web浏览器在localhost:8080/上执行一些HTTP请求,或者使用cUrl和HTTPie等命令行工具。日志应该类似于清单2.4。

INFO [vert.x-eventloop-thread-0] HelloVerticle - Open http://localhost:8080/ ① HTTP服务器现在就绪 INFO [vert.x-eventloop-thread-0] HelloVerticle - tick② 周期性任务事件日志INFO [vert.x-eventloop-thread-0] HelloVerticle - Request #1 from 0:0:0:0:0:0:0:1 ③ HTTP请求事件日志INFO [vert.x-eventloop-thread-0] HelloVerticle - Request #2 from 0:0:0:0:0:0:0:1INFO [vert.x-eventloop-thread-0] HelloVerticle - Request #3 from 0:0:0:0:0:0:0:1INFO [vert.x-eventloop-thread-0] HelloVerticle - Request #4 from 0:0:0:0:0:0:0:1INFO [vert.x-eventloop-thread-0] HelloVerticle - tick

Listing 2.4. Sample log output when running HelloVerticle

TIP

在剩下的一些示例中,我们缩短了类的定义。我们特地删除了与清单2.3中类似的包定义【package definitions】、导入【imports】和main方法。如果有疑问,请参考样本的完整源代码。

我们所使用的Logback配置打印出了与事件关联的线程的名称。现在我们已经可以在日志中校验Vert.x的一个重要的属性:

事件处理【event processing】发生在单独的事件回环【event-loop】线程上,如日志所示,周期性任务和HTTP请求的处理【periodic tasks and HTTP request processing 】都发生在日志中打印为vert.x-eventloop-thread-0的线程上。

这种设计的一个明显好处是,verticle实例总是在同一线程上执行事件处理,因此不需要使用任何线程同步原语【 thread synchronization primitives】。在多线程设计中,更新 counter 字段要么需要一个synchronized 块,要么需要使用java.util.concurrent.AtomicLong。但这里没有这样的问题,所以一个普通的long字段也可以安全地使用。

预备/准备方法【Preparation methods】,如createHttpServer或setTimer,除了可以在AbstractVerticle实现类中调用之外,也可以被一个非Vert.x线程【non-vert.x thread】调用。这在直接使用没有verticleVertx对象时(译者注:即不创建Verticle实现,直接用Vertx来绑定事件处理程序),或者在编写单元测试时,可能会发生这种情况。不过,这不是问题,因为Vertx类中的方法是线程安全的。

Figure 2.1. Execution of listing 2.3

图 2.1 中简单地展示了verticle、处理程序【handler】、Vertx和事件源【event sources】之间的交互。每个箭头表示参与者之间的一个方法调用。例如,HelloVerticle通过在Vertx对象上调用setPeriodic来创建一个周期性任务处理程序【 periodic task handler】,后者使用Vert.x内部的计时器创建一个周期性任务【 periodic task 】。反过来,Vert.x的内部计时器会定期的回调HelloVerticle中的timerHandler处理程序。

注意,为简单起见,我们将对requestHandler以及listen方法的的调用表示为对Vertx对象的快捷方式,而实际上它们位于实现了HttpServer接口的对象上。该实现类是Vert.x包内部的,由于它不适合作为一个参与者添加到该图表中,所以我们暂且将它合并到Vertx中。

2.2 More on verticles

关于编写和部署verticles还有很多需要了解的事情:

当事件回环被阻塞时会发生什么【what happens when the event-loop is being blocked】在存在异步初始化工作的情况下,如何延迟生命周期完成的通知?【how to defer notification of life-cycle completion in presence of asynchronous initialization work?】,如何部署verticles以及取消对verticles的部署,【how to deploy and un-deploy verticles】如何传递配置数据【how to pass configuration data】

我们将使用非常简单但重点突出的例子来涵盖每个主题。

2.2.1 Blocking and the event-loop

处理程序回调【Handler callbacks】在事件回环【event-loop】线程中运行。在事件回环【event-loop】上运行的代码尽可能少花时间是很重要的,这样事件回环【event-loop】线程在处理的事件数量上就可以有更高的吞吐量。这就是为什么在事件回环【event-loop】中不应该发生长时间运行或阻塞的I/O的操作。

尽管如此,发现阻塞代码并不总是那么容易,尤其是在使用第三方库时。Vert.x提供了一个检查器,用于检测事件回环【event-loop】是否被阻塞过长时间。

为了说明这一点,让我们看看当在事件处理程序回调【 event handler callback】中引入无限循环时会发生什么。

public class BlockEventLoop extends AbstractVerticle {@Overridepublic void start() {vertx.setTimer(1000, id -> {while (true);});}public static void main(String[] args) {Vertx vertx = Vertx.vertx();vertx.deployVerticle(new BlockEventLoop());}}

Listing 2.5. Sample where the event-loop is being blocked

清单 2.5 中的代码定义了一个1秒的计时器,然后处理程序回调【handler callback】进入一个无限循环。

WARN [vertx-blocked-thread-checker] BlockedThreadChecker - Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 2871 ms, time limit is 2000WARN [vertx-blocked-thread-checker] BlockedThreadChecker - Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 3871 ms, time limit is 2000(...)WARN [vertx-blocked-thread-checker] BlockedThreadChecker - Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 5879 ms, time limit is 2000io.vertx.core.VertxException: Thread blockedat chapter2.blocker.BlockEventLoop.lambda$start$0(BlockEventLoop.java:11)at chapter2.blocker.BlockEventLoop$$Lambda$10/152379791.handle(Unknown Source)(...)

Listing 2.6. Sample log output when running listing 2.5

清单 2.6中 提供了运行清单 2.5 中的代码时的标准日志输出。正如我们所看到的,当事件回环【event-loop】线程运行着无限循环,而无法处理其他事件时,警告日志开始出现。经过一些迭代之后(默认为5秒),警告会被堆栈跟踪转储,内容变得更加丰富,因此我们可以清楚地识别出代码中的罪魁祸首。注意,这只是一个警告,事件回环【event-loop】线程检查器不会杀死需要花费太长时间来完成其任务的处理程序。

当然,有时需要使用阻塞代码或长时间运行的代码,Vert.x提供了在不阻塞事件循环的情况下运行这些代码的解决方案。这就是第2.3章节的主题。

TIP 配置Vert.x阻塞检查器

默认情况下,阻塞线程检查器发出警告前的时间限制为两秒,但可以将其配置为不同的值。有些环境(如嵌入式设备)的处理能力较慢,因此提高它们的线程阻塞检查器的阈值是正常的。

可以使用系统属性更改设置:

时间限制默认为2秒,但可以配置为不同的值。在处理能力较慢的环境中,比如嵌入式设备,增加线程检查器阈值是正常的。

-Dvertx.options.blockedThreadCheckInterval=5000 将告警间隔设置为5秒-Dvertx.threadChecks=false 禁用线程检查器

注意,此配置是全局的,因此不能在每个verticle的基础上进行微调。

2.2.2 Asynchronous notification of life-cycle events

到目前为止,我们已经看到了使用**start()**生命周期方法的示例。关于生命周期方法中的约定是:除非该方法抛出异常,否则verticle成功完成其start生命周期事件处理。stop()方法也是如此。

但是有一个问题:startstop方法中的一些操作可能是异步的,因此它们可能在调用**start()stop()**返回后才完成。

让我们看看如何正确地将延迟的成功或失败通知给调用者。一个很好的例子就是启动HTTP服务器,这是一个非阻塞操作。

public class SomeVerticle extends AbstractVerticle {@Overridepublic void start(Promise<Void> promise) {① Promise是void类型的,因为Vert.x只对部署完成感兴趣,无需携带任何值vertx.createHttpServer().requestHandler(req -> req.response().end("Ok")).listen(8080, ar -> {② 支持异步结果的listen变体,可以指示操作是否失败if (ar.succeeded()) {plete(); ③ complete()用于将Promise标记为已完成(当Promise不是void类型时,可以传递一个值)} else {promise.fail(ar.cause()); ④ 如果listen操作失败,我们将Promise标记为失败并传播错误。}});}}

Listing 2.7. Example of an asynchronous start life-cycle method

清单 2.7 提供了这样一个示例,其中verticle在启动时报告一个异步通知。这一点很重要,因为启动HTTP服务器可能会失败。实际上,TCP端口可能被另一个进程使用,在这种情况下,HTTP服务器无法启动,因此verticle没有成功部署。为此,我们使用listen方法的一个变体,该变体listen方法在操作完成时调用回调。

The start and stop methods in AbstractVerticle support variants with an argument of type io.vertx.core.Promise. As the name suggest, a Vert.x Promise is an adaptation of the future and promises model for processing asynchronous results [Liskov88]. A promise is used to write an asynchronous result, while a future is used to view an asynchronous result. Given a Promise object, you can call the future() method to obtain a future or type io.vertx.core.Future.

AbstractVerticle中的startstop方法还有一套重载的方法变体,该变体方法的形参类型为io.vertx.core.Promise。顾名思义,Vert.x 中的Promise是对future和promises模型的一种改编,用于处理异步结果[Liskov88]。promise用于写异步结果,future用于查看异步结果。给定一个Promise对象,您可以调用future()方法来获取一个类型为io.vertx.core.Future的future。

在清单 2.7 中,当在verticle成功完成其startstop生命周期方法时,Promise对象才被设置为完成。如果出现错误,则Promise对象被置为失败,并有一个描述器失败原因的错误【error】,然后verticle部署失败。

为了更好地理解这里发生了什么,图 2.3 阐明了当使用start(Promise startPromise)重载方法时,verticle、Vertx对象以及内部的Vert.x部署器对象(负责调用start/stop方法)之间的交互。我们可以通过检查部署器是否还在等待promise完成,以知道部署是否成功,即使在方法start的调用已经返回之后。

相比之下,图2.2展示了使用**start()**重载方法时,verticle、Vertx对象以及内部的Vert.x部署器对象(负责调用start/stop方法)之间的交互,此时,部署器不可能收到错误通知。

TIP

使用接受一个回调来通知错误的异步方法变体【asynchronous method variants 】是一个很好的健壮性实践,就像清单 2.7 中的**listen**方法一样。在本书的其余部分中,我们不会总是这样做,因为这样可以减少代码示例的冗长。

2.2.3 Deploying verticles

到目前为止,我们一直在使用嵌入在某一Verticle实现类中的main方法来部署verticles

Verticles are always deployed (and un-deployed) through the Vertx object. You may do so from any method, but the typical way to deploy an application composed of verticles is to:

Verticles总是通过Vertx对象来部署(和取消部署)。你可以通过任何方法来实现这一点,但是部署由多个verticles组成的应用程序的典型方式是:

部署一个main verticle【deploy a main verticle】这个main verticle 部署其他的verticles【the main verticle deploys other verticles】被部署的verticles转而部署进一步的verticles【the deployed verticles may in turn deploy further verticles】

请注意,虽然这听起来有层次感【hierarchical】,但是Vert.x 中其实并没有父【parent】/子【child 】verticles的正式概念。

为了说明这一点,让我们定义一些verticles

public class EmptyVerticle extends AbstractVerticle {private final Logger logger = LoggerFactory.getLogger(EmptyVerticle.class);@Overridepublic void start() {logger.info("Start");}@Overridepublic void stop() {logger.info("Stop");}}

Listing 2.8. A sample verticle to deploy

清单2.8定义了一个简单的verticle。它不做任何有趣的事情,只是在启动和停止时打印日志记录。

public class Deployer extends AbstractVerticle {private final Logger logger = LoggerFactory.getLogger(Deployer.class);@Overridepublic void start() {long delay = 1000;for (int i = 0; i < 50; i++) {vertx.setTimer(delay, id -> deploy()); ① 每秒部署一个EmptyVerticle实例delay = delay + 1000;}}private void deploy() {vertx.deployVerticle(new EmptyVerticle(), ar -> {② 部署一个Verticle是一个异步操作,并且deploy方法的一个变体支持异步结果if (ar.succeeded()) {String id = ar.result();logger.info("Successfully deployed {}", id);vertx.setTimer(5000, tid -> undeployLater(id)); ③ 5秒后对Verticle取消部署} else {logger.error("Error while deploying", ar.cause());}});}private void undeployLater(String id) {④ 取消部署与部署类似vertx.undeploy(id, ar -> {if (ar.succeeded()) {logger.info("{} was undeployed", id);} else {logger.error("{} could not be undeployed", id);}});}}

Listing 2.9. A verticle that deploys and un-deploys other verticles.

清单2.9定义了一个verticle,它部署了50个清单2.8中的EmptyVerticle类的实例。计时器【timer】的使用使得我们可以每隔1秒部署一个verticle。部署【deploy】方法中则使用另一个计时器【timer】,用于在部署后5秒取消对verticle的部署。部署时会为每一个verticle分配一个惟一标识符字符串,该标识符后面可以用于取消部署。

public static void main(String[] args) {Vertx vertx = Vertx.vertx();vertx.deployVerticle(new Deployer());}

Listing 2.10. Main class to deploy the Deployer verticle`

最后,但并非最重要的是,部署器【Deployer 】verticle自身可以从清单2-10的类的main方法中部署。运行此示例会生成日志条目,如下面的列表所示:

INFO [vert.x-eventloop-thread-1] EmptyVerticle - StartINFO [vert.x-eventloop-thread-0] Deployer - Successfully deployed 05553394-b6ce-4f47-9076-2c6648d65329 ① 一个verticle被部署INFO [vert.x-eventloop-thread-2] EmptyVerticle - StartINFO [vert.x-eventloop-thread-0] Deployer - Successfully deployed 6d920f33-f317-4964-992f-e712185fe514(...)INFO [vert.x-eventloop-thread-0] Deployer - 8153abb7-fc64-496e-8155-75c27a93b56d was undeployed ② 一个verticle被取消部署INFO [vert.x-eventloop-thread-13] EmptyVerticle - StartINFO [vert.x-eventloop-thread-0] Deployer - Successfully deployed 0f69ccd8-1344-4b70-8245-020a4815cc96(...)

Listing 2.11. Logs excerpts of running the code in listing 2.10

运行此示例将生成如清单2.11所示的日志记录,我们可以看到来自于vert.x-eventloop-thread-0线程的日志:它们对应于部署器【Deployer 】verticle。然后,我们可以从EmptyVerticle实例中看到生命周期日志事件,它们使用其他事件回环线程【event-loop threads】。

有趣的是,我们在部署器【Deployer 】verticle中部署了50个verticles,但是在日志中出现的线程可能比verticles还少。在默认情况下 Vert.x 创建两倍于CPU内核数量的事件循环线程。如果你有8个核心,然后一 个Vert.x 应用程序将会有16个事件循环。将verticles分配给事件回环是以轮询【round-robin】的方式完成的。

This teaches us an interesting lesson: while a verticle always uses the same event-loop thread, the event-loop threads are being shared by multiple verticles. This design results in a predictable number of threads for running an application.

这给我们上了一个有趣的课:虽然verticle总是使用相同的事件回环线程【event-loop thread】,但事件回环线程【event-loop thread】是由多个verticles共享的。这种设计导致运行应用程序所需的线程数量可以预测。

TIP

我们可以调整应该有多少个事件回环线程。但是不可能手动将给定的**verticle**分配给某个事件回环。这在实践中应该不会成为问题,但在最坏的情况下,您总是可以计划verticles部署顺序。

2.2.4 Passing configuration data

应用程序代码经常需要使用到配置数据【configuration data】。一个很好的例子就是连接到数据库服务器的代码:它通常需要主机名【host】、TCP端口【port】、登录名【login】和密码【password】。由于每一个部署配置中的值一般不同,因此需要一个配置API用于访问此类配置数据。

Vert.x的verticle可以在部署时传递此类配置数据【configuration data】。我们将在本书后面看到一些更高级的配置形式可以使用,但是Vert.x core API已经提供了一个非常有用的通用API。

配置【Configuration】需要使用由 io.vertx.core.json包中的JsonObject和JsonArray类实现的Vert JSON API,以JSON 形式进行传递。

public class SampleVerticle extends AbstractVerticle {private final Logger logger = LoggerFactory.getLogger(SampleVerticle.class);@Overridepublic void start() {logger.info("n = {}", config().getInteger("n", -1));}public static void main(String[] args) {Vertx vertx = Vertx.vertx();for (int n = 0; n < 4; n++) {JsonObject conf = new JsonObject().put("n", n);DeploymentOptions opts = new DeploymentOptions().setConfig(conf).setInstances(n);vertx.deployVerticle("chapter2.opts.SampleVerticle", opts);}}}

Listing 2.12. Passing configuration data to a verticle

清单2.12给出了一个部署多个传递配置数据的verticles的示例:

INFO [vert.x-eventloop-thread-2] SampleVerticle - n = 2INFO [vert.x-eventloop-thread-5] SampleVerticle - n = 3INFO [vert.x-eventloop-thread-4] SampleVerticle - n = 3INFO [vert.x-eventloop-thread-1] SampleVerticle - n = 2INFO [vert.x-eventloop-thread-3] SampleVerticle - n = 3INFO [vert.x-eventloop-thread-0] SampleVerticle - n = 1

Listing 2.13. Sample execution output when running the code in listing 2.12

运行这个示例会得到如清单2.13中的输出,我们可以检查配置数据【configuration data】的不同值。

2.3 When code needs to block

在事件回环【event-loop】中运行代码的基本规则是它不应该阻塞,并且它应该运行得“足够快【fast enough】”。我们之前已经看到,默认情况下,当事件回环【event-loop】被阻塞太久时,Vert.x会检测并发出警告。

在一些不可避免的情况下,您将很难避免阻塞性代码。这可能是因为您引入的第三方库使用了另一种线程模型,例如某些网络服务的驱动程序。Vert.x提供了两个选项来处理这种情:

worker verticlesexecuteBlocking

2.3.1 Worker verticles

Worker verticles是一种特殊形式的verticles,它不在事件回环【event-loop】上工作。相反,它们在工作线程【worker threads】上执行,即从特殊的工作线程池【worker pools】中获取的线程。您可以定义自己的工作线程池【worker pools】并向其部署worker verticles,但在大多数情况下,使用Vert.x默认的工作线程池【worker pools】就可以了。

worker verticle处理事件的方式与event-loop verticle处理事件的方式基本相同,不同之处在于它可以花费任意长的时间来完成这一操作。重要的是要了解:

worker verticle不绑定到单个工作线程,因此与event-loop verticle不同,后续事件可能不会在同一个线程上执行worker verticles在给定的时间内只能通过一个工作线程【worker thread】访问到

简单地说:

相同点:与event-loop verticle相同的是,worker verticle是单线程的不同点:与event-loop verticle不同的是,线程可能不总是相同的

public class WorkerVerticle extends AbstractVerticle {private final Logger logger = LoggerFactory.getLogger(WorkerVerticle.class);@Overridepublic void start() {vertx.setPeriodic(10_000, id -> {try {logger.info("Zzz...");Thread.sleep(8000);logger.info("Up!");} catch (InterruptedException e) {logger.error("Woops", e);}});}public static void main(String[] args) {Vertx vertx = Vertx.vertx();DeploymentOptions opts = new DeploymentOptions().setInstances(2).setWorker(true);vertx.deployVerticle("chapter2.worker.WorkerVerticle", opts);}}

Listing 2.14. Sample worker verticle

清单2.14给出了一个示例,其中部署了两个有2个worker verticle实例。 每10秒钟,代码就会阻塞8秒钟。运行此示例会得到类似于清单2.15的输出。 正如我们所看到的,不同的工作线程正在用于后续事件。

INFO [vert.x-worker-thread-2] WorkerVerticle - Zzz...INFO [vert.x-worker-thread-3] WorkerVerticle - Zzz...INFO [vert.x-worker-thread-3] WorkerVerticle - Up!INFO [vert.x-worker-thread-2] WorkerVerticle - Up!INFO [vert.x-worker-thread-5] WorkerVerticle - Zzz...INFO [vert.x-worker-thread-4] WorkerVerticle - Zzz...INFO [vert.x-worker-thread-4] WorkerVerticle - Up!INFO [vert.x-worker-thread-5] WorkerVerticle - Up!(...)

Listing 2.15. Sample output of running listing 2.14

WARNING

部署verticle时,有一个选项可以为worker verticles启用多线程,这允许一个verticles同时并发处理多个事件,从而打破了一直以来的单线程处理的假设。 人们一直认为这是相当高级的用法,许多用户最终以错误的方式使用它并捕获到并发错误。 这个功能现在是隐蔽的,甚至可能在将来的Vert.x版本中消失。 鼓励用户简单地调整工作程序池【worker pool】大小以匹配工作负载,而不是启用工作程序多线程【worker multi-threading】。

2.3.2 The executeBlocking operation

Worker verticles是运行阻塞任务的明智选择,但将阻塞代码提取到worker verticles中可能并不总是有意义的。这样做会导致执行小任务的worker verticles类数量激增,而且每个类可能不会形成一个合理的独立功能单元。

运行阻塞代码的另一种选择是使用Vertx类中的executeBlocking方法。该方法需要执行一些阻塞代码,将其卸载到工作线程【worker thread】,并将结果作为一个新事件发送回事件回环【event loop】,如图2.4所示。

清单2.16提供了一个示例用法:

public class Offload extends AbstractVerticle {private final Logger logger = LoggerFactory.getLogger(Offload.class);@Overridepublic void start() {vertx.setPeriodic(5000, id -> {logger.info("Tick");vertx.executeBlocking(this::blockingCode, this::resultHandler);});}private void blockingCode(Promise<String> promise) {logger.info("Blocking code running");try {Thread.sleep(4000);logger.info("Done!");plete("Ok!");} catch (InterruptedException e) {promise.fail(e);}}private void resultHandler(AsyncResult<String> ar) {if (ar.succeeded()) {logger.info("Blocking code result: {}", ar.result());} else {logger.error("Woops", ar.cause());}}}

Listing 2.16. Using executeBlocking

清单2.17提供了运行清单2.16中的代码时的示例输出。正如我们所看到的,执行被卸载到工作线程【worker thread】,但是结果处理仍然发生在事件回环【event loop】上。

INFO [vert.x-eventloop-thread-0] Offload - TickINFO [vert.x-worker-thread-0] Offload - Blocking code runningINFO [vert.x-worker-thread-0] Offload - Done!INFO [vert.x-eventloop-thread-0] Offload - Blocking code result: Ok!INFO [vert.x-eventloop-thread-0] Offload - TickINFO [vert.x-worker-thread-1] Offload - Blocking code runningINFO [vert.x-worker-thread-1] Offload - Done!INFO [vert.x-eventloop-thread-0] Offload - Blocking code result: Ok!INFO [vert.x-eventloop-thread-0] Offload - TickINFO [vert.x-worker-thread-2] Offload - Blocking code running(...)

Listing 2.17. Sample output when running listing 2.16

TIP

默认情况下,连续的executeBlocking操作的结果,将按照与executeBlocking调用相同的顺序来处理。executeBlocking有一个变体,带有一个附加的布尔参数,当它被设置为false时,结果将在事件回环【event loop】事件可用时立即可用,而不管executeBlocking调用的顺序是什么。

2.4 So what is really in a verticle?

到目前为止,我们已经了解了如何编写verticle,如何部署和配置它们以及如何处理阻塞代码。 通过在示例中使用信息日志,我们从经验上目睹了Vert.x线程模型中的元素。

现在是时候回过头来剖析verticle的内部内容了,并确保在本章结束时对verticle的工作方式以及如何正确使用它们有了全面的了解。

2.4.1 Verticles and their environment

图2.5给出了verticle及其环境之间关系的概述。

一个verticle对象本质上是两个对象的组合:

verticle实例所属的Vert.x实例允许将事件分派给处理程序的专用上下文实例【dedicated context instance】

Figure 2.5. An event-loop verticle and its environment

Event handling in user-defined callbacks happens through the context. The context instance allows calling the handler back on the verticle event-loop thread, hence respecting the Vert.x threading model.

Vert.x实例公开了用于声明事件处理程序的核心API。在先前的代码示例中,我们已经通过setTimer,setPeriodic,createHttpServer,deployVerticle等方法使用了它。 Vert.x实例由多个verticles共享,并且每个JVM进程通常只有一个Vertx实例。

上下文实例持有对线程的访问权,可以再这些线程上执行处理程序【handlers】。事件可能来自各种来源,比如计时器、数据库驱动程序、HTTP服务器等等。因此,它们通常是由其他线程触发的,比如Netty接受线程【Netty accepting threads】或计时器线程【timer threads】。

用户定义的回调中的事件处理通过上下文进行/发生。上下文实例使我们可以在verticle的事件回环线程上回调处理程序,从而遵守Vert.x线程模型。

worker verticles的情况差别不大,除了处理程序【handlers】是使用工作线程池【worker pool】中的一个工作线程【worker thread】执行的,如图2.6所示。它们仍然是verticles,就像它们的事件回环对等物一样,并且代码可以假定是单线程访问的。使用工作线程【worker thread】来处理worker verticle的事件具有不稳定性,即后续事件可能在不同的工作线程【worker thread】上执行。

Figure 2.6. A worker verticle and its environment

The case of worker verticles is not much different, except that handlers are being executed using one worker thread in a worker thread pool, as illustrated in figure 2.6. They are still verticles just like their event-loop counterparts and the code can assume single-threaded access. There is just no stability in which worker thread is going to be used for processing a worker verticle events.

2.4.2 More on contexts

Context objects can be accessed using the getOrCreateContext() method from the Vertx class. While a context is mostly always associated to a verticle, it is possible to create event-loop contexts outside of a verticle. As the name of the method suggests:

我们可以使用Vertx类中的getOrCreateContext()方法访问上下文对象。 尽管上下文几乎总是与某个verticle相关联,但是可以在verticle之外创建事件回环上下文【event-loop contexts】。 正如方法的名称所示

从上下文线程【context thread】(比如verticle中)中调用getOrCreateContext(),将返回上下文从非上下文线程【non-context thread】中调用getOrCreateContext()将会创建一个新的上下文

Vertx vertx = Vertx.vertx();vertx.getOrCreateContext().runOnContext(v -> logger.info("ABC")); ① 在Vertx.x上下文线程被执行的lambdavertx.getOrCreateContext().runOnContext(v -> logger.info("123"));

Listing 2.18. Creating contexts without a verticle

清单2.18展示了一个示例,其中创建了一个全局Vertx实例,并在JVM进程的main线程上进行了两次对getOrCreateContext的调用。 每次调用后都会调用runOnContext,这使我们可以在上下文线程上运行代码块。

INFO [vert.x-eventloop-thread-1] ThreadsAndContexts - 123INFO [vert.x-eventloop-thread-0] ThreadsAndContexts - ABC

Listing 2.19. Sample output of running listing 2.18

正如你在清单2.19所看到的,每一个上下文被分配给一个事件回环上。

上下文对象还支持更多操作,例如保存上下文范围内的任意键值对数据【key/value data】和声明异常处理程序【exception handlers】。 下面的清单显示了一个示例,其中foo键包含字符串bar,并且声明了一个异常处理程序【exception handler】,以在事件回环线程【event-loop thread】上执行处理程序【handler】时捕获并处理异常。

Vertx vertx = Vertx.vertx();Context ctx = vertx.getOrCreateContext();ctx.put("foo", "bar");ctx.exceptionHandler(t -> {if ("Tada".equals(t.getMessage())) {logger.info("Got a _Tada_ exception");} else {logger.error("Woops", t);}});ctx.runOnContext(v -> {throw new RuntimeException("Tada");});ctx.runOnContext(v -> {logger.info("foo = {}", (String) ctx.get("foo"));});

Listing 2.20. Using context data and exception handling

当事件处理【event processing】分布在多个类中时,上下文数据可能会很有用。 否则,使用类字段要简单得多(而且更快!)。

当事件处理可能引发异常时,异常处理程序【exception handler】很重要。 默认情况下,异常仅由Vert.x记录,但在执行自定义操作以处理错误时,重写上下文的异常处理程序【exception handler】非常有用。

INFO [vert.x-eventloop-thread-0] ThreadsAndContexts - Got a _Tada_ exceptionINFO [vert.x-eventloop-thread-0] ThreadsAndContexts - foo = bar

Listing 2.21. Sample output of running listing 2.20

运行代码会得到类似于清单2.21的日志输出。

2.4.3 Bridging Vert.x and non-Vert.x threading models

在编写Vert.x应用程序时,您可能不必处和Vert.x上下文打交道。 尽管如此,有一种情况下它还是有意义的:当你不得不使用第三方代码,而它有自己的线程模型,并且您希望它能正确地与Vert.x一起工作时

public class MixedThreading extends AbstractVerticle {private final Logger logger = LoggerFactory.getLogger(MixedThreading.class);@Overridepublic void start() {Context context = vertx.getOrCreateContext();new Thread(() -> {try {run(context);} catch (InterruptedException e) {logger.error("Woops", e);}}).start();}private void run(Context context) throws InterruptedException {CountDownLatch latch = new CountDownLatch(1);logger.info("I am in a non-Vert.x thread");context.runOnContext(v -> {logger.info("I am on the event-loop");vertx.setTimer(1000, id -> {logger.info("This is the final countdown");latch.countDown();});});logger.info("Waiting on the countdown latch...");latch.await();logger.info("Bye!");}}

Listing 2.22. Mixing different threading models

清单2.22中的代码是创建非Vert.x线程【non-Vert.x thread】的示例。 通过传递从verticle中获得的上下文,我们可以在非Vert.x线程上运行一些代码,然后再回到事件回环【event-loop】上执行一些代码。清单2.23中的日志证实了这一点。

INFO [Thread-3] MixedThreading - I am in a non-Vert.x threadINFO [Thread-3] MixedThreading - Waiting on the countdown latch...INFO [vert.x-eventloop-thread-0] MixedThreading - I am on the event-loopINFO [vert.x-eventloop-thread-0] MixedThreading - This is the final countdownINFO [Thread-3] MixedThreading - Bye!

Listing 2.23. Sample output when running listing 2.22

当您需要将非Vert.x线程模型【non-Vert.x threading models】集成到应用程序中时,您可以使用示例中的技术:

持有对verticle上下文的引用调用verticle上下文的runOnContext方法

TIP

这个例子向我们展示了上下文的另一个重要属性:在定义处理程序时,它们(即verticle上下文)可以被传播。 实际上,使用runOnContext运行的代码块会在一秒钟后设置一个计时器处理程序【timer handler】。 我们可以看到,处理程序【handler 】的执行上下文与用于定义它的上下文是相同的。

下一章将讨论事件总线,它是verticles在Vert.x应用程序中相互通信和阐明事件处理的特权方式。

2.5 Summary

Verticles是Vert.x中事件处理的单元事件回环verticles【Event-loop verticles】处理异步I/O事件,它应该没有阻塞和长时间运行的操作Worker verticles可以用来处理阻塞的I/O和长时间运行的操作通过使用事件回环上下文【event-loop contexts】,我们可以在代码中混合使用Vert.x线程和非Vert.x线程

2.6 References

[ActorModel] Carl Hewitt, Peter Bishop, and Richard Steiger. 1973. A universal modular ACTOR formalism for artificial intelligence. In Proceedings of the 3rd international joint conference on Artificial intelligence (IJCAI’73). Morgan Kaufmann Publishers Inc., San Francisco, CA, USA, 235-245.

[Liskov88] B. Liskov and L. Shrira. 1988. Promises: linguistic support for efficient asynchronous procedure calls in distributed systems. In Proceedings of the ACM SIGPLAN 1988 conference on Programming language design and implementation (PLDI’88), R. L. Wexelblat (Ed.). ACM, New York, NY, USA, 260-267.

[LogbackDoc] The logback manual. Retrieved from logback.qos.ch/manual/index.html

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。