计算我的工作线程仍在处理时所花费的时间 [英] Calculating time taken by my worker threads when it is still processing

查看:58
本文介绍了计算我的工作线程仍在处理时所花费的时间的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我无法获取工作线程的处理时间.我有一个线程监视我的工作线程的处理时间,并报告所用的时间以及有多少个工作线程处于处理状态.

I am not able to get the processing times for my worker threads. I have a thread monitoring the processing times for my worker threads and reports the time taken and how many of the worker threads are in processing state.

问题

  1. 我的监视线程始终报告没有工人处于处理状态?我怀疑我的监视线程无法正确评估工​​作线程.

  1. My monitoring thread, always reports that there are no workers in processing state? I suspect my monitoring thread is not able to evaluate the worker threads appropriately.

我已经同步了一些获取和记录处理时间和处理状态的方法.足以让我的监视线程在不覆盖线程的情况下跟踪处理时间.

I have synchronised some of the methods that fetch and record the processing times and processing state. Is that good enough for my monitor thread to keep track of processing times without thread overwrites.

isProcessing布尔变量应该是易失性变量吗?

Should the isProcessing boolean variable be a volatile variable?

MyWorker类

public class MyWorker implements Runnable {

    private final int WAIT_INTERVAL = 200;
    private MyService myService;
    private MyProvider myProvider;
    private boolean stopRequested = false;
    private boolean isProcessing; // Flag to indicate the worker is currently processing a message
    private long processingStartTime; //captures when the worker started processing a message
    private Logger logger = new Logger();

public MyWorker(MyService myService, MyProvider myProvider){
        this.myService = myService;
        this.myProvider = myProvider;
        this.isProcessing = false;
        this.processingStartTime = -1;
    }

    public void setStopRequested() {
        stopRequested = true;
    }


    private synchronized void recordProcessingStart(long start){
        isProcessing = true;
        processingStartTime = start;
    }

    private synchronized void recordProcessingStop(){
        isProcessing = false;
        processingStartTime = -1;
    }

    public synchronized boolean isProcessing(){
        return isProcessing;
    }

    public synchronized long getProcessingStartTime(){
        return processingStartTime;
    }

    @Override
    public void run() {

        while (!stopRequested) {
            boolean processingMessage = false;
            List<Message> messages = myProvider.getPendingMessages();
            if (messages.size() != 0) {
                logger.log("We have " + messages.size() + " messages");
                recordProcessingStart(System.currentTimeMillis());
                for (Message message : messages) {
                    processMessage(messages);
                }
                recordProcessingStop();
            }

            if (!(processingMessage || stopRequested)) {
                // this is to stop the thread from spinning when there are no messages
                try {
                    Thread.sleep(WAIT_INTERVAL);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private void processMessage(Message messages){
        myService.process(messages);
    }
}

WorkerManager类

WorkerManager class

public class WorkerManager implements Runnable {

    private MyWorker[] workers;
    private int workerCount;
    private boolean stopRequested;
    private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
    private Logger logger = new Logger();

    public WorkerManager(int count) {
        this.workerCount = count;
    }

    @Override
    public void run() {
        stopRequested = false;
        boolean managerStarted = false;

        while (!stopRequested) {
            if (!managerStarted) {
                workers = new MyWorker[workerCount];
                for (int i = 0; i < workerCount; i++) {
                    final Thread workerThread = new Thread(workers[i], "Worker-" + (i + 1));
                    workerThread.start();
                }
                startProcessMonitoringThread();
                managerStarted = true;
            }
        }
    }

    public void stop() {
        stopRequested = true;
    }

    public void cleanUpOnExit() {
        for (MyWorker w : workers) {
            w.setStopRequested();
        }
        stopProcessMonitoringThread();
    }

    private void startProcessMonitoringThread(){

        scheduledExecutorService = Executors.newScheduledThreadPool(1);
        scheduledExecutorService.scheduleWithFixedDelay( new WorkerMonitorThread(workers), 1, 1, TimeUnit.MINUTES);
        logger.info("Monitoring of worker threads initialized ...");
    }

    private void stopProcessMonitoringThread(){
        scheduledExecutorService.shutdown();
        logger.info("Successfully shutdown worker monitoring thread ...");
    }


    private class WorkerMonitorThread implements Runnable{

        private MyWorker[] workers;
        WorkerMonitorThread(MyWorker workers[]){
            this.workers = workers;

        }

        @Override
        public void run() {
            String trackingId = "["+ UUID.randomUUID().toString() + "] - ";
            logger.info(trackingId + "Calculating processing times of worker threads ...");

            StringBuilder sb = new StringBuilder();
            int totalWorkersInProcess = 0;
            for (int i = 0; i < workerCount; i++) {
                MyWorker worker = workers[i];
                if(worker.isProcessing()){
                    totalWorkersInProcess++;
                    long startTime = worker.getProcessingStartTime();
                    long currentTime = System.currentTimeMillis();
                    long duration = currentTime - startTime;
                    sb.append(trackingId + "Worker-" + (i + 1) + " has been processing for the last : " + duration + " ms");
                }
            }
            sb.append(trackingId + "Total worker(s) in progress : " + totalWorkersInProcess);
            logger.info(sb.toString());
        }
    }
}

推荐答案

您还没有创建工作人员.

You haven´t created your workers.

查看您的WorkerManager运行方法的代码:

Review that code of your WorkerManager run method:

            workers = new MyWorker[workerCount]; // Create an array of MyWorker, but each element of the array is null.
            for (int i = 0; i < workerCount; i++) {
                final Thread workerThread = new Thread(workers[i], "Worker-" + (i + 1)); // <-- Creates a thread and pass a null MyWorker.
                workerThread.start(); // NullPointers are coming on the other thread.
            }

要修复该错误,您的WorkerManager应该创建工作程序或接收要使用的工作程序.您可以将依赖项添加到构造函数中以创建工作程序:

To fix the bug your WorkerManager should create the workers or receive the workers to use. You could add to the constructor the dependencies to create the workers:

    public WorkerManager(int count, MyService myService, MyProvider myProvider) {
        this.workerCount = count;
        this.myService = myService;
        this.myProvider = myProvider;
    }

然后,正确地创建工作进程:

And then, create the workers correctly:

            workers = new MyWorker[workerCount];
            for (int i = 0; i < workerCount; i++) {
                workers[i] = new MyWorker(myService, myProvider);
            }

这篇关于计算我的工作线程仍在处理时所花费的时间的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆