异步增强
字数
699 字
阅读
4 分钟
基于 Guava 、transmittable-thread-local 扩展
- 使用
ListenableFuture监听任务,任务完成时可以得到计算结果 - 线程池等复用线程的场景下无法正确传递线程本地变量的问题
示例
java
public static class MyRunnable implements Runnable {
public String result;
public MyRunnable(String result) {
this.result = result;
}
@Override
public void run() {
// 耗时运算
ThreadUtil.safeSleep(2000);
}
}java
public static class MyFutureCallback implements FutureCallback<Result> {
@Override
public void onSuccess(@Nullable Result result) {
log.info(result.getMsg());
}
@Override
public void onFailure(Throwable t) {
log.info(t.getMessage());
}
}java
@AllArgsConstructor
public static class MyFutureCallable extends AsyncFutureCallback<Result> {
public String keyword;
@Override
public Result call() throws Exception {
log.info(keyword);
// 耗时运算
ThreadUtil.safeSleep(2000);
return R.succ(keyword);
}
@Override
public void onSuccess(@Nullable Result result) {
log.info(result.msg());
}
@Override
public void onFailure(@Nullable Throwable t) {
log.info(t.getMessage());
}
}基础
java
@Test
@Order(1)
public void executor() {
// 执行任务,不监听
Async.submit(new MyRunnable("1"));
// 执行任务,监听结果
Async.submit(new MyFutureCallable("2"));
Async.submit(new MyFutureCallable("3"), new MyFutureCallback());
Async.submit(() -> R.succ("4"), new MyFutureCallback());
}log
2025-10-14 15:18:41 INFO fastboot:3944 AsyncTest.java:39 - 1
2025-10-14 15:18:41 INFO fastboot:3944 AsyncTest.java:65 - 2
2025-10-14 15:18:41 INFO fastboot:3945 AsyncTest.java:65 - 3
2025-10-14 15:18:41 INFO fastboot:3946 AsyncTest.java:49 - 4转换
java
@Test
@Order(2)
public void transform() {
// 初始任务
ListenableFuture<Result> future1 = Async.submit(() -> R.succ(4));
// 处理任务
ListenableFuture<Integer> future2 =
Futures.transform(
future1,
input -> {
return input.isSuccess() ? Convert.toInt(input.data(), 0) + 1 : 0;
},
MoreExecutors.directExecutor());
// 监听结果
Futures.addCallback(
future2,
new FutureCallback<Integer>() {
@Override
public void onSuccess(@Nullable Integer result) {
log.info(result);
}
@Override
public void onFailure(@Nullable Throwable t) {
log.info(t.getMessage());
}
},
Async.getExecutorService());
}log
2025-10-14 15:47:15 INFO fastboot:3854 AsyncTest.java:113 - 5执行器
简易版执行器 1->2->[31,32]->4
java
@Test
@Order(1)
public void executor() {
// 数据
String json =
"[{\"name\":\"node1\",\"type\":\"HTTP\",\"method\":\"GET\",\"url\":\"http://rest.apizza.net/mock/46b8b8197618d143b5a76eeae002abbd/test\",\"params\":{},\"timeout\":5000,\"ret\":\"node1\",\"rule\":\"\",\"ignoreFailure\":false},{\"name\":\"node2\",\"type\":\"HTTP\",\"method\":\"GET\",\"url\":\"http://rest.apizza.net/mock/46b8b8197618d143b5a76eeae002abbd/test\",\"params\":{\"a\":\"$.node1.x\",\"b\":\"$.node1.y\"},\"timeout\":5000,\"ret\":\"node2\",\"rule\":\"\",\"ignoreFailure\":false},[{\"name\":\"node3-1\",\"type\":\"HTTP\",\"method\":\"GET\",\"url\":\"http://rest.apizza.net/mock/46b8b8197618d143b5a76eeae002abbd/test\",\"params\":{\"p1\":311,\"p2\":312},\"timeout\":5000,\"ret\":\"node31\",\"rule\":\"compare($.env.a,2)\",\"ignoreFailure\":false},{\"name\":\"node3-2\",\"type\":\"HTTP\",\"method\":\"GET\",\"url\":\"http://rest.apizza.net/mock/46b8b8197618d143b5a76eeae002abbd/test\",\"params\":{\"p1\":321,\"p2\":322},\"timeout\":5000,\"ret\":\"node32\",\"rule\":\"compare(sum($.env.b,1),2)\",\"ignoreFailure\":false}]]";
// 上下文
ActuatorContext context =
ActuatorContext.builder()
.trace(IdUtil.fastSimpleUUID())
.title("测试")
.env(
Envar.builder()
.custom(Dict.create().set("a", 1).set("b", 2))
.saveDir("D:")
.debug(true)
.build())
.build();
// 执行器
ActuatorDispatch dispatch =
new ActuatorDispatch(
context,
gtl.parseObject(json),
new ActuatorFutureCallback() {
@Override
public void onSuccess(ActuatorContext ctx, Dict result) {
Console.log(ctx);
Console.log(result);
}
@Override
public void onFailure(ActuatorContext ctx, Throwable t) {
Console.log(ctx);
}
});
dispatch.execute();
}
}json
[
{
"name": "node1",
"type": "HTTP",
"method": "GET",
"url": "http://rest.apizza.net/mock/46b8b8197618d143b5a76eeae002abbd/test",
"params": {},
"timeout": 5000,
"ret": "node1",
"rule": "",
"ignoreFailure": false
},
{
"name": "node2",
"type": "HTTP",
"method": "GET",
"url": "http://rest.apizza.net/mock/46b8b8197618d143b5a76eeae002abbd/test",
"params": {
"a": "$.node1.x",
"b": "$.node1.y"
},
"timeout": 5000,
"ret": "node2",
"rule": "",
"ignoreFailure": false
},
[
{
"name": "node3-1",
"type": "HTTP",
"method": "GET",
"url": "http://rest.apizza.net/mock/46b8b8197618d143b5a76eeae002abbd/test",
"params": {
"p1": 311,
"p2": 312
},
"timeout": 5000,
"ret": "node31",
"rule": "compare($.env.a,2)",
"ignoreFailure": false
},
{
"name": "node3-2",
"type": "HTTP",
"method": "GET",
"url": "http://rest.apizza.net/mock/46b8b8197618d143b5a76eeae002abbd/test",
"params": {
"p1": 321,
"p2": 322
},
"timeout": 5000,
"ret": "node32",
"rule": "compare(sum($.env.b,1),2)",
"ignoreFailure": false
}
],
{
"name": "node4",
"type": "HTTP",
"method": "GET",
"url": "http://rest.apizza.net/mock/46b8b8197618d143b5a76eeae002abbd/test",
"params": {
"c": "$.node32.x",
"d": "$.node32.y"
},
"timeout": 5000,
"ret": "node4",
"rule": "",
"ignoreFailure": false
}
]json
{
"trace": "e0ae580c3d404d169c34d667f22ee4a4",
"title": "测试",
"env": {
"custom": {
"a": 1,
"b": 2
},
"saveDir": "D:",
"debug": true
},
"context": {
"env": {
"saveDir": "D:",
"debug": true,
"a": 1,
"b": 2
},
"system": {
"os": "Windows 11",
"user": "xiesx",
"host": "DELL-WORK",
"java": "21.0.8",
"runtime": 83886080,
"datatime": "2025-10-22 17:09:18"
},
"node1": {
"x": 1,
"y": 2
},
"node2": {
"x": 1,
"y": 2
},
"node31": "ignore",
"node32": {
"x": 1,
"y": 2
},
"node4": {
"x": 1,
"y": 2
}
},
"result": {
"node4": {
"x": 1,
"y": 2
}
},
"error": ""
}
