当我重新启动 twitterStream 时,为什么 java.util.concurrent.RejectedExecutionException 使用 Twitter4J 对推文进行采样? [英] Why java.util.concurrent.RejectedExecutionException using Twitter4J to sample tweets, when I restart twitterStream?

查看:35
本文介绍了当我重新启动 twitterStream 时,为什么 java.util.concurrent.RejectedExecutionException 使用 Twitter4J 对推文进行采样?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在下面的 java 应用程序中,我使用 TwitterStream 使用示例函数收集推文.我需要在用户需要时启动和停止流,但出现以下异常:

java.util.concurrent.RejectedExecutionException:任务 twitter4j.StatusStreamBase$1@74e75335 从 java.util.concurrent.ThreadPoolExecutor@5117b235 被拒绝[终止,池大小 = 0,活动线程 = 0,排队任务 = 0,已完成的任务 = 2]在 java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)在 java.util.concurrent.ThreadPoolExecutor.reject(未知来源)在 java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)在 twitter4j.DispatcherImpl.invokeLater(DispatcherImpl.java:58)在 twitter4j.StatusStreamBase.handleNextElement(StatusStreamBase.java:80)在 twitter4j.StatusStreamImpl.next(StatusStreamImpl.java:56)在 twitter4j.TwitterStreamImpl$TwitterStreamConsumer.run(TwitterStreamImpl.java:568)

当用户按下Crawl"或Stop Crawling"时,方法 actionPerformed 被正确调用.但是,如果用户按爬网然后按停止然后再次按爬网,我会收到上面的错误

我有几个类,但主要的如下:

第一个创建接口并与爬虫类通信.

import java.awt.FlowLayout;导入 java.awt.event.ActionEvent;导入 java.awt.event.ActionListener;导入 javax.swing.JButton;导入 javax.swing.JFrame;导入 javax.swing.JTextArea;公共类 StackOv 扩展 JFrame 实现 ActionListener{私人 JTextArea usersSaved;私有布尔值已经爬行;私人布尔停止接收;私有流;私人 JButton 抓取;私人 JButton stopCrawl;私人蒙戈米;公共 StackOv(){this.stopReceived = false;this.alreadyCrawling = false;setLayout(new FlowLayout(FlowLayout.CENTER));爬行 = new JButton("爬行");Crawl.setActionCommand("爬行");Crawl.addActionListener(this);stopCrawl = new JButton("停止爬行");stopCrawl.setActionCommand("停止爬行");stopCrawl.addActionListener(this);m = new Mongo();//使用MongoDB的类的实例/***bla bla bla 根据需要创建界面的其余部分*添加(按钮)*添加(按钮)*等等...*/}public void setOut(String out){usersSaved.setText(out);}公共无效 setOffAlreadyCrawling(){this.alreadyCrawling = false;}@覆盖public void actionPerformed(ActionEvent e){if(e.getActionCommand().equals("Stop Crawling") && !this.stopReceived){this.stopReceived = true;流.setStop();}else if(e.getActionCommand().equals("Crawl") && !alreadyCrawling){if(stream != null && stream.isAlive()){流中断();}已经爬行 = 真;流 = 新流(米,这);//独立于使用以下两个调用之一,我得到了上面相同的异常流.execute1();//stream.start();this.stopReceived = false;}}公共无效主(字符串[]参数){StackOv so = new StackOv();so.setSize(800, 800);so.setVisible(true);}}

以下类是爬虫类,当 stopCrawl 为 true 或 twitterStream 采样的推文数量超过最大限制时关闭 twitterStream.

import java.awt.TextArea;导入 java.util.ArrayList;导入 java.util.List;导入 javax.swing.JTextArea;导入 javax.swing.JTextField;导入 twitter4j.FilterQuery;导入 twitter4j.StallWarning;导入 twitter4j.Status;导入 twitter4j.StatusDeletionNotice;导入 twitter4j.StatusListener;导入 twitter4j.Twitter;导入 twitter4j.TwitterException;导入 twitter4j.TwitterFactory;导入 twitter4j.TwitterStream;导入 twitter4j.TwitterStreamFactory;公共类流扩展线程{私人爬虫cr;私人 TwitterStream twitterStream;私人 int maxTweets;private int usersSaved;私有 Mongo 数据库;私有创建索引 ci;私人 TwitterSearch twitterSearch;私有静态布尔停止爬行;公共流(Mongo 数据库,TwitterSearch twitterSearch){Stream.stopCrawl = false;this.database = 数据库;this.cr = 新的爬虫(数据库);this.twitterStream = new TwitterStreamFactory(DefaultConfiguration.getConfiguration()).getInstance();this.maxTweets = 1000;ci = 新创建索引(数据库);this.twitterSearch = twitterSearch;}公共无效集停止(){Stream.stopCrawl = true;}public void execute() 抛出 TwitterException {最终列表<状态>statuses = new ArrayList();StatusListener 监听器 = new StatusListener() {公共无效onStatus(状态状态){状态.添加(状态);System.out.println(statuses.size() + ":" + status.getText());int usersIndexed = cr.retrieve(status.getUser());usersSaved = database.countDocuments();twitterSearch.setOut("usersSaved:"+usersSaved);if(usersIndexed > maxTweets || Stream.stopCrawl){//ci.load();ci.load();//这个调用创建了我的索引twitterSearch.setOut("索引创建");System.out.println("关机...");twitterSearch.setOffAlreadyCrawling();twitterStream.shutdown();}}公共无效 onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}public void onTrackLimitedNotice(int numberOfLimitedStatuses) {}public void onScrubGeo(long userId, long upToStatusId) {}public void onException(Exception ex) {ex.printStackTrace();}@覆盖公共无效 onStallWarning(StallWarning arg0) {//TODO 自动生成的方法存根}};twitterStream.addListener(listener);twitterStream.sample("en");}公共无效执行1(){尝试{this.execute();}catch(TwitterException e){e.printStackTrace();}}公共无效运行(){尝试 {this.execute();} catch (TwitterException e) {//TODO 自动生成的 catch 块e.printStackTrace();}}}

解决方案

当您的线程关闭/关闭时,它会阻止它重新启动",就像其他 Java IO 类一样.换句话说,一旦你关闭它,你就不能真正再次启动它.我很确定在 Twitter 代码或您的代码中的某个地方,您的线程已被停止.为了防止这种情况发生,这里有一个可能有效的代码片段:http://pastebin.com/APByKuiY另外,试试这个堆栈溢出的东西:RejectedExecutionException 的原因可能是什么.>

In the following java application, I use TwitterStream to gather tweets using sample function. I need to start and stop the stream whenever user wants, but I get the following exception:

java.util.concurrent.RejectedExecutionException: Task twitter4j.StatusStreamBase$1@74e75335 rejected from java.util.concurrent.ThreadPoolExecutor@5117b235[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
    at twitter4j.DispatcherImpl.invokeLater(DispatcherImpl.java:58)
    at twitter4j.StatusStreamBase.handleNextElement(StatusStreamBase.java:80)
    at twitter4j.StatusStreamImpl.next(StatusStreamImpl.java:56)
    at twitter4j.TwitterStreamImpl$TwitterStreamConsumer.run(TwitterStreamImpl.java:568)

When the user presses "Crawl" or "Stop Crawling", the method actionPerformed is correctly called. However, if the user presses Crawl and then presses Stop and then again presses Crawl, I get the error above

I have several classes, but the principal ones are the followings:

The first one creates the interface and comunicates with the crawler class.

import java.awt.FlowLayout;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;

import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JTextArea;

public class StackOv extends JFrame implements ActionListener{

    private JTextArea usersSaved;
    private boolean alreadyCrawling;
    private boolean stopReceived;
    private Stream stream;
    private JButton Crawl;
    private JButton stopCrawl;
    private Mongo m;

    public StackOv(){
        this.stopReceived = false;
        this.alreadyCrawling = false;
        setLayout(new FlowLayout(FlowLayout.CENTER));
        Crawl = new JButton("Crawl");
        Crawl.setActionCommand("Crawl");
        Crawl.addActionListener(this);
        stopCrawl = new JButton("Stop Crawling");
        stopCrawl.setActionCommand("Stop Crawling");
        stopCrawl.addActionListener(this);
        m = new Mongo(); //instance of class that uses MongoDB
       /*
       *
       *bla bla bla create the rest of the interface as you wish
       *add(button)
       *add(button)
       *etc...
       */

    }


    public void setOut(String out){
        usersSaved.setText(out);
    }

    public void setOffAlreadyCrawling(){
        this.alreadyCrawling = false;
    }
    @Override
    public void actionPerformed(ActionEvent e){
        if(e.getActionCommand().equals("Stop Crawling") && !this.stopReceived){
            this.stopReceived = true;
            stream.setStop();
        }
        else if(e.getActionCommand().equals("Crawl") && !alreadyCrawling){
            if(stream != null && stream.isAlive()){
                stream.interrupt();
            }
            alreadyCrawling = true;
            stream = new Stream(m, this);
            //independently of using one of the following two calls, I get the same exception above
            stream.execute1();
            //stream.start();
            this.stopReceived = false;
        }
    }

    public void main(String[] args){
        StackOv so = new StackOv();
        so.setSize(800, 800);
        so.setVisible(true);
    }

}

The following class is the crawler class, that shutdown twitterStream when stopCrawl is true or when twitterStream has sampled a number of tweets over the maximum limit.

import java.awt.TextArea;
import java.util.ArrayList;
import java.util.List;

import javax.swing.JTextArea;
import javax.swing.JTextField;

import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.Twitter;
import twitter4j.TwitterException;
import twitter4j.TwitterFactory;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;

public class Stream extends Thread{

    private Crawler cr;
    private TwitterStream twitterStream;
    private int maxTweets;
    private int usersSaved;
    private Mongo database;
    private CreateIndex ci;
    private TwitterSearch twitterSearch;
    private static boolean stopCrawl;

    public Stream(Mongo database, TwitterSearch twitterSearch){

        Stream.stopCrawl = false;
        this.database = database;
        this.cr = new Crawler(database);
        this.twitterStream = new TwitterStreamFactory(DefaultConfiguration.getConfiguration()).getInstance();
        this.maxTweets = 1000;
        ci = new CreateIndex(database);
        this.twitterSearch = twitterSearch;


    }

    public void setStop(){
        Stream.stopCrawl = true;
    }

    public void execute() throws TwitterException {

        final List<Status> statuses = new ArrayList<Status>();

        StatusListener listener = new StatusListener() {

            public void onStatus(Status status) {
                statuses.add(status);
                System.out.println(statuses.size() + ":" + status.getText());
                int usersIndexed = cr.retrieve(status.getUser());
                usersSaved = database.countDocuments();
                twitterSearch.setOut("usersSaved: "+usersSaved);
                if(usersIndexed > maxTweets || Stream.stopCrawl){
                    //ci.load();
                    ci.load(); //this call creates my index
                    twitterSearch.setOut("INDEX CREATED");
                    System.out.println("shutdown...");
                    twitterSearch.setOffAlreadyCrawling();
                    twitterStream.shutdown();
                }
            }

            public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {

            }

            public void onTrackLimitationNotice(int numberOfLimitedStatuses) {

            }

            public void onScrubGeo(long userId, long upToStatusId) {

            }

            public void onException(Exception ex) {
                ex.printStackTrace();
            }
            @Override
            public void onStallWarning(StallWarning arg0) {
                // TODO Auto-generated method stub

            }

        };


        twitterStream.addListener(listener);
        twitterStream.sample("en");
    }

    public void execute1(){
        try{
            this.execute();
        }catch(TwitterException e){
            e.printStackTrace();
        }
    }

    public void run(){
        try {
            this.execute();
        } catch (TwitterException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

解决方案

When your thread is shut down/closed, it prevents it from being "restarted", as with other java IO classes. In other words, once you close it, you can't really start it up again. I'm pretty sure somewhere in either the Twitter code or your code, your thread is being stopped. To prevent this from happening, here's a code snippet that may work: http://pastebin.com/APByKuiY Also, try this stack overflow thingy: What could be the cause of RejectedExecutionException.

这篇关于当我重新启动 twitterStream 时,为什么 java.util.concurrent.RejectedExecutionException 使用 Twitter4J 对推文进行采样?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆