从数据流作业连接到Cloud SQL [英] Connecting to Cloud SQL from Dataflow Job

查看:89
本文介绍了从数据流作业连接到Cloud SQL的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在努力将JdbcIO与Apache Beam 2.0(Java)结合使用,以从同一项目中的Dataflow连接到Cloud SQL实例.

I'm struggling to use JdbcIO with Apache Beam 2.0 (Java) to connect to a Cloud SQL instance from Dataflow within the same project.

我遇到以下错误:

java.sql.SQLException: Cannot create PoolableConnectionFactory (Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.)

  • 根据文档,如果数据流服务帐户*@dataflow-service-producer-prod.iam.gserviceaccount.com具有编辑"权限,则应该有权访问同一项目中的所有资源.

    • According to the documentation the dataflow service account *@dataflow-service-producer-prod.iam.gserviceaccount.com should have access to all resources within the same project if he's got "Editor" permissions.

      当我使用DirectRunner运行相同的Dataflow作业时,一切正常.

      When I run the same Dataflow job with DirectRunner everything works fine.

      这是我正在使用的代码:

      This is the code I'm using:

      private static String JDBC_URL = "jdbc:mysql://myip:3306/mydb?verifyServerCertificate=false&useSSL=true";
      
      PCollection < KV < String, Double >> exchangeRates = p.apply(JdbcIO. < KV < String, Double >> read()
       .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", JDBC_URL)
        .withUsername(JDBC_USER).withPassword(JDBC_PW))
       .withQuery(
        "SELECT CurrencyCode, ExchangeRate FROM mydb.mytable")
       .withCoder(KvCoder.of(StringUtf8Coder.of(), DoubleCoder.of()))
       .withRowMapper(new JdbcIO.RowMapper < KV < String, Double >> () {
        public KV < String, Double > mapRow(ResultSet resultSet) throws Exception {
         return KV.of(resultSet.getString(1), resultSet.getDouble(2));
        }
       }));
      

      在DataflowRunner中,在另一个数据流作业中的波束外使用以下方法似乎可以很好地与DataflowRunner一起使用,这告诉我数据库可能不是问题.

      Using the following approach outside of beam within another dataflow job seems to work fine with DataflowRunner which tells me that the database might not be the problem.

      java.sql.Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PW);
      

      推荐答案

      按照以下有关如何从Java连接到Cloud SQL的说明进行操作:

      Following these instructions on how to connect to Cloud SQL from Java:

      https://cloud.google.com/sql/docs/mysql/connect-external-app#java

      我设法使其正常工作.

      这是代码的样子(您必须将MYDBNAME,MYSQLINSTANCE,USER和PASSWORD替换为您的值.

      This is what the code looks like (you must replace MYDBNAME, MYSQLINSTANCE, USER and PASSWORD with your values.

      注意:MYSQLINSTANCE格式为project:zone:instancename.

      Heads up: MYSQLINSTANCE format is project:zone:instancename.

      我正在使用一个自定义类(客户)来存储每一行​​的值,而不是键值对.

      And I'm using a custom class (Customer) to store the values for each row, instead of key-value pairs.

      p.apply(JdbcIO. <Customer> read()
          .withDataSourceConfiguration(
              JdbcIO.DataSourceConfiguration.create(
                  "com.mysql.jdbc.Driver", 
                  "jdbc:mysql://google/MYDBNAME?cloudSqlInstance=MYSQLINSTANCE&socketFactory=com.google.cloud.sql.mysql.SocketFactory&user=USER&password=PASSWORD&useUnicode=true&characterEncoding=UTF-8"
              )
          )
          .withQuery( "SELECT CustomerId, Name, Location, Email FROM Customers" )
          .withCoder( AvroCoder.of(Customer.class) )
          .withRowMapper(
              new JdbcIO.RowMapper < Customer > ()
              {
                  @Override
                  public Customer mapRow(java.sql.ResultSet resultSet) throws Exception
                  {
                      final Logger LOG = LoggerFactory.getLogger(CloudSqlToBq.class);
                      LOG.info(resultSet.getString(2));
                      Customer customer = new Customer(resultSet.getInt(1), resultSet.getString(2), resultSet.getString(3), resultSet.getString(3));
                      return customer;
                  }
              }
          )
      );
      

      我希望这会有所帮助.

      这篇关于从数据流作业连接到Cloud SQL的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆