从数据流作业连接到Cloud SQL [英] Connecting to Cloud SQL from Dataflow Job
问题描述
我正在努力将JdbcIO与Apache Beam 2.0(Java)结合使用,以从同一项目中的Dataflow连接到Cloud SQL实例.
I'm struggling to use JdbcIO with Apache Beam 2.0 (Java) to connect to a Cloud SQL instance from Dataflow within the same project.
我遇到以下错误:
java.sql.SQLException: Cannot create PoolableConnectionFactory (Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.)
-
根据文档,如果数据流服务帐户*@dataflow-service-producer-prod.iam.gserviceaccount.com具有编辑"权限,则应该有权访问同一项目中的所有资源.
According to the documentation the dataflow service account *@dataflow-service-producer-prod.iam.gserviceaccount.com should have access to all resources within the same project if he's got "Editor" permissions.
当我使用DirectRunner运行相同的Dataflow作业时,一切正常.
When I run the same Dataflow job with DirectRunner everything works fine.
这是我正在使用的代码:
This is the code I'm using:
private static String JDBC_URL = "jdbc:mysql://myip:3306/mydb?verifyServerCertificate=false&useSSL=true"; PCollection < KV < String, Double >> exchangeRates = p.apply(JdbcIO. < KV < String, Double >> read() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", JDBC_URL) .withUsername(JDBC_USER).withPassword(JDBC_PW)) .withQuery( "SELECT CurrencyCode, ExchangeRate FROM mydb.mytable") .withCoder(KvCoder.of(StringUtf8Coder.of(), DoubleCoder.of())) .withRowMapper(new JdbcIO.RowMapper < KV < String, Double >> () { public KV < String, Double > mapRow(ResultSet resultSet) throws Exception { return KV.of(resultSet.getString(1), resultSet.getDouble(2)); } }));
在DataflowRunner中,在另一个数据流作业中的波束外使用以下方法似乎可以很好地与DataflowRunner一起使用,这告诉我数据库可能不是问题.
Using the following approach outside of beam within another dataflow job seems to work fine with DataflowRunner which tells me that the database might not be the problem.
java.sql.Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PW);
推荐答案
按照以下有关如何从Java连接到Cloud SQL的说明进行操作:
Following these instructions on how to connect to Cloud SQL from Java:
https://cloud.google.com/sql/docs/mysql/connect-external-app#java
我设法使其正常工作.
这是代码的样子(您必须将MYDBNAME,MYSQLINSTANCE,USER和PASSWORD替换为您的值.
This is what the code looks like (you must replace MYDBNAME, MYSQLINSTANCE, USER and PASSWORD with your values.
注意:MYSQLINSTANCE格式为project:zone:instancename.
Heads up: MYSQLINSTANCE format is project:zone:instancename.
我正在使用一个自定义类(客户)来存储每一行的值,而不是键值对.
And I'm using a custom class (Customer) to store the values for each row, instead of key-value pairs.
p.apply(JdbcIO. <Customer> read() .withDataSourceConfiguration( JdbcIO.DataSourceConfiguration.create( "com.mysql.jdbc.Driver", "jdbc:mysql://google/MYDBNAME?cloudSqlInstance=MYSQLINSTANCE&socketFactory=com.google.cloud.sql.mysql.SocketFactory&user=USER&password=PASSWORD&useUnicode=true&characterEncoding=UTF-8" ) ) .withQuery( "SELECT CustomerId, Name, Location, Email FROM Customers" ) .withCoder( AvroCoder.of(Customer.class) ) .withRowMapper( new JdbcIO.RowMapper < Customer > () { @Override public Customer mapRow(java.sql.ResultSet resultSet) throws Exception { final Logger LOG = LoggerFactory.getLogger(CloudSqlToBq.class); LOG.info(resultSet.getString(2)); Customer customer = new Customer(resultSet.getInt(1), resultSet.getString(2), resultSet.getString(3), resultSet.getString(3)); return customer; } } ) );
我希望这会有所帮助.
这篇关于从数据流作业连接到Cloud SQL的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!