将 SSE-Emitter 对象保存到 MongoDB/Redis,从数据库中获取并通过它发送事件 [英] Save SSE-Emitter object into MongoDB/Redis, Fetch it from Database and send event over it
问题描述
我正在尝试使我的 REST API 无状态.为此,我需要的是,将客户端的 SSE-Emitter 对象保存到 mongo 或 redis,以便其他实例可以集中访问.
当前行为:
我能够将 SSE 发射器对象保存到 mongoDb,但我认为该对象正在以某种方式被修改,因此,从 mongoDb 获取它后,我无法向客户端发送事件.如果我在本地保存 Map/List 中的相同发射器对象,则事件正在成功发送.
预期行为:
我应该能够从 mongoDb 获取发射器对象并将 EventData 发送到客户端.
源代码:
客户端订阅的控制器:
@GetMapping("/memory/{userName}")public SseEmitter handle(@PathVariable("userName") String userName) {SseEmitter 发射器 = new SseEmitter();尝试{MongoSession session = new MongoSession();session.setId(用户名);session.setAttribute("发射器",发射器);mongoSessionRepo.save(session);}catch(异常e){e.printStackTrace();}this.emitters.add(emitter);//将它添加到列表中也只是为了测试.Emitter.onCompletion(() -> this.emitters.remove(emitter));Emitter.onTimeout(() -> this.emitters.remove(emitter));返回发射器;}
MongoSession 类,代表 mongoDb 中的文档:
package ch.rasc.sse;导入 java.io.ByteArrayInputStream;导入 java.io.ByteArrayOutputStream;导入 java.io.IOException;导入 java.io.ObjectInputStream;导入 java.io.ObjectOutputStream;导入 java.io.Serializable;导入 java.util.HashMap;导入 java.util.Map;导入 java.util.Set;导入 org.springframework.data.annotation.Id;导入 org.springframework.data.mongodb.core.index.Indexed;导入 org.springframework.data.mongodb.core.mapping.Document;导入 org.springframework.session.ExpiringSession;@Document(collection = "springMongoSession")公共类 MongoSession 实现 ExpiringSession{public static final int DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS = 1800;/*** MongoDB 对象 ID*/@Indexed(unique = true)@ID私人字符串ID;公共无效setId(字符串ID){this.id = id;}/*** 会话 ID*/public static final String KEY_SESSION_ID = "_id";/*** 序列化会话属性*/私有字节[] serializedAttributes;/*** 会话属性(未保存到 MongoDB)*/私有地图属性;/*** 创建时间(以毫秒为单位的纪元)*/私有长创建时间;/*** 最后访问时间(以毫秒为单位的纪元)*/私人 long lastAccessedTime;/*** 最大非活动间隔(秒)*/私人 int maxInactiveIntervalInSeconds;/*** 过期时间(以毫秒为单位的纪元)*/@索引私人长expireTime;public static final String KEY_EXPIRE_TIME = "expireTime";/*** 构造函数*/公共 MongoSession() {属性 = 新的 HashMap<>();创建时间 = System.currentTimeMillis();lastAccessedTime = 创建时间;maxInactiveIntervalInSeconds = DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS;updateExpireTime();}/*** 构造函数*/公共 MongoSession(String sessionId) {this.id = sessionId;//this.sessionId = sessionId;属性 = 新的 HashMap<>();创建时间 = System.currentTimeMillis();lastAccessedTime = 创建时间;maxInactiveIntervalInSeconds = DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS;updateExpireTime();}公共字符串 getId() {返回标识;}public void setLastAccessedTime(long lastAccessedTime) {this.lastAccessedTime = lastAccessedTime;updateExpireTime();}公共长 getCreationTime() {返回创建时间;}公共长 getLastAccessedTime() {返回 lastAccessedTime;}公共无效 setMaxInactiveIntervalInSeconds(int 间隔) {maxInactiveIntervalInSeconds = 间隔;updateExpireTime();}公共 int getMaxInactiveIntervalInSeconds() {返回 maxInactiveIntervalInSeconds;}受保护的长 getExpireTime() {返回过期时间;}私有无效 updateExpireTime() {expireTime = lastAccessedTime + maxInactiveIntervalInSeconds * 1000;}公共布尔 isExpired() {long now = System.currentTimeMillis();返回 expireTime <= 现在;}公共 <T>T getAttribute(String attributeName) {返回 (T)attributes.get(attributeName);}公共集getAttributeNames() {返回attributes.keySet();}public void setAttribute(String attributeName, Object attributeValue) {attributes.put(attributeName, attributeValue);}public void removeAttribute(String attributeName) {attributes.remove(attributeName);}/*** 序列化会话属性*/public void serializeAttributes() {试试 (ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = 新 ObjectOutputStream(bos)) {oos.writeObject(属性);oos.flush();serializedAttributes = bos.toByteArray();} catch (IOException e) {//e.printStackTrace();serializedAttributes = 新字节[0];}}public void serializeAttributesThis(Object attributeValue) {试试 (ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = 新 ObjectOutputStream(bos)) {oos.writeObject(attributeValue);oos.flush();serializedAttributes = bos.toByteArray();} catch (IOException e) {//e.printStackTrace();serializedAttributes = 新字节[0];}}/*** 反序列化会话属性*/公共无效反序列化属性(){尝试 (ByteArrayInputStream bis = new ByteArrayInputStream(serializedAttributes);ObjectInputStream ois = 新的 ObjectInputStream(bis)) {属性 = (Map)ois.readObject();} catch (IOException | ClassNotFoundException e) {//e.printStackTrace();属性 = 新的 HashMap<>();}}}
在下面的请求中,我想将 eventData 发送回客户端:
@RequestMapping("/qmevents/{sessionId}")public void readQmEvents(@PathVariable("sessionId") String userName){尝试{System.out.println("发射器对象:"+mongoSessionRepo._getSession(userName));System.out.println("发射器对象:"+mongoSessionRepo._getSession(userName).getAttributeNames());System.out.println("发射器对象:"+mongoSessionRepo._getSession(userName).getAttribute("发射器").toString());sessionRepo.getSessionAttributes(userName, "emitter");SseEmitter 发射器 =mongoSessionRepo._getSession(userName).getAttribute("发射器");MemoryMXBean memBean = ManagementFactory.getMemoryMXBean();MemoryUsage heap = memBean.getHeapMemoryUsage();MemoryUsage nonHeap = memBean.getNonHeapMemoryUsage();MemoryInfo mi = new MemoryInfo(heap.getUsed(), nonHeap.getUsed());mi.setForUserName("QM 引发的事件");System.out.println("来自地图的发射器:"+SSEControllerPerUser.emitters.get(0));SSEControllerPerUser.emitters.get(0).send(mi);//发射器.send(mi);}catch(异常e){e.printStackTrace();}}
子类化 Spring SseEmitter(请参阅下文)并使用该组件,我已将此解决方案用于您所描述的类似场景(服务器崩溃).
>
public class SerializableSSE extends SseEmitter 实现Serializable{公共 SerializableSSE() {}公共 SerializableSSE(长超时){超级(超时);}}
希望有帮助!
I am trying to make my REST API stateless. For that what I need is, to save client's SSE-Emitter object to mongo or redis so that it will be centrally accessible by other instances.
Current behavior:
I am able to save the SSE-emitter object to mongoDb but I think that object is being modified somehow because of which, after fetching it from mongoDb I am not able to send an event to client. If I save same emitter object in Map/List locally, the events are being sent out successfully.
Expected behaviour:
I should be able to fetch the emitter object from mongoDb and send EventData to client over it.
Source Code:
Controller where client subcribes:
@GetMapping("/memory/{userName}")
public SseEmitter handle(@PathVariable("userName") String userName) {
SseEmitter emitter = new SseEmitter();
try{
MongoSession session = new MongoSession();
session.setId(userName);
session.setAttribute("emitter", emitter);
mongoSessionRepo.save(session);
}catch(Exception e){
e.printStackTrace();
}
this.emitters.add(emitter);// adding it to list as well just for testing.
emitter.onCompletion(() -> this.emitters.remove(emitter));
emitter.onTimeout(() -> this.emitters.remove(emitter));
return emitter;
}
MongoSession class which represents document in mongoDb:
package ch.rasc.sse;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.index.Indexed;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.session.ExpiringSession;
@Document(collection = "springMongoSession")
public class MongoSession implements ExpiringSession{
public static final int DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS = 1800;
/**
* MongoDB Object ID
*/
@Indexed(unique = true)
@Id
private String id;
public void setId(String id) {
this.id = id;
}
/**
* Session ID
*/
public static final String KEY_SESSION_ID = "_id";
/**
* Serialized session attributes
*/
private byte[] serializedAttributes;
/**
* Session attributes (not saved to MongoDB)
*/
private Map<String,Object> attributes;
/**
* Creation time (epoch in ms)
*/
private long creationTime;
/**
* Last accessed time (epoch in ms)
*/
private long lastAccessedTime;
/**
* Max inactive interval (sec)
*/
private int maxInactiveIntervalInSeconds;
/**
* Expire time (epoch in ms)
*/
@Indexed
private long expireTime;
public static final String KEY_EXPIRE_TIME = "expireTime";
/**
* Constructor
*/
public MongoSession() {
attributes = new HashMap<>();
creationTime = System.currentTimeMillis();
lastAccessedTime = creationTime;
maxInactiveIntervalInSeconds = DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS;
updateExpireTime();
}
/**
* Constructor
*/
public MongoSession(String sessionId) {
this.id = sessionId;
//this.sessionId = sessionId;
attributes = new HashMap<>();
creationTime = System.currentTimeMillis();
lastAccessedTime = creationTime;
maxInactiveIntervalInSeconds = DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS;
updateExpireTime();
}
public String getId() {
return id;
}
public void setLastAccessedTime(long lastAccessedTime) {
this.lastAccessedTime = lastAccessedTime;
updateExpireTime();
}
public long getCreationTime() {
return creationTime;
}
public long getLastAccessedTime() {
return lastAccessedTime;
}
public void setMaxInactiveIntervalInSeconds(int interval) {
maxInactiveIntervalInSeconds = interval;
updateExpireTime();
}
public int getMaxInactiveIntervalInSeconds() {
return maxInactiveIntervalInSeconds;
}
protected long getExpireTime() {
return expireTime;
}
private void updateExpireTime() {
expireTime = lastAccessedTime + maxInactiveIntervalInSeconds * 1000;
}
public boolean isExpired() {
long now = System.currentTimeMillis();
return expireTime <= now;
}
public <T> T getAttribute(String attributeName) {
return (T)attributes.get(attributeName);
}
public Set<String> getAttributeNames() {
return attributes.keySet();
}
public void setAttribute(String attributeName, Object attributeValue) {
attributes.put(attributeName, attributeValue);
}
public void removeAttribute(String attributeName) {
attributes.remove(attributeName);
}
/**
* Serialize session attributes
*/
public void serializeAttributes() {
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject(attributes);
oos.flush();
serializedAttributes = bos.toByteArray();
} catch (IOException e) {
//e.printStackTrace();
serializedAttributes = new byte[0];
}
}
public void serializeAttributesThis(Object attributeValue) {
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject(attributeValue);
oos.flush();
serializedAttributes = bos.toByteArray();
} catch (IOException e) {
//e.printStackTrace();
serializedAttributes = new byte[0];
}
}
/**
* Deserialize session attributes
*/
public void deserializeAttributes() {
try (ByteArrayInputStream bis = new ByteArrayInputStream(serializedAttributes);
ObjectInputStream ois = new ObjectInputStream(bis)) {
attributes = (Map<String,Object>)ois.readObject();
} catch (IOException | ClassNotFoundException e) {
//e.printStackTrace();
attributes = new HashMap<>();
}
}
}
On below request I want to send eventData back to client:
@RequestMapping("/qmevents/{sessionId}")
public void readQmEvents(@PathVariable("sessionId") String userName)
{
try{
System.out.println("Emitter Object:
"+mongoSessionRepo._getSession(userName));
System.out.println("Emitter Object:
"+mongoSessionRepo._getSession(userName).getAttributeNames());
System.out.println("Emitter Object:
"+mongoSessionRepo._getSession(userName)
.getAttribute("emitter").toString());
sessionRepo.getSessionAttributes(userName, "emitter");
SseEmitter emitter =mongoSessionRepo._getSession(userName).
getAttribute("emitter");
MemoryMXBean memBean = ManagementFactory.getMemoryMXBean();
MemoryUsage heap = memBean.getHeapMemoryUsage();
MemoryUsage nonHeap = memBean.getNonHeapMemoryUsage();
MemoryInfo mi = new MemoryInfo(heap.getUsed(), nonHeap.getUsed());
mi.setForUserName("Event raised by QM");
System.out.println("Emitter from map:
"+SSEControllerPerUser.emitters.get(0));
SSEControllerPerUser.emitters.get(0).send(mi);
//emitter.send(mi);
}catch(Exception e){
e.printStackTrace();
}
}
Subclass the Spring SseEmitter(refer below) and use that component, I have used this solution for the similar scenario that you have described(server crash).
public class SerializableSSE extends SseEmitter implements Serializable{
public SerializableSSE() {
}
public SerializableSSE(Long timeout) {
super(timeout);
}
}
Hope it helps!
这篇关于将 SSE-Emitter 对象保存到 MongoDB/Redis,从数据库中获取并通过它发送事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!