如何在Python中的Google Cloud Function中运行子流程 [英] How to run a subprocess inside Google Cloud Function in Python

查看:96
本文介绍了如何在Python中的Google Cloud Function中运行子流程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在GCP Function中运行bash脚本,但是某种程度上它不起作用.这是我的功能,该功能基本上将文件(代理)导出到Google Apigee:

  def test2(request):cmd ="python ./my-proxy/tools/deploy.py -n myProxy -u用户名:!密码-o myOrg -e测试-d ./my-proxy -p/"#no block,它启动一个子进程.p = subprocess.Popen(cmd,shell = True,stdout = subprocess.PIPE,stderr = subprocess.PIPE)#并且您可以阻止util cmd执行完成p.wait()#或stdout,stderr = p.communicate()返回将代理部署到Apigee" 

这是我的 deploy.py 文件如下:

 <代码>!/usr/bin/env python导入base64导入getopt导入httplib导入json汇入导入操作系统导入系统导入StringIO导入urlparse导入xml.dom.minidom导入压缩文件def httpCall(动词,uri,标题,正文):如果httpScheme =='https':conn = httplib.HTTPSConnection(httpHost)别的:conn = httplib.HTTPConnection(httpHost)如果标题==无:hdrs = dict()别的:hdrs =标头hdrs ['Authorization'] ='基本%s'%base64.b64encode(UserPW)conn.request(动词,uri,body,hdrs)返回conn.getresponse()def getElementText(n):c = n.firstChildstr = StringIO.StringIO()而c!= None:如果c.nodeType == xml.dom.Node.TEXT_NODE:str.write(c.data)c = c.nextSibling返回str.getvalue().strip()def getElementVal(n,名称):c = n.firstChild而c!= None:如果c.nodeName ==名称:返回getElementText(c)c = c.nextSibling不返回#如果文件路径的任何组件包含目录名称,则返回TRUE#以."开头.例如".svn",而不是."或者 '..'def pathContainsDot(p):c = re.compile('\.\ w +')对于p.split('/')中的pc:如果c.match(pc)!=无:返回True返回Falsedef getDeployments():#打印部署信息hdrs = {'Accept':'application/xml'}resp = httpCall('GET','/v1/organizations/%s/apis/%s/deployments'\% (机构名称),hdrs,无)如果resp.status!= 200:不返回ret = list()部署= xml.dom.minidom.parse(resp)环境= deployments.getElementsByTagName('Environment')用于环境中的环境:envName = env.getAttribute('名称')版本= env.getElementsByTagName('Revision')对于修订版:revNum = int(rev.getAttribute('name'))错误=无状态= getElementVal(rev,'State')basePaths = rev.getElementsByTagName('BasePath')如果len(basePaths)>0:basePath = getElementText(basePaths [0])别的:basePath ='未知'#svrs = rev.getElementsByTagName('服务器')状态= {'环境':envName,修订":revNum,'basePath':basePath,'状态':状态}如果错误!=无:状态['错误'] =错误ret.append(状态)返回retdef printDeployments(dep):对于d中的d:打印'Environment:%s'%d ['environment']打印'修订:%i BasePath =%s'%(d ['revision'],d ['basePath'])打印'State:%s'%d ['state']如果d中的错误":打印'错误:%s'%d ['错误']ApigeeHost ='https://api.enterprise.apigee.com'UserPW =无目录=无组织=无环境=无名称=无BasePath ='/'应该部署=真选项='h:u:d:e:n:p:o:i:z:'opts = getopt.getopt(sys.argv [1:],选项)[0]对于选择中的o:如果o [0] =='-n':名称= o [1]elif o [0] =='-o':组织= o [1]elif o [0] =='-h':ApigeeHost = o [1]elif o [0] =='-d':目录= o [1]elif o [0] =='-e':环境= o [1]elif o [0] =='-p':BasePath = o [1]elif o [0] =='-u':UserPW = o [1]elif o [0] =='-i':ShouldDeploy = Falseelif o [0] =='-z':ZipFile = o [1]如果UserPW == None或\(目录==无,ZipFile ==无)或\环境==无或\名称==无或\组织==无:打印"用法:部署-n [名称](-d [目录名称] | -z [zip文件])-e [环境] -u [用户名:密码] -o [组织][-p [基本路径] -h [apigee API网址] -i]基本路径默认为"/"Apigee URL默认为"https://api.enterprise.apigee.com"-i表示仅导入而不实际部署"sys.exit(1)url = urlparse.urlparse(ApigeeHost)httpScheme =网址[0]httpHost = url [1]身体=无如果Directory!= None:#在内存中构造捆绑包的ZIP副本tf = StringIO.StringIO()zipout = zipfile.ZipFile(tf,'w')dirList = os.walk(目录)对于dirList中的dirEntry:如果不是pathContainsDot(dirEntry [0]):对于dirEntry [2]中的fileEntry:如果不是,则fileEntry.endswith('〜'):fn = os.path.join(dirEntry [0],fileEntry)en = os.path.join(os.path.relpath(dirEntry [0],Directory),fileEntry)打印'将%s写入%s'%(fn,en)zipout.write(fn,en)zipout.close()身体= tf.getvalue()elif ZipFile!=无:f =打开(ZipFile,'r')身体= f.read()f.close()#将捆绑包上传到APIhdrs = {'Content-Type':'application/octet-stream','Accept':'application/json'}uri ='/v1/organizations/%s/apis?action = import& name =%s'%\(机构名称)resp = httpCall('POST',uri,hdrs,body)如果resp.status!= 200和resp.status!= 201:打印导入到%s的状态为%i的失败:\ n%s"%\(uri,resp.status,resp.read())sys.exit(2)部署= json.load(resp)版本= int(部署['版本'])打印导入的新代理版本%i"%修订版如果应该部署:#取消重复deps = getDeployments()对于d中的d:如果d ['environment'] ==环境和\d ['basePath'] == BasePath和\d ['revision']!=版本:打印在相同的环境和路径中取消部署版本%i:"%\d ['revision']conn = httplib.HTTPSConnection(httpHost)resp = httpCall('POST',('/v1/organizations/%s/apis/%s/deployments'+'?action = undeploy'+'& env =%s'+'& revision =%i')%\(组织,名称,环境,d ['修订版]]),无,无)如果resp.status!= 200和resp.status!= 204:在取消部署时显示错误%i:\ n%s"%\(resp.status,resp.read())#部署捆绑hdrs = {'Accept':'application/json'}resp = httpCall('POST',('/v1/organizations/%s/apis/%s/deployments'+'?action = deploy'+'& env =%s'+'& revision =%i'+'& basepath =%s')%\(组织,名称,环境,修订版,BasePath),hdrs,无)如果resp.status!= 200和resp.status!= 201:打印'部署失败,状态为%i:\ n%s'%(resp.status,resp.read())sys.exit(2)deps = getDeployments()printDeployments(deps) 

当我在计算机上本地运行但在GCP上不运行时,此方法有效.不知道我是否通过此功能连接到Google Apigee.奇怪的是,GCP上的日志未显示任何错误,但是我没有将代理导出到Apigee.

感谢帮助!

已更新:尝试使用 subprocess.check_output(),如此处所鼓励:

  def测试(请求):输出=无尝试:输出= subprocess.check_output(["./my-proxy/tools/deploy.py",'-n','myProxy','-u','myUserName:myPassword','-o','myOrgName','-e','test','-d','./my-proxy','-p','/'])除了:打印(输出)返回输出 

并且仍然无法在GCP上运行.就像我之前提到的,它在我的机器上就像一个魅力(以上两种解决方案一样),但在GCP中却没有.从下图可以看到,从GCP执行 deploy.py 后,我得到200,但我的文件没有转到Apigee:

GCP日志也未显示任何错误:

解决方案

这是可能的!

在Cloud Function运行时中未安装或链接 python 可执行文件,但安装了 python3 .因此,有几种方法可以解决此问题:

  1. python3 指定为要运行的程序:"python3 ./my-proxy/tools/deploy.py ..." ;

  2. deploy.py 脚本中添加#!运算符:#!/usr/bin/env python3 ;

  3. 将python解释器指定为 Popen .您可以使用 sys.executable 引用当前使用的可执行文件:

     进程= subprocess.Popen([sys.executable,"./deploy.py","-n","myProxy","-u","myUserName:myPassword","-o","myOrgName","-e",测试","-d","./my-proxy","-p","/",],stdout = subprocess.PIPE,stderr = subprocess.PIPE,Universal_newlines =是,) 

您没有看到错误,因为它是在子进程中生成,打印到其stderr,然后由您的程序使用 process.communicate() process.check_output()捕获的...),但未打印出来.要查看遇到的错误,可以打印出stdout和stderr的内容:

  out,err = process.communicate()log.debug("returncode =%s",process.returncode)log.debug("stdout =%s",输出)log.debug("stderr =%s",err) 

Here's my deploy.py file looks like:

!/usr/bin/env python

import base64
import getopt
import httplib
import json
import re
import os
import sys
import StringIO
import urlparse
import xml.dom.minidom
import zipfile


def httpCall(verb, uri, headers, body):
    if httpScheme == 'https':
        conn = httplib.HTTPSConnection(httpHost)
    else:
        conn = httplib.HTTPConnection(httpHost)

    if headers == None:
        hdrs = dict()
    else:
        hdrs = headers

    hdrs['Authorization'] = 'Basic %s' % base64.b64encode(UserPW)
    conn.request(verb, uri, body, hdrs)

    return conn.getresponse()


def getElementText(n):
    c = n.firstChild
    str = StringIO.StringIO()

    while c != None:
        if c.nodeType == xml.dom.Node.TEXT_NODE:
            str.write(c.data)
        c = c.nextSibling

    return str.getvalue().strip()


def getElementVal(n, name):
    c = n.firstChild

    while c != None:
        if c.nodeName == name:
            return getElementText(c)
        c = c.nextSibling

    return None


# Return TRUE if any component of the file path contains a directory name that
# starts with a "." like '.svn', but not '.' or '..'
def pathContainsDot(p):
    c = re.compile('\.\w+')

    for pc in p.split('/'):
        if c.match(pc) != None:
            return True

    return False


def getDeployments():
    # Print info on deployments
    hdrs = {'Accept': 'application/xml'}
    resp = httpCall('GET',
            '/v1/organizations/%s/apis/%s/deployments' \
                % (Organization, Name),
            hdrs, None)

    if resp.status != 200:
        return None

    ret = list()
    deployments = xml.dom.minidom.parse(resp)
    environments = deployments.getElementsByTagName('Environment')

    for env in environments:
        envName = env.getAttribute('name')
        revisions = env.getElementsByTagName('Revision')
        for rev in revisions:
            revNum = int(rev.getAttribute('name'))
            error = None
            state = getElementVal(rev, 'State')
            basePaths = rev.getElementsByTagName('BasePath')

            if len(basePaths) > 0:
                basePath = getElementText(basePaths[0])
            else:
                basePath = 'unknown'

            # svrs = rev.getElementsByTagName('Server')
            status = {'environment': envName,
                    'revision': revNum,
                    'basePath': basePath,
                    'state': state}

            if error != None:
                status['error'] = error

            ret.append(status)

    return ret


def printDeployments(dep):
    for d in dep:
        print 'Environment: %s' % d['environment']
        print '  Revision: %i BasePath = %s' % (d['revision'], d['basePath'])
        print '  State: %s' % d['state']
        if 'error' in d:
            print '  Error: %s' % d['error']

ApigeeHost = 'https://api.enterprise.apigee.com'
UserPW = None
Directory = None
Organization = None
Environment = None
Name = None
BasePath = '/'
ShouldDeploy = True

Options = 'h:u:d:e:n:p:o:i:z:'

opts = getopt.getopt(sys.argv[1:], Options)[0]

for o in opts:
    if o[0] == '-n':
        Name = o[1]
    elif o[0] == '-o':
        Organization = o[1]
    elif o[0] == '-h':
        ApigeeHost = o[1]
    elif o[0] == '-d':
        Directory = o[1]
    elif o[0] == '-e':
        Environment = o[1]
    elif o[0] == '-p':
        BasePath = o[1]
    elif o[0] == '-u':
        UserPW = o[1]
    elif o[0] == '-i':
        ShouldDeploy = False
    elif o[0] == '-z':
        ZipFile = o[1]

if UserPW == None or \
        (Directory == None and ZipFile == None) or \
        Environment == None or \
        Name == None or \
        Organization == None:
    print """Usage: deploy -n [name] (-d [directory name] | -z [zipfile])
              -e [environment] -u [username:password] -o [organization]
              [-p [base path] -h [apigee API url] -i]
    base path defaults to "/"
    Apigee URL defaults to "https://api.enterprise.apigee.com"
    -i denotes to import only and not actually deploy
    """
    sys.exit(1)

url = urlparse.urlparse(ApigeeHost)
httpScheme = url[0]
httpHost = url[1]

body = None

if Directory != None:
    # Construct a ZIPped copy of the bundle in memory
    tf = StringIO.StringIO()
    zipout = zipfile.ZipFile(tf, 'w')

    dirList = os.walk(Directory)
    for dirEntry in dirList:
        if not pathContainsDot(dirEntry[0]):
            for fileEntry in dirEntry[2]:
                if not fileEntry.endswith('~'):
                    fn = os.path.join(dirEntry[0], fileEntry)
                    en = os.path.join(
                            os.path.relpath(dirEntry[0], Directory),
                            fileEntry)
                    print 'Writing %s to %s' % (fn, en)
                    zipout.write(fn, en)

    zipout.close()
    body = tf.getvalue()
elif ZipFile != None:
    f = open(ZipFile, 'r')
    body = f.read()
    f.close()

# Upload the bundle to the API
hdrs = {'Content-Type': 'application/octet-stream',
        'Accept': 'application/json'}
uri = '/v1/organizations/%s/apis?action=import&name=%s' % \
            (Organization, Name)
resp = httpCall('POST', uri, hdrs, body)

if resp.status != 200 and resp.status != 201:
    print 'Import failed to %s with status %i:\n%s' % \
            (uri, resp.status, resp.read())
    sys.exit(2)

deployment = json.load(resp)
revision = int(deployment['revision'])

print 'Imported new proxy version %i' % revision

if ShouldDeploy:
    # Undeploy duplicates
    deps = getDeployments()
    for d in deps:
        if d['environment'] == Environment and \
            d['basePath'] == BasePath and \
            d['revision'] != revision:
            print 'Undeploying revision %i in same environment and path:' % \
                    d['revision']
            conn = httplib.HTTPSConnection(httpHost)
            resp = httpCall('POST',
                    ('/v1/organizations/%s/apis/%s/deployments' +
                            '?action=undeploy' +
                            '&env=%s' +
                            '&revision=%i') % \
                        (Organization, Name, Environment, d['revision']),
                 None, None)
            if resp.status != 200 and resp.status != 204:
                print 'Error %i on undeployment:\n%s' % \
                        (resp.status, resp.read())

    # Deploy the bundle
    hdrs = {'Accept': 'application/json'}
    resp = httpCall('POST',
        ('/v1/organizations/%s/apis/%s/deployments' +
                '?action=deploy' +
                '&env=%s' +
                '&revision=%i' +
                '&basepath=%s') % \
            (Organization, Name, Environment, revision, BasePath),
        hdrs, None)

    if resp.status != 200 and resp.status != 201:
        print 'Deploy failed with status %i:\n%s' % (resp.status, resp.read())
        sys.exit(2)

deps = getDeployments()
printDeployments(deps)

This works when I run locally on my machine, but not on GCP. Don't know if have anything to do with the fact I'm connecting to Google Apigee with this function. It's weird that the logs on GCP doesn't show any error, however I don't have my proxy exported to Apigee.

Thanks the help!

UPDATED: tried using subprocess.check_output() as encouraged by some here:

def test(request):
    output = None
    try:
        output = subprocess.check_output([
        "./my-proxy/tools/deploy.py", 
        '-n',  'myProxy',
        '-u', 'myUserName:myPassword',
        '-o', 'myOrgName',
        '-e', 'test',
        '-d', './my-proxy',
        '-p', '/'])

    except:
        print(output)    

    return output 

And still not working on GCP. Like I mentioned before, it works like a charm (both solutions above) in my machine, but in GCP doesn't. As you can see from the image below, I get a 200 after executing deploy.py from GCP but my file doesn't go to Apigee:

GCP logs doesn't show any error as well:

解决方案

This is possible!

The python executable is not installed or linked in the Cloud Function runtime, but python3 is. Hence, there are a few ways to solve this:

  1. specify python3 as the program to run: "python3 ./my-proxy/tools/deploy.py ...";

  2. add the #! operator in the deploy.py script: #!/usr/bin/env python3;

  3. specify the python interpreter to Popen. You can use sys.executable to refer to the currently used executable:

     process = subprocess.Popen(
         [
             sys.executable,
             "./deploy.py",
             "-n",
             "myProxy",
             "-u",
             "myUserName:myPassword",
             "-o",
             "myOrgName",
             "-e",
             "test",
             "-d",
             "./my-proxy",
             "-p",
             "/",
         ],
         stdout=subprocess.PIPE,
         stderr=subprocess.PIPE,
         universal_newlines=True,
     )
    

You did not see an error because it was generated in a subprocess, printed to its stderr, and subsequently captured by your program with process.communicate() or process.check_output(...), but not printed. To see the error you are experiencing, you can print out the content of stdout and stderr:

    out, err = process.communicate()
    log.debug("returncode = %s", process.returncode)
    log.debug("stdout = %s", out)
    log.debug("stderr = %s", err)

Check out our source code we used to analyze, reproduce and solve your question on github

这篇关于如何在Python中的Google Cloud Function中运行子流程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆