将操作插入到bigquery表中 [英] insert operation into a bigquery table

查看:129
本文介绍了将操作插入到bigquery表中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想将SQL Server表的所有行插入具有相同模式的BigQuery表中。
流式插入行逐行非常慢:要插入1000行,执行下面的代码需要大约10分钟。
在此代码中,我循环遍历某个文件夹中的前10个文件,并将此文件的内容插入到唯一的SQL Server表中。一旦我打开了愿望文件,我循环遍历SQL Server表(它包含所有文件的所有行),并将内容逐行插入BigQuery表中。最后我删除这些文件,并清空sql服务器表



这个操作非常慢。

有人可以有更好的解决方案将SQL Server表的内容自动地(通过代码)插入到BigQuery表中?例如,将所有sql服务器表中的内容插入到 one 区块的bigquery表中(而不是逐行)。

谢谢



这是我的代码(在coldfusion中):

 < cfsilent> ; 
< cfinclude template =app_locals.cfm/>
< cfinclude template =act_BigqueryApiAccess.cfm/>
< / cfsilent>

<! - --- 1er traitement BQ:Insertion des colistraités--->
<!--- --- enregistrement dudébutdu 1er traitement BQ(TShipping)--->
< cfset BigqueryTShipping_StartDate = now()>
< cfset QueryName =InsertBigqueryLogTShippingStartDate>
< cfinclude template =qry_item.cfm>

< cfset FileList = Valuelist(FList.name)>
< cfoutput>< h3> FileList:#FileList#< / h3>< / cfoutput>

< cfif len(trim(FileList))>
<!--- traiter les 10 derniers fichiers(les MaxNbFile moinsrécents)--->
< cfset FileLoop = 1>
< cfloop list =#FileList#index =FileName>
< cfset PathFile =#FileRoot#\ _data \ _Bigquery \TShipping \#FileName#>
< cfset QueryName =InsertTShipping>
< cfinclude template =qry_item.cfm>
< cfset FileLoop = FileLoop + 1>
< cfif FileLoop GT Attributes.MaxNbFile>
< cfbreak />
< / cfif>
< / cfloop>
< / cfif>

<!--- instancier un objet de type(class)TableRow --->
< cfobject action =createtype =javaclass =com.google.api.services.bigquery.model.TableRowname =row>
<!---< cfdump var =#row#> --->

< cfset QueryName =GetParcels>
< cfinclude template =qry_item.cfm>
< cfloop query =GetParcels>
< cfset row.set(Tracking_Date,mid(Tracking_Date,6,19))>
< cfset row.set(TShipping_ID,TShipping_ID)>
< cfset row.set(TShipping_Tracking,TShipping_Tracking)>
< cfset row.set(Shipper_ID,Shipper_ID)>

< cfset rows.setInsertId(sys.currentTimeMillis())>
< cfset rows.setJson(row)>

< cfset rowList.add(rows)>

< cfset content = rqst.setRows(rowList)>

< cfset response = bq.tabledata()。insertAll(Project_ID,Dataset_ID,Table_ID,content).execute()>
< / cfloop>

<!--- --- vider la table TShipping_BQ --->
< cfset QueryName =DeleteOldTShipping_BQParcels>
< cfinclude template =qry_item.cfm>

<!--- ---抑制fichierstraités--->
< cfif len(trim(FileList))>
< cfset TShippingFileNb = len(trim(FileList))>
< cfset FileLoop = 1>
< cfloop list =#FileList#index =FileName>
< cfset PathFile =#FileRoot#\ _data \ _Bigquery \TShipping \#FileName#>
<!---< cffile action =deletefile =#PathFile#> --->
< cfset FileLoop = FileLoop + 1>
< cfif FileLoop GT Attributes.MaxNbFile>
< cfbreak />
< / cfif>
< / cfloop>
< cfelse>
< cfset TShippingFileNb = 0>
< / cfif>


<!--- --- enregistrement du nb de fichiers TShippingtraités--->
< cfset QueryName =InsertBigqueryLogTShippingNb>
< cfinclude template =qry_item.cfm>
<!--- enregistrement de la fin du 1er traitement BQ --->
< cfset BigqueryTShipping_EndDate = now()>
< cfset QueryName =InsertBigqueryLogTShippingEndDate>
< cfinclude template =qry_item.cfm>


解决方案

您应该可以将 insertAll()在循环外部。可能有一点你试图插入太多的记录,并且你需要在那时将它们批出来。即一旦你击中了1000条记录,就插入它们并重新设置你的rowList数组

 < cfloop query =GetParcels> 
< cfset row = something()><!---你需要为每个循环重新创建行,否则你正在更新每个循环的引用--->
< cfset row.set(Tracking_Date,mid(Tracking_Date,6,19))>
< cfset row.set(TShipping_ID,TShipping_ID)>
< cfset row.set(TShipping_Tracking,TShipping_Tracking)>
< cfset row.set(Shipper_ID,Shipper_ID)>
< cfset rows.setInsertId(sys.currentTimeMillis())>
< cfset rows.setJson(row)>
< cfset rowList.add(rows)>
< / cfloop>
< cfset content = rqst.setRows(rowList)>
< cfset response = bq.tabledata()。insertAll(Project_ID,Dataset_ID,Table_ID,content).execute()>

我的意思是批量处理的例子

 < cfloop query =GetParcels> 
< cfset row.set(Tracking_Date,mid(Tracking_Date,6,19))>
< cfset row.set(TShipping_ID,TShipping_ID)>
< cfset row.set(TShipping_Tracking,TShipping_Tracking)>
< cfset row.set(Shipper_ID,Shipper_ID)>
< cfset rows.setInsertId(sys.currentTimeMillis())>
< cfset rows.setJson(row)>
< cfset rowList.add(rows)>
< cfif arrayLen(rowList)EQ 1000>
< cfset content = rqst.setRows(rowList)>
< cfset response = bq.tabledata()。insertAll(Project_ID,Dataset_ID,Table_ID,content).execute()>
< cfset rowList = []>
< / cfif>
< / cfloop>
<!---添加此检查,以确保1000行的确切增量--->
< cfif! arrayIsEmpty(rowList)GT;
< cfset content = rqst.setRows(rowList)>
< cfset response = bq.tabledata()。insertAll(Project_ID,Dataset_ID,Table_ID,content).execute()>
< / cfif>


I want to insert all rows of an SQL server Table into a Bigquery Table having the same schema. The streaming insert row by row is very slow: to insert 1000 rows the execution of the code below took about 10 minutes. In this code I loop over the first 10 files in a certain folder, and I insert the content of this file in a unique SQL Server Table. Once I looped over the desire files, I loop over the SQL Server table (which contain all rows of all files) and I insert the content row by row in a Bigquery Table. Finally I delete those files and I empty the sql server table

This operation is very slow.

Can someone have a better solution to insert the content of an SQL server Table into a Bigquery Table automatically (via a code) ?? For example insert all the content af the sql server table into the bigquery table in one bloc (and not row by row).

Thanks

This is my code (in coldfusion):

<cfsilent>
    <cfinclude template="app_locals.cfm" />
    <cfinclude template="act_BigqueryApiAccess.cfm" />
</cfsilent>

<!--- 1er traitement BQ: Insertion des colis traités --->
 <!--- enregistrement du début du 1er traitement BQ (TShipping)--->
<cfset BigqueryTShipping_StartDate=now()>
<cfset QueryName = "InsertBigqueryLogTShippingStartDate">
<cfinclude template="qry_item.cfm"> 

<cfdirectory action="list" directory="#FileRoot#\_data\_Bigquery\TShipping" listinfo="all" type="file" name="FList" sort="datelastmodified">
<cfset FileList = Valuelist(FList.name)>
<cfoutput><h3>FileList: #FileList#</h3></cfoutput>

<cfif len(trim(FileList))>
    <!--- traiter les 10 derniers fichiers (les MaxNbFile moins récents) --->
    <cfset FileLoop = 1>
    <cfloop list="#FileList#" index="FileName"> 
        <cfset PathFile="#FileRoot#\_data\_Bigquery\TShipping\#FileName#">
        <cfset QueryName = "InsertTShipping">
        <cfinclude template="qry_item.cfm"> 
        <cfset FileLoop = FileLoop+1>
        <cfif FileLoop GT Attributes.MaxNbFile>
            <cfbreak />
        </cfif>
    </cfloop>
</cfif>

<!--- instancier un objet de type (class) TableRow --->
<cfobject action="create" type="java" class="com.google.api.services.bigquery.model.TableRow" name="row">
<!--- <cfdump var="#row#"> --->

<cfset QueryName = "GetParcels">
<cfinclude template="qry_item.cfm"> 
<cfloop query="GetParcels"> 
    <cfset row.set("Tracking_Date",mid(Tracking_Date,6,19))>
    <cfset row.set("TShipping_ID", TShipping_ID)>
    <cfset row.set("TShipping_Tracking", TShipping_Tracking)>
    <cfset row.set("Shipper_ID", Shipper_ID)>

    <cfset rows.setInsertId(sys.currentTimeMillis())>
    <cfset rows.setJson(row)>

    <cfset rowList.add(rows)>

    <cfset content=rqst.setRows(rowList)>

    <cfset response = bq.tabledata().insertAll(Project_ID,Dataset_ID,Table_ID, content).execute()>  
</cfloop>

<!---vider la table TShipping_BQ--->
<cfset QueryName = "DeleteOldTShipping_BQParcels">
<cfinclude template="qry_item.cfm">

<!--- Suppression des fichiers traités ---> 
<cfif len(trim(FileList))>
    <cfset TShippingFileNb=len(trim(FileList))>
    <cfset FileLoop = 1>
    <cfloop list="#FileList#" index="FileName"> 
        <cfset PathFile="#FileRoot#\_data\_Bigquery\TShipping\#FileName#">
        <cffile action="move" source="#PathFile#" destination="#FileRoot#\_data\_Bigquery\TShippingArchive">
        <!--- <cffile action="delete" file="#PathFile#"> --->
        <cfset FileLoop = FileLoop+1>
        <cfif FileLoop GT Attributes.MaxNbFile>
            <cfbreak />
        </cfif>
    </cfloop>
<cfelse>
    <cfset TShippingFileNb=0>
</cfif>


<!--- enregistrement du nb de fichiers TShipping traités --->
<cfset QueryName = "InsertBigqueryLogTShippingNb">
<cfinclude template="qry_item.cfm"> 
 <!--- enregistrement de la fin du 1er traitement BQ--->
<cfset BigqueryTShipping_EndDate=now()>
<cfset QueryName = "InsertBigqueryLogTShippingEndDate">
<cfinclude template="qry_item.cfm">

解决方案

You should be able to move your insertAll() outside the loop. There may be a point where you're trying to insert too many records and you would need to batch them out at that point. i.e. once you've hit 1000 records insert them and reset your rowList array

<cfloop query="GetParcels"> 
  <cfset row = something()><!--- you need to re-create row for each loop or else you're updating a reference with each loop --->
  <cfset row.set("Tracking_Date",mid(Tracking_Date,6,19))>
  <cfset row.set("TShipping_ID", TShipping_ID)>
  <cfset row.set("TShipping_Tracking", TShipping_Tracking)>
  <cfset row.set("Shipper_ID", Shipper_ID)>
  <cfset rows.setInsertId(sys.currentTimeMillis())>
  <cfset rows.setJson(row)>
  <cfset rowList.add(rows)>
</cfloop>
<cfset content=rqst.setRows(rowList)>
<cfset response = bq.tabledata().insertAll(Project_ID,Dataset_ID,Table_ID,content).execute()>  

An example of what I mean by batching

<cfloop query="GetParcels"> 
  <cfset row.set("Tracking_Date",mid(Tracking_Date,6,19))>
  <cfset row.set("TShipping_ID", TShipping_ID)>
  <cfset row.set("TShipping_Tracking", TShipping_Tracking)>
  <cfset row.set("Shipper_ID", Shipper_ID)>
  <cfset rows.setInsertId(sys.currentTimeMillis())>
  <cfset rows.setJson(row)>
  <cfset rowList.add(rows)>
  <cfif arrayLen(rowList) EQ 1000>
    <cfset content=rqst.setRows(rowList)>
    <cfset response = bq.tabledata().insertAll(Project_ID,Dataset_ID,Table_ID,content).execute()>  
    <cfset rowList = []>
  </cfif>
</cfloop>
<!--- add this check in case there are exactly an increment of 1000 rows --->
<cfif ! arrayIsEmpty(rowList)>
  <cfset content=rqst.setRows(rowList)>
  <cfset response = bq.tabledata().insertAll(Project_ID,Dataset_ID,Table_ID,content).execute()>  
</cfif>

这篇关于将操作插入到bigquery表中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆