云功能运行多次,而不是一次 [英] Cloud Function running multiple times instead of once

查看:63
本文介绍了云功能运行多次,而不是一次的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我每天晚上11点通过Cron Job上传10个文件到GCS的存储桶中.每个文件都是一个 .csv ,大小从 2 30 KB .文件名始终为 YYYY-MM-DD-ID.csv

I upload 10 files every day at 11 p.m with a Cron Job to a bucket on GCS. Each file is a .csv with a size from 2 to 30 KB. The file name is always YYYY-MM-DD-ID.csv

每次我将文件上传到存储桶中以将那些 .csv 文件发送到BigQuery时,都会调用Cloud Function.触发类型为 finalise/create 事件上的 Cloud Storage .

A Cloud Function is called everytime I am uploading a file into that bucket to send those .csv files to BigQuery. The trigger type is Cloud Storage on finalise/create events.

我的问题如下:在BigQuery上,每个行/列的每个值都乘以一个倍数.有时是1(所以值是相同的),经常是2,有时是3.我在下面附上一个示例,其中介绍了BigQuery(BQ)和Google Cloud Storage(GCS)之间的区别.

My issue is the following: On BigQuery, each value for each row/columns is multiplied by a multiple. Sometimes it's 1 (so the value is the same), often 2 and sometimes 3. I attached one example bellow with the difference between BigQuery (BQ) and Google Cloud Storage (GCS).

似乎多次调用了云函数.它不在代码上,而是在触发过程中来自Cloud Function的一些重复消息传递.在今天的日志"选项卡上,我可以看到"Cloud Function upload_to_bigquery "已被多次调用.

It seems that the cloud function is called multiple times. It's not on the code but rather some duplicate message deliveries from the Cloud Function during the trigger. When I am going o the logs tab for today, I can see the Cloud Function upload_to_bigquery has been called multiple times.

我试图修复它,但我犯了一个错误.我以为我们可以将临时文件写入Cloud Functions,但不能.我的解决方案是将要上传到BigQuery的文件名写在.txt文件上.在BigQuery上上传新文件之前,请阅读该.txt文件,并检查该列表中是否存在当前文件.如果文件名已经存在,请跳过.否则,将.txt文件名写到列表中,然后做我的事情.

I have tried to fix it but I made a mistake. I thought we could write temporary files to Cloud Functions but we can not. My solution was to write the filename I am uploading to BigQuery on a .txt file. And before to upload a new file on BigQuery, read that .txt file and check if the current file exist on that list. If the filename is already present, skip. Else, write the .txt filename to the list and do my stuff.

if file_to_upload not in text:
    text.append(file_to_upload)
    with open("all_uploaded_files.txt", "w") as text_file:
        for item in text:
            text_file.write(item + "\n")

    bucket = storage_client.bucket('sfr-test-data')
    blob = bucket.blob("all_uploaded_files.txt")
    blob.upload_from_filename("all_uploaded_files.txt")
    ## do my things here

else:
    print("file already uploaded")
    # skip to new file to upload

但是,即使我能做到这一点,该解决方案也不可行.几个月后,临时文件将变得如此之大,以至于混乱不堪.您知道解决此问题的最简单方法是什么吗?

But even if I could do that, this solution is not viable. The temporary file will become so large after months of years that it would be a mess. Do you know whats the easiest way to fix this issue?

云功能:upload_to_big_query-main.py

BUCKET = "xxx"
GOOGLE_PROJECT = "xxx"
HEADER_MAPPING = {
    "Source/Medium": "source_medium",
    "Campaign": "campaign",
    "Last Non-Direct Click Conversions": "last_non_direct_click_conversions",
    "Last Non-Direct Click Conversion Value": "last_non_direct_click_conversion_value",
    "Last Click Prio Conversions": "last_click_prio_conversions",
    "Last Click Prio Conversion Value": "last_click_prio_conversion_value",
    "Data-Driven Conversions": "dda_conversions",
    "Data-Driven Conversion Value": "dda_conversion_value",
    "% Change in Conversions from Last Non-Direct Click to Last Click Prio": "last_click_prio_vs_last_click",
    "% Change in Conversions from Last Non-Direct Click to Data-Driven": "dda_vs_last_click"
}

SPEND_HEADER_MAPPING = {
    "Source/Medium": "source_medium",
    "Campaign": "campaign",
    "Spend": "spend"
}

tables_schema = {
    "google-analytics": [
            bigquery.SchemaField("date", bigquery.enums.SqlTypeNames.DATE, mode='REQUIRED'),
            bigquery.SchemaField("week", bigquery.enums.SqlTypeNames.INT64, mode='REQUIRED'),
            bigquery.SchemaField("goal", bigquery.enums.SqlTypeNames.STRING, mode='REQUIRED'),
            bigquery.SchemaField("source", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
            bigquery.SchemaField("medium", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
            bigquery.SchemaField("campaign", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
            bigquery.SchemaField("last_non_direct_click_conversions", bigquery.enums.SqlTypeNames.INT64, mode='NULLABLE'),
            bigquery.SchemaField("last_non_direct_click_conversion_value", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
            bigquery.SchemaField("last_click_prio_conversions", bigquery.enums.SqlTypeNames.INT64, mode='NULLABLE'),
            bigquery.SchemaField("last_click_prio_conversion_value", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
            bigquery.SchemaField("dda_conversions", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
            bigquery.SchemaField("dda_conversion_value", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
            bigquery.SchemaField("last_click_prio_vs_last_click", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
            bigquery.SchemaField("dda_vs_last_click", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE')
    ],
    "google-analytics-spend": [
            bigquery.SchemaField("date", bigquery.enums.SqlTypeNames.DATE, mode='REQUIRED'),
            bigquery.SchemaField("week", bigquery.enums.SqlTypeNames.INT64, mode='REQUIRED'),
            bigquery.SchemaField("source", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
            bigquery.SchemaField("medium", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
            bigquery.SchemaField("campaign", bigquery.enums.SqlTypeNames.STRING, mode='NULLABLE'),
            bigquery.SchemaField("spend", bigquery.enums.SqlTypeNames.FLOAT64, mode='NULLABLE'),
    ]
}


def download_from_gcs(file):
    client = storage.Client()
    bucket = client.get_bucket(BUCKET)
    blob = bucket.get_blob(file['name'])
    file_name = os.path.basename(os.path.normpath(file['name']))
    blob.download_to_filename(f"/tmp/{file_name}")
    return file_name


def load_in_bigquery(file_object, dataset: str, table: str):
    client = bigquery.Client()
    table_id = f"{GOOGLE_PROJECT}.{dataset}.{table}"
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=1,
        autodetect=True,
        schema=tables_schema[table]
    )

    job = client.load_table_from_file(file_object, table_id, job_config=job_config)
    job.result()  # Wait for the job to complete.


def __order_columns(df: pd.DataFrame, spend=False) ->pd.DataFrame:
    # We want to have source and medium columns at the third position
    # for a spend data frame and at the fourth postion for others df
    # because spend data frame don't have goal column.
    pos = 2 if spend else 3

    cols = df.columns.tolist()
    cols[pos:2] = cols[-2:]
    cols = cols[:-2]
    return df[cols]


def __common_transformation(df: pd.DataFrame, date: str, goal: str) -> pd.DataFrame:
    # for any kind of dataframe, we add date and week columns
    # based on the file name and we split Source/Medium from the csv
    # into two different columns

    week_of_the_year = datetime.strptime(date, '%Y-%m-%d').isocalendar()[1]
    df.insert(0, 'date', date)
    df.insert(1, 'week', week_of_the_year)
    mapping = SPEND_HEADER_MAPPING if goal == "spend" else HEADER_MAPPING
    print(df.columns.tolist())
    df = df.rename(columns=mapping)
    print(df.columns.tolist())
    print(df)
    df["source_medium"] = df["source_medium"].str.replace(' ', '')
    df[["source", "medium"]] = df["source_medium"].str.split('/', expand=True)
    df = df.drop(["source_medium"], axis=1)
    df["week"] = df["week"].astype(int, copy=False)
    return df


def __transform_spend(df: pd.DataFrame) -> pd.DataFrame:
    df["spend"] = df["spend"].astype(float, copy=False)
    df = __order_columns(df, spend=True)
    return df[df.columns[:6]]


def __transform_attribution(df: pd.DataFrame, goal: str) -> pd.DataFrame:
    df.insert(2, 'goal', goal)
    df["last_non_direct_click_conversions"] = df["last_non_direct_click_conversions"].astype(int, copy=False)
    df["last_click_prio_conversions"] = df["last_click_prio_conversions"].astype(int, copy=False)
    df["dda_conversions"] = df["dda_conversions"].astype(float, copy=False)
    return __order_columns(df)


def transform(df: pd.DataFrame, file_name) -> pd.DataFrame:
    goal, date, *_ = file_name.split('_')
    df = __common_transformation(df, date, goal)
    # we only add goal in attribution df (google-analytics table).
    return __transform_spend(df) if "spend" in file_name else __transform_attribution(df, goal)


def main(event, context):
    """Triggered by a change to a Cloud Storage bucket.
    Args:
         event (dict): Event payload.
         context (google.cloud.functions.Context): Metadata for the event.
    """
    file = event

    file_name = download_from_gcs(file)
    df = pd.read_csv(f"/tmp/{file_name}")

    transformed_df = transform(df, file_name)

    with open(f"/tmp/bq_{file_name}", "w") as file_object:
        file_object.write(transformed_df.to_csv(index=False))

    with open(f"/tmp/bq_{file_name}", "rb") as file_object:
        table = "google-analytics-spend" if "spend" in file_name else "google-analytics"
        load_in_bigquery(file_object, dataset='attribution', table=table)

推荐答案

您可能希望检查此线程:

You might would prefer to check this thread:

BigQuery显示错误的结果-是否从Cloud Function复制数据?

很快-函数将是幂等的,并且进程的状态(如果数据/文件是否已上传到BQ中)应保留在云函数之外.一个文本文件(在某些GCS存储桶中,不在云功能存储器中,可以在云功能执行完成后立即擦除),但是在这种特殊情况下,GCS有很多缺点.例如,消防站-更好得多的选择.

Very shortly - the function is to be idempotent, and the state of the process (if the data/file was uploaded into BQ or not) should be kept outside of the cloud function. A text file (in some GCS bucket, not inside the cloud function memory, which can be erased as soon as the cloud function execution is finished) is an option, but GCS has plenty of drawbacks in this particular case. For example, a firestore - is much, much better choice.

您可以考虑以下算法-

当云功能启动时,它应该基于输入数据-文件/对象元数据或文件/对象数据或两者的组合来计算一些哈希码.该哈希-对于给定的数据集应该是唯一的.

When you cloud function starts, it should calculate some hash code based on input data - file/object metadata or file/object data or combination of both. That hash - should be unique for the given set of data.

您的云函数连接到预定义的Firestore集合(可以在环境变量中提供项目和名称),并检查是否存在以给定哈希作为ID的文档/记录-已经存在或不存在.

Your cloud function connects to a predefined firestore collection (the project and the name can be provided in the environment variables) and checks if there a document/record with the given hash as an id - already exists or not.

如果该杂凑在firestore集合中已经存在(文档存在)-云功能将完成其执行并且不执行其他任何操作(可以进行日志记录,如果需要,可以在firestore文档中添加一些其他详细信息,等等).因此,只需完成其执行即可.

If that hash already exists (the document exists) in the firestore collection - the cloud function finishes its execution and does not do anything else (can do logging, add some additional details into the firestore document if required, etc.). Thus simply finishes its execution.

如果未找到该哈希(该文档不存在)-云功能将使用给定哈希作为ID创建一个新文档.如果需要,可以将一些元数据详细信息添加到该文档中.

If that hash is not found (the document does not exist) - the cloud function creates a new document with the given hash as an id. Some metadata details can be added into that document if needed.

创建文档后,云功能将继续执行主要的工作流程".

Upon the document is created the cloud function continues the main 'workflow'.

一些需要牢记的事情.

1/IAM权限-正在运行云功能的服务帐户-应该在Firestore上具有相关权限.显然,将在给定的项目中启用firestore API ...

1/ IAM permissions - the service account under which the cloud function is running - should have relevant permissions on the firestore. Obviously the firestore API is to be enabled in the given project...

2/如果云功能创建了一个新的Firestore文档,但是由于某种原因却无法将数据加载到BigQuery中,将会发生什么.仅仅检查Firestore文档是否存在可能是不够的.因此,将在Firestore文档中维护适当的状态".例如,当创建一个新文档时(在Firestore中),应该有一个 __ state 字段,并为其分配了一个值(例如) IN_PROGRESS .然后,在加载数据时,云功能将返回到Firestore,并使用值 DONE 更新该字段(例如).但是,这还不够.当您有加载作业时-可能在某些情况下,实际上加载成功,但是云功能失败(包括超时在内的任何原因).您可能还想考虑在这种情况下该怎么做.无论如何,在Firestore中进行一些状态"监视可能有助于了解/调查加载过程中的情况.监视的自动化可能需要开发单独的云功能,但这是一个单独的故事.

2/ What will happen if the cloud function creates a new firestore document, but then failed to load the data into BigQuery (for any reason). It may be that just a check on the firestore document existence is not enough. Thus, a proper 'state' is to be maintained in the firestore document. For example, when a new document is created (in the firestore), there should be a field __state and a value (for example) IN_PROGRESS is assigned to it. Then, when the data is loaded, the cloud function comes back to the firestore and updates that field with the value DONE (for example). But even that is not enough. As you have a load job - there may be cases, when the load is actually successful, but he cloud function failed (any reason including timeout). You might would like to think what to do in that case as well. In any case, having some 'state' monitoring in the firestore may help to understand/investigate the situation with the loading process. Automation of the monitoring might need developing a separate cloud function, but this is a separate story.

3/正如我在上面提到的线程中提到的那样(请参见该答案中的推理),从云函数内存内部加载数据是一个冒险的想法.我建议再次考虑一下算法的那一部分.

3/ As I mentioned in the thread I pointed above (see reasoning in that answer), loading data from inside the cloud function memory is a risky idea. I would suggest to think about that part of your algorithm again.

4/将加载的文件/对象从输入"目录移到另一个目录可能是一个好主意.存储到一些已处理"的(或存档")存储桶,如果成功,则将其移至失败"存储桶中.存储桶,以防加载失败.这将在云功能代码中完成.失败结果也可以反映在Firestore文档中(即,将 __ state 字段的值设置为 FAILURE ).

4/ It might be a good idea to move the loaded file/object from the "input" bucket to some "processed" (or "archive") bucket in case of success, and to move it into a "failure" bucket, in case the load failed. That is to be done in the cloud function code. Failure outcome can also be reflected in the firestore document (i.e. set the value of the __state field to FAILURE).

这篇关于云功能运行多次,而不是一次的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆