数据流如何与BIgQuery数据集一起使用 [英] How Dataflow works with BIgQuery Dataset

查看:199
本文介绍了数据流如何与BIgQuery数据集一起使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我没有找到如何从指定的数据集中获取表格.我想使用Dataflow将表从Dataset US迁移到数据集位置EU.我想在数据集US的并行处理中获取所有表,然后在数据集EU中写入表.

I don't found how get tables from a dataset specified. I want use Dataflow for migrate tables since Dataset US to dataset location EU. I would like get all tables in paralel process of dataset US and write the tables in dataset EU.

Beam 2.4正在使用com.google.api.services.bigquery v2-rev374-1.22.0.这也是应与Beam 2.4一起使用的库.

Beam 2.4 is using com.google.api.services.bigquery v2-rev374-1.22.0. This is also the library that you should use with Beam 2.4.

该代码在DirectRunner上成功运行,但是如果我在DataflowRunner上运行,则不会运行并抛出错误

The code run successfully with DirectRunner but If I run with DataflowRunner doesn't run and throw the error

un 29, 2018 1:52:48 PM com.google.api.client.http.HttpRequest execute
ADVERTENCIA: exception thrown while executing request
java.net.SocketException: Unexpected end of file from server
        at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:851)
        at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:678)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1587)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1492)
        at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
        at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:347)
        at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:37)
        at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
        at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:972)
        at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
        at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
        at com.google.api.client.googleapis.media.MediaHttpUploader.executeUploadInitiation(MediaHttpUploader.java:519)
        at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:384)
        at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
        at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

jun 29, 2018 1:52:55 PM com.google.api.client.http.HttpRequest execute
ADVERTENCIA: exception thrown while executing request
java.net.SocketException: Unexpected end of file from server
        at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:851)
        at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:678)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1587)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1492)
        at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
        at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:347)
        at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:37)
        at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
        at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:972)
        at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
        at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
        at com.google.api.client.googleapis.media.MediaHttpUploader.executeUploadInitiation(MediaHttpUploader.java:519)
        at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:384)
        at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
        at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

代码示例

package emp.customerjourney.etls;

import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.Bigquery.*;
import com.google.api.services.bigquery.BigqueryScopes;
import com.google.api.services.bigquery.model.*;


import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;


import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class Migracion {

    public static interface MyOptions extends DataflowPipelineOptions {
        @Description("BigQuery table to write to, specified as "+ "<project_id>:<dataset_id>.<table_id>. The dataset must already exist.")
        @Default.String("customerjourney:prueba.weather_stations")
        @Validation.Required
        String getOutput();
        void setOutput(String s);

        @Description("Table to read from, specified as "+ "<project_id>:<dataset_id>.<table_id>")
        @Default.String("customerjourney:118678548.gsod3")
        String getInput();
        void setInput(String value);


    }


    public static Bigquery createAuthorizedClient() throws IOException {
        // Create the credential
        HttpTransport transport = new NetHttpTransport();
        JsonFactory jsonFactory = new JacksonFactory();
        GoogleCredential credential = GoogleCredential.getApplicationDefault(transport, jsonFactory);

        if (credential.createScopedRequired()) {
            credential = credential.createScoped(BigqueryScopes.all());
        }

        return new Bigquery.Builder(transport, jsonFactory, credential)
                .setApplicationName("Bigquery Samples")
                .build();
    }


    public static final void main(String args[]) throws Exception {


        String projectId = "customerjourney";
        String datasetName = "dsorigen";
        // Create a new Bigquery client authorized via Application Default Credentials.
        Bigquery bigquery = createAuthorizedClient();
        Bigquery.Tables.List lista=bigquery.tables().list(projectId,datasetName);
        TableList rp= lista.execute();
        List<TableList.Tables> tblista =rp.getTables();
        String  entrada=tblista.get(3).getId();

        MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

        options.setTempLocation("gs://pruebasg/teststaging");
        options.setRegion("europe-west1");
        options.setStagingLocation("gs://pruebasg/temp_dataflow_tasks");
        Pipeline p = Pipeline.create(options);

        // Build the table schema for the output table.
        List<TableFieldSchema> fields = new ArrayList<>();
        fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
        fields.add(new TableFieldSchema().setName("tornado_count").setType("INTEGER"));
        TableSchema schema = new TableSchema().setFields(fields);

       // p.apply(BigQueryIO.readTableRows().from(options.getInput()))
        p.apply(BigQueryIO.readTableRows().from(entrada)) //get dataset name form api Bigquery V2
                .apply(new BigQueryTornadoes.CountTornadoes())
                .apply(BigQueryIO.writeTableRows()
                        .to(options.getOutput())
                        .withSchema(schema)
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));



        p.run().waitUntilFinish();

        options.getExecutorService().shutdown();
        try {
            options.getExecutorService().awaitTermination(3, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            System.out.println("Thread was interrupted waiting for execution service to shutdown.");
        }
        System.out.println("termino");

    }

}

<dependencies>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-core</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-direct-java</artifactId>
        </dependency>

        <dependency>
            <groupId>com.google.cloud.dataflow</groupId>
            <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
            <version>[2.4.0, 2.99)</version>
        </dependency>

        <!-- slf4j API frontend binding with JUL backend -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.14</version>
        </dependency>

       <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-jdk14</artifactId>
            <version>1.7.14</version>
        </dependency>

     <dependency>
            <groupId>com.google.apis</groupId>
            <artifactId>google-api-services-bigquery</artifactId>
            <version>${bigquery.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>com.google.guava</groupId>
                    <artifactId>guava-jdk5</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>22.0</version>
        </dependency>

              <dependency>
                  <groupId>com.google.oauth-client</groupId>
                  <artifactId>google-oauth-client</artifactId>
                  <version>1.21.0</version>
              </dependency>
              <dependency>
                  <groupId>com.google.http-client</groupId>
                  <artifactId>google-http-client-jackson2</artifactId>
                  <version>1.21.0</version>
              </dependency>
              <dependency>
                  <groupId>com.google.oauth-client</groupId>
                  <artifactId>google-oauth-client-jetty</artifactId>
                  <version>1.21.0</version>
              </dependency>
              <dependency>
                  <groupId>com.google.code.gson</groupId>
                  <artifactId>gson</artifactId>
                  <version>2.7</version>
              </dependency>
              <dependency>
                  <groupId>junit</groupId>
                  <artifactId>junit</artifactId>
                  <version>4.12</version>
                  <scope>test</scope>
              </dependency>
              <dependency>
                  <groupId>com.google.truth</groupId>
                  <artifactId>truth</artifactId>
                  <version>0.29</version>
                  <scope>test</scope>
              </dependency>


          </dependencies>

数据流需要GOOGLE_APPLICATION_CREDENTIALS json文件来执行此代码吗?

Dataflow would need GOOGLE_APPLICATION_CREDENTIALS json file to execute this code?

我没有找到存在的数据集的方法列表 BigQuery IO. 参考帖子

I don't found the method list tables of the exist Dataset with BigQuery IO. Reference post

我能帮我吗? –

推荐答案

这是连接错误.我建议首先按照其他答案中的说明,验证您是否正确设置了凭据,并且在仍然出现此错误时,请执行以下操作:

This is a connection error. I suggest first to verify that you're setting your credentials properly, as explained in other answers, and when still getting this error, do the following:

首先,此错误表明在服务器能够发送响应之前,TCP套接字已经关闭.一些可能的原因是:

First, this error indicates that the TCP socket has been closed before the server was able to send a response. Some possible reasons are:

  • 网络连接丢失
  • 服务器决定关闭连接
  • 客户端和服务器之间的某些操作终止了请求

考虑使用netstat/traceroute/ping之类的工具来查看是否可以在案例1和案例3的路由中找到问题.

Consider to use tools like netstat/traceroute/ping to look to see if you can find an issue along the route for the cases 1 and 3.

如果服务器是关闭连接的服务器,则您的请求可能不正确或正在限制您以避免拥塞.在这种情况下,快速重试无济于事(在某些情况下,如果服务器同时出现问题,则重试会有所帮助).您可以尝试指数退避和重试策略,如果该策略不起作用,则您的请求可能不是正确的请求.

If the server is the one closing the connection, then your Request may not be correct or it's throttling you in order to avoid congestion. Those are the cases when quick retries don't help (in some cases, if the server have issues at the same moment, then retrying helps). You can try a exponential back-off and retry strategy, and if it doesn't work, then your request may not be a correct one.

这篇关于数据流如何与BIgQuery数据集一起使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆