Dataflow 如何与 BIgQuery 数据集配合使用 [英] How Dataflow works with BIgQuery Dataset

查看:28
本文介绍了Dataflow 如何与 BIgQuery 数据集配合使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我没有找到如何从指定的数据集中获取表格.我想使用 Dataflow 来迁移表,因为 Dataset US 到数据集位置 EU.我想在数据集 US 的并行过程中获取所有表并将这些表写入数据集 EU.

Beam 2.4 使用 com.google.api.services.bigquery v2-rev374-1.22.0.这也是您应该与 Beam 2.4 一起使用的库.

代码使用 DirectRunner 成功运行,但如果我使用 DataflowRunner 运行,则不会运行并抛出错误

un 29, 2018 1:52:48 PM com.google.api.client.http.HttpRequest 执行ADVERTENCIA:执行请求时抛出异常java.net.SocketException:来自服务器的意外文件结尾在 sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:851)在 sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:678)在 sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1587)在 sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1492)在 java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)在 sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:347)在 com.google.api.client.http.javanet.NetHttpResponse.(NetHttpResponse.java:37)在 com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)在 com.google.api.client.http.HttpRequest.execute(HttpRequest.java:972)在 com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)在 com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)在 com.google.api.client.googleapis.media.MediaHttpUploader.executeUploadInitiation(MediaHttpUploader.java:519)在 com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:384)在 com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)在 com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)在 com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)在 com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)在 com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)在 java.util.concurrent.FutureTask.run(FutureTask.java:266)在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)在 java.lang.Thread.run(Thread.java:748)2018 年 6 月 29 日下午 1:52:55 com.google.api.client.http.HttpRequest 执行ADVERTENCIA:执行请求时抛出异常java.net.SocketException:来自服务器的意外文件结尾在 sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:851)在 sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:678)在 sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1587)在 sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1492)在 java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)在 sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:347)在 com.google.api.client.http.javanet.NetHttpResponse.(NetHttpResponse.java:37)在 com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)在 com.google.api.client.http.HttpRequest.execute(HttpRequest.java:972)在 com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)在 com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)在 com.google.api.client.googleapis.media.MediaHttpUploader.executeUploadInitiation(MediaHttpUploader.java:519)在 com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:384)在 com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)在 com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)在 com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)在 com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)在 com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)在 java.util.concurrent.FutureTask.run(FutureTask.java:266)在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)在 java.lang.Thread.run(Thread.java:748)

<块引用>

代码示例

包 emp.customerjourney.etls;导入 com.google.api.client.googleapis.auth.oauth2.GoogleCredential;导入 com.google.api.client.http.HttpTransport;导入 com.google.api.client.http.javanet.NetHttpTransport;导入 com.google.api.client.json.JsonFactory;导入 com.google.api.client.json.jackson2.JacksonFactory;导入 com.google.api.services.bigquery.Bigquery;导入 com.google.api.services.bigquery.Bigquery.*;导入 com.google.api.services.bigquery.BigqueryScopes;导入 com.google.api.services.bigquery.model.*;导入 org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;导入 org.apache.beam.sdk.Pipeline;导入 org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;导入 org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;导入 org.apache.beam.sdk.options.Default;导入 org.apache.beam.sdk.options.Description;导入 org.apache.beam.sdk.options.PipelineOptionsFactory;导入 org.apache.beam.sdk.options.Validation;导入 java.io.IOException;导入 java.util.ArrayList;导入 java.util.List;导入 java.util.concurrent.TimeUnit;公共类迁移{公共静态接口 MyOptions 扩展 DataflowPipelineOptions {@Description("要写入的 BigQuery 表,指定为 "+ ":..数据集必须已经存在.")@Default.String("customerjourney:prueba.weather_stations")@Validation.Required字符串 getOutput();void setOutput(String s);@Description("要读取的表,指定为 "+ ":.")@Default.String("customerjourney:118678548.gsod3")字符串 getInput();void setInput(String value);}公共静态 Bigquery createAuthorizedClient() 抛出 IOException {//创建凭证HttpTransport 传输 = 新 NetHttpTransport();JsonFactory jsonFactory = new JacksonFactory();GoogleCredential 凭证 = GoogleCredential.getApplicationDefault(transport, jsonFactory);如果 (credential.createScopedRequired()) {credential = credential.createScoped(BigqueryScopes.all());}返回新的 Bigquery.Builder(transport, jsonFactory, credential).setApplicationName("Bigquery 示例").建造();}public static final void main(String args[]) 抛出异常 {String projectId = "customerjourney";String datasetName = "dsorigen";//创建一个通过应用程序默认凭据授权的新 Bigquery 客户端.Bigquery bigquery = createAuthorizedClient();Bigquery.Tables.List lista=bigquery.tables().list(projectId,datasetName);TableList rp= lista.execute();列表tblista =rp.getTables();String entrada=tblista.get(3).getId();MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);options.setTempLocation("gs://pruebasg/teststaging");options.setRegion("europe-west1");options.setStagingLocation("gs://pruebasg/temp_dataflow_tasks");管道 p = Pipeline.create(options);//为输出表构建表架构.列表字段 = 新的 ArrayList();fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));fields.add(new TableFieldSchema().setName("tornado_count").setType("INTEGER"));TableSchema schema = new TableSchema().setFields(fields);//p.apply(BigQueryIO.readTableRows().from(options.getInput()))p.apply(BigQueryIO.readTableRows().from(entrada))//从API Bigquery V2中获取数据集名称.apply(new BigQueryTornadoes.CountTornadoes()).apply(BigQueryIO.writeTableRows().to(options.getOutput()).withSchema(架构).withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED).withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));p.run().waitUntilFinish();options.getExecutorService().shutdown();尝试 {options.getExecutorService().awaitTermination(3, TimeUnit.MINUTES);} catch (InterruptedException e) {System.out.println("线程被中断,等待执行服务关闭.");}System.out.println("终端");}}<依赖项><依赖><groupId>org.apache.beam</groupId><artifactId>beam-sdks-java-core</artifactId></依赖><依赖><groupId>org.apache.beam</groupId><artifactId>beam-runners-direct-java</artifactId></依赖><依赖><groupId>com.google.cloud.dataflow</groupId><artifactId>google-cloud-dataflow-java-sdk-all</artifactId><version>[2.4.0, 2.99)</version></依赖><!-- slf4j API 前端与 JUL 后端的绑定--><依赖><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.14</version></依赖><依赖><groupId>org.slf4j</groupId><artifactId>slf4j-jdk14</artifactId><version>1.7.14</version></依赖><依赖><groupId>com.google.apis</groupId><artifactId>google-api-services-bigquery</artifactId><version>${bigquery.version}</version><排除事项><排除><groupId>com.google.guava</groupId><artifactId>guava-jdk5</artifactId></排除></排除项></依赖><依赖><groupId>com.google.guava</groupId><artifactId>番石榴</artifactId><version>22.0</version></依赖><依赖><groupId>com.google.oauth-client</groupId><artifactId>google-oauth-client</artifactId><version>1.21.0</version></依赖><依赖><groupId>com.google.http-client</groupId><artifactId>google-http-client-jackson2</artifactId><version>1.21.0</version></依赖><依赖><groupId>com.google.oauth-client</groupId><artifactId>google-oauth-client-jetty</artifactId><version>1.21.0</version></依赖><依赖><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.7</version></依赖><依赖><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><范围>测试</范围></依赖><依赖><groupId>com.google.truth</groupId><artifactId>真相</artifactId><version>0.29</version><范围>测试</范围></依赖></依赖项>

Dataflow 需要 GOOGLE_APPLICATION_CREDENTIALS json 文件来执行此代码?

<块引用>

我没有找到现有数据集的方法列表大查询 IO.参考帖子

我能帮帮我吗?——

解决方案

这是一个连接错误.我建议首先验证您是否正确设置了凭据,如其他答案中所述,如果仍然出现此错误,请执行以下操作:

首先,此错误表明 TCP 套接字在服务器能够发送响应之前已关闭.一些可能的原因是:

  • 网络连接丢失
  • 服务器决定关闭连接
  • 客户端和服务器之间的某些东西终止了请求

考虑使用诸如 netstat/traceroute/ping 之类的工具来查看您是否可以在案例 1 和案例 3 的路线上找到问题.

如果服务器是关闭连接的服务器,那么您的请求可能不正确,或者它会限制您以避免拥塞.这些是快速重试无济于事的情况(在某些情况下,如果服务器同时出现问题,则重试会有所帮助).您可以尝试指数退避和重试策略,如果它不起作用,那么您的请求可能不正确.

I don't found how get tables from a dataset specified. I want use Dataflow for migrate tables since Dataset US to dataset location EU. I would like get all tables in paralel process of dataset US and write the tables in dataset EU.

Beam 2.4 is using com.google.api.services.bigquery v2-rev374-1.22.0. This is also the library that you should use with Beam 2.4.

The code run successfully with DirectRunner but If I run with DataflowRunner doesn't run and throw the error

un 29, 2018 1:52:48 PM com.google.api.client.http.HttpRequest execute
ADVERTENCIA: exception thrown while executing request
java.net.SocketException: Unexpected end of file from server
        at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:851)
        at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:678)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1587)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1492)
        at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
        at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:347)
        at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:37)
        at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
        at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:972)
        at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
        at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
        at com.google.api.client.googleapis.media.MediaHttpUploader.executeUploadInitiation(MediaHttpUploader.java:519)
        at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:384)
        at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
        at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

jun 29, 2018 1:52:55 PM com.google.api.client.http.HttpRequest execute
ADVERTENCIA: exception thrown while executing request
java.net.SocketException: Unexpected end of file from server
        at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:851)
        at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:678)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1587)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1492)
        at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
        at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:347)
        at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:37)
        at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
        at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:972)
        at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
        at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
        at com.google.api.client.googleapis.media.MediaHttpUploader.executeUploadInitiation(MediaHttpUploader.java:519)
        at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:384)
        at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
        at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

the code example

package emp.customerjourney.etls;

import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.Bigquery.*;
import com.google.api.services.bigquery.BigqueryScopes;
import com.google.api.services.bigquery.model.*;


import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;


import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class Migracion {

    public static interface MyOptions extends DataflowPipelineOptions {
        @Description("BigQuery table to write to, specified as "+ "<project_id>:<dataset_id>.<table_id>. The dataset must already exist.")
        @Default.String("customerjourney:prueba.weather_stations")
        @Validation.Required
        String getOutput();
        void setOutput(String s);

        @Description("Table to read from, specified as "+ "<project_id>:<dataset_id>.<table_id>")
        @Default.String("customerjourney:118678548.gsod3")
        String getInput();
        void setInput(String value);


    }


    public static Bigquery createAuthorizedClient() throws IOException {
        // Create the credential
        HttpTransport transport = new NetHttpTransport();
        JsonFactory jsonFactory = new JacksonFactory();
        GoogleCredential credential = GoogleCredential.getApplicationDefault(transport, jsonFactory);

        if (credential.createScopedRequired()) {
            credential = credential.createScoped(BigqueryScopes.all());
        }

        return new Bigquery.Builder(transport, jsonFactory, credential)
                .setApplicationName("Bigquery Samples")
                .build();
    }


    public static final void main(String args[]) throws Exception {


        String projectId = "customerjourney";
        String datasetName = "dsorigen";
        // Create a new Bigquery client authorized via Application Default Credentials.
        Bigquery bigquery = createAuthorizedClient();
        Bigquery.Tables.List lista=bigquery.tables().list(projectId,datasetName);
        TableList rp= lista.execute();
        List<TableList.Tables> tblista =rp.getTables();
        String  entrada=tblista.get(3).getId();

        MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

        options.setTempLocation("gs://pruebasg/teststaging");
        options.setRegion("europe-west1");
        options.setStagingLocation("gs://pruebasg/temp_dataflow_tasks");
        Pipeline p = Pipeline.create(options);

        // Build the table schema for the output table.
        List<TableFieldSchema> fields = new ArrayList<>();
        fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
        fields.add(new TableFieldSchema().setName("tornado_count").setType("INTEGER"));
        TableSchema schema = new TableSchema().setFields(fields);

       // p.apply(BigQueryIO.readTableRows().from(options.getInput()))
        p.apply(BigQueryIO.readTableRows().from(entrada)) //get dataset name form api Bigquery V2
                .apply(new BigQueryTornadoes.CountTornadoes())
                .apply(BigQueryIO.writeTableRows()
                        .to(options.getOutput())
                        .withSchema(schema)
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));



        p.run().waitUntilFinish();

        options.getExecutorService().shutdown();
        try {
            options.getExecutorService().awaitTermination(3, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            System.out.println("Thread was interrupted waiting for execution service to shutdown.");
        }
        System.out.println("termino");

    }

}

<dependencies>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-core</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-direct-java</artifactId>
        </dependency>

        <dependency>
            <groupId>com.google.cloud.dataflow</groupId>
            <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
            <version>[2.4.0, 2.99)</version>
        </dependency>

        <!-- slf4j API frontend binding with JUL backend -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.14</version>
        </dependency>

       <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-jdk14</artifactId>
            <version>1.7.14</version>
        </dependency>

     <dependency>
            <groupId>com.google.apis</groupId>
            <artifactId>google-api-services-bigquery</artifactId>
            <version>${bigquery.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>guava-jdk5</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>22.0</version>
        </dependency>

              <dependency>
                  <groupId>com.google.oauth-client</groupId>
                  <artifactId>google-oauth-client</artifactId>
                  <version>1.21.0</version>
              </dependency>
              <dependency>
                  <groupId>com.google.http-client</groupId>
                  <artifactId>google-http-client-jackson2</artifactId>
                  <version>1.21.0</version>
              </dependency>
              <dependency>
                  <groupId>com.google.oauth-client</groupId>
                  <artifactId>google-oauth-client-jetty</artifactId>
                  <version>1.21.0</version>
              </dependency>
              <dependency>
                  <groupId>com.google.code.gson</groupId>
                  <artifactId>gson</artifactId>
                  <version>2.7</version>
              </dependency>
              <dependency>
                  <groupId>junit</groupId>
                  <artifactId>junit</artifactId>
                  <version>4.12</version>
                  <scope>test</scope>
              </dependency>
              <dependency>
                  <groupId>com.google.truth</groupId>
                  <artifactId>truth</artifactId>
                  <version>0.29</version>
                  <scope>test</scope>
              </dependency>


          </dependencies>

Dataflow would need GOOGLE_APPLICATION_CREDENTIALS json file to execute this code?

I don't found the method list tables of the exist Dataset with BigQuery IO. Reference post

I Could you help me please? –

解决方案

This is a connection error. I suggest first to verify that you're setting your credentials properly, as explained in other answers, and when still getting this error, do the following:

First, this error indicates that the TCP socket has been closed before the server was able to send a response. Some possible reasons are:

  • Network connection was lost
  • The server decided to close the connection
  • Something in between the client and the server terminated the request

Consider to use tools like netstat/traceroute/ping to look to see if you can find an issue along the route for the cases 1 and 3.

If the server is the one closing the connection, then your Request may not be correct or it's throttling you in order to avoid congestion. Those are the cases when quick retries don't help (in some cases, if the server have issues at the same moment, then retrying helps). You can try a exponential back-off and retry strategy, and if it doesn't work, then your request may not be a correct one.

这篇关于Dataflow 如何与 BIgQuery 数据集配合使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆