从 Dataflow 作业连接到 Cloud SQL [英] Connecting to Cloud SQL from Dataflow Job

查看:27
本文介绍了从 Dataflow 作业连接到 Cloud SQL的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在努力将 JdbcIO 与 Apache Beam 2.0 (Java) 结合使用,以便从同一项目中的 Dataflow 连接到 Cloud SQL 实例.

I'm struggling to use JdbcIO with Apache Beam 2.0 (Java) to connect to a Cloud SQL instance from Dataflow within the same project.

我收到以下错误:

java.sql.SQLException: Cannot create PoolableConnectionFactory (Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.)

  • 根据文档,如果数据流服务帐户 *@dataflow-service-producer-prod.iam.gserviceaccount.com 拥有编辑器"权限,他应该有权访问同一项目中的所有资源.

    • According to the documentation the dataflow service account *@dataflow-service-producer-prod.iam.gserviceaccount.com should have access to all resources within the same project if he's got "Editor" permissions.

      当我使用 DirectRunner 运行相同的 Dataflow 作业时,一切正常.

      When I run the same Dataflow job with DirectRunner everything works fine.

      这是我正在使用的代码:

      This is the code I'm using:

      private static String JDBC_URL = "jdbc:mysql://myip:3306/mydb?verifyServerCertificate=false&useSSL=true";
      
      PCollection < KV < String, Double >> exchangeRates = p.apply(JdbcIO. < KV < String, Double >> read()
       .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", JDBC_URL)
        .withUsername(JDBC_USER).withPassword(JDBC_PW))
       .withQuery(
        "SELECT CurrencyCode, ExchangeRate FROM mydb.mytable")
       .withCoder(KvCoder.of(StringUtf8Coder.of(), DoubleCoder.of()))
       .withRowMapper(new JdbcIO.RowMapper < KV < String, Double >> () {
        public KV < String, Double > mapRow(ResultSet resultSet) throws Exception {
         return KV.of(resultSet.getString(1), resultSet.getDouble(2));
        }
       }));
      

      在另一个数据流作业中使用 Beam 之外的以下方法似乎与 DataflowRunner 一起工作正常,这告诉我数据库可能不是问题.

      Using the following approach outside of beam within another dataflow job seems to work fine with DataflowRunner which tells me that the database might not be the problem.

      java.sql.Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PW);
      

      推荐答案

      按照以下有关如何从 Java 连接到 Cloud SQL 的说明进行操作:

      Following these instructions on how to connect to Cloud SQL from Java:

      https://cloud.google.com/sql/docs/mysql/connect-external-app#java

      我设法让它发挥作用.

      这是代码的样子(您必须用您的值替换 MYDBNAME、MYSQLINSTANCE、USER 和 PASSWORD.

      This is what the code looks like (you must replace MYDBNAME, MYSQLINSTANCE, USER and PASSWORD with your values.

      注意:MYSQLINSTANCE 格式为 project:zone:instancename.

      Heads up: MYSQLINSTANCE format is project:zone:instancename.

      我使用自定义类 (Customer) 来存储每一行​​的值,而不是键值对.

      And I'm using a custom class (Customer) to store the values for each row, instead of key-value pairs.

      p.apply(JdbcIO. <Customer> read()
          .withDataSourceConfiguration(
              JdbcIO.DataSourceConfiguration.create(
                  "com.mysql.jdbc.Driver", 
                  "jdbc:mysql://google/MYDBNAME?cloudSqlInstance=MYSQLINSTANCE&socketFactory=com.google.cloud.sql.mysql.SocketFactory&user=USER&password=PASSWORD&useUnicode=true&characterEncoding=UTF-8"
              )
          )
          .withQuery( "SELECT CustomerId, Name, Location, Email FROM Customers" )
          .withCoder( AvroCoder.of(Customer.class) )
          .withRowMapper(
              new JdbcIO.RowMapper < Customer > ()
              {
                  @Override
                  public Customer mapRow(java.sql.ResultSet resultSet) throws Exception
                  {
                      final Logger LOG = LoggerFactory.getLogger(CloudSqlToBq.class);
                      LOG.info(resultSet.getString(2));
                      Customer customer = new Customer(resultSet.getInt(1), resultSet.getString(2), resultSet.getString(3), resultSet.getString(3));
                      return customer;
                  }
              }
          )
      );
      

      我希望这会有所帮助.

      这篇关于从 Dataflow 作业连接到 Cloud SQL的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆