如何使用 Python 异步运行终端命令? [英] How can I run a terminal command asynchronously using Python?

查看:61
本文介绍了如何使用 Python 异步运行终端命令?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在 Linux 环境中工作,尝试使用 Python 脚本自动执行这 2 个 Linux 命令:

I am working on a Linux environment, trying to automate these 2 Linux commands using a Python script:

  1. sudo rfcomm connect 0 XX:XX:XX:XX:XX:XX
  2. hcitool rssi XX:XX:XX:XX:XX:XX

命令 1 连接到我的蓝牙设备,只要设备已连接,命令 2 就会返回接收到的信号强度指示 (RSSI).因此,脚本必须能够在没有任何人工干预的情况下连接并提取 RSSI.我正在使用 subprocess 模块运行这些命令.

Command 1 connects to my bluetooth device, and command 2 returns the received signal strength inidication (RSSI) as long as the device is connected. So the script must be able to connect and then extract RSSI without any human intervention. I am running these commands using the subprocess module.

问题:当运行命令 1 以建立连接时,它说Connected/dev/rfcomm0 to XX:XX:XX:XX:XX:XX on频道 1按 CTRL-C 挂断",但它不会返回.它只会在连接丢失时返回,此时无法提取 RSSI.目前,python 脚本在运行命令 1 后不会继续到下一行代码,所以我之后无法运行命令 2.这是我的代码:

The problem: When command 1 is run such that a connection is made, it says "Connected /dev/rfcomm0 to XX:XX:XX:XX:XX:XX on channel 1 Press CTRL-C for hangup", however it does not return. It only returns once the connection is lost, at which point it would be impossible to extract RSSI. Currently the python script does not continue to the next line of code after running command 1, so I am unable to run command 2 afterwards. Here is my code:

def get_rssi(): 
    while True:
        p1 = subprocess.run(['hcitool', 'rssi', shimmer_id], capture_output=True, text=True)
        
        if p1.stderr:
            print('stderr:', p1.stderr)
            reconnect()
        if p1.stdout:
            print('stdout:', p1.stdout)
            time.sleep(0.3)


def reconnect():
    reconnected = False
    print('[-] lost connection, reconnecting...')
    
    while reconnected is False:
        p2 = subprocess.run(['sudo', 'rfcomm', 'connect', '0', shimmer_id], capture_output=True, text=True)
        
# If the connection on the previous line is successful, nothing is executed from  here onward! The script only continues once the above line returns (i.e. when the connection is lost)

        if p2.stderr:
            if 'Host is down' in p2.stderr.decode():
                print('Host is down')
                time.sleep(0.3)
                reconnect()
        else:
            reconnected = True
            
    get_rssi()

我尝试过线程和多处理,但它们无济于事,因为命令 1 从不连接"在 python 线程术语中.我知道我可能需要异步运行命令 1,但即使在参考 asyncio 文档时,我也很难在 python 中正确实现它.非常感谢任何帮助!

I have tried threading and multiprocessing, but they do not help since command 1 never "joins" in python threading lingo. I am aware that I probably need to run command 1 asynchronously, but I am struggling to implement it properly in python even while referring to the asyncio docs. Any help is much appreciated!

推荐答案

您似乎正在尝试建立 RFCOMM 连接.

It looks like you are trying to make an RFCOMM connection.

rfcommhcitool 都已经已于 2017 年被 BlueZ 项目弃用,因此我不建议使用这些来启动项目.

Both rfcomm and hcitool have been deprecated by the BlueZ project back in 2017 so I wouldn't recommend starting a project using these.

BlueZ 具有旨在从 Python 等语言访问的 API,这些 API 记录在:https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/doc 和官方示例位于:https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/测试

BlueZ has APIs that are designed to be accessed from languages like Python and these are documented at: https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/doc and the official examples are at: https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/test

但是,RFCOMM(或串行端口配置文件)连接可以通过标准 Python 版本中的套接字库完成.https://stackoverflow.com/a/63382835/7721752

However, RFCOMM (or Serial Port Profile) connections can be done with the socket library in the standard Python releases. There are examples at https://stackoverflow.com/a/63382835/7721752

上述答案中也有使用 Bluedot 图书馆做同样的事情.

There are also examples in the above answer of using the Bluedot library to do the same.

这篇关于如何使用 Python 异步运行终端命令?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆