解决 Java 异常 “java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking”
1、简介
Spring Webflux 是一个非阻塞的 Web 框架,从底层开始构建,旨在利用多核、下一代处理器的优势,处理大量并发连接(既然是非阻塞框架,线程就不应该被阻塞)。
本文将带你了解在使用 Spring Webflux 时常犯的一个错误。
2、Spring Webflux 线程模型
为了更好地理解这个问题,我们需要了解 Spring Webflux 的线程模型。
在 Spring Webflux 中,一个小型工作线程池负责处理传入请求。这与 Servlet 模型不同,在 Servlet 模型中,每个请求都有一个专用线程。因此,框架会保护(隔离)这些接受(处理)请求的线程。
理解了这一点后,继续往下看。
3、通过线程阻塞了解 IllegalStateException
让我们通过一个示例来了解 Spring Webflux 中何时以及为何会出现异常:“java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread”。
以文件搜索 API 为例。该应用从文件系统读取文件,并在文件中搜索用户提供的文本。
3.1、FileService
先定义一个 FileService
类,它能以字符串形式读取文件内容:
@Service
public class FileService {
@Value("${files.base.dir:/tmp/bael-7724}")
private String filesBaseDir;
public Mono<String> getFileContentAsString(String fileName) {
return DataBufferUtils.read(Paths.get(filesBaseDir + "/" + fileName), DefaultDataBufferFactory.sharedInstance, DefaultDataBufferFactory.DEFAULT_INITIAL_CAPACITY)
.map(dataBuffer -> dataBuffer.toString(StandardCharsets.UTF_8))
.reduceWith(StringBuilder::new, StringBuilder::append)
.map(StringBuilder::toString);
}
}
注意,FileService
是以响应式(异步)从文件系统读取文件的。
3.2、FileContentSearchService
使用 FileService
来实现文件搜索服务:
@Service
public class FileContentSearchService {
@Autowired
private FileService fileService;
public Mono<Boolean> blockingSearch(String fileName, String searchTerm) {
String fileContent = fileService
.getFileContentAsString(fileName)
.doOnNext(content -> ThreadLogger.log("1. BlockingSearch"))
.block();
boolean isSearchTermPresent = fileContent.contains(searchTerm);
return Mono.just(isSearchTermPresent);
}
}
文件搜索服务会根据是否在文件中找到搜索词返回一个 boolean
值。为此,我们调用了 FileService
的 getFileContentAsString()
方法。由于我们以异步方式(即 Mono<String>
)获取结果,因此我们调用 block()
来获取 String
值。然后,检查 fileContent
是否包含 searchTerm
。最后,将结果封装在 Mono
中并返回。
3.3、FileController
最后,定义 FileController
,它调用了 FileContentSearchService
的 blockingSearch()
方法:
@RestController
@RequestMapping("bael7724/v1/files")
public class FileController {
...
@GetMapping(value = "/{name}/blocking-search")
Mono<Boolean> blockingSearch(@PathVariable("name") String fileName, @RequestParam String term) {
return fileContentSearchService.blockingSearch(fileName, term);
}
}
3.4、重现异常
我们可以看到,Controller
调用了 FileContentSearchService
的方法,而 FileContentSearchService
又调用了 block()
方法。由于这是在一个接受请求的线程上,如果按照当前的设计调用我们的 API,将遇到我们想要避免的臭名昭著的异常:
12:28:51.610 [reactor-http-epoll-2] ERROR o.s.b.a.w.r.e.AbstractErrorWebExceptionHandler - [ea98e542-1] 500 Server Error for HTTP GET "/bael7724/v1/files/a/blocking-search?term=a"
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-epoll-2
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ com.baeldung.filters.TraceWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ com.baeldung.filters.ExceptionalTraceFilter [DefaultWebFilterChain]
*__checkpoint ⇢ HTTP GET "/bael7724/v1/files/a/blocking-search?term=a" [ExceptionHandlingWebHandler]
Original Stack Trace:
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86)
at reactor.core.publisher.Mono.block(Mono.java:1712)
at com.baeldung.bael7724.service.FileContentSearchService.blockingSearch(FileContentSearchService.java:20)
at com.baeldung.bael7724.controller.FileController.blockingSearch(FileController.java:35)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
3.5、根本原因
导致此异常的根本原因是在接受请求的线程上调用了 block()
。在上面的示例代码中,block()
方法是在接受请求的线程池中的一个线程上调用的。具体地说,是在标记为 “仅限非阻塞操作” 的线程上调用了 block()
,也就是在实现了 Reactor
的 NonBlocking
记接口的线程上调用了 block()
,比如那些由 Schedulers.parallel()
启动的线程。
4、解决办法
来看看如何解决这一异常。
4.1、拥抱响应式业务
惯用的方法是使用响应式操作,而不是调用 block()
。
更新代码,使用 map()
操作将 String
转换为 Boolean
值:
public Mono<Boolean> nonBlockingSearch(String fileName, String searchTerm) {
return fileService.getFileContentAsString(fileName)
.doOnNext(content -> ThreadLogger.log("1. NonBlockingSearch"))
.map(content -> content.contains(searchTerm))
.doOnNext(content -> ThreadLogger.log("2. NonBlockingSearch"));
}
这样,就完全不需要调用 block()
了。当我们运行上述方法时,会发现线程上下文如下:
[1. NonBlockingSearch] ThreadName: Thread-4, Time: 2024-06-17T07:40:59.506215299Z
[2. NonBlockingSearch] ThreadName: Thread-4, Time: 2024-06-17T07:40:59.506361786Z
[1. In Controller] ThreadName: Thread-4, Time: 2024-06-17T07:40:59.506465805Z
[2. In Controller] ThreadName: Thread-4, Time: 2024-06-17T07:40:59.506543145Z
上述日志表明,我们在接受请求的同一个线程池中执行了操作。
注意,尽管我们没有遇到异常,但最好还是在不同的线程池中运行 I/O 操作,例如从文件中读取数据。
4.2、在 Schedulers.boundedElastic() 线程池上阻塞
假设由于某种原因,我们无法避开 block()
。那该怎么做呢?
我们的结论是,当在接受请求的线程池上调用 block()
时,异常就发生了。因此,要调用 block()
,需要切换线程池:
public Mono<Boolean> workableBlockingSearch(String fileName, String searchTerm) {
return Mono.just("")
.doOnNext(s -> ThreadLogger.log("1. WorkableBlockingSearch"))
.publishOn(Schedulers.boundedElastic())
.doOnNext(s -> ThreadLogger.log("2. WorkableBlockingSearch"))
.map(s -> fileService.getFileContentAsString(fileName)
.block()
.contains(searchTerm))
.doOnNext(s -> ThreadLogger.log("3. WorkableBlockingSearch"));
}
要切换线程池,Spring Webflux 提供了两个操作 publishOn()
和 subscribeOn()
。
我们使用 publishOn()
,它可以为 publishOn()
之后的操作更改线程,而不会影响订阅或上游操作。由于线程池现在已切换为 Schedulers.boundedElastic()
线程池,因此可以调用 block()
。
现在,如果我们运行 workableBlockingSearch()
方法,就会看到以下线程日志:
[1. WorkableBlockingSearch] ThreadName: parallel-2, Time: 2024-06-17T07:40:59.440562518Z
[2. WorkableBlockingSearch] ThreadName: boundedElastic-1, Time: 2024-06-17T07:40:59.442161018Z
[3. WorkableBlockingSearch] ThreadName: boundedElastic-1, Time: 2024-06-17T07:40:59.442891230Z
[1. In Controller] ThreadName: boundedElastic-1, Time: 2024-06-17T07:40:59.443058091Z
[2. In Controller] ThreadName: boundedElastic-1, Time: 2024-06-17T07:40:59.443181770Z
可以看到,从第 2 项开始,操作确实发生在有 Schedulers.boundedElastic()
线程池上,因此没有发生 IllegalStateException
异常。
4.3、注意事项
这种阻塞解决方案有一些注意事项。
在调用 block()
时,可能会在很多方面出错。
举一个例子,尽管我们使用了 Scheduler
来切换线程上下文,但它并没有按照我们期望的方式运行:
public Mono<Boolean> incorrectUseOfSchedulersSearch(String fileName, String searchTerm) {
String fileContent = fileService.getFileContentAsString(fileName)
.doOnNext(content -> ThreadLogger.log("1. IncorrectUseOfSchedulersSearch"))
.publishOn(Schedulers.boundedElastic())
.doOnNext(content -> ThreadLogger.log("2. IncorrectUseOfSchedulersSearch"))
.block();
boolean isSearchTermPresent = fileContent.contains(searchTerm);
return Mono.just(isSearchTermPresent);
}
示例如下,我们使用了 publishOn()
,但 block()
方法仍会导致异常。运行上述代码时,可以看到以下日志。
[1. IncorrectUseOfSchedulersSearch] ThreadName: Thread-4, Time: 2024-06-17T08:57:02.490298417Z
[2. IncorrectUseOfSchedulersSearch] ThreadName: boundedElastic-1, Time: 2024-06-17T08:57:02.491870410Z
14:27:02.495 [parallel-1] ERROR o.s.b.a.w.r.e.AbstractErrorWebExceptionHandler - [53e4bce1] 500 Server Error for HTTP GET "/bael7724/v1/files/robots.txt/incorrect-use-of-schedulers-search?term=r-"
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-1
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ com.baeldung.filters.TraceWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ com.baeldung.filters.ExceptionalTraceFilter [DefaultWebFilterChain]
*__checkpoint ⇢ HTTP GET "/bael7724/v1/files/robots.txt/incorrect-use-of-schedulers-search?term=r-" [ExceptionHandlingWebHandler]
Original Stack Trace:
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86)
at reactor.core.publisher.Mono.block(Mono.java:1712)
at com.baeldung.bael7724.service.FileContentSearchService.incorrectUseOfSchedulersSearch(FileContentSearchService.java:64)
at com.baeldung.bael7724.controller.FileController.incorrectUseOfSchedulersSearch(FileController.java:48)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
这表明第二条日志语句确实在 Schedulers.boundedElastic()
线程池上运行。但是,仍然遇到了异常。原因是 block()
仍在同一个接受请求的线程池上运行。
再来看看另一个注意事项。即使我们切换了线程池,也不能使用并行线程池,即 Schedulers.parallel()
。如前所述,某些线程池不允许在其线程上调用 block()
,并行线程池就是其中之一。
最后,在我们的示例中我们仅使用了 Schedulers.boundedElastic()
。相反,我们也可以通过 Schedulers.fromExecutorService()
使用任何自定义线程池。
5、总结
总之,要有效解决 Spring Webflux 在使用 block()
等阻塞操作时出现 IllegalStateException
的问题,我们应该采用非阻塞的响应式方法。通过利用 map()
等响应式操作符,我们可以在同一个响应式线程池上执行操作,从而无需显式 block()
。如果无法避免 block()
,那么将执行上下文切换到 boundedElastic
Scheduler 或使用 publishOn()
的自定义线程池,就可以将这些操作与接受请求的响应式线程池隔离开来,从而防止异常的发生。
必须注意有些线程池不支持阻塞调用,并确保应用正确的上下文切换,以保持应用的稳定性和性能。
Ref:https://www.baeldung.com/java-fix-illegalstateexception-blocking