SubmissionPublisher在提交时未调用onNext订阅者 [英] SubmissionPublisher on submit not invoking onNext of subscriber

查看:125
本文介绍了SubmissionPublisher在提交时未调用onNext订阅者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

每隔一段时间,我都会通过某个查询来检索推文. 这些推文必须传递给计算和处理这些推文的服务. 因此,这些服务已订阅我的发布者.因此Publisher.hasSubscribers()返回true.但是Submit或offer函数不会调用我的订阅者的onNext. 因此,作为修复",我循环浏览订户并自己调用它.但这不是事实.

Every interval I retrieve tweets with a certain query. These tweets have to be passed to services which calculate and manipulate those tweets. So these services are subscribed to my publisher. So publisher.hasSubscribers() returns true. But the submit or offer function does not invoke the onNext of my subscriber. So as a "fix", I cycle through my subscribers and invoke it myself. But that shouldn't be the case.

这是我的发布者的构造函数.

This is the constructor of my publisher.

 public TwitterStreamer(Executor executor, int maxBufferCapacity, long period, TimeUnit unit, String searchQuery){
    super(executor, maxBufferCapacity);
    this.searchQuery = searchQuery;
    scheduler = new ScheduledThreadPoolExecutor(1);
    this.tweetGetter = scheduler.scheduleAtFixedRate(
            () -> {
               List<String> tweets = getTweets(searchQuery);
               /* this.lastCall = LocalDateTime.now();
                for(Flow.Subscriber sub : this.getSubscribers()){
                    sub.onNext(tweets);
                }*/
               this.submit(tweets);
                if(tweets.size() >= 20) this.close();
            }, 0, period, unit);
}

这是我的订户

    package myFlowAPI;

import Interfaces.IProcess;
import Services.LogToFileService;

import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;

public class MySubscriber implements Flow.Subscriber<List<String>> {
private Flow.Subscription subscription;
private AtomicInteger count;

private IProcess processor;

private String name;
private int DEMAND = 0;

public MySubscriber(String name, IProcess processor){
    this.name = name;
    this.processor = processor;
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
    this.subscription = subscription;
}


@Override
public void onNext(List<String> item) {
    Object result = this.processor.process(item);
    this.readResult(result);

    switch (this.processor.getClass().getSimpleName()){
        case "CalculateTweetStatsService":
            if((Integer) result >= 20){
                this.subscription.cancel();
            }
            break;
    }
}

@Override
public void onError(Throwable throwable) {
    System.out.println("Error is thrown " + throwable.getMessage());
}

@Override
public void onComplete() {
    if(this.processor instanceof LogToFileService){
        ((LogToFileService) processor).closeResource();
    }
    System.out.println("complete");
}

private void readResult(Object result){
    System.out.println("Result of " + this.processor.getClass().getSimpleName() + " processor is " + result.toString());
}
}

这是我订阅发布者的主要途径

This is the main where I subscribe to the publisher

public static void main(String[] args) {
    ScheduledExecutorService  executor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());

    String searchQuery;
    try{
       searchQuery = args[0] != null ? args[0] : "#capgemini50";
    }catch (ArrayIndexOutOfBoundsException ex){
        searchQuery = "#capgemini50";
    }

    TwitterStreamer streamer = new TwitterStreamer(executor, 5, 15L, SECONDS, searchQuery);

    MySubscriber subscriber1 = new MySubscriber("LogFileSubscriber", new LogToFileService("./tweetsLogger.txt"));
    MySubscriber subscriber2 = new MySubscriber("TotalTweetSubscriber",new CalculateTweetStatsService());
    streamer.subscribe(subscriber1);
    streamer.subscribe(subscriber2);

}

推荐答案

您需要订阅者明确请求数据,例如订阅后(请参见 http://download.java.net/java/jdk9/docs/api/java/util/concurrent/Flow.Subscription.html#request-long- ):

You need the subscriber to explicitly request data e.g. upon subscription (see http://download.java.net/java/jdk9/docs/api/java/util/concurrent/Flow.Subscription.html#request-long-):

@Override
public void onSubscribe(Flow.Subscription subscription) {
    this.subscription = subscription;
    this.subscription.request(1);
}

与onNext()处理相同以请求下一个项目.

Same upon processing in onNext() to request the next item.

这篇关于SubmissionPublisher在提交时未调用onNext订阅者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆