定义存储池
/**
* 用于存储SseEmitter对象
*/
private static final Map<String, SseEmitter> pool = new ConcurrentHashMap<>();
发布消息
/**
* 描述 发布消息
* @param id
* @param message
*/
@PostMapping("/publisher/{id}")
public void publish(@PathVariable String id, @RequestBody String message) {
SseEmitter sseEmitter = pool.get(id);
if(Objects.nonNull(sseEmitter)){
try {
sseEmitter.send(message);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
订阅消息
/**
* 描述 订阅消息
* @param id
* @return
*/
@GetMapping("/subscribe/{id}")
public SseEmitter subscribe(@PathVariable String id) {
SseEmitter sseEmitter = pool.get(id);
if(Objects.isNull(sseEmitter)){
sseEmitter = new SseEmitter();
sseEmitter.onCompletion(() -> pool.remove(id));
sseEmitter.onTimeout(() -> pool.remove(id));
pool.put(id, sseEmitter);
}
return sseEmitter;
}
测试
// 接收消息
curl http://127.0.0.1:8080/subscribe/1
// 发送消息
curl -d "message=1" http://127.0.0.1:8080/publisher/1
curl -d "message=2" http://127.0.0.1:8080/publisher/1
curl -d "message=3" http://127.0.0.1:8080/publisher/1
本文由 小马哥 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为:
2024/06/15 18:21