并发程序中的I / O. [英] I/O in concurrent program

查看:106
本文介绍了并发程序中的I / O.的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在制作并发计划;它有两个线程,其中一个线程侦听来自服务器的消息,另一个线程向其发送消息。
我需要从用户那里获取命令(使用cin?)并同时显示来自服务器的消息。



我该如何处理情况?问题是,如果我在收到消息时正在读取用户的命令,则用户的输入会与其他内容混淆。



提前致谢

解决方案

我拿了旧的示例代码并尝试将其转换为



请记住,我写了这段代码,简单比完美或舒适更重要。


I'm working on a concurrent program; it has two threads, one of which listens messages from a server and the other one sends messages to it. I need to obtain commands from the user (using cin?) and show messages coming from the server both at the same time.

How can I handle that situation? The problem is that if I'm reading a command from the user when a message comes, the user's input is messed up with other stuff.

Thanks in advance

解决方案

I took my old sample code and tried to turn it into an MCVE. ("Minimal" does not necessarily mean "short", does it?)

This is a very simple concept of a "shell" which supports one thread for input while multiple threads may do output.

  1. The keyboard input is done non-echoing. This is non-portable. Therefore I provide two implementations of function getChar() – one for MS Windows and another for non-MS Windows (which considers actually only *ix OSes). The latter is "strongly inspired" by SO: How to implement getch() function of C in Linux?.

  2. The input characters are stored in a std::string.

  3. The output erases the prompt and the current input text (repeating the output of "\b \b" resp.), prints the output text (incl. newline), and prints the prompt and current input buffer again.

The output is mutex guarded to grant thread-safety.

This is the sample code miniShell.cc:

// system header:
#ifdef _WIN32
#include <conio.h>
#else // (not) _WIN32
#include <termios.h>
#include <unistd.h>
#include <stdio.h>
#endif // _WIN32

/// reads a character from console without echo.
#ifdef _WIN32
inline int getChar() { return _getch(); }
#else // (not) _WIN32
int getChar()
{
  struct termios oldattr;
  tcgetattr(STDIN_FILENO, &oldattr);
  struct termios newattr = oldattr;
  newattr.c_lflag &= ~(ICANON | ECHO);
  tcsetattr(STDIN_FILENO, TCSANOW, &newattr);
  const int ch = getchar();
  tcsetattr(STDIN_FILENO, TCSANOW, &oldattr);
  return ch;
}
#endif // _WIN32

// standard C/C++ header:
#include <cstring>
#include <mutex>
#include <string>

/* provides a class for a simple thread-safe mini-shell.
 *
 * It is assumed that one thread may await user input (read()) while
 * another thread may (or may not) output text from time to time.
 * The mini-shell grants that the input line is always the last line.
 */
class Console {

  // variable:
  private:
    // mutex for console I/O
    std::mutex _mtx;
    // current input
    std::string _input;
    // prompt output
    std::string _prompt;

  // methods:
  public:
    /// constructor.
    Console() { }

    // disabled:
    Console(const Console&) = delete;
    Console& operator = (const Console&) = delete;

    // reads a line from console and returns input string
    std::string read();

    /* writes text to console.
     *
     * text the text
     * size size of text
     */
    void write(const char *text, size_t size);
    void write(const char *text) { write(text, strlen(text)); }
    void write(const std::string &text) { write(text.c_str(), text.size()); }
};

// standard C/C++ header:
#include <atomic>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <sstream>
#include <thread>

std::string Console::read()
{
  { // activate prompt
    std::lock_guard<std::mutex> lock(_mtx);
    _prompt = "> "; _input.clear();
    std::cout << _prompt << std::flush;
  }
#ifdef _WIN32
  enum { Enter = '\r', BackSpc = '\b' };
#else // (not) _WIN32
  enum { Enter = '\n', BackSpc = 127 };
#endif // _WIN32
  // input loop
  for (;;) {
    switch (int c = getChar()) {
      case Enter: {
        std::lock_guard<std::mutex> lock(_mtx);
        std::string input = _input;
        _prompt.clear(); _input.clear();
        std::cout << std::endl;
        return input;
      } // unreachable: break;
      case BackSpc: {
        std::lock_guard<std::mutex> lock(_mtx);
        if (_input.empty()) break; // nothing to do
        _input.pop_back();
        std::cout << "\b \b" << std::flush;
      } break;
      default: {
        if (c < ' ' || c >= '\x7f') break;
        std::lock_guard<std::mutex> lock(_mtx);
        _input += c;
        std::cout << (char)c << std::flush;
      } break;
    }
  }
}

void Console::write(const char *text, size_t len)
{
  if (!len) return; // nothing to do
  bool eol = text[len - 1] == '\n';
  std::lock_guard<std::mutex> lock(_mtx);
  // remove current input echo
  if (size_t size = _prompt.size() + _input.size()) {
    std::cout
      << std::setfill('\b') << std::setw(size) << ""
      << std::setfill(' ') << std::setw(size) << ""
      << std::setfill('\b') << std::setw(size) << "";
  }
  // print text
  std::cout << text;
  if (!eol) std::cout << std::endl;
  // print current input echo
  std::cout << _prompt << _input << std::flush;
}

// a sample application

// shared data for main thread and data processing thread
struct Shared {
  // flag: true ... exit communication thread and main loop
  std::atomic<bool> exit;
  // flag: true ... start data processing
  std::atomic<bool> start;
  // the mini console
  Console console;

  // constructor.
  Shared(): exit(false), start(true) { }
};

void dataProc(Shared &shared)
{
  while (!shared.exit) {
    // "busy" wait for start (condition would be more elegant)
    while (!shared.start) {
      if (shared.exit) return;
      std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
    // do data processing
    shared.console.write("Starting data processing.");
    for (int i = 0, n = 20; i < n; ++i) {
      // "busy" wait for start (condition would be more elegant)
      if (!shared.start) {
        shared.console.write("Data processing stopped.");
        while (!shared.start) {
          if (shared.exit) return;
          std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
        shared.console.write("Data processing restarted.");
      }
      // consume some time (to simulate heavy computation)
      std::this_thread::sleep_for(std::chrono::milliseconds(250));
      // do some console output about progress
      { std::ostringstream fmt;
        fmt << "Step " << i + 1 << '/' << n;
        shared.console.write(fmt.str());
      }
    }
    shared.console.write("Data processing done.");
    shared.start = false;
  }
}

void processInput(const std::string &input, Shared &shared)
{
  if (input == "start") shared.start = true;
  else if (input == "stop") shared.start = false;
  else if (input == "exit") shared.exit = true;
  else if (input.size()) shared.console.write("Wrong command!");
}

int main()
{
  Shared shared;
  // start a thread for some kind of data processing
  std::thread threadDataProc(&dataProc, std::ref(shared));
  // main loop
  while (!shared.exit) {
    shared.console.write("Commands: start stop exit");
    std::string input = shared.console.read();
    processInput(input, shared);
  }
  // join data processing thread
  threadDataProc.join();
  // done
  return 0;
}

I compiled and tested in VS2013 vs. bash/Xterm of cygwin on Windows 10. (cygwin was the closest to Linux I have at hand.)

Please, keep in mind that I wrote this code whereby simplicity was more important than perfection or comfort.

这篇关于并发程序中的I / O.的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆