将 SSE-Emitter 对象保存到 MongoDB/Redis,从数据库中获取并通过它发送事件 [英] Save SSE-Emitter object into MongoDB/Redis, Fetch it from Database and send event over it

查看:101
本文介绍了将 SSE-Emitter 对象保存到 MongoDB/Redis,从数据库中获取并通过它发送事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使我的 REST API 无状态.为此,我需要的是,将客户端的 SSE-Emitter 对象保存到 mongo 或 redis,以便其他实例可以集中访问.

当前行为:

我能够将 SSE 发射器对象保存到 mongoDb,但我认为该对象正在以某种方式被修改,因此,从 mongoDb 获取它后,我无法向客户端发送事件.如果我在本地保存 Map/List 中的相同发射器对象,则事件正在成功发送.

预期行为:

我应该能够从 mongoDb 获取发射器对象并将 EventData 发送到客户端.

源代码:

客户端订阅的控制器:

@GetMapping("/memory/{userName}")public SseEmitter handle(@PathVariable("userName") String userName) {SseEmitter 发射器 = new SseEmitter();尝试{MongoSession session = new MongoSession();session.setId(用户名);session.setAttribute("发射器",发射器);mongoSessionRepo.save(session);}catch(异常e){e.printStackTrace();}this.emitters.add(emitter);//将它添加到列表中也只是为了测试.Emitter.onCompletion(() -> this.emitters.remove(emitter));Emitter.onTimeout(() -> this.emitters.remove(emitter));返回发射器;}

MongoSession 类,代表 mongoDb 中的文档:

package ch.rasc.sse;导入 java.io.ByteArrayInputStream;导入 java.io.ByteArrayOutputStream;导入 java.io.IOException;导入 java.io.ObjectInputStream;导入 java.io.ObjectOutputStream;导入 java.io.Serializable;导入 java.util.HashMap;导入 java.util.Map;导入 java.util.Set;导入 org.springframework.data.annotation.Id;导入 org.springframework.data.mongodb.core.index.Indexed;导入 org.springframework.data.mongodb.core.mapping.Document;导入 org.springframework.session.ExpiringSession;@Document(collection = "springMongoSession")公共类 MongoSession 实现 ExpiringSession{public static final int DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS = 1800;/*** MongoDB 对象 ID*/@Indexed(unique = true)@ID私人字符串ID;公共无效setId(字符串ID){this.id = id;}/*** 会话 ID*/public static final String KEY_SESSION_ID = "_id";/*** 序列化会话属性*/私有字节[] serializedAttributes;/*** 会话属性(未保存到 MongoDB)*/私有地图属性;/*** 创建时间(以毫秒为单位的纪元)*/私有长创建时间;/*** 最后访问时间(以毫秒为单位的纪元)*/私人 long lastAccessedTime;/*** 最大非活动间隔(秒)*/私人 int maxInactiveIntervalInSeconds;/*** 过期时间(以毫秒为单位的纪元)*/@索引私人长expireTime;public static final String KEY_EXPIRE_TIME = "expireTime";/*** 构造函数*/公共 MongoSession() {属性 = 新的 HashMap<>();创建时间 = System.currentTimeMillis();lastAccessedTime = 创建时间;maxInactiveIntervalInSeconds = DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS;updateExpireTime();}/*** 构造函数*/公共 MongoSession(String sessionId) {this.id = sessionId;//this.sessionId = sessionId;属性 = 新的 HashMap<>();创建时间 = System.currentTimeMillis();lastAccessedTime = 创建时间;maxInactiveIntervalInSeconds = DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS;updateExpireTime();}公共字符串 getId() {返回标识;}public void setLastAccessedTime(long lastAccessedTime) {this.lastAccessedTime = lastAccessedTime;updateExpireTime();}公共长 getCreationTime() {返回创建时间;}公共长 getLastAccessedTime() {返回 lastAccessedTime;}公共无效 setMaxInactiveIntervalInSeconds(int 间隔) {maxInactiveIntervalInSeconds = 间隔;updateExpireTime();}公共 int getMaxInactiveIntervalInSeconds() {返回 maxInactiveIntervalInSeconds;}受保护的长 getExpireTime() {返回过期时间;}私有无效 updateExpireTime() {expireTime = lastAccessedTime + maxInactiveIntervalInSeconds * 1000;}公共布尔 isExpired() {long now = System.currentTimeMillis();返回 expireTime <= 现在;}公共 <T>T getAttribute(String attributeName) {返回 (T)attributes.get(attributeName);}公共集getAttributeNames() {返回attributes.keySet();}public void setAttribute(String attributeName, Object attributeValue) {attributes.put(attributeName, attributeValue);}public void removeAttribute(String attributeName) {attributes.remove(attributeName);}/*** 序列化会话属性*/public void serializeAttributes() {试试 (ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = 新 ObjectOutputStream(bos)) {oos.writeObject(属性);oos.flush();serializedAttributes = bos.toByteArray();} catch (IOException e) {//e.printStackTrace();serializedAttributes = 新字节[0];}}public void serializeAttributesThis(Object attributeValue) {试试 (ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = 新 ObjectOutputStream(bos)) {oos.writeObject(attributeValue);oos.flush();serializedAttributes = bos.toByteArray();} catch (IOException e) {//e.printStackTrace();serializedAttributes = 新字节[0];}}/*** 反序列化会话属性*/公共无效反序列化属性(){尝试 (ByteArrayInputStream bis = new ByteArrayInputStream(serializedAttributes);ObjectInputStream ois = 新的 ObjectInputStream(bis)) {属性 = (Map)ois.readObject();} catch (IOException | ClassNotFoundException e) {//e.printStackTrace();属性 = 新的 HashMap<>();}}}

在下面的请求中,我想将 eventData 发送回客户端:

 @RequestMapping("/qmevents/{sessionId}")public void readQmEvents(@PathVariable("sessionId") String userName){尝试{System.out.println("发射器对象:"+mongoSessionRepo._getSession(userName));System.out.println("发射器对象:"+mongoSessionRepo._getSession(userName).getAttributeNames());System.out.println("发射器对象:"+mongoSessionRepo._getSession(userName).getAttribute("发射器").toString());sessionRepo.getSessionAttributes(userName, "emitter");SseEmitter 发射器 =mongoSessionRepo._getSession(userName).getAttribute("发射器");MemoryMXBean memBean = ManagementFactory.getMemoryMXBean();MemoryUsage heap = memBean.getHeapMemoryUsage();MemoryUsage nonHeap = memBean.getNonHeapMemoryUsage();MemoryInfo mi = new MemoryInfo(heap.getUsed(), nonHeap.getUsed());mi.setForUserName("QM 引发的事件");System.out.println("来自地图的发射器:"+SSEControllerPerUser.emitters.get(0));SSEControllerPerUser.emitters.get(0).send(mi);//发射器.send(mi);}catch(异常e){e.printStackTrace();}}

解决方案

子类化 Spring SseEmitter(请参阅下文)并使用该组件,我已将此解决方案用于您所描述的类似场景(服务器崩溃).

>

public class SerializableSSE extends SseEmitter 实现Serializable{公共 SerializableSSE() {}公共 SerializableSSE(长超时){超级(超时);}}

希望有帮助!

I am trying to make my REST API stateless. For that what I need is, to save client's SSE-Emitter object to mongo or redis so that it will be centrally accessible by other instances.

Current behavior:

I am able to save the SSE-emitter object to mongoDb but I think that object is being modified somehow because of which, after fetching it from mongoDb I am not able to send an event to client. If I save same emitter object in Map/List locally, the events are being sent out successfully.

Expected behaviour:

I should be able to fetch the emitter object from mongoDb and send EventData to client over it.

Source Code:

Controller where client subcribes:

@GetMapping("/memory/{userName}")
public SseEmitter handle(@PathVariable("userName") String userName) {
 SseEmitter emitter = new SseEmitter();
 try{
         MongoSession session = new MongoSession();
         session.setId(userName);
         session.setAttribute("emitter", emitter);
         mongoSessionRepo.save(session);
 }catch(Exception e){
         e.printStackTrace();
 }
 this.emitters.add(emitter);// adding it to list as well just for testing.
 emitter.onCompletion(() -> this.emitters.remove(emitter));
 emitter.onTimeout(() -> this.emitters.remove(emitter));

 return emitter;
}

MongoSession class which represents document in mongoDb:

package ch.rasc.sse;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.index.Indexed;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.session.ExpiringSession;

@Document(collection = "springMongoSession")
public class MongoSession implements ExpiringSession{

  public static final int DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS = 1800;

  /**
  * MongoDB Object ID
  */
  @Indexed(unique = true)
  @Id
  private String id;

  public void setId(String id) {
    this.id = id;
  }
  /**
  * Session ID
  */
  public static final String KEY_SESSION_ID = "_id";

  /**
   * Serialized session attributes
   */
  private byte[] serializedAttributes;

  /**
  * Session attributes (not saved to MongoDB)
   */
  private Map<String,Object> attributes;

/**
 * Creation time (epoch in ms)
 */
  private long creationTime;

/**
 * Last accessed time (epoch in ms)
 */
  private long lastAccessedTime;

/**
 * Max inactive interval (sec)
 */
  private int maxInactiveIntervalInSeconds;

/**
 * Expire time (epoch in ms)
 */
  @Indexed
  private long expireTime;
  public static final String KEY_EXPIRE_TIME = "expireTime";

/**
 * Constructor
 */
 public MongoSession() {
    attributes = new HashMap<>();
    creationTime = System.currentTimeMillis();
    lastAccessedTime = creationTime;
    maxInactiveIntervalInSeconds = DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS;
    updateExpireTime();
 }

/**
 * Constructor
 */
 public MongoSession(String sessionId) {
    this.id = sessionId;
    //this.sessionId = sessionId;
    attributes = new HashMap<>();
    creationTime = System.currentTimeMillis();
    lastAccessedTime = creationTime;
    maxInactiveIntervalInSeconds = DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS;
    updateExpireTime();
 }


public String getId() {
    return id;
}

public void setLastAccessedTime(long lastAccessedTime) {
    this.lastAccessedTime = lastAccessedTime;
    updateExpireTime();
}


public long getCreationTime() {
    return creationTime;
}

public long getLastAccessedTime() {
    return lastAccessedTime;
}

public void setMaxInactiveIntervalInSeconds(int interval) {
    maxInactiveIntervalInSeconds = interval;
    updateExpireTime();
}

public int getMaxInactiveIntervalInSeconds() {
    return maxInactiveIntervalInSeconds;
}

protected long getExpireTime() {
    return expireTime;
}

private void updateExpireTime() {
    expireTime = lastAccessedTime + maxInactiveIntervalInSeconds * 1000;
}

public boolean isExpired() {
    long now = System.currentTimeMillis();
    return expireTime <= now;
}

public <T> T getAttribute(String attributeName) {
    return (T)attributes.get(attributeName);
}

public Set<String> getAttributeNames() {
    return attributes.keySet();
}

public void setAttribute(String attributeName, Object attributeValue) {

   attributes.put(attributeName, attributeValue);
}

public void removeAttribute(String attributeName) {
    attributes.remove(attributeName);
}
/**
 * Serialize session attributes
 */
 public void serializeAttributes() {
    try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
         ObjectOutputStream oos = new ObjectOutputStream(bos)) {
        oos.writeObject(attributes);
        oos.flush();
        serializedAttributes = bos.toByteArray();
    } catch (IOException e) {
        //e.printStackTrace();
        serializedAttributes = new byte[0];
    }
}
public void serializeAttributesThis(Object attributeValue) {
   try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos)) {
       oos.writeObject(attributeValue);
       oos.flush();
       serializedAttributes = bos.toByteArray();
   } catch (IOException e) {
       //e.printStackTrace();
       serializedAttributes = new byte[0];
   }
 }
   /**
   * Deserialize session attributes
   */
  public void deserializeAttributes() {
    try (ByteArrayInputStream bis = new ByteArrayInputStream(serializedAttributes);
         ObjectInputStream ois = new ObjectInputStream(bis))  {
        attributes = (Map<String,Object>)ois.readObject();
    } catch (IOException | ClassNotFoundException e) {
        //e.printStackTrace();
        attributes = new HashMap<>();
    }
  }
 }

On below request I want to send eventData back to client:

    @RequestMapping("/qmevents/{sessionId}")
    public void readQmEvents(@PathVariable("sessionId") String userName)
    {
       try{
        System.out.println("Emitter Object: 
         "+mongoSessionRepo._getSession(userName));
        System.out.println("Emitter Object: 
  "+mongoSessionRepo._getSession(userName).getAttributeNames());
        System.out.println("Emitter Object: 
  "+mongoSessionRepo._getSession(userName)
    .getAttribute("emitter").toString());
    sessionRepo.getSessionAttributes(userName, "emitter");
    SseEmitter emitter =mongoSessionRepo._getSession(userName).
            getAttribute("emitter");
    MemoryMXBean memBean = ManagementFactory.getMemoryMXBean();
    MemoryUsage heap = memBean.getHeapMemoryUsage();
    MemoryUsage nonHeap = memBean.getNonHeapMemoryUsage();
    MemoryInfo mi = new MemoryInfo(heap.getUsed(), nonHeap.getUsed());
        mi.setForUserName("Event raised by QM");
        System.out.println("Emitter from map: 
     "+SSEControllerPerUser.emitters.get(0));
        SSEControllerPerUser.emitters.get(0).send(mi);
        //emitter.send(mi);
    }catch(Exception e){
        e.printStackTrace();
    }

} 

解决方案

Subclass the Spring SseEmitter(refer below) and use that component, I have used this solution for the similar scenario that you have described(server crash).

public class SerializableSSE extends SseEmitter implements Serializable{

    public SerializableSSE() {
    }

    public SerializableSSE(Long timeout) {
        super(timeout);
    }
}

Hope it helps!

这篇关于将 SSE-Emitter 对象保存到 MongoDB/Redis,从数据库中获取并通过它发送事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆