Google.api_core.exceptions.BadRequest:400无效的凭证 [英] Google.api_core.exceptions.BadRequest: 400 Invalid credential

查看:97
本文介绍了Google.api_core.exceptions.BadRequest:400无效的凭证的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用气流运行docker映像,但是我无法通过gcp进行身份验证.我尝试使用os.environ库,但是它也不起作用.

I am trying to run a docker image using airflow but I am unable to authenticate to gcp. I tried using the os.environ library but it didn't work either.

包括服务密钥以能够运行脚本的最佳方法是什么?

What would be the best way to include the service key to be able to run the script?

我发送了图像执行的脚本,并要求所有人指出代码中可能的调整,以便我可以执行我的过程.

I send the script that the image executes and I ask the help of all to point out the possible adjustments in the code and so I can execute my process.

import gspread
from oauth2client.service_account import ServiceAccountCredentials
import base64
import io
import avro.io
from avro.datafile import DataFileWriter
import os
import gcloud
from gcloud import storage
from google.cloud import bigquery
from datetime import datetime, timedelta

#Bigquery Credentials and settings
scope = ["https://spreadsheets.google.com/feeds",
         'https://www.googleapis.com/auth/spreadsheets',
         "https://www.googleapis.com/auth/drive.file",
         "https://www.googleapis.com/auth/drive",
         "https://www.googleapis.com/auth/urlshortener",
         "https://www.googleapis.com/auth/sqlservice.admin",
         "https://www.googleapis.com/auth/cloud-platform",
         "https://www.googleapis.com/auth/compute",
         "https://www.googleapis.com/auth/devstorage.full_control",
         "https://www.googleapis.com/auth/logging.admin",
         "https://www.googleapis.com/auth/logging.write",
         "https://www.googleapis.com/auth/monitoring",
         "https://www.googleapis.com/auth/servicecontrol",
         "https://www.googleapis.com/auth/service.management.readonly",
         "https://www.googleapis.com/auth/bigquery",
         "https://www.googleapis.com/auth/datastore",
         "https://www.googleapis.com/auth/taskqueue",
         "https://www.googleapis.com/auth/userinfo.email",
         "https://www.googleapis.com/auth/trace.append",
         "https://www.googleapis.com/auth/plus.login",
         "https://www.googleapis.com/auth/plus.me",
         "https://www.googleapis.com/auth/userinfo.email",
         "https://www.googleapis.com/auth/userinfo.profile"]


creds = ServiceAccountCredentials.from_json_keyfile_name('cert/key.json', scope)
client = gspread.authorize(creds)

folder = str((datetime.now() - timedelta(days=15)).strftime('%Y-%m-%d'))
data_folder = str((datetime.now() - timedelta(days=15)).strftime('%Y%m%d'))
bucket_name = 'gs://bucket/*.csv'
dataset = 'dataset'
tabela = 'table'

new_file = 'cert/register_' + data_folder + '.avro'
file_schema = 'cert/schema.avsc'
new_filename = 'register_' + data_folder + '.avro'


# As file at filePath is deleted now, so we should check if file exists or not not before deleting them
if os.path.exists(new_file):
    os.remove(new_file)
    print("Delete file", new_file)
else:
    print("Can not delete the file as it doesn't exists")

bq1 = bigquery.Client()
#Delete IDs
query1 = """DELETE FROM dataset.ids WHERE ID IS NOT NULL"""
query_job1 = bq1.query(query1)

def insert_bigquery(target_uri, dataset_id, table_id):
    bigquery_client = bigquery.Client()
    dataset_ref = bigquery_client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.schema = [
        bigquery.SchemaField('id','STRING',mode='REQUIRED')
    ]
    job_config.source_format = bigquery.SourceFormat.CSV
    job_config.field_delimiter = ";"
    uri = target_uri
    load_job = bigquery_client.load_table_from_uri(
        uri,
        dataset_ref.table(table_id),
        job_config=job_config
        )
    print('Starting job {}'.format(load_job.job_id))
    load_job.result()
    print('Job finished.')

insert_bigquery(bucket_name, dataset, tabela)

def get_data_from_bigquery():
    """query bigquery to get data to import to PSQL"""
    bq = bigquery.Client()
    #Busca IDs
    query = """SELECT id FROM dataset.ids"""
    query_job = bq.query(query)
    data = query_job.result()
    rows = list(data)
    return rows

a = get_data_from_bigquery()
length = len(a)
line_count = 0
schema = avro.schema.Parse(open(file_schema, "rb").read())  # need to know the schema to write. According to 1.8.2 of Apache Avro
writer = DataFileWriter(open(new_file, "wb"), avro.io.DatumWriter(), schema)

for row in range(length):
    bytes = base64.b64decode(str(a[row][0]))
    bytes = bytes[5:]
    buf = io.BytesIO(bytes)
    decoder = avro.io.BinaryDecoder(buf)
    rec_reader = avro.io.DatumReader(avro.schema.Parse(open(file_schema).read()))
    out=rec_reader.read(decoder)
    writer.append(out)
writer.close()

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob("insert/" + destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print('File {} uploaded to {}'.format(
        source_file_name,
        destination_blob_name
    ))

upload_blob('bucket', new_file, new_filename)

def insert_bigquery_avro(target_uri, dataset_id, table_id):
    bigquery_client = bigquery.Client()
    dataset_ref = bigquery_client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.autodetect = True
    job_config.source_format = bigquery.SourceFormat.AVRO
    job_config.use_avro_logical_types = True
    time_partitioning = bigquery.table.TimePartitioning()
    job_config.time_partitioning = time_partitioning
    uri = target_uri
    load_job = bigquery_client.load_table_from_uri(
        uri,
        dataset_ref.table(table_id),
        job_config=job_config
        )
    print('Starting job {}'.format(load_job.job_id))
    load_job.result()
    print('Job finished.')

dataset1 = 'dataset'
tabela1 = 'test'
bucket_name1 = 'gs://bucket/insert/' + new_filename

insert_bigquery_avro(bucket_name1, dataset1, tabela1)

if os.path.exists(new_file):
    os.remove(new_file)
    print("Delete file", new_file)
else:
    print("Can not delete the file as it doesn't exists")

错误消息:

[2019-12-03 18:18:55,176] {taskinstance.py:859} INFO - Executing <Task(KubernetesPodOperator): script> on 2019-12-03T18:17:47.034888+00:00
[2019-12-03 18:18:55,176] {base_task_runner.py:133} INFO - Running: ['airflow', 'run', 'test_script', 'script', '2019-12-03T18:17:47.034888+00:00', '--job_id', '37988', '--pool', 'default_pool', '--raw', '-sd', '/airflow/dags/git/test_script.py', '--cfg_path', '/tmp/tmp0j3b1n2u']
[2019-12-03 18:18:55,649] {base_task_runner.py:115} INFO - Job 37988: Subtask script /usr/local/lib/python3.7/site-packages/airflow/config_templates/airflow_local_settings.py:65: DeprecationWarning: The elasticsearch_host option in [elasticsearch] has been renamed to host - the old setting has been used, but please update your config.
[2019-12-03 18:18:55,650] {base_task_runner.py:115} INFO - Job 37988: Subtask script   ELASTICSEARCH_HOST = conf.get('elasticsearch', 'HOST')
[2019-12-03 18:18:55,650] {base_task_runner.py:115} INFO - Job 37988: Subtask script /usr/local/lib/python3.7/site-packages/airflow/config_templates/airflow_local_settings.py:67: DeprecationWarning: The elasticsearch_log_id_template option in [elasticsearch] has been renamed to log_id_template - the old setting has been used, but please update your config.
[2019-12-03 18:18:55,650] {base_task_runner.py:115} INFO - Job 37988: Subtask script   ELASTICSEARCH_LOG_ID_TEMPLATE = conf.get('elasticsearch', 'LOG_ID_TEMPLATE')
[2019-12-03 18:18:55,650] {base_task_runner.py:115} INFO - Job 37988: Subtask script /usr/local/lib/python3.7/site-packages/airflow/config_templates/airflow_local_settings.py:69: DeprecationWarning: The elasticsearch_end_of_log_mark option in [elasticsearch] has been renamed to end_of_log_mark - the old setting has been used, but please update your config.
[2019-12-03 18:18:55,650] {base_task_runner.py:115} INFO - Job 37988: Subtask script   ELASTICSEARCH_END_OF_LOG_MARK = conf.get('elasticsearch', 'END_OF_LOG_MARK')
[2019-12-03 18:18:55,817] {base_task_runner.py:115} INFO - Job 37988: Subtask script /usr/local/lib/python3.7/site-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
[2019-12-03 18:18:55,817] {base_task_runner.py:115} INFO - Job 37988: Subtask script   """)
[2019-12-03 18:18:55,932] {base_task_runner.py:115} INFO - Job 37988: Subtask script [2019-12-03 18:18:55,932] {__init__.py:51} INFO - Using executor LocalExecutor
[2019-12-03 18:18:56,233] {base_task_runner.py:115} INFO - Job 37988: Subtask script [2019-12-03 18:18:56,233] {dagbag.py:90} INFO - Filling up the DagBag from /airflow/dags/git/test_script.py
[2019-12-03 18:18:56,979] {base_task_runner.py:115} INFO - Job 37988: Subtask script [2019-12-03 18:18:56,979] {cli.py:516} INFO - Running <TaskInstance: bexs_script.script 2019-12-03T18:17:47.034888+00:00 [running]> on host bexspaytransferpaytransfer-c5050aad788b4547974f8ec02ca25232
[2019-12-03 18:18:57,040] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:18:57,040[0m] {[34mpod_launcher.py:[0m122} INFO[0m - Event: [1mdag_test-f50779cc[0m had an event of type [1mPending[0m[0m
[2019-12-03 18:18:58,047] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:18:58,047[0m] {[34mpod_launcher.py:[0m122} INFO[0m - Event: [1mdag_test-f50779cc[0m had an event of type [1mPending[0m[0m
[2019-12-03 18:18:59,054] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:18:59,054[0m] {[34mpod_launcher.py:[0m122} INFO[0m - Event: [1mdag_test-f50779cc[0m had an event of type [1mPending[0m[0m
[2019-12-03 18:19:00,061] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:19:00,060[0m] {[34mpod_launcher.py:[0m122} INFO[0m - Event: [1mdag_test-f50779cc[0m had an event of type [1mPending[0m[0m
[2019-12-03 18:19:01,066] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:19:01,066[0m] {[34mpod_launcher.py:[0m122} INFO[0m - Event: [1mdag_test-f50779cc[0m had an event of type [1mPending[0m[0m
[2019-12-03 18:19:02,072] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:19:02,072[0m] {[34mpod_launcher.py:[0m122} INFO[0m - Event: [1mdag_test-f50779cc[0m had an event of type [1mPending[0m[0m
[2019-12-03 18:19:03,078] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:19:03,078[0m] {[34mpod_launcher.py:[0m122} INFO[0m - Event: [1mdag_test-f50779cc[0m had an event of type [1mPending[0m[0m
[2019-12-03 18:19:04,084] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:19:04,084[0m] {[34mpod_launcher.py:[0m122} INFO[0m - Event: [1mdag_test-f50779cc[0m had an event of type [1mPending[0m[0m
[2019-12-03 18:19:05,090] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:19:05,090[0m] {[34mpod_launcher.py:[0m122} INFO[0m - Event: [1mdag_test-f50779cc[0m had an event of type [1mPending[0m[0m
[2019-12-03 18:19:06,097] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:19:06,097[0m] {[34mpod_launcher.py:[0m122} INFO[0m - Event: [1mdag_test-f50779cc[0m had an event of type [1mPending[0m[0m
[2019-12-03 18:19:07,107] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:19:07,107[0m] {[34mpod_launcher.py:[0m122} INFO[0m - Event: [1mdag_test-f50779cc[0m had an event of type [1mPending[0m[0m
[2019-12-03 18:19:08,114] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:19:08,114[0m] {[34mpod_launcher.py:[0m122} INFO[0m - Event: [1mdag_test-f50779cc[0m had an event of type [1mPending[0m[0m
[2019-12-03 18:19:09,121] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:19:09,121[0m] {[34mpod_launcher.py:[0m122} INFO[0m - Event: [1mdag_test-f50779cc[0m had an event of type [1mPending[0m[0m
[2019-12-03 18:19:10,128] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:19:10,128[0m] {[34mpod_launcher.py:[0m122} INFO[0m - Event: [1mdag_test-f50779cc[0m had an event of type [1mRunning[0m[0m
[2019-12-03 18:19:12,738] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:19:12,735[0m] {[34mpod_launcher.py:[0m105} INFO[0m - b'Traceback (most recent call last):\n'[0m
[2019-12-03 18:19:12,738] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:19:12,738[0m] {[34mpod_launcher.py:[0m105} INFO[0m - b'  File "script.py", line 101, in <module>\n'[0m
[2019-12-03 18:19:12,738] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:19:12,738[0m] {[34mpod_launcher.py:[0m105} INFO[0m - b'    insert_bigquery(bucket_name, dataset, tabela)\n'[0m
[2019-12-03 18:19:12,739] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:19:12,738[0m] {[34mpod_launcher.py:[0m105} INFO[0m - b'  File "script.py", line 98, in insert_bigquery\n'[0m
[2019-12-03 18:19:12,739] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:19:12,739[0m] {[34mpod_launcher.py:[0m105} INFO[0m - b'    load_job.result()\n'[0m
[2019-12-03 18:19:12,740] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:19:12,739[0m] {[34mpod_launcher.py:[0m105} INFO[0m - b'  File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/job.py", line 697, in result\n'[0m
[2019-12-03 18:19:12,740] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:19:12,740[0m] {[34mpod_launcher.py:[0m105} INFO[0m - b'    return super(_AsyncJob, self).result(timeout=timeout)\n'[0m
[2019-12-03 18:19:12,740] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:19:12,740[0m] {[34mpod_launcher.py:[0m105} INFO[0m - b'  File "/usr/local/lib/python3.7/site-packages/google/api_core/future/polling.py", line 127, in result\n'[0m
[2019-12-03 18:19:12,740] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:19:12,740[0m] {[34mpod_launcher.py:[0m105} INFO[0m - b'    raise self._exception\n'[0m
[2019-12-03 18:19:12,740] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:19:12,740[0m] {[34mpod_launcher.py:[0m105} INFO[0m - b'google.api_core.exceptions.BadRequest: 400 Invalid credential\n'[0m
[2019-12-03 18:19:12,741] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:19:12,741[0m] {[34mpod_launcher.py:[0m105} INFO[0m - b"Can not delete the file as it doesn't exists\n"[0m
[2019-12-03 18:19:12,742] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:19:12,741[0m] {[34mpod_launcher.py:[0m105} INFO[0m - b'Starting job 71ea5742-37c6-4152-a171-8f558b83da76\n'[0m
[2019-12-03 18:19:17,752] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:19:17,752[0m] {[34mpod_launcher.py:[0m122} INFO[0m - Event: [1mdag_test-f50779cc[0m had an event of type [1mFailed[0m[0m
[2019-12-03 18:19:17,753] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:19:17,752[0m] {[34mpod_launcher.py:[0m208} INFO[0m - Event with job id [1mdag_test-f50779cc[0m Failed[0m
[2019-12-03 18:19:17,757] {logging_mixin.py:95} INFO - [[34m2019-12-03 18:19:17,757[0m] {[34mpod_launcher.py:[0m122} INFO[0m - Event: [1mdag_test-f50779cc[0m had an event of type [1mFailed[0m[0m

推荐答案

基于上述建议,我能够成功生成图像.以下是带有必要更正的脚本:

Based on the above recommendations, I was able to successfully generate the image. Following is the script with the necessary corrections:

import gspread
from oauth2client.service_account import ServiceAccountCredentials
import base64
import io
import avro.io
from avro.datafile import DataFileWriter
from google.oauth2 import service_account
import os
import gcloud
from gcloud import storage
from google.cloud import bigquery
from datetime import datetime, timedelta

key_path = 'cert/key.json'

credentials = service_account.Credentials.from_service_account_file(
key_path,
scopes=["https://www.googleapis.com/auth/cloud-platform",
     "https://spreadsheets.google.com/feeds",
     'https://www.googleapis.com/auth/spreadsheets',
     "https://www.googleapis.com/auth/drive.file",
     "https://www.googleapis.com/auth/drive",
     "https://www.googleapis.com/auth/urlshortener",
     "https://www.googleapis.com/auth/sqlservice.admin",
     "https://www.googleapis.com/auth/cloud-platform",
     "https://www.googleapis.com/auth/compute",
     "https://www.googleapis.com/auth/devstorage.full_control",
     "https://www.googleapis.com/auth/logging.admin",
     "https://www.googleapis.com/auth/logging.write",
     "https://www.googleapis.com/auth/monitoring",
     "https://www.googleapis.com/auth/servicecontrol",
     "https://www.googleapis.com/auth/service.management.readonly",
     "https://www.googleapis.com/auth/bigquery",
     "https://www.googleapis.com/auth/datastore",
     "https://www.googleapis.com/auth/taskqueue",
     "https://www.googleapis.com/auth/userinfo.email",
     "https://www.googleapis.com/auth/trace.append",
     "https://www.googleapis.com/auth/plus.login",
     "https://www.googleapis.com/auth/plus.me",
     "https://www.googleapis.com/auth/userinfo.email",
     "https://www.googleapis.com/auth/userinfo.profile"],
)

client = bigquery.Client(
    credentials=credentials,
    project=credentials.project_id,
)

folder = str((datetime.now() - timedelta(days=15)).strftime('%Y-%m-%d'))
data_folder = str((datetime.now() - timedelta(days=15)).strftime('%Y%m%d'))
bucket_name = 'gs://bucket/*.csv'
dataset = 'dataset'
tabela = 'table'

new_file = 'cert/register_' + data_folder + '.avro'
file_schema = 'cert/schema.avsc'
new_filename = 'register_' + data_folder + '.avro'


# As file at filePath is deleted now, so we should check if file exists or not not before deleting them
if os.path.exists(new_file):
    os.remove(new_file)
    print("Delete file", new_file)
else:
    print("Can not delete the file as it doesn't exists")

bq1 = bigquery.Client(credentials=credentials, project=credentials.project_id)
#Delete IDs
query1 = """DELETE FROM dataset.ids WHERE ID IS NOT NULL"""
query_job1 = bq1.query(query1)

def insert_bigquery(target_uri, dataset_id, table_id):
    bigquery_client = bigquery.Client(credentials=credentials, project=credentials.project_id)
    dataset_ref = bigquery_client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.schema = [
        bigquery.SchemaField('id','STRING',mode='REQUIRED')
    ]
    job_config.source_format = bigquery.SourceFormat.CSV
    job_config.field_delimiter = ";"
    uri = target_uri
    load_job = bigquery_client.load_table_from_uri(
        uri,
        dataset_ref.table(table_id),
        job_config=job_config
        )
    print('Starting job {}'.format(load_job.job_id))
    load_job.result()
    print('Job finished.')

insert_bigquery(bucket_name, dataset, tabela)

def get_data_from_bigquery():
    """query bigquery to get data to import to PSQL"""
    bq = bigquery.Client(credentials=credentials, project=credentials.project_id)
    #Busca IDs
    query = """SELECT id FROM dataset.ids"""
    query_job = bq.query(query)
    data = query_job.result()
    rows = list(data)
    return rows

a = get_data_from_bigquery()
length = len(a)
line_count = 0
schema = avro.schema.Parse(open(file_schema, "rb").read())  # need to know the schema to write. According to 1.8.2 of Apache Avro
writer = DataFileWriter(open(new_file, "wb"), avro.io.DatumWriter(), schema)

for row in range(length):
    bytes = base64.b64decode(str(a[row][0]))
    bytes = bytes[5:]
    buf = io.BytesIO(bytes)
    decoder = avro.io.BinaryDecoder(buf)
    rec_reader = avro.io.DatumReader(avro.schema.Parse(open(file_schema).read()))
    out=rec_reader.read(decoder)
    writer.append(out)
writer.close()

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    storage_client = storage.Client.from_service_account_json('cert/key.json')
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob("insert/" + destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print('File {} uploaded to {}'.format(
        source_file_name,
        destination_blob_name
    ))

upload_blob('bucket', new_file, new_filename)

def insert_bigquery_avro(target_uri, dataset_id, table_id):
    bigquery_client = bigquery.Client(credentials=credentials, project=credentials.project_id)
    dataset_ref = bigquery_client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.autodetect = True
    job_config.source_format = bigquery.SourceFormat.AVRO
    job_config.use_avro_logical_types = True
    time_partitioning = bigquery.table.TimePartitioning()
    job_config.time_partitioning = time_partitioning
    uri = target_uri
    load_job = bigquery_client.load_table_from_uri(
        uri,
        dataset_ref.table(table_id),
        job_config=job_config
        )
    print('Starting job {}'.format(load_job.job_id))
    load_job.result()
    print('Job finished.')

dataset1 = 'dataset'
tabela1 = 'test'
bucket_name1 = 'gs://bucket/insert/' + new_filename

insert_bigquery_avro(bucket_name1, dataset1, tabela1)

if os.path.exists(new_file):
    os.remove(new_file)
    print("Delete file", new_file)
else:
    print("Can not delete the file as it doesn't exists")

这篇关于Google.api_core.exceptions.BadRequest:400无效的凭证的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆